Merge pull request #18 from bcye/feature/only-python

Integrate Node Script into Python
This commit is contained in:
Bruce 2025-04-30 21:12:51 +02:00 committed by GitHub
commit 729d4adc62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 195 additions and 442 deletions

View File

@ -1,30 +1,21 @@
#!/usr/bin/env python3
import os import os
from pathlib import Path
import sys import sys
import re
import zlib
import bz2
import asyncio import asyncio
import importlib
import logging import logging
import importlib
import xml.sax
from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
from parser import WikivoyageParser import aiohttp
from transformers import fetch_mappings, WikiDumpHandler, WikivoyageParser
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def process_file(
input_file: Path,
handler,
) -> None:
"""
Parse one wiki file and hand the resulting entry off to our handler.
Uses the filename (sans suffix) as the unique UID.
"""
text = input_file.read_text(encoding="utf-8")
parser = WikivoyageParser()
entry = parser.parse(text) # assume returns a dict
uid = input_file.stem
await handler.write_entry(entry, uid)
def gather_handler_kwargs(handler_name: str) -> dict: def gather_handler_kwargs(handler_name: str) -> dict:
""" """
Find all ENV vars starting with HANDLER_<NAME>_ and turn them into kwargs. Find all ENV vars starting with HANDLER_<NAME>_ and turn them into kwargs.
@ -47,12 +38,41 @@ def gather_handler_kwargs(handler_name: str) -> dict:
logger.debug(f"Handler kwargs: {kwargs}") logger.debug(f"Handler kwargs: {kwargs}")
return kwargs return kwargs
async def main():
async def process_dump(
mappings: dict[str, str], handler, max_concurrent: int
):
"""
Stream-download the bzip2-compressed XML dump and feed to SAX.
"""
xml_url = (
"https://dumps.wikimedia.org/"
"enwikivoyage/latest/"
"enwikivoyage-latest-pages-articles.xml.bz2"
)
decomp = bz2.BZ2Decompressor()
sax_parser = xml.sax.make_parser()
dump_handler = WikiDumpHandler(mappings, handler, max_concurrent)
sax_parser.setContentHandler(dump_handler)
async with aiohttp.ClientSession() as session:
async with session.get(xml_url) as resp:
resp.raise_for_status()
async for chunk in resp.content.iter_chunked(1024 * 1024):
data = decomp.decompress(chunk)
if not data:
continue
text = data.decode("utf-8", errors="ignore")
sax_parser.feed(text)
sax_parser.close()
if dump_handler.tasks:
await asyncio.gather(*dump_handler.tasks)
async def main():
# 1. Which handler to load? # 1. Which handler to load?
handler_name = os.getenv("HANDLER") handler_name = os.getenv("HANDLER")
if not handler_name: if not handler_name:
print("Error: set ENV HANDLER (e.g. 'filesystem')") logger.error("Error: set ENV HANDLER (e.g. 'filesystem')")
sys.exit(1) sys.exit(1)
# 2. Dynamic import # 2. Dynamic import
@ -60,31 +80,25 @@ async def main():
try: try:
mod = importlib.import_module(module_path) mod = importlib.import_module(module_path)
except ImportError as e: except ImportError as e:
print(f"Error loading handler module {module_path}: {e}") logger.error(f"Error loading handler module {module_path}: {e}")
sys.exit(1) sys.exit(1)
# 3. Find the class: e.g. "sftp" → "SftpHandler" # 3. Find the class: e.g. "sftp" → "SftpHandler"
class_name = handler_name.title().replace("_", "") + "Handler" class_name = handler_name.title().replace("_", "") + "Handler"
if not hasattr(mod, class_name): if not hasattr(mod, class_name):
print(f"{module_path} defines no class {class_name}") logger.error(f"{module_path} defines no class {class_name}")
sys.exit(1) sys.exit(1)
HandlerCls = getattr(mod, class_name) HandlerCls = getattr(mod, class_name)
logger.info(f"Using handler from {module_path}")
# 4. Build kwargs from ENV # 4. Build kwargs from ENV
handler_kwargs = gather_handler_kwargs(handler_name) handler_kwargs = gather_handler_kwargs(handler_name)
# 5. Instantiate # 5. Instantiate
handler = HandlerCls(**handler_kwargs) handler = HandlerCls(**handler_kwargs)
# 6. Which dir to walk? # 6. read concurrency setting
input_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".")
txt_files = list(input_dir.rglob("*.txt"))
if not txt_files:
logger.info(f"No .txt files found under {input_dir}")
sys.exit(1)
# 7. read concurrency setting
try: try:
max_conc = int(os.getenv("MAX_CONCURRENT", "0")) max_conc = int(os.getenv("MAX_CONCURRENT", "0"))
except ValueError: except ValueError:
@ -93,31 +107,18 @@ async def main():
if max_conc < 0: if max_conc < 0:
raise ValueError("MAX_CONCURRENT must be >= 0") raise ValueError("MAX_CONCURRENT must be >= 0")
# 8. schedule tasks
if max_conc == 0:
# unbounded
tasks = [
asyncio.create_task(process_file(txt, handler))
for txt in txt_files
]
else:
# bounded by semaphore
sem = asyncio.Semaphore(max_conc)
async def bounded(txt): # 7. Fetch mappings
async with sem: logger.info("Fetching mappings from SQL dump…")
return await process_file(txt, handler) mappings = await fetch_mappings()
logger.info(f"Got {len(mappings)} wikibase_item mappings.")
tasks = [ # 8. Stream & split the XML dump
asyncio.create_task(bounded(txt)) logger.info("Processing XML dump…")
for txt in txt_files await process_dump(mappings, handler, max_conc)
]
# 9. run them all # 5. Finish up
await asyncio.gather(*tasks)
await handler.close() await handler.close()
logger.info("All done.") logger.info("All done.")

