diff --git a/src/main.py b/src/main.py index 7f43550..b597e17 100644 --- a/src/main.py +++ b/src/main.py @@ -54,8 +54,8 @@ async def process_dump( sax_parser = xml.sax.make_parser() dump_handler = WikiDumpHandler(mappings, handlers) sax_parser.setContentHandler(dump_handler) - - async with aiohttp.ClientSession() as session: + timeout = aiohttp.ClientTimeout(total = 5000) + async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(xml_url) as resp: resp.raise_for_status() async for chunk in resp.content.iter_chunked(1024 * 1024): diff --git a/src/output_handlers/base_handler.py b/src/output_handlers/base_handler.py index 38b6922..ac8149f 100644 --- a/src/output_handlers/base_handler.py +++ b/src/output_handlers/base_handler.py @@ -32,6 +32,7 @@ class BaseHandler(ABC): self.semaphore = None if max_concurrent > 0: self.semaphore = asyncio.Semaphore(max_concurrent) + self.logger.info(f"Handler initialized with fail_on_error={self.fail_on_error}, max_concurrent={max_concurrent}") return self diff --git a/src/output_handlers/s3.py b/src/output_handlers/s3.py index 7214c39..6cc288a 100644 --- a/src/output_handlers/s3.py +++ b/src/output_handlers/s3.py @@ -1,9 +1,7 @@ """Handler that writes asynchronously.""" from .base_handler import BaseHandler import json -import aiobotocore -from aiobotocore.session import get_session -from asyncio import TimeoutError +from aiobotocore.session import AioSession from contextlib import AsyncExitStack class S3Handler(BaseHandler): @@ -21,10 +19,9 @@ class S3Handler(BaseHandler): self = await super().create(**kwargs) self.bucket_name = bucket_name - self.session = get_session() self.exit_stack = AsyncExitStack() - session = aiobotocore.session.AioSession() + session = AioSession() self.client = await self.exit_stack.enter_async_context( session.create_client( service_name = 's3', @@ -54,24 +51,21 @@ class S3Handler(BaseHandler): data = json.dumps(entry).encode('utf-8') try: response = await self.client.put_object( - Bucket=self.bucket_name, - Key=f"{uid}.json", - Body=data + Bucket = self.bucket_name, + Key = f"{uid}.json", + Body = data ) - except TimeoutError: - self.logger.error(f"Timeout error while writing entry {uid} to bucket {self.bucket_name}.") - return False - - if response['ResponseMetadata']['HTTPStatusCode'] == 200: - self.logger.info(f"Successfully wrote entry {uid} to bucket {self.bucket_name}.") + if response['ResponseMetadata']['HTTPStatusCode'] not in (200, 201): + raise Exception(f"Response: {response}") return True - else: - self.logger.error(f"Failed to write entry {uid} to bucket {self.bucket_name}. Status code: {response['ResponseMetadata']['HTTPStatusCode']}") + + except: + self.logger.exception(f"Failed to write entry {uid} to bucket {self.bucket_name}.") return False async def close(self): await self.client.close() - await self._exit_stack.__aexit__(None, None, None) + await self.exit_stack.__aexit__(None, None, None) await super().close() diff --git a/src/transformers/wiki_dump_handler.py b/src/transformers/wiki_dump_handler.py index 5b561cd..ea3766c 100644 --- a/src/transformers/wiki_dump_handler.py +++ b/src/transformers/wiki_dump_handler.py @@ -97,5 +97,6 @@ class WikiDumpHandler(xml.sax.ContentHandler): # Write to all handlers concurrently await asyncio.gather( - *[handler.write_entry(entry, uid) for handler in self.handlers] + *[handler.write_entry(entry, uid) for handler in self.handlers], + return_exceptions = True, )