From 012a57674e7efd1e84ec6409531e16075c171c3b Mon Sep 17 00:00:00 2001 From: Remy Moll Date: Wed, 15 Oct 2025 17:42:45 +0200 Subject: [PATCH] update the other handlers to reflect the changes --- src/output_handlers/base_handler.py | 13 ++++----- src/output_handlers/bunny_storage.py | 40 +++++++++++++++------------- src/output_handlers/filesystem.py | 14 ++++++---- src/output_handlers/s3.py | 17 +++++++----- 4 files changed, 48 insertions(+), 36 deletions(-) diff --git a/src/output_handlers/base_handler.py b/src/output_handlers/base_handler.py index ac8149f..a0ca83e 100644 --- a/src/output_handlers/base_handler.py +++ b/src/output_handlers/base_handler.py @@ -14,6 +14,8 @@ class BaseHandler(ABC): logger = logging.getLogger(__name__) _successful_writes = 0 _failed_writes = 0 + fail_on_error: bool + semaphore: asyncio.Semaphore = None @classmethod @abstractmethod @@ -27,13 +29,12 @@ class BaseHandler(ABC): 0 means unlimited concurrency. **kwargs: Additional keyword arguments for specific handler implementations. """ - self = cls(**kwargs) - self.fail_on_error = fail_on_error - self.semaphore = None + obj = cls(**kwargs) + obj.fail_on_error = fail_on_error if max_concurrent > 0: - self.semaphore = asyncio.Semaphore(max_concurrent) - self.logger.info(f"Handler initialized with fail_on_error={self.fail_on_error}, max_concurrent={max_concurrent}") - return self + obj.semaphore = asyncio.Semaphore(max_concurrent) + obj.logger.info(f"Handler initialized with fail_on_error={obj.fail_on_error}, max_concurrent={max_concurrent}") + return obj @abstractmethod diff --git a/src/output_handlers/bunny_storage.py b/src/output_handlers/bunny_storage.py index 3d033ed..824c027 100644 --- a/src/output_handlers/bunny_storage.py +++ b/src/output_handlers/bunny_storage.py @@ -3,38 +3,39 @@ import aiohttp from .base_handler import BaseHandler class BunnyStorageHandler(BaseHandler): - def __init__( - self, + + base_url: str + headers: dict + _session: aiohttp.ClientSession + _connector: aiohttp.TCPConnector + + @classmethod + async def create( + cls, region: str, base_path: str, api_key: str, - fail_on_error: bool = True, keepalive_timeout: int = 75, **kwargs, - ): - super().__init__(fail_on_error=fail_on_error, **kwargs) - self.base_url = f"https://{region}.bunnycdn.com/{base_path}" - self.headers = { + ) -> "BunnyStorageHandler": + obj = await super().create(**kwargs) + obj.base_url = f"https://{region}.bunnycdn.com/{base_path}" + obj.headers = { "AccessKey": api_key, "Content-Type": "application/json", "accept": "application/json", } - # initialized later, in a guaranteed async context - self._connector = None - self._session = None - self._keepalive_timeout = keepalive_timeout + # setup the aiohttp session and connector + obj._connector = aiohttp.TCPConnector( + # limit is implicitly set to 100 + keepalive_timeout = keepalive_timeout, + ) + obj._session = aiohttp.ClientSession(connector=obj._connector) + return obj - async def setup_connector(self): - if self._session is None: - self._connector = aiohttp.TCPConnector( - # limit is implicitly set to 100 - keepalive_timeout = self._keepalive_timeout, - ) - self._session = aiohttp.ClientSession(connector=self._connector) async def _write_entry(self, entry: dict, uid: str) -> bool: - await self.setup_connector() payload = json.dumps(entry).encode("utf-8") url = f"{self.base_url}/{uid}.json" @@ -50,6 +51,7 @@ class BunnyStorageHandler(BaseHandler): self.logger.exception(f"Exception while uploading UID={uid}") return False + async def close(self): await self._session.close() await self._connector.close() diff --git a/src/output_handlers/filesystem.py b/src/output_handlers/filesystem.py index 855348b..070c8ee 100644 --- a/src/output_handlers/filesystem.py +++ b/src/output_handlers/filesystem.py @@ -8,7 +8,10 @@ class FilesystemHandler(BaseHandler): """ Handler that writes files to the filesystem. """ - def __init__(self, output_dir: str, **kwargs): + output_dir: Path + + @classmethod + async def create(cls, output_dir: str, **kwargs) -> "FilesystemHandler": """ Initializes the FileSystemHandler with the specified output directory. @@ -16,11 +19,12 @@ class FilesystemHandler(BaseHandler): output_dir (str): The directory where files will be written. **kwargs: Additional keyword arguments for the BaseHandler. """ - super().__init__(**kwargs) - self.output_dir = Path(output_dir) + obj = await super().create(**kwargs) + obj.output_dir = Path(output_dir) # Ensure the target directory exists - self.output_dir.mkdir(parents=True, exist_ok=True) - self.logger.info(f"Output directory set to {self.output_dir}") + obj.output_dir.mkdir(parents=True, exist_ok=True) + obj.logger.info(f"Output directory set to {obj.output_dir}") + return obj async def _write_entry(self, entry: dict, uid: str) -> bool: diff --git a/src/output_handlers/s3.py b/src/output_handlers/s3.py index 6cc288a..238aaef 100644 --- a/src/output_handlers/s3.py +++ b/src/output_handlers/s3.py @@ -2,12 +2,17 @@ from .base_handler import BaseHandler import json from aiobotocore.session import AioSession +from aiobotocore.client import AioBaseClient from contextlib import AsyncExitStack class S3Handler(BaseHandler): """ Handler that writes files to an S3 bucket asynchronously. """ + bucket_name: str + client: AioBaseClient + exit_stack: AsyncExitStack + @classmethod async def create(cls, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs) -> "S3Handler": """ @@ -16,13 +21,13 @@ class S3Handler(BaseHandler): Args: **kwargs: Additional keyword arguments for the BaseHandler. """ - self = await super().create(**kwargs) - self.bucket_name = bucket_name + obj = await super().create(**kwargs) + obj.bucket_name = bucket_name - self.exit_stack = AsyncExitStack() + obj.exit_stack = AsyncExitStack() session = AioSession() - self.client = await self.exit_stack.enter_async_context( + obj.client = await obj.exit_stack.enter_async_context( session.create_client( service_name = 's3', # region_name='us-west-2', @@ -32,8 +37,8 @@ class S3Handler(BaseHandler): ) ) - await self._ensure_bucket_exists() - return self + await obj._ensure_bucket_exists() + return obj async def _ensure_bucket_exists(self):