From 0ada91a8ce7734bfccf71ebbe563ac024aae22e8 Mon Sep 17 00:00:00 2001 From: Remy Moll Date: Wed, 23 Apr 2025 14:01:45 +0200 Subject: [PATCH] implement stability changes and slight cleanup --- output_handlers/base_handler.py | 7 +++++++ output_handlers/bunny_storage.py | 36 +++++++++++++++++++------------- transform-documents.py | 34 ++++++++++++++---------------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/output_handlers/base_handler.py b/output_handlers/base_handler.py index 3574dac..67d4886 100644 --- a/output_handlers/base_handler.py +++ b/output_handlers/base_handler.py @@ -52,3 +52,10 @@ class BaseHandler(ABC): self.logger.error(f"Failed to write entry with UID {uid}") if self.fail_on_error: raise Exception(f"Failed to write entry with UID {uid}") + + + async def close(self): + """ + Closes the handler. This method should be overridden by subclasses if they need to perform any cleanup operations. + """ + pass diff --git a/output_handlers/bunny_storage.py b/output_handlers/bunny_storage.py index 75c3dd3..553fbbb 100644 --- a/output_handlers/bunny_storage.py +++ b/output_handlers/bunny_storage.py @@ -12,27 +12,33 @@ class BunnyStorageHandler(BaseHandler): keepalive_timeout: int = 75, ): super().__init__(fail_on_error=fail_on_error) - self.region = region - self.base_path = base_path - self.api_key = api_key - - # no explicit 'limit'; use the default (100) - self._connector = aiohttp.TCPConnector( - keepalive_timeout=keepalive_timeout - ) - self._session = aiohttp.ClientSession(connector=self._connector) - - async def _write_entry(self, entry: dict, uid: str) -> bool: - url = f"https://{self.region}.bunnycdn.com/{self.base_path}/{uid}.json" - headers = { - "AccessKey": self.api_key, + self.base_url = f"https://{region}.bunnycdn.com/{base_path}" + self.headers = { + "AccessKey": api_key, "Content-Type": "application/json", "accept": "application/json", } + + # initialized later, in a guaranteed async context + self._connector = None + self._session = None + self._keepalive_timeout = keepalive_timeout + + async def setup_connector(self): + if self._session is None: + self._connector = aiohttp.TCPConnector( + # limit is implicitly set to 100 + keepalive_timeout = self._keepalive_timeout, + ) + self._session = aiohttp.ClientSession(connector=self._connector) + + async def _write_entry(self, entry: dict, uid: str) -> bool: + await self.setup_connector() payload = json.dumps(entry).encode("utf-8") + url = f"{self.base_url}/{uid}.json" try: - async with self._session.put(url, data=payload, headers=headers) as resp: + async with self._session.put(url, data=payload, headers=self.headers) as resp: if resp.status in (200, 201, 204): return True body = await resp.text() diff --git a/transform-documents.py b/transform-documents.py index 3e7a51d..1a5a9bc 100644 --- a/transform-documents.py +++ b/transform-documents.py @@ -349,7 +349,6 @@ class WikivoyageParser: async def process_file( input_file: Path, - parser: WikivoyageParser, handler, ) -> None: """ @@ -358,6 +357,7 @@ async def process_file( """ text = input_file.read_text(encoding="utf-8") + parser = WikivoyageParser() entry = parser.parse(text) # assume returns a dict uid = input_file.stem @@ -374,12 +374,15 @@ def gather_handler_kwargs(handler_name: str) -> dict: for env_key, val in os.environ.items(): if not env_key.startswith(prefix): continue - param = env_key[len(prefix) :].lower() - # try to cast ints + param = env_key.replace(prefix, "").lower() + # cast ints if val.isdigit(): val = int(val) + # cast bools + elif val.lower() in ("true", "false"): + val = val.lower() == "true" kwargs[param] = val - + print(f"Handler kwargs: {kwargs}") return kwargs async def main(): @@ -412,10 +415,7 @@ async def main(): # 5. Instantiate handler = HandlerCls(**handler_kwargs) - # 6. Prepare parser - parser = WikivoyageParser() - - # 7. Which dir to walk? + # 6. Which dir to walk? input_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".") txt_files = list(input_dir.rglob("*.txt")) @@ -423,7 +423,7 @@ async def main(): print(f"No .txt files found under {input_dir}") sys.exit(1) - # 7) read concurrency setting + # 7. read concurrency setting try: max_conc = int(os.getenv("MAX_CONCURRENT", "0")) except ValueError: @@ -434,11 +434,11 @@ async def main(): print("Error: MAX_CONCURRENT must be >= 0") sys.exit(1) - # 8) schedule tasks + # 8. schedule tasks if max_conc == 0: # unbounded tasks = [ - asyncio.create_task(process_file(txt, parser, handler)) + asyncio.create_task(process_file(txt, handler)) for txt in txt_files ] else: @@ -447,22 +447,20 @@ async def main(): async def bounded(txt): async with sem: - return await process_file(txt, parser, handler) + return await process_file(txt, handler) tasks = [ asyncio.create_task(bounded(txt)) for txt in txt_files ] - # 9) run them all + # 9. run them all await asyncio.gather(*tasks) + await handler.close() + print("All done.") - - if hasattr(handler, "close"): - await handler.close() - if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main())