set processing timeout higher since s3 tasks are somewhat slow and processing the whole batch takes some time

This commit is contained in:
2025-10-14 19:00:25 +02:00
parent c9825ecf90
commit c48b326660
4 changed files with 16 additions and 20 deletions

View File

@@ -54,8 +54,8 @@ async def process_dump(
sax_parser = xml.sax.make_parser() sax_parser = xml.sax.make_parser()
dump_handler = WikiDumpHandler(mappings, handlers) dump_handler = WikiDumpHandler(mappings, handlers)
sax_parser.setContentHandler(dump_handler) sax_parser.setContentHandler(dump_handler)
timeout = aiohttp.ClientTimeout(total = 5000)
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(xml_url) as resp: async with session.get(xml_url) as resp:
resp.raise_for_status() resp.raise_for_status()
async for chunk in resp.content.iter_chunked(1024 * 1024): async for chunk in resp.content.iter_chunked(1024 * 1024):

View File

@@ -32,6 +32,7 @@ class BaseHandler(ABC):
self.semaphore = None self.semaphore = None
if max_concurrent > 0: if max_concurrent > 0:
self.semaphore = asyncio.Semaphore(max_concurrent) self.semaphore = asyncio.Semaphore(max_concurrent)
self.logger.info(f"Handler initialized with fail_on_error={self.fail_on_error}, max_concurrent={max_concurrent}")
return self return self

View File

@@ -1,9 +1,7 @@
"""Handler that writes asynchronously.""" """Handler that writes asynchronously."""
from .base_handler import BaseHandler from .base_handler import BaseHandler
import json import json
import aiobotocore from aiobotocore.session import AioSession
from aiobotocore.session import get_session
from asyncio import TimeoutError
from contextlib import AsyncExitStack from contextlib import AsyncExitStack
class S3Handler(BaseHandler): class S3Handler(BaseHandler):
@@ -21,10 +19,9 @@ class S3Handler(BaseHandler):
self = await super().create(**kwargs) self = await super().create(**kwargs)
self.bucket_name = bucket_name self.bucket_name = bucket_name
self.session = get_session()
self.exit_stack = AsyncExitStack() self.exit_stack = AsyncExitStack()
session = aiobotocore.session.AioSession() session = AioSession()
self.client = await self.exit_stack.enter_async_context( self.client = await self.exit_stack.enter_async_context(
session.create_client( session.create_client(
service_name = 's3', service_name = 's3',
@@ -54,24 +51,21 @@ class S3Handler(BaseHandler):
data = json.dumps(entry).encode('utf-8') data = json.dumps(entry).encode('utf-8')
try: try:
response = await self.client.put_object( response = await self.client.put_object(
Bucket=self.bucket_name, Bucket = self.bucket_name,
Key=f"{uid}.json", Key = f"{uid}.json",
Body=data Body = data
) )
except TimeoutError: if response['ResponseMetadata']['HTTPStatusCode'] not in (200, 201):
self.logger.error(f"Timeout error while writing entry {uid} to bucket {self.bucket_name}.") raise Exception(f"Response: {response}")
return False
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
self.logger.info(f"Successfully wrote entry {uid} to bucket {self.bucket_name}.")
return True return True
else:
self.logger.error(f"Failed to write entry {uid} to bucket {self.bucket_name}. Status code: {response['ResponseMetadata']['HTTPStatusCode']}") except:
self.logger.exception(f"Failed to write entry {uid} to bucket {self.bucket_name}.")
return False return False
async def close(self): async def close(self):
await self.client.close() await self.client.close()
await self._exit_stack.__aexit__(None, None, None) await self.exit_stack.__aexit__(None, None, None)
await super().close() await super().close()

View File

@@ -97,5 +97,6 @@ class WikiDumpHandler(xml.sax.ContentHandler):
# Write to all handlers concurrently # Write to all handlers concurrently
await asyncio.gather( await asyncio.gather(
*[handler.write_entry(entry, uid) for handler in self.handlers] *[handler.write_entry(entry, uid) for handler in self.handlers],
return_exceptions = True,
) )