mirror of
				https://github.com/bcye/structured-wikivoyage-exports.git
				synced 2025-10-31 07:02:44 +00:00 
			
		
		
		
	implement stability changes and slight cleanup
This commit is contained in:
		| @@ -52,3 +52,10 @@ class BaseHandler(ABC): | ||||
|             self.logger.error(f"Failed to write entry with UID {uid}") | ||||
|             if self.fail_on_error: | ||||
|                 raise Exception(f"Failed to write entry with UID {uid}") | ||||
|  | ||||
|  | ||||
|     async def close(self): | ||||
|         """ | ||||
|         Closes the handler. This method should be overridden by subclasses if they need to perform any cleanup operations. | ||||
|         """ | ||||
|         pass | ||||
|   | ||||
| @@ -12,27 +12,33 @@ class BunnyStorageHandler(BaseHandler): | ||||
|         keepalive_timeout: int = 75, | ||||
|     ): | ||||
|         super().__init__(fail_on_error=fail_on_error) | ||||
|         self.region = region | ||||
|         self.base_path = base_path | ||||
|         self.api_key = api_key | ||||
|  | ||||
|         # no explicit 'limit'; use the default (100) | ||||
|         self._connector = aiohttp.TCPConnector( | ||||
|             keepalive_timeout=keepalive_timeout | ||||
|         ) | ||||
|         self._session = aiohttp.ClientSession(connector=self._connector) | ||||
|  | ||||
|     async def _write_entry(self, entry: dict, uid: str) -> bool: | ||||
|         url = f"https://{self.region}.bunnycdn.com/{self.base_path}/{uid}.json" | ||||
|         headers = { | ||||
|             "AccessKey": self.api_key, | ||||
|         self.base_url = f"https://{region}.bunnycdn.com/{base_path}" | ||||
|         self.headers = { | ||||
|             "AccessKey": api_key, | ||||
|             "Content-Type": "application/json", | ||||
|             "accept": "application/json", | ||||
|         } | ||||
|  | ||||
|         # initialized later, in a guaranteed async context | ||||
|         self._connector = None | ||||
|         self._session = None | ||||
|         self._keepalive_timeout = keepalive_timeout | ||||
|  | ||||
|     async def setup_connector(self): | ||||
|         if self._session is None: | ||||
|             self._connector = aiohttp.TCPConnector( | ||||
|                 # limit is implicitly set to 100 | ||||
|                 keepalive_timeout = self._keepalive_timeout, | ||||
|             ) | ||||
|             self._session = aiohttp.ClientSession(connector=self._connector) | ||||
|  | ||||
|     async def _write_entry(self, entry: dict, uid: str) -> bool: | ||||
|         await self.setup_connector() | ||||
|         payload = json.dumps(entry).encode("utf-8") | ||||
|         url = f"{self.base_url}/{uid}.json" | ||||
|  | ||||
|         try: | ||||
|             async with self._session.put(url, data=payload, headers=headers) as resp: | ||||
|             async with self._session.put(url, data=payload, headers=self.headers) as resp: | ||||
|                 if resp.status in (200, 201, 204): | ||||
|                     return True | ||||
|                 body = await resp.text() | ||||
|   | ||||
| @@ -349,7 +349,6 @@ class WikivoyageParser: | ||||
|  | ||||
| async def process_file( | ||||
|     input_file: Path, | ||||
|     parser: WikivoyageParser, | ||||
|     handler, | ||||
| ) -> None: | ||||
|     """ | ||||
| @@ -358,6 +357,7 @@ async def process_file( | ||||
|     """ | ||||
|      | ||||
|     text = input_file.read_text(encoding="utf-8") | ||||
|     parser = WikivoyageParser() | ||||
|     entry = parser.parse(text)  # assume returns a dict | ||||
|     uid = input_file.stem | ||||
|  | ||||
| @@ -374,12 +374,15 @@ def gather_handler_kwargs(handler_name: str) -> dict: | ||||
|     for env_key, val in os.environ.items(): | ||||
|         if not env_key.startswith(prefix): | ||||
|             continue | ||||
|         param = env_key[len(prefix) :].lower() | ||||
|         # try to cast ints | ||||
|         param = env_key.replace(prefix, "").lower() | ||||
|         # cast ints | ||||
|         if val.isdigit(): | ||||
|             val = int(val) | ||||
|         # cast bools | ||||
|         elif val.lower() in ("true", "false"): | ||||
|             val = val.lower() == "true" | ||||
|         kwargs[param] = val | ||||
|  | ||||
|     print(f"Handler kwargs: {kwargs}") | ||||
|     return kwargs | ||||
|  | ||||
| async def main(): | ||||
| @@ -412,10 +415,7 @@ async def main(): | ||||
|     # 5. Instantiate | ||||
|     handler = HandlerCls(**handler_kwargs) | ||||
|  | ||||
|     # 6. Prepare parser | ||||
|     parser = WikivoyageParser() | ||||
|  | ||||
|     # 7. Which dir to walk? | ||||
|     # 6. Which dir to walk? | ||||
|     input_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".") | ||||
|     txt_files = list(input_dir.rglob("*.txt")) | ||||
|  | ||||
| @@ -423,7 +423,7 @@ async def main(): | ||||
|         print(f"No .txt files found under {input_dir}") | ||||
|         sys.exit(1) | ||||
|  | ||||
|     # 7) read concurrency setting | ||||
|     # 7. read concurrency setting | ||||
|     try: | ||||
|         max_conc = int(os.getenv("MAX_CONCURRENT", "0")) | ||||
|     except ValueError: | ||||
| @@ -434,11 +434,11 @@ async def main(): | ||||
|         print("Error: MAX_CONCURRENT must be >= 0") | ||||
|         sys.exit(1) | ||||
|  | ||||
|     # 8) schedule tasks | ||||
|     # 8. schedule tasks | ||||
|     if max_conc == 0: | ||||
|         # unbounded | ||||
|         tasks = [ | ||||
|             asyncio.create_task(process_file(txt, parser, handler)) | ||||
|             asyncio.create_task(process_file(txt, handler)) | ||||
|             for txt in txt_files | ||||
|         ] | ||||
|     else: | ||||
| @@ -447,22 +447,20 @@ async def main(): | ||||
|  | ||||
|         async def bounded(txt): | ||||
|             async with sem: | ||||
|                 return await process_file(txt, parser, handler) | ||||
|                 return await process_file(txt, handler) | ||||
|  | ||||
|         tasks = [ | ||||
|             asyncio.create_task(bounded(txt)) | ||||
|             for txt in txt_files | ||||
|         ] | ||||
|  | ||||
|     # 9) run them all | ||||
|     # 9. run them all | ||||
|     await asyncio.gather(*tasks) | ||||
|     await handler.close() | ||||
|  | ||||
|  | ||||
|     print("All done.") | ||||
|      | ||||
|     if hasattr(handler, "close"):  | ||||
|         await handler.close() | ||||
|  | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     asyncio.run(main()) | ||||
|     asyncio.run(main()) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user