refactor transform code into own module

This commit is contained in:
Bruce Röttgers
2025-04-30 14:01:20 +02:00
parent 3330667fa7
commit b18387a83c
6 changed files with 273 additions and 246 deletions

3
transformers/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .fetch_mappings import fetch_mappings
from .wiki_dump_handler import WikiDumpHandler
from .parser import WikivoyageParser

View File

@@ -0,0 +1,42 @@
from logging import getLogger
import zlib
import re
import aiohttp
logger = getLogger(__name__)
async def fetch_mappings() -> dict[str, str]:
"""
Download and gunzip the page_props SQL dump, extract
page→wikibase_item mappings.
"""
sql_url = (
"https://dumps.wikimedia.org/"
"enwikivoyage/latest/"
"enwikivoyage-latest-page_props.sql.gz"
)
# decompress gzip
decomp = zlib.decompressobj(16 + zlib.MAX_WBITS)
# regex for tuples: (page,'prop','value',NULL_or_number)
tuple_re = re.compile(r"\((\d+),'([^']+)','([^']+)',(NULL|[\d\.]+)\)")
buffer = ""
mappings: dict[str, str] = {}
async with aiohttp.ClientSession() as session:
async with session.get(sql_url) as resp:
resp.raise_for_status()
async for chunk in resp.content.iter_chunked(1024 * 1024):
data = decomp.decompress(chunk)
if not data:
continue
text = data.decode("utf-8", errors="ignore")
buffer += text
for m in tuple_re.finditer(buffer):
page_id, prop, value = m.group(1), m.group(2), m.group(3)
if prop == "wikibase_item":
logger.debug(f"Found mapping {page_id} -> {value}")
mappings[page_id] = value
# keep tail to handle split tuples
if len(buffer) > 1000:
buffer = buffer[-1000:]
return mappings

348
transformers/parser.py Normal file
View File

