update the other handlers to reflect the changes

This commit is contained in:
2025-10-15 17:42:45 +02:00
parent c48b326660
commit 012a57674e
4 changed files with 48 additions and 36 deletions

View File

@@ -14,6 +14,8 @@ class BaseHandler(ABC):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_successful_writes = 0 _successful_writes = 0
_failed_writes = 0 _failed_writes = 0
fail_on_error: bool
semaphore: asyncio.Semaphore = None
@classmethod @classmethod
@abstractmethod @abstractmethod
@@ -27,13 +29,12 @@ class BaseHandler(ABC):
0 means unlimited concurrency. 0 means unlimited concurrency.
**kwargs: Additional keyword arguments for specific handler implementations. **kwargs: Additional keyword arguments for specific handler implementations.
""" """
self = cls(**kwargs) obj = cls(**kwargs)
self.fail_on_error = fail_on_error obj.fail_on_error = fail_on_error
self.semaphore = None
if max_concurrent > 0: if max_concurrent > 0:
self.semaphore = asyncio.Semaphore(max_concurrent) obj.semaphore = asyncio.Semaphore(max_concurrent)
self.logger.info(f"Handler initialized with fail_on_error={self.fail_on_error}, max_concurrent={max_concurrent}") obj.logger.info(f"Handler initialized with fail_on_error={obj.fail_on_error}, max_concurrent={max_concurrent}")
return self return obj
@abstractmethod @abstractmethod

View File

@@ -3,38 +3,39 @@ import aiohttp
from .base_handler import BaseHandler from .base_handler import BaseHandler
class BunnyStorageHandler(BaseHandler): class BunnyStorageHandler(BaseHandler):
def __init__(
self, base_url: str
headers: dict
_session: aiohttp.ClientSession
_connector: aiohttp.TCPConnector
@classmethod
async def create(
cls,
region: str, region: str,
base_path: str, base_path: str,
api_key: str, api_key: str,
fail_on_error: bool = True,
keepalive_timeout: int = 75, keepalive_timeout: int = 75,
**kwargs, **kwargs,
): ) -> "BunnyStorageHandler":
super().__init__(fail_on_error=fail_on_error, **kwargs) obj = await super().create(**kwargs)
self.base_url = f"https://{region}.bunnycdn.com/{base_path}" obj.base_url = f"https://{region}.bunnycdn.com/{base_path}"
self.headers = { obj.headers = {
"AccessKey": api_key, "AccessKey": api_key,
"Content-Type": "application/json", "Content-Type": "application/json",
"accept": "application/json", "accept": "application/json",
} }
# initialized later, in a guaranteed async context # setup the aiohttp session and connector
self._connector = None obj._connector = aiohttp.TCPConnector(
self._session = None # limit is implicitly set to 100
self._keepalive_timeout = keepalive_timeout keepalive_timeout = keepalive_timeout,
)
obj._session = aiohttp.ClientSession(connector=obj._connector)
return obj
async def setup_connector(self):
if self._session is None:
self._connector = aiohttp.TCPConnector(
# limit is implicitly set to 100
keepalive_timeout = self._keepalive_timeout,
)
self._session = aiohttp.ClientSession(connector=self._connector)
async def _write_entry(self, entry: dict, uid: str) -> bool: async def _write_entry(self, entry: dict, uid: str) -> bool:
await self.setup_connector()
payload = json.dumps(entry).encode("utf-8") payload = json.dumps(entry).encode("utf-8")
url = f"{self.base_url}/{uid}.json" url = f"{self.base_url}/{uid}.json"
@@ -50,6 +51,7 @@ class BunnyStorageHandler(BaseHandler):
self.logger.exception(f"Exception while uploading UID={uid}") self.logger.exception(f"Exception while uploading UID={uid}")
return False return False
async def close(self): async def close(self):
await self._session.close() await self._session.close()
await self._connector.close() await self._connector.close()

View File

@@ -8,7 +8,10 @@ class FilesystemHandler(BaseHandler):
""" """
Handler that writes files to the filesystem. Handler that writes files to the filesystem.
""" """
def __init__(self, output_dir: str, **kwargs): output_dir: Path
@classmethod
async def create(cls, output_dir: str, **kwargs) -> "FilesystemHandler":
""" """
Initializes the FileSystemHandler with the specified output directory. Initializes the FileSystemHandler with the specified output directory.
@@ -16,11 +19,12 @@ class FilesystemHandler(BaseHandler):
output_dir (str): The directory where files will be written. output_dir (str): The directory where files will be written.
**kwargs: Additional keyword arguments for the BaseHandler. **kwargs: Additional keyword arguments for the BaseHandler.
""" """
super().__init__(**kwargs) obj = await super().create(**kwargs)
self.output_dir = Path(output_dir) obj.output_dir = Path(output_dir)
# Ensure the target directory exists # Ensure the target directory exists
self.output_dir.mkdir(parents=True, exist_ok=True) obj.output_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Output directory set to {self.output_dir}") obj.logger.info(f"Output directory set to {obj.output_dir}")
return obj
async def _write_entry(self, entry: dict, uid: str) -> bool: async def _write_entry(self, entry: dict, uid: str) -> bool:

View File

@@ -2,12 +2,17 @@
from .base_handler import BaseHandler from .base_handler import BaseHandler
import json import json
from aiobotocore.session import AioSession from aiobotocore.session import AioSession
from aiobotocore.client import AioBaseClient
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
class S3Handler(BaseHandler): class S3Handler(BaseHandler):
""" """
Handler that writes files to an S3 bucket asynchronously. Handler that writes files to an S3 bucket asynchronously.
""" """
bucket_name: str
client: AioBaseClient
exit_stack: AsyncExitStack
@classmethod @classmethod
async def create(cls, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs) -> "S3Handler": async def create(cls, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs) -> "S3Handler":
""" """
@@ -16,13 +21,13 @@ class S3Handler(BaseHandler):
Args: Args:
**kwargs: Additional keyword arguments for the BaseHandler. **kwargs: Additional keyword arguments for the BaseHandler.
""" """
self = await super().create(**kwargs) obj = await super().create(**kwargs)
self.bucket_name = bucket_name obj.bucket_name = bucket_name
self.exit_stack = AsyncExitStack() obj.exit_stack = AsyncExitStack()
session = AioSession() session = AioSession()
self.client = await self.exit_stack.enter_async_context( obj.client = await obj.exit_stack.enter_async_context(
session.create_client( session.create_client(
service_name = 's3', service_name = 's3',
# region_name='us-west-2', # region_name='us-west-2',
@@ -32,8 +37,8 @@ class S3Handler(BaseHandler):
) )
) )
await self._ensure_bucket_exists() await obj._ensure_bucket_exists()
return self return obj
async def _ensure_bucket_exists(self): async def _ensure_bucket_exists(self):