From 60c13fb9eca2a3f9169d2507ed80a39678ff8c7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruce=20R=C3=B6ttgers?= Date: Fri, 16 May 2025 20:27:59 +0200 Subject: [PATCH 1/3] support multiple handlers --- main.py | 70 +++++++++++++++++-------------- transformers/wiki_dump_handler.py | 13 ++++-- 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/main.py b/main.py index 3347b7a..3394e02 100644 --- a/main.py +++ b/main.py @@ -40,7 +40,7 @@ def gather_handler_kwargs(handler_name: str) -> dict: async def process_dump( - mappings: dict[str, str], handler, max_concurrent: int + mappings: dict[str, str], handlers, max_concurrent: int ): """ Stream-download the bzip2-compressed XML dump and feed to SAX. @@ -52,7 +52,7 @@ async def process_dump( ) decomp = bz2.BZ2Decompressor() sax_parser = xml.sax.make_parser() - dump_handler = WikiDumpHandler(mappings, handler, max_concurrent) + dump_handler = WikiDumpHandler(mappings, handlers, max_concurrent) sax_parser.setContentHandler(dump_handler) async with aiohttp.ClientSession() as session: @@ -69,36 +69,45 @@ async def process_dump( await asyncio.gather(*dump_handler.tasks) async def main(): - # 1. Which handler to load? - handler_name = os.getenv("HANDLER") - if not handler_name: - logger.error("Error: set ENV HANDLER (e.g. 'filesystem')") + # 1. Which handler(s) to load? + handler_names = os.getenv("HANDLER", "").split(",") + if not handler_names or not handler_names[0]: + logger.error("Error: set ENV HANDLER (e.g. 'filesystem' or 'filesystem,sftp')") sys.exit(1) - # 2. Dynamic import - module_path = f"output_handlers.{handler_name}" - try: - mod = importlib.import_module(module_path) - except ImportError as e: - logger.error(f"Error loading handler module {module_path}: {e}") - sys.exit(1) + handlers = [] + + # 2. Load each handler + for handler_name in handler_names: + handler_name = handler_name.strip() + if not handler_name: + continue + + # Dynamic import + module_path = f"output_handlers.{handler_name}" + try: + mod = importlib.import_module(module_path) + except ImportError as e: + logger.error(f"Error loading handler module {module_path}: {e}") + sys.exit(1) - # 3. Find the class: e.g. "sftp" → "SftpHandler" - class_name = handler_name.title().replace("_", "") + "Handler" - if not hasattr(mod, class_name): - logger.error(f"{module_path} defines no class {class_name}") - sys.exit(1) - HandlerCls = getattr(mod, class_name) + # Find the class: e.g. "sftp" → "SftpHandler" + class_name = handler_name.title().replace("_", "") + "Handler" + if not hasattr(mod, class_name): + logger.error(f"{module_path} defines no class {class_name}") + sys.exit(1) + HandlerCls = getattr(mod, class_name) - logger.info(f"Using handler from {module_path}") + logger.info(f"Using handler from {module_path}") - # 4. Build kwargs from ENV - handler_kwargs = gather_handler_kwargs(handler_name) + # Build kwargs from ENV + handler_kwargs = gather_handler_kwargs(handler_name) - # 5. Instantiate - handler = HandlerCls(**handler_kwargs) + # Instantiate + handler = HandlerCls(**handler_kwargs) + handlers.append(handler) - # 6. read concurrency setting + # 3. read concurrency setting try: max_conc = int(os.getenv("MAX_CONCURRENT", "0")) except ValueError: @@ -107,18 +116,17 @@ async def main(): if max_conc < 0: raise ValueError("MAX_CONCURRENT must be >= 0") - - # 7. Fetch mappings + # 4. Fetch mappings logger.info("Fetching mappings from SQL dump…") mappings = await fetch_mappings() logger.info(f"Got {len(mappings)} wikibase_item mappings.") - # 8. Stream & split the XML dump + # 5. Stream & split the XML dump logger.info("Processing XML dump…") - await process_dump(mappings, handler, max_conc) + await process_dump(mappings, handlers, max_conc) - # 5. Finish up - await handler.close() + # 6. Finish up + await asyncio.gather(*[handler.close() for handler in handlers]) logger.info("All done.") diff --git a/transformers/wiki_dump_handler.py b/transformers/wiki_dump_handler.py index 1528227..48a1c7e 100644 --- a/transformers/wiki_dump_handler.py +++ b/transformers/wiki_dump_handler.py @@ -9,13 +9,14 @@ class WikiDumpHandler(xml.sax.ContentHandler): """ SAX handler that, for each whose is in mappings, collects the and schedules an async task to parse - and write via the user‐supplied handler. + and write via the user‐supplied handler(s). """ - def __init__(self, mappings, handler, max_concurrent): + def __init__(self, mappings, handlers, max_concurrent): super().__init__() self.mappings = mappings - self.handler = handler + # Support a single handler or a list of handlers + self.handlers = handlers self.sem = ( asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None ) @@ -98,7 +99,11 @@ class WikiDumpHandler(xml.sax.ContentHandler): parser = WikivoyageParser() entry = parser.parse(text) entry['properties']['title'] = title - await self.handler.write_entry(entry, uid) + + # Write to all handlers concurrently + await asyncio.gather(*[ + handler.write_entry(entry, uid) for handler in self.handlers + ]) async def _bounded_process(self, text: str, uid: str, title: str): # Only run N at once From 5031f33ea2615afba44f7139e1fff2229513d205 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruce=20R=C3=B6ttgers?= Date: Fri, 16 May 2025 20:32:21 +0200 Subject: [PATCH 2/3] move semaphore to handler level --- main.py | 29 ++++++++++++++++------------- output_handlers/base_handler.py | 14 ++++++++++++-- transformers/wiki_dump_handler.py | 15 ++------------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/main.py b/main.py index 3394e02..5327258 100644 --- a/main.py +++ b/main.py @@ -40,7 +40,7 @@ def gather_handler_kwargs(handler_name: str) -> dict: async def process_dump( - mappings: dict[str, str], handlers, max_concurrent: int + mappings: dict[str, str], handlers ): """ Stream-download the bzip2-compressed XML dump and feed to SAX. @@ -52,7 +52,7 @@ async def process_dump( ) decomp = bz2.BZ2Decompressor() sax_parser = xml.sax.make_parser() - dump_handler = WikiDumpHandler(mappings, handlers, max_concurrent) + dump_handler = WikiDumpHandler(mappings, handlers) sax_parser.setContentHandler(dump_handler) async with aiohttp.ClientSession() as session: @@ -75,9 +75,18 @@ async def main(): logger.error("Error: set ENV HANDLER (e.g. 'filesystem' or 'filesystem,sftp')") sys.exit(1) + # 2. Read concurrency setting + try: + max_conc = int(os.getenv("MAX_CONCURRENT", "0")) + except ValueError: + raise ValueError("MAX_CONCURRENT must be an integer") + + if max_conc < 0: + raise ValueError("MAX_CONCURRENT must be >= 0") + handlers = [] - # 2. Load each handler + # 3. Load each handler for handler_name in handler_names: handler_name = handler_name.strip() if not handler_name: @@ -102,20 +111,14 @@ async def main(): # Build kwargs from ENV handler_kwargs = gather_handler_kwargs(handler_name) + + # Add max_concurrent to kwargs + handler_kwargs["max_concurrent"] = max_conc # Instantiate handler = HandlerCls(**handler_kwargs) handlers.append(handler) - # 3. read concurrency setting - try: - max_conc = int(os.getenv("MAX_CONCURRENT", "0")) - except ValueError: - raise ValueError("MAX_CONCURRENT must be an integer") - - if max_conc < 0: - raise ValueError("MAX_CONCURRENT must be >= 0") - # 4. Fetch mappings logger.info("Fetching mappings from SQL dump…") mappings = await fetch_mappings() @@ -123,7 +126,7 @@ async def main(): # 5. Stream & split the XML dump logger.info("Processing XML dump…") - await process_dump(mappings, handlers, max_conc) + await process_dump(mappings, handlers) # Pass 0 as max_concurrent since handlers handle it # 6. Finish up await asyncio.gather(*[handler.close() for handler in handlers]) diff --git a/output_handlers/base_handler.py b/output_handlers/base_handler.py index bd8f20b..706b085 100644 --- a/output_handlers/base_handler.py +++ b/output_handlers/base_handler.py @@ -1,6 +1,7 @@ """Reference handler for output handlers.""" from abc import ABC, abstractmethod import logging +import asyncio @@ -14,15 +15,20 @@ class BaseHandler(ABC): _successful_writes = 0 _failed_writes = 0 - def __init__(self, fail_on_error: bool = True, **kwargs): + def __init__(self, fail_on_error: bool = True, max_concurrent=0, **kwargs): """ Initializes the BaseHandler with optional parameters. Args: fail_on_error (bool): If True, the handler will raise an exception on error. Defaults to True. + max_concurrent: Maximum number of concurrent write operations. + 0 means unlimited concurrency. **kwargs: Additional keyword arguments for specific handler implementations. """ self.fail_on_error = fail_on_error + self.semaphore = None + if max_concurrent > 0: + self.semaphore = asyncio.Semaphore(max_concurrent) @abstractmethod @@ -47,7 +53,11 @@ class BaseHandler(ABC): entry (dict): The entry to write (will be JSON-encoded). uid (str): The unique identifier for the entry. The default id provided by wikivoyage is recommended. """ - success = await self._write_entry(entry, uid) + if self.semaphore: + async with self.semaphore: + success = await self._write_entry(entry, uid) + else: + success = await self._write_entry(entry, uid) if success: self.logger.debug(f"Successfully wrote entry with UID {uid}") self._successful_writes += 1 diff --git a/transformers/wiki_dump_handler.py b/transformers/wiki_dump_handler.py index 48a1c7e..c566f44 100644 --- a/transformers/wiki_dump_handler.py +++ b/transformers/wiki_dump_handler.py @@ -12,14 +12,11 @@ class WikiDumpHandler(xml.sax.ContentHandler): and write via the user‐supplied handler(s). """ - def __init__(self, mappings, handlers, max_concurrent): + def __init__(self, mappings, handlers): super().__init__() self.mappings = mappings # Support a single handler or a list of handlers self.handlers = handlers - self.sem = ( - asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None - ) self.tasks: list[asyncio.Task] = [] self.currentTag: str | None = None @@ -55,10 +52,7 @@ class WikiDumpHandler(xml.sax.ContentHandler): title = self.currentTitle logger.debug(f"scheduled {wd_id} for handling") # schedule processing - if self.sem: - task = asyncio.create_task(self._bounded_process(text, wd_id, title)) - else: - task = asyncio.create_task(self._process(text, wd_id, title)) + task = asyncio.create_task(self._process(text, wd_id, title)) self.tasks.append(task) else: logger.debug(f"page {pid} without wikidata id, skipping...") @@ -104,8 +98,3 @@ class WikiDumpHandler(xml.sax.ContentHandler): await asyncio.gather(*[ handler.write_entry(entry, uid) for handler in self.handlers ]) - - async def _bounded_process(self, text: str, uid: str, title: str): - # Only run N at once - async with self.sem: - await self._process(text, uid, title) From be28fddeb53e97da9ac023ce33b9f8c8a87bb01f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruce=20R=C3=B6ttgers?= Date: Fri, 16 May 2025 20:42:31 +0200 Subject: [PATCH 3/3] accept kwargs to forward max conc --- output_handlers/bunny_storage.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/output_handlers/bunny_storage.py b/output_handlers/bunny_storage.py index 5958d9f..3d033ed 100644 --- a/output_handlers/bunny_storage.py +++ b/output_handlers/bunny_storage.py @@ -10,8 +10,9 @@ class BunnyStorageHandler(BaseHandler): api_key: str, fail_on_error: bool = True, keepalive_timeout: int = 75, + **kwargs, ): - super().__init__(fail_on_error=fail_on_error) + super().__init__(fail_on_error=fail_on_error, **kwargs) self.base_url = f"https://{region}.bunnycdn.com/{base_path}" self.headers = { "AccessKey": api_key,