@@ -0,0 +1,348 @@
"""Where the magic happens: parsing wikitext into a structured JSON tree."""
import mwparserfromhell as mwp
import mwparserfromhell.nodes as nodes
import json
from typing import Dict
DOCUMENT_TEMPLATES = [
"pagebanner", "mapframe", "routebox", "geo", "isPartOf",
"usablecity", "guidecity", "outlinecity"
]
LISTING_TEMPLATES = [
"see", "do", "buy", "eat", "drink", "sleep", "listing"
]
class WikivoyageParser:
"""
A parser for Wikivoyage wikitext to JSON tree structure.
This class uses mwparserfromhell to parse the wikitext and convert it into a structured JSON format.
"""
def __init__(self):
self.root = {
"type": "root",
"properties": {},
"children": []
}
self.current_section = self.root
def parse(self, wikitext: str) -> Dict:
"""Parse wikitext and return structured JSON tree"""
self.root = {
"type": "root",
"properties": {},
"children": []
}
self.current_section = self.root
# Parse the wikitext
parsed = mwp.parse(wikitext)
# Process the parsed content
self._process_nodes(parsed)
return self.root
def _process_nodes(self, wikicode):
"""Process all nodes in the wikicode"""
current_text = ""
for node in wikicode.nodes:
# Handle different node types
if isinstance(node, nodes.heading.Heading):
# First flush any pending text
if current_text:
self._add_text_node(current_text)
current_text = ""
# Create new section
self._handle_heading(node)
elif isinstance(node, nodes.template.Template):
# First flush any pending text
if current_text:
self._add_text_node(current_text)
current_text = ""
# Handle template
self._handle_template(node)
elif isinstance(node, nodes.text.Text):
# Accumulate text
current_text += str(node.value)
elif isinstance(node, nodes.tag.Tag):
# Handle tag (potential styling)
tag_text = self._convert_tag_to_markdown(node)
current_text += tag_text
elif isinstance(node, nodes.wikilink.Wikilink):
# Handle wikilink
link_text = self._convert_wikilink_to_markdown(node)
current_text += link_text
elif isinstance(node, nodes.external_link.ExternalLink):
# Handle external link
link_text = self._convert_external_link_to_markdown(node)
current_text += link_text
elif isinstance(node, nodes.comment.Comment):
# Skip comments
pass
else:
# Process other nodes as text
current_text += str(node)
# Add any remaining text
if current_text:
self._add_text_node(current_text)
def _add_text_node(self, text: str):
"""Add a text node to the current section"""
# Avoid adding empty text nodes
if not text.strip():
return
text_node = {
"type": "text",
"properties": {
"markdown": text.strip()
},
"children": []
}
self.current_section["children"].append(text_node)
def _handle_heading(self, heading_node):
"""Handle a heading node by creating a new section"""
level = heading_node.level
title = str(heading_node.title).strip()
# Create new section node
section = {
"type": "section",
"properties": {
"title": title,
"level": level
},
"children": []
}
# Find the appropriate parent section based on level
parent = self.root
# If the level is 1, the parent is the root
if level > 1:
# Start from root and traverse the tree
current = self.root
current_level = 0
for child in reversed(self._get_all_sections()):
child_level = child["properties"]["level"]
if child_level < level:
parent = child
break
# Add the section to its parent
parent["children"].append(section)
# Update current section
self.current_section = section
def _get_all_sections(self):
"""Get all sections in the document in the order they appear"""
sections = []
def collect_sections(node):
if node["type"] == "section":
sections.append(node)
for child in node["children"]:
if child["type"] == "section":
collect_sections(child)
collect_sections(self.root)
return sections
def _handle_template(self, template_node):
"""Handle a template node"""
template_name = str(template_node.name).strip().lower()
# Check if it's a document-wide template
if template_name in DOCUMENT_TEMPLATES:
self._handle_document_template(template_node)
return
# Check if it's a listing template
if template_name in LISTING_TEMPLATES:
self._handle_listing_template(template_node)
return
# Handle other templates as regular nodes
self._handle_other_template(template_node)
def _handle_document_template(self, template_node):
"""Handle document-wide templates by adding to root properties"""
template_name = str(template_node.name).strip().lower()
# Extract parameters
params = {}
for param in template_node.params:
name = str(param.name).strip()
value = str(param.value).strip()
params[name] = value
# Add to root properties
if template_name not in self.root["properties"]:
self.root["properties"][template_name] = {}
self.root["properties"][template_name] = params
def _handle_listing_template(self, template_node):
"""Handle listing templates (see, do, buy, eat, drink, sleep)"""
template_name = str(template_node.name).strip().lower()
# Extract parameters
properties = {}
for param in template_node.params:
name = str(param.name).strip()
value = str(param.value).strip()
# Convert content to markdown if it's in the 'content' parameter
if name == "content":
value = self._convert_wikicode_to_markdown(param.value)
properties[name] = value
# Create listing node
listing_node = {
"type": template_name,
"properties": properties,
"children": []
}
# Add to current section
self.current_section["children"].append(listing_node)
def _handle_other_template(self, template_node):
"""Handle other templates as general template nodes"""
template_name = str(template_node.name).strip().lower()
# Extract parameters
properties = {
"name": template_name,
"params": {}
}
for param in template_node.params:
name = str(param.name).strip()
value = str(param.value).strip()
properties["params"][name] = value
# Create template node
template_node = {
"type": "template",
"properties": properties,
"children": []
}
# Add to current section
self.current_section["children"].append(template_node)
def _convert_wikicode_to_markdown(self, wikicode) -> str:
"""Convert wikicode to markdown"""
markdown = ""
for node in wikicode.nodes:
if isinstance(node, nodes.text.Text):
markdown += str(node.value)
elif isinstance(node, nodes.tag.Tag):
markdown += self._convert_tag_to_markdown(node)
elif isinstance(node, nodes.wikilink.Wikilink):
markdown += self._convert_wikilink_to_markdown(node)
elif isinstance(node, nodes.external_link.ExternalLink):
markdown += self._convert_external_link_to_markdown(node)
else:
# For other nodes, just use their string representation
markdown += str(node)
return markdown.strip()
def _convert_tag_to_markdown(self, tag_node) -> str:
"""Convert HTML tag to markdown"""
tag = str(tag_node.tag).lower()
content = str(tag_node.contents)
# Convert the content recursively to handle nested tags
if tag_node.contents:
content = self._convert_wikicode_to_markdown(tag_node.contents)
# Handle different tags
if tag == 'b' or tag == 'strong':
return f"**{content}**"
elif tag == 'i' or tag == 'em':
return f"*{content}*"
elif tag == 'u':
return f"_{content}_"
elif tag == 'strike' or tag == 's' or tag == 'del':
return f"~~{content}~~"
elif tag == 'code':
return f"`{content}`"
elif tag == 'pre':
return f"```\n{content}\n```"
elif tag == 'br':
return "\n"
elif tag == 'hr':
return "\n---\n"
elif tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
level = int(tag[1])
return f"\n{'#' * level} {content}\n"
elif tag == 'a':
href = ""
for attr in tag_node.attributes:
if str(attr.name).lower() == 'href':
href = str(attr.value)
break
return f"[{content}]({href})"
elif tag == 'img':
src = alt = ""
for attr in tag_node.attributes:
if str(attr.name).lower() == 'src':
src = str(attr.value)
elif str(attr.name).lower() == 'alt':
alt = str(attr.value)
return f"![{alt}]({src})"
else:
# For unknown tags, just return the content
return content
def _convert_wikilink_to_markdown(self, wikilink_node) -> str:
"""Convert wikilink to markdown"""
title = str(wikilink_node.title)
if wikilink_node.text:
text = str(wikilink_node.text)
return f"[{text}]({title})"
else:
return f"[{title}]({title})"
def _convert_external_link_to_markdown(self, link_node) -> str:
"""Convert external link to markdown"""
url = str(link_node.url)
if link_node.title:
title = str(link_node.title)
return f"[{title}]({url})"
else:
return url
def export_json(self, root=None, indent=2) -> str:
"""Export the tree as JSON string"""
if root is None:
root = self.root
return json.dumps(root, indent=indent)

