switched to aiobotocore

This commit is contained in:
2025-09-29 12:03:39 +02:00
parent 0c28033a44
commit c9825ecf90
5 changed files with 552 additions and 437 deletions

View File

@@ -5,6 +5,7 @@ description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"aiobotocore>=2.24.2",
"aiofiles>=24.1.0", "aiofiles>=24.1.0",
"aiohttp>=3.11.16", "aiohttp>=3.11.16",
"asyncssh>=2.20.0", "asyncssh>=2.20.0",

View File

@@ -116,7 +116,7 @@ async def main():
handler_kwargs["max_concurrent"] = max_conc handler_kwargs["max_concurrent"] = max_conc
# Instantiate # Instantiate
handler = HandlerCls(**handler_kwargs) handler = await HandlerCls.create(**handler_kwargs)
handlers.append(handler) handlers.append(handler)
# 4. Fetch mappings # 4. Fetch mappings

View File

@@ -15,7 +15,9 @@ class BaseHandler(ABC):
_successful_writes = 0 _successful_writes = 0
_failed_writes = 0 _failed_writes = 0
def __init__(self, fail_on_error: bool = True, max_concurrent=0, **kwargs): @classmethod
@abstractmethod
async def create(cls, fail_on_error: bool = True, max_concurrent=0, **kwargs) -> "BaseHandler":
""" """
Initializes the BaseHandler with optional parameters. Initializes the BaseHandler with optional parameters.
@@ -25,10 +27,12 @@ class BaseHandler(ABC):
0 means unlimited concurrency. 0 means unlimited concurrency.
**kwargs: Additional keyword arguments for specific handler implementations. **kwargs: Additional keyword arguments for specific handler implementations.
""" """
self = cls(**kwargs)
self.fail_on_error = fail_on_error self.fail_on_error = fail_on_error
self.semaphore = None self.semaphore = None
if max_concurrent > 0: if max_concurrent > 0:
self.semaphore = asyncio.Semaphore(max_concurrent) self.semaphore = asyncio.Semaphore(max_concurrent)
return self
@abstractmethod @abstractmethod

View File

@@ -1,103 +1,77 @@
"""Handler that writes asynchronously.""" """Handler that writes asynchronously."""
from .base_handler import BaseHandler from .base_handler import BaseHandler
import json import json
from minio import Minio import aiobotocore
# making async calls to minio requires some wrapping from aiobotocore.session import get_session
import concurrent.futures from asyncio import TimeoutError
import asyncio from contextlib import AsyncExitStack
from io import BytesIO
import urllib3
class S3Handler(BaseHandler): class S3Handler(BaseHandler):
""" """
Handler that writes files to an S3 bucket asynchronously. Handler that writes files to an S3 bucket asynchronously.
""" """
def __init__(self, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs): @classmethod
async def create(cls, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs) -> "S3Handler":
""" """
Initializes the Handler with the specified S3 endpoint and bucket name. Initializes the Handler with the specified S3 endpoint and bucket name.
Args: Args:
**kwargs: Additional keyword arguments for the BaseHandler. **kwargs: Additional keyword arguments for the BaseHandler.
""" """
super().__init__(**kwargs) self = await super().create(**kwargs)
self.bucket_name = bucket_name self.bucket_name = bucket_name
# minio uses urllib3 so we need to set the connection pool limit according to max_concurrent self.session = get_session()
max_concurrent = kwargs.get("max_concurrent") self.exit_stack = AsyncExitStack()
# usually 0 is used to indicate no concurrence - in this setup that corresponds to a single worker
max_concurrent = max(1, max_concurrent)
http_client = urllib3.PoolManager(num_pools=max_concurrent) session = aiobotocore.session.AioSession()
self.client = await self.exit_stack.enter_async_context(
self.s3_client = Minio( session.create_client(
url, service_name = 's3',
access_key = access_key, # region_name='us-west-2',
secret_key = secret_key, aws_secret_access_key = secret_key,
secure = True, aws_access_key_id = access_key,
http_client = http_client endpoint_url = url,
)
) )
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) await self._ensure_bucket_exists()
self._ensure_bucket_exists() return self
def _ensure_bucket_exists(self): async def _ensure_bucket_exists(self):
""" """
Ensures that the specified S3 bucket exists, tries to create it if it does not. Ensures that the specified S3 bucket exists, but does not create it if it doesn't.
""" """
if not self.s3_client.bucket_exists(self.bucket_name): # this will raise an error if the bucket does not exist
try: await self.client.head_bucket(Bucket=self.bucket_name)
self.s3_client.make_bucket(self.bucket_name)
self.logger.info(f"Created bucket: {self.bucket_name}")
except Exception as e:
self.logger.error(f"Error creating bucket {self.bucket_name}: {e}")
raise
else:
self.logger.debug(f"Bucket {self.bucket_name} already exists.")
async def _write_entry(self, entry: dict, uid: str) -> bool: async def _write_entry(self, entry: dict, uid: str) -> bool:
""" """
Asynchronously writes a single entry to the bucket. Asynchronously writes a single entry to the bucket.
Args:
entry (dict): The entry to write (will be JSON-encoded).
uid (str): The unique identifier for the entry.
Returns:
bool: True if the entry was written successfully, False otherwise.
""" """
data = json.dumps(entry).encode('utf-8')
loop = asyncio.get_running_loop()
def sync_put():
# put requires an object that implements read
entry_json = json.dumps(entry).encode("utf-8")
size = len(entry_json) # size in bytes
entry_bytes = BytesIO(entry_json)
result = self.s3_client.put_object(
bucket_name = self.bucket_name,
object_name = f"{uid}.json",
data = entry_bytes,
length = size,
content_type = "application/json"
)
self.logger.debug(f"Got result {result}")
return result
# run the put operation in a thread pool to avoid blocking the event loop
try: try:
result = await loop.run_in_executor(self.executor, sync_put) response = await self.client.put_object(
if not result: Bucket=self.bucket_name,
raise Exception("Minio operation failed without exception.") Key=f"{uid}.json",
self.logger.debug(f"Successfully wrote entry with UID {uid} to bucket {self.bucket_name}.") Body=data
return True )
except Exception as e:
self.logger.error(f"Error writing entry with UID {uid} to bucket {self.bucket_name}: {e}") except TimeoutError:
self.logger.error(f"Timeout error while writing entry {uid} to bucket {self.bucket_name}.")
return False return False
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
self.logger.info(f"Successfully wrote entry {uid} to bucket {self.bucket_name}.")
return True
else:
self.logger.error(f"Failed to write entry {uid} to bucket {self.bucket_name}. Status code: {response['ResponseMetadata']['HTTPStatusCode']}")
return False
async def close(self): async def close(self):
self.executor.shutdown(wait=True) await self.client.close()
self.logger.info("Executor shut down.") await self._exit_stack.__aexit__(None, None, None)
await super().close()

854
uv.lock generated

File diff suppressed because it is too large Load Diff