mirror of
https://github.com/bcye/structured-wikivoyage-exports.git
synced 2025-11-01 07:32:45 +00:00
tried some home-made async
This commit is contained in:
@@ -5,13 +5,15 @@ from minio import Minio
|
|||||||
# making async calls to minio requires some wrapping
|
# making async calls to minio requires some wrapping
|
||||||
import concurrent.futures
|
import concurrent.futures
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from io import BytesIO
|
||||||
|
import urllib3
|
||||||
|
|
||||||
|
|
||||||
class S3Handler(BaseHandler):
|
class S3Handler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
Handler that writes files to an S3 bucket asynchronously.
|
Handler that writes files to an S3 bucket asynchronously.
|
||||||
"""
|
"""
|
||||||
def __init__(self, s3_url: str, s3_access_key: str, s3_secret_key: str, s3_bucket_name: str, **kwargs):
|
def __init__(self, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs):
|
||||||
"""
|
"""
|
||||||
Initializes the Handler with the specified S3 endpoint and bucket name.
|
Initializes the Handler with the specified S3 endpoint and bucket name.
|
||||||
|
|
||||||
@@ -20,15 +22,24 @@ class S3Handler(BaseHandler):
|
|||||||
"""
|
"""
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
self.s3_client = Minio(
|
self.bucket_name = bucket_name
|
||||||
s3_url,
|
|
||||||
access_key=s3_access_key,
|
|
||||||
secret_key=s3_secret_key,
|
|
||||||
secure=True
|
|
||||||
)
|
|
||||||
self.bucket_name = s3_bucket_name
|
|
||||||
|
|
||||||
self.executor = concurrent.futures.ThreadPoolExecutor()
|
# minio uses urllib3 so we need to set the connection pool limit according to max_concurrent
|
||||||
|
max_concurrent = kwargs.get("max_concurrent")
|
||||||
|
# usually 0 is used to indicate no concurrence - in this setup that corresponds to a single worker
|
||||||
|
max_concurrent = max(1, max_concurrent)
|
||||||
|
|
||||||
|
http_client = urllib3.PoolManager(num_pools=max_concurrent)
|
||||||
|
|
||||||
|
self.s3_client = Minio(
|
||||||
|
url,
|
||||||
|
access_key = access_key,
|
||||||
|
secret_key = secret_key,
|
||||||
|
secure = True,
|
||||||
|
http_client = http_client
|
||||||
|
)
|
||||||
|
|
||||||
|
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent)
|
||||||
self._ensure_bucket_exists()
|
self._ensure_bucket_exists()
|
||||||
|
|
||||||
|
|
||||||
@@ -61,18 +72,22 @@ class S3Handler(BaseHandler):
|
|||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
|
||||||
def sync_put():
|
def sync_put():
|
||||||
entry_json = json.dumps(entry).encode('utf-8')
|
# put requires an object that implements read
|
||||||
self.s3_client.put_object(
|
entry_json = json.dumps(entry).encode("utf-8")
|
||||||
|
size = len(entry_json) # size in bytes
|
||||||
|
entry_bytes = BytesIO(entry_json)
|
||||||
|
result = self.s3_client.put_object(
|
||||||
bucket_name = self.bucket_name,
|
bucket_name = self.bucket_name,
|
||||||
object_name = f"{uid}.json",
|
object_name = f"{uid}.json",
|
||||||
data = entry_json,
|
data = entry_bytes,
|
||||||
length = len(entry_json),
|
length = size,
|
||||||
content_type = 'application/json'
|
content_type = "application/json"
|
||||||
)
|
)
|
||||||
|
self.logger.debug(f"Got result {result}")
|
||||||
|
return result
|
||||||
|
|
||||||
# rub the put operation in a thread pool to avoid blocking the event loop
|
# run the put operation in a thread pool to avoid blocking the event loop
|
||||||
try:
|
try:
|
||||||
result = await loop.run_in_executor(self.executor, sync_put)
|
result = await loop.run_in_executor(self.executor, sync_put)
|
||||||
if not result:
|
if not result:
|
||||||
@@ -82,3 +97,7 @@ class S3Handler(BaseHandler):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error writing entry with UID {uid} to bucket {self.bucket_name}: {e}")
|
self.logger.error(f"Error writing entry with UID {uid} to bucket {self.bucket_name}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
self.executor.shutdown(wait=True)
|
||||||
|
self.logger.info("Executor shut down.")
|
||||||
|
|||||||
Reference in New Issue
Block a user