183
package-lock.json generated
View File

@ -1,183 +0,0 @@
{
"name": "mapvoyage-extract",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"dependencies": {
"sax": "^1.4.1",
"unbzip2-stream": "^1.4.3"
},
"devDependencies": {
"@types/node": "^22.14.0",
"@types/sax": "^1.2.7",
"@types/unbzip2-stream": "^1.4.3",
"prettier": "^3.4.2",
"typescript": "^5.8.2"
}
},
"node_modules/@types/node": {
"version": "22.14.0",
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.14.0.tgz",
"integrity": "sha512-Kmpl+z84ILoG+3T/zQFyAJsU6EPTmOCj8/2+83fSN6djd6I4o7uOuGIH6vq3PrjY5BGitSbFuMN18j3iknubbA==",
"dev": true,
"license": "MIT",
"dependencies": {
"undici-types": "~6.21.0"
}
},
"node_modules/@types/sax": {
"version": "1.2.7",
"resolved": "https://registry.npmjs.org/@types/sax/-/sax-1.2.7.tgz",
"integrity": "sha512-rO73L89PJxeYM3s3pPPjiPgVVcymqU490g0YO5n5By0k2Erzj6tay/4lr1CHAAU4JyOWd1rpQ8bCf6cZfHU96A==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/through": {
"version": "0.0.33",
"resolved": "https://registry.npmjs.org/@types/through/-/through-0.0.33.tgz",
"integrity": "sha512-HsJ+z3QuETzP3cswwtzt2vEIiHBk/dCcHGhbmG5X3ecnwFD/lPrMpliGXxSCg03L9AhrdwA4Oz/qfspkDW+xGQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/unbzip2-stream": {
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/@types/unbzip2-stream/-/unbzip2-stream-1.4.3.tgz",
"integrity": "sha512-D8X5uuJRISqc8YtwL8jNW2FpPdUOCYXbfD6zNROCTbVXK9nawucxh10tVXE3MPjnHdRA1LvB0zDxVya/lBsnYw==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/through": "*"
}
},
"node_modules/base64-js": {
"version": "1.5.1",
"resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz",
"integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT"
},
"node_modules/buffer": {
"version": "5.7.1",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz",
"integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "MIT",
"dependencies": {
"base64-js": "^1.3.1",
"ieee754": "^1.1.13"
}
},
"node_modules/ieee754": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz",
"integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"license": "BSD-3-Clause"
},
"node_modules/prettier": {
"version": "3.5.3",
"resolved": "https://registry.npmjs.org/prettier/-/prettier-3.5.3.tgz",
"integrity": "sha512-QQtaxnoDJeAkDvDKWCLiwIXkTgRhwYDEQCghU9Z6q03iyek/rxRh/2lC3HB7P8sWT2xC/y5JDctPLBIGzHKbhw==",
"dev": true,
"license": "MIT",
"bin": {
"prettier": "bin/prettier.cjs"
},
"engines": {
"node": ">=14"
},
"funding": {
"url": "https://github.com/prettier/prettier?sponsor=1"
}
},
"node_modules/sax": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/sax/-/sax-1.4.1.tgz",
"integrity": "sha512-+aWOz7yVScEGoKNd4PA10LZ8sk0A/z5+nXQG5giUO5rprX9jgYsTdov9qCchZiPIZezbZH+jRut8nPodFAX4Jg==",
"license": "ISC"
},
"node_modules/through": {
"version": "2.3.8",
"resolved": "https://registry.npmjs.org/through/-/through-2.3.8.tgz",
"integrity": "sha512-w89qg7PI8wAdvX60bMDP+bFoD5Dvhm9oLheFp5O4a2QF0cSBGsBX4qZmadPMvVqlLJBBci+WqGGOAPvcDeNSVg==",
"license": "MIT"
},
"node_modules/typescript": {
"version": "5.8.2",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-5.8.2.tgz",
"integrity": "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ==",
"dev": true,
"license": "Apache-2.0",
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=14.17"
}
},
"node_modules/unbzip2-stream": {
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/unbzip2-stream/-/unbzip2-stream-1.4.3.tgz",
"integrity": "sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg==",
"license": "MIT",
"dependencies": {
"buffer": "^5.2.1",
"through": "^2.3.8"
}
},
"node_modules/undici-types": {
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz",
"integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==",
"dev": true,
"license": "MIT"
}
}
}

