Merge pull request #40 from bcye/feature/s3-handler

Feature: S3 handler
This commit is contained in:
Bruce
2025-10-20 23:02:22 +02:00
committed by GitHub
8 changed files with 623 additions and 397 deletions

View File

@@ -5,6 +5,7 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"aiobotocore>=2.24.2",
"aiofiles>=24.1.0",
"aiohttp>=3.11.16",
"asyncssh>=2.20.0",

View File

@@ -54,8 +54,8 @@ async def process_dump(
sax_parser = xml.sax.make_parser()
dump_handler = WikiDumpHandler(mappings, handlers)
sax_parser.setContentHandler(dump_handler)
async with aiohttp.ClientSession() as session:
timeout = aiohttp.ClientTimeout(total = 5000)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(xml_url) as resp:
resp.raise_for_status()
async for chunk in resp.content.iter_chunked(1024 * 1024):
@@ -83,15 +83,15 @@ async def main():
if max_conc < 0:
raise ValueError("MAX_CONCURRENT must be >= 0")
handlers = []
# 3. Load each handler
for handler_name in handler_names:
handler_name = handler_name.strip()
if not handler_name:
continue
# Dynamic import
module_path = f"output_handlers.{handler_name}"
try:
@@ -111,12 +111,12 @@ async def main():
# Build kwargs from ENV
handler_kwargs = gather_handler_kwargs(handler_name)
# Add max_concurrent to kwargs
handler_kwargs["max_concurrent"] = max_conc
# Instantiate
handler = HandlerCls(**handler_kwargs)
handler = await HandlerCls.create(**handler_kwargs)
handlers.append(handler)
# 4. Fetch mappings

View File

@@ -14,8 +14,12 @@ class BaseHandler(ABC):
logger = logging.getLogger(__name__)
_successful_writes = 0
_failed_writes = 0
fail_on_error: bool
semaphore: asyncio.Semaphore = None
def __init__(self, fail_on_error: bool = True, max_concurrent=0, **kwargs):
@classmethod
@abstractmethod
async def create(cls, fail_on_error: bool = True, max_concurrent=0, **kwargs) -> "BaseHandler":
"""
Initializes the BaseHandler with optional parameters.
@@ -25,10 +29,12 @@ class BaseHandler(ABC):
0 means unlimited concurrency.
**kwargs: Additional keyword arguments for specific handler implementations.
"""
self.fail_on_error = fail_on_error
self.semaphore = None
obj = cls(**kwargs)
obj.fail_on_error = fail_on_error
if max_concurrent > 0:
self.semaphore = asyncio.Semaphore(max_concurrent)
obj.semaphore = asyncio.Semaphore(max_concurrent)
obj.logger.info(f"Handler initialized with fail_on_error={obj.fail_on_error}, max_concurrent={max_concurrent}")
return obj
@abstractmethod
@@ -38,7 +44,7 @@ class BaseHandler(ABC):
Args:
entry (dict): The entry to write (will be JSON-encoded).
uid (str): The unique identifier for the entry. The default id provided by wikivoyage is recommended.
uid (str): The unique identifier for the entry. The default id provided by wikivoyage is recommended.
Returns:
bool: True if the entry was written successfully, False otherwise.
"""
@@ -51,7 +57,7 @@ class BaseHandler(ABC):
Args:
entry (dict): The entry to write (will be JSON-encoded).
uid (str): The unique identifier for the entry. The default id provided by wikivoyage is recommended.
uid (str): The unique identifier for the entry. The default id provided by wikivoyage is recommended.
"""
if self.semaphore:
async with self.semaphore:

View File

@@ -3,38 +3,39 @@ import aiohttp
from .base_handler import BaseHandler
class BunnyStorageHandler(BaseHandler):
def __init__(
self,
base_url: str
headers: dict
_session: aiohttp.ClientSession
_connector: aiohttp.TCPConnector
@classmethod
async def create(
cls,
region: str,
base_path: str,
api_key: str,
fail_on_error: bool = True,
keepalive_timeout: int = 75,
**kwargs,
):
super().__init__(fail_on_error=fail_on_error, **kwargs)
self.base_url = f"https://{region}.bunnycdn.com/{base_path}"
self.headers = {
) -> "BunnyStorageHandler":
obj = await super().create(**kwargs)
obj.base_url = f"https://{region}.bunnycdn.com/{base_path}"
obj.headers = {
"AccessKey": api_key,
"Content-Type": "application/json",
"accept": "application/json",
}
# initialized later, in a guaranteed async context
self._connector = None
self._session = None
self._keepalive_timeout = keepalive_timeout
# setup the aiohttp session and connector
obj._connector = aiohttp.TCPConnector(
# limit is implicitly set to 100
keepalive_timeout = keepalive_timeout,
)
obj._session = aiohttp.ClientSession(connector=obj._connector)
return obj
async def setup_connector(self):
if self._session is None:
self._connector = aiohttp.TCPConnector(
# limit is implicitly set to 100
keepalive_timeout = self._keepalive_timeout,
)
self._session = aiohttp.ClientSession(connector=self._connector)
async def _write_entry(self, entry: dict, uid: str) -> bool:
await self.setup_connector()
payload = json.dumps(entry).encode("utf-8")
url = f"{self.base_url}/{uid}.json"
@@ -50,6 +51,7 @@ class BunnyStorageHandler(BaseHandler):
self.logger.exception(f"Exception while uploading UID={uid}")
return False
async def close(self):
await self._session.close()
await self._connector.close()

View File

@@ -8,7 +8,10 @@ class FilesystemHandler(BaseHandler):
"""
Handler that writes files to the filesystem.
"""
def __init__(self, output_dir: str, **kwargs):
output_dir: Path
@classmethod
async def create(cls, output_dir: str, **kwargs) -> "FilesystemHandler":
"""
Initializes the FileSystemHandler with the specified output directory.
@@ -16,11 +19,12 @@ class FilesystemHandler(BaseHandler):
output_dir (str): The directory where files will be written.
**kwargs: Additional keyword arguments for the BaseHandler.
"""
super().__init__(**kwargs)
self.output_dir = Path(output_dir)
obj = await super().create(**kwargs)
obj.output_dir = Path(output_dir)
# Ensure the target directory exists
self.output_dir.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Output directory set to {self.output_dir}")
obj.output_dir.mkdir(parents=True, exist_ok=True)
obj.logger.info(f"Output directory set to {obj.output_dir}")
return obj
async def _write_entry(self, entry: dict, uid: str) -> bool:

76
src/output_handlers/s3.py Normal file
View File

@@ -0,0 +1,76 @@
"""Handler that writes asynchronously."""
from .base_handler import BaseHandler
import json
from aiobotocore.session import AioSession
from aiobotocore.client import AioBaseClient
from contextlib import AsyncExitStack
class S3Handler(BaseHandler):
"""
Handler that writes files to an S3 bucket asynchronously.
"""
bucket_name: str
client: AioBaseClient
exit_stack: AsyncExitStack
@classmethod
async def create(cls, url: str, access_key: str, secret_key: str, bucket_name: str, **kwargs) -> "S3Handler":
"""
Initializes the Handler with the specified S3 endpoint and bucket name.
Args:
**kwargs: Additional keyword arguments for the BaseHandler.
"""
obj = await super().create(**kwargs)
obj.bucket_name = bucket_name
obj.exit_stack = AsyncExitStack()
session = AioSession()
obj.client = await obj.exit_stack.enter_async_context(
session.create_client(
service_name = 's3',
# region_name='us-west-2',
aws_secret_access_key = secret_key,
aws_access_key_id = access_key,
endpoint_url = url,
)
)
await obj._ensure_bucket_exists()
return obj
async def _ensure_bucket_exists(self):
"""
Ensures that the specified S3 bucket exists, but does not create it if it doesn't.
"""
# this will raise an error if the bucket does not exist
await self.client.head_bucket(Bucket=self.bucket_name)
async def _write_entry(self, entry: dict, uid: str) -> bool:
"""
Asynchronously writes a single entry to the bucket.
"""
data = json.dumps(entry).encode('utf-8')
try:
response = await self.client.put_object(
Bucket = self.bucket_name,
Key = f"{uid}.json",
Body = data
)
if response['ResponseMetadata']['HTTPStatusCode'] not in (200, 201):
raise Exception(f"Response: {response}")
return True
except:
self.logger.exception(f"Failed to write entry {uid} to bucket {self.bucket_name}.")
return False
async def close(self):
await self.client.close()
await self.exit_stack.__aexit__(None, None, None)
await super().close()

View File

@@ -97,5 +97,6 @@ class WikiDumpHandler(xml.sax.ContentHandler):
# Write to all handlers concurrently
await asyncio.gather(
*[handler.write_entry(entry, uid) for handler in self.handlers]
*[handler.write_entry(entry, uid) for handler in self.handlers],
return_exceptions = True,
)

854
uv.lock generated

File diff suppressed because it is too large Load Diff