support multiple handlers

This commit is contained in:
Bruce Röttgers 2025-05-16 20:27:59 +02:00
parent f73046bd65
commit 60c13fb9ec
2 changed files with 48 additions and 35 deletions

42
main.py
View File

@ -40,7 +40,7 @@ def gather_handler_kwargs(handler_name: str) -> dict:
async def process_dump(
mappings: dict[str, str], handler, max_concurrent: int
mappings: dict[str, str], handlers, max_concurrent: int
):
"""
Stream-download the bzip2-compressed XML dump and feed to SAX.
@ -52,7 +52,7 @@ async def process_dump(
)
decomp = bz2.BZ2Decompressor()
sax_parser = xml.sax.make_parser()
dump_handler = WikiDumpHandler(mappings, handler, max_concurrent)
dump_handler = WikiDumpHandler(mappings, handlers, max_concurrent)
sax_parser.setContentHandler(dump_handler)
async with aiohttp.ClientSession() as session:
@ -69,13 +69,21 @@ async def process_dump(
await asyncio.gather(*dump_handler.tasks)
async def main():
# 1. Which handler to load?
handler_name = os.getenv("HANDLER")
if not handler_name:
logger.error("Error: set ENV HANDLER (e.g. 'filesystem')")
# 1. Which handler(s) to load?
handler_names = os.getenv("HANDLER", "").split(",")
if not handler_names or not handler_names[0]:
logger.error("Error: set ENV HANDLER (e.g. 'filesystem' or 'filesystem,sftp')")
sys.exit(1)
# 2. Dynamic import
handlers = []
# 2. Load each handler
for handler_name in handler_names:
handler_name = handler_name.strip()
if not handler_name:
continue
# Dynamic import
module_path = f"output_handlers.{handler_name}"
try:
mod = importlib.import_module(module_path)
@ -83,7 +91,7 @@ async def main():
logger.error(f"Error loading handler module {module_path}: {e}")
sys.exit(1)
# 3. Find the class: e.g. "sftp" → "SftpHandler"
# Find the class: e.g. "sftp" → "SftpHandler"
class_name = handler_name.title().replace("_", "") + "Handler"
if not hasattr(mod, class_name):
logger.error(f"{module_path} defines no class {class_name}")
@ -92,13 +100,14 @@ async def main():
logger.info(f"Using handler from {module_path}")
# 4. Build kwargs from ENV
# Build kwargs from ENV
handler_kwargs = gather_handler_kwargs(handler_name)
# 5. Instantiate
# Instantiate
handler = HandlerCls(**handler_kwargs)
handlers.append(handler)
# 6. read concurrency setting
# 3. read concurrency setting
try:
max_conc = int(os.getenv("MAX_CONCURRENT", "0"))
except ValueError:
@ -107,18 +116,17 @@ async def main():
if max_conc < 0:
raise ValueError("MAX_CONCURRENT must be >= 0")
# 7. Fetch mappings
# 4. Fetch mappings
logger.info("Fetching mappings from SQL dump…")
mappings = await fetch_mappings()
logger.info(f"Got {len(mappings)} wikibase_item mappings.")
# 8. Stream & split the XML dump
# 5. Stream & split the XML dump
logger.info("Processing XML dump…")
await process_dump(mappings, handler, max_conc)
await process_dump(mappings, handlers, max_conc)
# 5. Finish up
await handler.close()
# 6. Finish up
await asyncio.gather(*[handler.close() for handler in handlers])
logger.info("All done.")

View File

@ -9,13 +9,14 @@ class WikiDumpHandler(xml.sax.ContentHandler):
"""
SAX handler that, for each <page> whose <id> is in mappings,
collects the <text> and schedules an async task to parse
and write via the usersupplied handler.
and write via the usersupplied handler(s).
"""
def __init__(self, mappings, handler, max_concurrent):
def __init__(self, mappings, handlers, max_concurrent):
super().__init__()
self.mappings = mappings
self.handler = handler
# Support a single handler or a list of handlers
self.handlers = handlers
self.sem = (
asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None
)
@ -98,7 +99,11 @@ class WikiDumpHandler(xml.sax.ContentHandler):
parser = WikivoyageParser()
entry = parser.parse(text)
entry['properties']['title'] = title
await self.handler.write_entry(entry, uid)
# Write to all handlers concurrently
await asyncio.gather(*[
handler.write_entry(entry, uid) for handler in self.handlers
])
async def _bounded_process(self, text: str, uid: str, title: str):
# Only run N at once