View File

@ -1,14 +0,0 @@
{
"private": true,
"devDependencies": {
"@types/node": "^22.14.0",
"@types/sax": "^1.2.7",
"@types/unbzip2-stream": "^1.4.3",
"prettier": "^3.4.2",
"typescript": "^5.8.2"
},
"dependencies": {
"sax": "^1.4.1",
"unbzip2-stream": "^1.4.3"
}
}

View File

@ -1,192 +0,0 @@
import fs from "fs";
import https from "https";
import path from "path";
import sax from "sax";
import bz2 from "unbzip2-stream";
import { createGunzip } from "zlib";
// Local storage configuration
const OUTPUT_FOLDER = "myfolder";
// --- Step 1: Fetch mappings from SQL dump ---
async function fetchMappings(): Promise<Record<string, string>> {
return new Promise((resolve, reject) => {
const sqlUrl =
"https://dumps.wikimedia.org/enwikivoyage/latest/enwikivoyage-latest-page_props.sql.gz";
https
.get(sqlUrl, (res) => {
if (res.statusCode !== 200) {
return reject(
new Error(`Failed to get SQL dump, status code: ${res.statusCode}`),
);
}
const gunzip = createGunzip();
let buffer = "";
const mappings: Record<string, string> = {};
res.pipe(gunzip);
gunzip.on("data", (chunk: Buffer) => {
buffer += chunk.toString();
const regex = /\((\d+),'([^']+)','([^']+)',(NULL|[\d\.]+)\)/g;
let match: RegExpExecArray | null;
while ((match = regex.exec(buffer)) !== null) {
const [, pp_page, pp_propname, pp_value] = match;
if (pp_propname === "wikibase_item") {
mappings[pp_page] = pp_value;
}
}
// Keep a tail to handle chunk splits
if (buffer.length > 1000) {
buffer = buffer.slice(-1000);
}
});
gunzip.on("end", () => resolve(mappings));
gunzip.on("error", reject);
})
.on("error", reject);
});
}
// --- Helper to save file locally ---
let saveCount = 0;
function saveToLocalFile(filename: string, data: string): Promise<void> {
return new Promise((resolve, reject) => {
// Create directory if it doesn't exist
if (!fs.existsSync(OUTPUT_FOLDER)) {
fs.mkdirSync(OUTPUT_FOLDER, { recursive: true });
}
const filePath = path.join(OUTPUT_FOLDER, filename);
fs.writeFile(filePath, data, (err) => {
if (err) {
reject(err);
} else {
console.log(`File saved successfully (${++saveCount}): ${filePath}`);
resolve();
}
});
});
}
// Simple semaphore to limit concurrency
class Semaphore {
private tasks: (() => void)[] = [];
private count: number;
constructor(count: number) {
this.count = count;
}
async acquire(): Promise<() => void> {
return new Promise((release) => {
const task = () => {
this.count--;
release(() => {
this.count++;
if (this.tasks.length > 0) {
const next = this.tasks.shift()!;
next();
}
});
};
if (this.count > 0) {
task();
} else {
this.tasks.push(task);
}
});
}
}
// --- Step 3: Process the XML dump ---
async function processXML(mappings: Record<string, string>): Promise<void> {
return new Promise((resolve, reject) => {
const xmlUrl =
"https://dumps.wikimedia.org/enwikivoyage/latest/enwikivoyage-latest-pages-articles.xml.bz2";
https
.get(xmlUrl, (res) => {
if (res.statusCode !== 200) {
return reject(
new Error(`Failed to fetch XML dump: ${res.statusCode}`),
);
}
// Pipe through bz2 decompressor
const stream = res.pipe(bz2());
// Use sax for streaming XML parsing
const parser = sax.createStream(true, {});
let currentPageId: string | null = null;
let currentText: string | null = null;
let inPage = false;
let inRevision = false;
let inText = false;
let currentTag: string | null = null; // Track current tag
parser.on("opentag", (node) => {
currentTag = node.name; // Track current tag
if (node.name === "page") {
inPage = true;
currentPageId = null;
currentText = null;
} else if (node.name === "revision") {
inRevision = true;
} else if (inRevision && node.name === "text") {
inText = true;
}
});
parser.on("closetag", (tagName) => {
if (tagName === "page") {
if (
typeof currentPageId == "string" &&
currentText !== null &&
!!mappings[currentPageId]
) {
const wikidataId = mappings[currentPageId];
const filename = `${wikidataId}.wiki.txt`;
// Make a copy as the value will continue changing
const textToSave = currentText.toString();
saveToLocalFile(filename, textToSave).catch((err) =>
console.error(`Save error for page ${currentPageId}:`, err)
);
}
// Reset state for the next page
inPage = false;
currentPageId = null;
currentText = null;
} else if (tagName === "revision") {
inRevision = false;
} else if (tagName === "text") {
inText = false;
}
currentTag = null; // Reset current tag
});
parser.on("text", (text) => {
const trimmedText = text.trim();
if (!trimmedText) return;
if (currentTag === "id" && inPage && !inRevision && !currentPageId) {
currentPageId = trimmedText;
} else if (inText) {
currentText = (currentText || "") + trimmedText;
}
});
parser.on("error", reject);
parser.on("end", resolve);
stream.pipe(parser);
})
.on("error", reject);
});
}
// --- Main integration ---
async function main() {
try {
console.log("Fetching mappings from SQL dump...");
const mappings = await fetchMappings();
console.log(`Fetched ${Object.keys(mappings).length} mappings.`);
console.log("Processing XML dump...");
await processXML(mappings);
console.log("Processing complete.");
} catch (err) {
console.error("Error:", err);
}
}
main().then(() => process.exit());

