From 0bf0238ef1d13180dcceec84438b84b6e25ce9b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruce=20R=C3=B6ttgers?= Date: Mon, 29 Sep 2025 17:51:42 +0200 Subject: [PATCH] move remaining files --- .../tests}/fixtures/boston_input.txt | 0 .../tests}/fixtures/boston_output.json | 0 .../tests}/test_parser_json_snippets.py | 0 src/transformers/wiki_dump_handler.py | 37 ++++--- transformers/wiki_dump_handler.py | 100 ------------------ 5 files changed, 21 insertions(+), 116 deletions(-) rename {tests => src/tests}/fixtures/boston_input.txt (100%) rename {tests => src/tests}/fixtures/boston_output.json (100%) rename {tests => src/tests}/test_parser_json_snippets.py (100%) delete mode 100644 transformers/wiki_dump_handler.py diff --git a/tests/fixtures/boston_input.txt b/src/tests/fixtures/boston_input.txt similarity index 100% rename from tests/fixtures/boston_input.txt rename to src/tests/fixtures/boston_input.txt diff --git a/tests/fixtures/boston_output.json b/src/tests/fixtures/boston_output.json similarity index 100% rename from tests/fixtures/boston_output.json rename to src/tests/fixtures/boston_output.json diff --git a/tests/test_parser_json_snippets.py b/src/tests/test_parser_json_snippets.py similarity index 100% rename from tests/test_parser_json_snippets.py rename to src/tests/test_parser_json_snippets.py diff --git a/src/transformers/wiki_dump_handler.py b/src/transformers/wiki_dump_handler.py index eecf022..5b561cd 100644 --- a/src/transformers/wiki_dump_handler.py +++ b/src/transformers/wiki_dump_handler.py @@ -5,20 +5,19 @@ from .parser import WikivoyageParser logger = getLogger(__name__) + class WikiDumpHandler(xml.sax.ContentHandler): """ SAX handler that, for each whose is in mappings, collects the and schedules an async task to parse - and write via the user‐supplied handler. + and write via the user‐supplied handler(s). """ - def __init__(self, mappings, handler, max_concurrent): + def __init__(self, mappings, handlers): super().__init__() self.mappings = mappings - self.handler = handler - self.sem = ( - asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None - ) + # Support a single handler or a list of handlers + self.handlers = handlers self.tasks: list[asyncio.Task] = [] self.currentTag: str | None = None @@ -26,6 +25,7 @@ class WikiDumpHandler(xml.sax.ContentHandler): self.inRevision = False self.inText = False self.currentPageId: str | None = None + self.currentTitle: str | None = None self.currentText: list[str] = [] def startElement(self, name, attrs): @@ -34,6 +34,7 @@ class WikiDumpHandler(xml.sax.ContentHandler): logger.debug("start page") self.inPage = True self.currentPageId = None + self.currentTitle = None self.currentText = [] elif name == "revision": logger.debug("start revision") @@ -49,18 +50,17 @@ class WikiDumpHandler(xml.sax.ContentHandler): if pid and pid in self.mappings: wd_id = self.mappings[pid] text = "".join(self.currentText) + title = self.currentTitle logger.debug(f"scheduled {wd_id} for handling") # schedule processing - if self.sem: - task = asyncio.create_task(self._bounded_process(text, wd_id)) - else: - task = asyncio.create_task(self._process(text, wd_id)) + task = asyncio.create_task(self._process(text, wd_id, title)) self.tasks.append(task) else: logger.debug(f"page {pid} without wikidata id, skipping...") # reset self.inPage = self.inRevision = self.inText = False self.currentPageId = None + self.currentTitle = None self.currentText = [] elif name == "revision": logger.debug("end revision") @@ -81,16 +81,21 @@ class WikiDumpHandler(xml.sax.ContentHandler): content_stripped = content.strip() if content_stripped: # Only process non-empty ID content self.currentPageId = content_stripped + elif self.currentTag == "title" and self.inPage: + if self.currentTitle is None: + self.currentTitle = content + else: + self.currentTitle += content elif self.inText: # Always append text content, even if it's just whitespace or newlines self.currentText.append(content) - async def _process(self, text: str, uid: str): + async def _process(self, text: str, uid: str, title: str): parser = WikivoyageParser() entry = parser.parse(text) - await self.handler.write_entry(entry, uid) + entry["properties"]["title"] = title - async def _bounded_process(self, text: str, uid: str): - # Only run N at once - async with self.sem: - await self._process(text, uid) + # Write to all handlers concurrently + await asyncio.gather( + *[handler.write_entry(entry, uid) for handler in self.handlers] + ) diff --git a/transformers/wiki_dump_handler.py b/transformers/wiki_dump_handler.py deleted file mode 100644 index c566f44..0000000 --- a/transformers/wiki_dump_handler.py +++ /dev/null @@ -1,100 +0,0 @@ -from logging import getLogger -import xml.sax -import asyncio -from .parser import WikivoyageParser - -logger = getLogger(__name__) - -class WikiDumpHandler(xml.sax.ContentHandler): - """ - SAX handler that, for each whose is in mappings, - collects the and schedules an async task to parse - and write via the user‐supplied handler(s). - """ - - def __init__(self, mappings, handlers): - super().__init__() - self.mappings = mappings - # Support a single handler or a list of handlers - self.handlers = handlers - self.tasks: list[asyncio.Task] = [] - - self.currentTag: str | None = None - self.inPage = False - self.inRevision = False - self.inText = False - self.currentPageId: str | None = None - self.currentTitle: str | None = None - self.currentText: list[str] = [] - - def startElement(self, name, attrs): - self.currentTag = name - if name == "page": - logger.debug("start page") - self.inPage = True - self.currentPageId = None - self.currentTitle = None - self.currentText = [] - elif name == "revision": - logger.debug("start revision") - self.inRevision = True - elif name == "text" and self.inRevision: - logger.debug("start text") - self.inText = True - - def endElement(self, name): - if name == "page": - logger.debug("end page") - pid = self.currentPageId - if pid and pid in self.mappings: - wd_id = self.mappings[pid] - text = "".join(self.currentText) - title = self.currentTitle - logger.debug(f"scheduled {wd_id} for handling") - # schedule processing - task = asyncio.create_task(self._process(text, wd_id, title)) - self.tasks.append(task) - else: - logger.debug(f"page {pid} without wikidata id, skipping...") - # reset - self.inPage = self.inRevision = self.inText = False - self.currentPageId = None - self.currentTitle = None - self.currentText = [] - elif name == "revision": - logger.debug("end revision") - self.inRevision = False - elif name == "text": - logger.debug("end text") - self.inText = False - self.currentTag = None - - def characters(self, content): - # Only filter whitespace for ID fields, preserve all content for text - if ( - self.currentTag == "id" - and self.inPage - and not self.inRevision - and not self.currentPageId - ): - content_stripped = content.strip() - if content_stripped: # Only process non-empty ID content - self.currentPageId = content_stripped - elif self.currentTag == "title" and self.inPage: - if self.currentTitle is None: - self.currentTitle = content - else: - self.currentTitle += content - elif self.inText: - # Always append text content, even if it's just whitespace or newlines - self.currentText.append(content) - - async def _process(self, text: str, uid: str, title: str): - parser = WikivoyageParser() - entry = parser.parse(text) - entry['properties']['title'] = title - - # Write to all handlers concurrently - await asyncio.gather(*[ - handler.write_entry(entry, uid) for handler in self.handlers - ])