Merge branch 'main' into feature/docker-multi-stage-builds

This commit is contained in:
Bruce Röttgers
2025-09-29 17:49:23 +02:00
10 changed files with 471 additions and 1379 deletions

View File

@@ -9,16 +9,14 @@ class WikiDumpHandler(xml.sax.ContentHandler):
"""
SAX handler that, for each <page> whose <id> is in mappings,
collects the <text> and schedules an async task to parse
and write via the usersupplied handler.
and write via the usersupplied handler(s).
"""
def __init__(self, mappings, handler, max_concurrent):
def __init__(self, mappings, handlers):
super().__init__()
self.mappings = mappings
self.handler = handler
self.sem = (
asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None
)
# Support a single handler or a list of handlers
self.handlers = handlers
self.tasks: list[asyncio.Task] = []
self.currentTag: str | None = None
@@ -54,10 +52,7 @@ class WikiDumpHandler(xml.sax.ContentHandler):
title = self.currentTitle
logger.debug(f"scheduled {wd_id} for handling")
# schedule processing
if self.sem:
task = asyncio.create_task(self._bounded_process(text, wd_id, title))
else:
task = asyncio.create_task(self._process(text, wd_id, title))
task = asyncio.create_task(self._process(text, wd_id, title))
self.tasks.append(task)
else:
logger.debug(f"page {pid} without wikidata id, skipping...")
@@ -98,9 +93,8 @@ class WikiDumpHandler(xml.sax.ContentHandler):
parser = WikivoyageParser()
entry = parser.parse(text)
entry['properties']['title'] = title
await self.handler.write_entry(entry, uid)
async def _bounded_process(self, text: str, uid: str, title: str):
# Only run N at once
async with self.sem:
await self._process(text, uid, title)
# Write to all handlers concurrently
await asyncio.gather(*[
handler.write_entry(entry, uid) for handler in self.handlers
])