3
transformers/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .fetch_mappings import fetch_mappings
from .wiki_dump_handler import WikiDumpHandler
from .parser import WikivoyageParser

View File

@ -0,0 +1,42 @@
from logging import getLogger
import zlib
import re
import aiohttp
logger = getLogger(__name__)
async def fetch_mappings() -> dict[str, str]:
"""
Download and gunzip the page_props SQL dump, extract
pagewikibase_item mappings.
"""
sql_url = (
"https://dumps.wikimedia.org/"
"enwikivoyage/latest/"
"enwikivoyage-latest-page_props.sql.gz"
)
# decompress gzip
decomp = zlib.decompressobj(16 + zlib.MAX_WBITS)
# regex for tuples: (page,'prop','value',NULL_or_number)
tuple_re = re.compile(r"\((\d+),'([^']+)','([^']+)',(NULL|[\d\.]+)\)")
buffer = ""
mappings: dict[str, str] = {}
async with aiohttp.ClientSession() as session:
async with session.get(sql_url) as resp:
resp.raise_for_status()
async for chunk in resp.content.iter_chunked(1024 * 1024):
data = decomp.decompress(chunk)
if not data:
continue
text = data.decode("utf-8", errors="ignore")
buffer += text
for m in tuple_re.finditer(buffer):
page_id, prop, value = m.group(1), m.group(2), m.group(3)
if prop == "wikibase_item":
logger.debug(f"Found mapping {page_id} -> {value}")
mappings[page_id] = value
# keep tail to handle split tuples
if len(buffer) > 1000:
buffer = buffer[-1000:]
return mappings

