From 0c28033a449ec11233dc8d7f693f779475bd4e38 Mon Sep 17 00:00:00 2001 From: Remy Moll Date: Mon, 29 Sep 2025 00:14:08 +0200 Subject: [PATCH] tried some home-made async --- src/output_handlers/s3.py | 51 +++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/src/output_handlers/s3.py b/src/output_handlers/s3.py index 196613c..a2b268a 100644 --- a/src/output_handlers/s3.py +++ b/src/output_handlers/s3.py @@ -5,13 +5,15 @@ from minio import Minio # making async calls to minio requires some wrapping import concurrent.futures import asyncio +from io import BytesIO +import urllib3 class S3Handler(BaseHandler): """ Handler that writes files to an S3 bucket asynchronously. """ - def __init__(self, s3_url: str, s3_access_key: str, s3_secret_key: str, s3_bucket_name: str, **kwargs): + def __init__(self, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs): """ Initializes the Handler with the specified S3 endpoint and bucket name. @@ -20,15 +22,24 @@ class S3Handler(BaseHandler): """ super().__init__(**kwargs) - self.s3_client = Minio( - s3_url, - access_key=s3_access_key, - secret_key=s3_secret_key, - secure=True - ) - self.bucket_name = s3_bucket_name + self.bucket_name = bucket_name - self.executor = concurrent.futures.ThreadPoolExecutor() + # minio uses urllib3 so we need to set the connection pool limit according to max_concurrent + max_concurrent = kwargs.get("max_concurrent") + # usually 0 is used to indicate no concurrence - in this setup that corresponds to a single worker + max_concurrent = max(1, max_concurrent) + + http_client = urllib3.PoolManager(num_pools=max_concurrent) + + self.s3_client = Minio( + url, + access_key = access_key, + secret_key = secret_key, + secure = True, + http_client = http_client + ) + + self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) self._ensure_bucket_exists() @@ -61,18 +72,22 @@ class S3Handler(BaseHandler): loop = asyncio.get_running_loop() - def sync_put(): - entry_json = json.dumps(entry).encode('utf-8') - self.s3_client.put_object( + # put requires an object that implements read + entry_json = json.dumps(entry).encode("utf-8") + size = len(entry_json) # size in bytes + entry_bytes = BytesIO(entry_json) + result = self.s3_client.put_object( bucket_name = self.bucket_name, object_name = f"{uid}.json", - data = entry_json, - length = len(entry_json), - content_type = 'application/json' + data = entry_bytes, + length = size, + content_type = "application/json" ) + self.logger.debug(f"Got result {result}") + return result - # rub the put operation in a thread pool to avoid blocking the event loop + # run the put operation in a thread pool to avoid blocking the event loop try: result = await loop.run_in_executor(self.executor, sync_put) if not result: @@ -82,3 +97,7 @@ class S3Handler(BaseHandler): except Exception as e: self.logger.error(f"Error writing entry with UID {uid} to bucket {self.bucket_name}: {e}") return False + + async def close(self): + self.executor.shutdown(wait=True) + self.logger.info("Executor shut down.")