View File

@@ -0,0 +1,96 @@
from logging import getLogger
import xml.sax
import asyncio
from .parser import WikivoyageParser
logger = getLogger(__name__)
class WikiDumpHandler(xml.sax.ContentHandler):
"""
SAX handler that, for each <page> whose <id> is in mappings,
collects the <text> and schedules an async task to parse
and write via the usersupplied handler.
"""
def __init__(self, mappings, handler, max_concurrent):
super().__init__()
self.mappings = mappings
self.handler = handler
self.sem = (
asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None
)
self.tasks: list[asyncio.Task] = []
self.currentTag: str | None = None
self.inPage = False
self.inRevision = False
self.inText = False
self.currentPageId: str | None = None
self.currentText: list[str] = []
def startElement(self, name, attrs):
self.currentTag = name
if name == "page":
logger.debug("start page")
self.inPage = True
self.currentPageId = None
self.currentText = []
elif name == "revision":
logger.debug("start revision")
self.inRevision = True
elif name == "text" and self.inRevision:
logger.debug("start text")
self.inText = True
def endElement(self, name):
if name == "page":
logger.debug("end page")
pid = self.currentPageId
if pid and pid in self.mappings:
wd_id = self.mappings[pid]
text = "".join(self.currentText)
logger.debug(f"scheduled {wd_id} for handling")
# schedule processing
if self.sem:
task = asyncio.create_task(self._bounded_process(text, wd_id))
else:
task = asyncio.create_task(self._process(text, wd_id))
self.tasks.append(task)
else:
logger.debug(f"page {pid} without wikidata id, skipping...")
# reset
self.inPage = self.inRevision = self.inText = False
self.currentPageId = None
self.currentText = []
elif name == "revision":
logger.debug("end revision")
self.inRevision = False
elif name == "text":
logger.debug("end text")
self.inText = False
self.currentTag = None
def characters(self, content):
# Only filter whitespace for ID fields, preserve all content for text
if (
self.currentTag == "id"
and self.inPage
and not self.inRevision
and not self.currentPageId
):
content_stripped = content.strip()
if content_stripped: # Only process non-empty ID content
self.currentPageId = content_stripped
elif self.inText:
# Always append text content, even if it's just whitespace or newlines
self.currentText.append(content)
async def _process(self, text: str, uid: str):
parser = WikivoyageParser()
entry = parser.parse(text)
await self.handler.write_entry(entry, uid)
async def _bounded_process(self, text: str, uid: str):
# Only run N at once
async with self.sem:
await self._process(text, uid)