View File

@ -0,0 +1,96 @@
from logging import getLogger
import xml.sax
import asyncio
from .parser import WikivoyageParser
logger = getLogger(__name__)
class WikiDumpHandler(xml.sax.ContentHandler):
"""
SAX handler that, for each <page> whose <id> is in mappings,
collects the <text> and schedules an async task to parse
and write via the usersupplied handler.
"""
def __init__(self, mappings, handler, max_concurrent):
super().__init__()
self.mappings = mappings
self.handler = handler
self.sem = (
asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None
)
self.tasks: list[asyncio.Task] = []
self.currentTag: str | None = None
self.inPage = False
self.inRevision = False
self.inText = False
self.currentPageId: str | None = None
self.currentText: list[str] = []
def startElement(self, name, attrs):
self.currentTag = name
if name == "page":
logger.debug("start page")
self.inPage = True
self.currentPageId = None
self.currentText = []
elif name == "revision":
logger.debug("start revision")
self.inRevision = True
elif name == "text" and self.inRevision:
logger.debug("start text")
self.inText = True
def endElement(self, name):
if name == "page":
logger.debug("end page")
pid = self.currentPageId
if pid and pid in self.mappings:
wd_id = self.mappings[pid]
text = "".join(self.currentText)
logger.debug(f"scheduled {wd_id} for handling")
# schedule processing
if self.sem:
task = asyncio.create_task(self._bounded_process(text, wd_id))
else:
task = asyncio.create_task(self._process(text, wd_id))
self.tasks.append(task)
else:
logger.debug(f"page {pid} without wikidata id, skipping...")
# reset
self.inPage = self.inRevision = self.inText = False
self.currentPageId = None
self.currentText = []
elif name == "revision":
logger.debug("end revision")
self.inRevision = False
elif name == "text":
logger.debug("end text")
self.inText = False
self.currentTag = None
def characters(self, content):
# Only filter whitespace for ID fields, preserve all content for text
if (
self.currentTag == "id"
and self.inPage
and not self.inRevision
and not self.currentPageId
):
content_stripped = content.strip()
if content_stripped: # Only process non-empty ID content
self.currentPageId = content_stripped
elif self.inText:
# Always append text content, even if it's just whitespace or newlines
self.currentText.append(content)
async def _process(self, text: str, uid: str):
parser = WikivoyageParser()
entry = parser.parse(text)
await self.handler.write_entry(entry, uid)
async def _bounded_process(self, text: str, uid: str):
# Only run N at once
async with self.sem:
await self._process(text, uid)