diff --git a/package-lock.json b/package-lock.json deleted file mode 100644 index 69e4130..0000000 --- a/package-lock.json +++ /dev/null @@ -1,183 +0,0 @@ -{ - "name": "mapvoyage-extract", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "dependencies": { - "sax": "^1.4.1", - "unbzip2-stream": "^1.4.3" - }, - "devDependencies": { - "@types/node": "^22.14.0", - "@types/sax": "^1.2.7", - "@types/unbzip2-stream": "^1.4.3", - "prettier": "^3.4.2", - "typescript": "^5.8.2" - } - }, - "node_modules/@types/node": { - "version": "22.14.0", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.14.0.tgz", - "integrity": "sha512-Kmpl+z84ILoG+3T/zQFyAJsU6EPTmOCj8/2+83fSN6djd6I4o7uOuGIH6vq3PrjY5BGitSbFuMN18j3iknubbA==", - "dev": true, - "license": "MIT", - "dependencies": { - "undici-types": "~6.21.0" - } - }, - "node_modules/@types/sax": { - "version": "1.2.7", - "resolved": "https://registry.npmjs.org/@types/sax/-/sax-1.2.7.tgz", - "integrity": "sha512-rO73L89PJxeYM3s3pPPjiPgVVcymqU490g0YO5n5By0k2Erzj6tay/4lr1CHAAU4JyOWd1rpQ8bCf6cZfHU96A==", - "dev": true, - "license": "MIT", - "dependencies": { - "@types/node": "*" - } - }, - "node_modules/@types/through": { - "version": "0.0.33", - "resolved": "https://registry.npmjs.org/@types/through/-/through-0.0.33.tgz", - "integrity": "sha512-HsJ+z3QuETzP3cswwtzt2vEIiHBk/dCcHGhbmG5X3ecnwFD/lPrMpliGXxSCg03L9AhrdwA4Oz/qfspkDW+xGQ==", - "dev": true, - "license": "MIT", - "dependencies": { - "@types/node": "*" - } - }, - "node_modules/@types/unbzip2-stream": { - "version": "1.4.3", - "resolved": "https://registry.npmjs.org/@types/unbzip2-stream/-/unbzip2-stream-1.4.3.tgz", - "integrity": "sha512-D8X5uuJRISqc8YtwL8jNW2FpPdUOCYXbfD6zNROCTbVXK9nawucxh10tVXE3MPjnHdRA1LvB0zDxVya/lBsnYw==", - "dev": true, - "license": "MIT", - "dependencies": { - "@types/through": "*" - } - }, - "node_modules/base64-js": { - "version": "1.5.1", - "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", - "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT" - }, - "node_modules/buffer": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", - "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "MIT", - "dependencies": { - "base64-js": "^1.3.1", - "ieee754": "^1.1.13" - } - }, - "node_modules/ieee754": { - "version": "1.2.1", - "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", - "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], - "license": "BSD-3-Clause" - }, - "node_modules/prettier": { - "version": "3.5.3", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.5.3.tgz", - "integrity": "sha512-QQtaxnoDJeAkDvDKWCLiwIXkTgRhwYDEQCghU9Z6q03iyek/rxRh/2lC3HB7P8sWT2xC/y5JDctPLBIGzHKbhw==", - "dev": true, - "license": "MIT", - "bin": { - "prettier": "bin/prettier.cjs" - }, - "engines": { - "node": ">=14" - }, - "funding": { - "url": "https://github.com/prettier/prettier?sponsor=1" - } - }, - "node_modules/sax": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/sax/-/sax-1.4.1.tgz", - "integrity": "sha512-+aWOz7yVScEGoKNd4PA10LZ8sk0A/z5+nXQG5giUO5rprX9jgYsTdov9qCchZiPIZezbZH+jRut8nPodFAX4Jg==", - "license": "ISC" - }, - "node_modules/through": { - "version": "2.3.8", - "resolved": "https://registry.npmjs.org/through/-/through-2.3.8.tgz", - "integrity": "sha512-w89qg7PI8wAdvX60bMDP+bFoD5Dvhm9oLheFp5O4a2QF0cSBGsBX4qZmadPMvVqlLJBBci+WqGGOAPvcDeNSVg==", - "license": "MIT" - }, - "node_modules/typescript": { - "version": "5.8.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.8.2.tgz", - "integrity": "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ==", - "dev": true, - "license": "Apache-2.0", - "bin": { - "tsc": "bin/tsc", - "tsserver": "bin/tsserver" - }, - "engines": { - "node": ">=14.17" - } - }, - "node_modules/unbzip2-stream": { - "version": "1.4.3", - "resolved": "https://registry.npmjs.org/unbzip2-stream/-/unbzip2-stream-1.4.3.tgz", - "integrity": "sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg==", - "license": "MIT", - "dependencies": { - "buffer": "^5.2.1", - "through": "^2.3.8" - } - }, - "node_modules/undici-types": { - "version": "6.21.0", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", - "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", - "dev": true, - "license": "MIT" - } - } -} diff --git a/package.json b/package.json deleted file mode 100644 index 3ff3289..0000000 --- a/package.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "private": true, - "packageManager": "pnpm@9.13.2+sha512.88c9c3864450350e65a33587ab801acf946d7c814ed1134da4a924f6df5a2120fd36b46aab68f7cd1d413149112d53c7db3a4136624cfd00ff1846a0c6cef48a", - "devDependencies": { - "@types/node": "^22.14.0", - "@types/sax": "^1.2.7", - "@types/unbzip2-stream": "^1.4.3", - "prettier": "^3.4.2", - "typescript": "^5.8.2" - }, - "dependencies": { - "sax": "^1.4.1", - "unbzip2-stream": "^1.4.3" - } -} diff --git a/split-dump.ts b/split-dump.ts deleted file mode 100644 index 782e951..0000000 --- a/split-dump.ts +++ /dev/null @@ -1,192 +0,0 @@ -import fs from "fs"; -import https from "https"; -import path from "path"; -import sax from "sax"; -import bz2 from "unbzip2-stream"; -import { createGunzip } from "zlib"; - -// Local storage configuration -const OUTPUT_FOLDER = "myfolder"; - -// --- Step 1: Fetch mappings from SQL dump --- -async function fetchMappings(): Promise> { - return new Promise((resolve, reject) => { - const sqlUrl = - "https://dumps.wikimedia.org/enwikivoyage/latest/enwikivoyage-latest-page_props.sql.gz"; - https - .get(sqlUrl, (res) => { - if (res.statusCode !== 200) { - return reject( - new Error(`Failed to get SQL dump, status code: ${res.statusCode}`), - ); - } - const gunzip = createGunzip(); - let buffer = ""; - const mappings: Record = {}; - res.pipe(gunzip); - gunzip.on("data", (chunk: Buffer) => { - buffer += chunk.toString(); - const regex = /\((\d+),'([^']+)','([^']+)',(NULL|[\d\.]+)\)/g; - let match: RegExpExecArray | null; - while ((match = regex.exec(buffer)) !== null) { - const [, pp_page, pp_propname, pp_value] = match; - if (pp_propname === "wikibase_item") { - mappings[pp_page] = pp_value; - } - } - // Keep a tail to handle chunk splits - if (buffer.length > 1000) { - buffer = buffer.slice(-1000); - } - }); - gunzip.on("end", () => resolve(mappings)); - gunzip.on("error", reject); - }) - .on("error", reject); - }); -} - -// --- Helper to save file locally --- -let saveCount = 0; -function saveToLocalFile(filename: string, data: string): Promise { - return new Promise((resolve, reject) => { - // Create directory if it doesn't exist - if (!fs.existsSync(OUTPUT_FOLDER)) { - fs.mkdirSync(OUTPUT_FOLDER, { recursive: true }); - } - - const filePath = path.join(OUTPUT_FOLDER, filename); - fs.writeFile(filePath, data, (err) => { - if (err) { - reject(err); - } else { - console.log(`File saved successfully (${++saveCount}): ${filePath}`); - resolve(); - } - }); - }); -} - -// Simple semaphore to limit concurrency -class Semaphore { - private tasks: (() => void)[] = []; - private count: number; - constructor(count: number) { - this.count = count; - } - async acquire(): Promise<() => void> { - return new Promise((release) => { - const task = () => { - this.count--; - release(() => { - this.count++; - if (this.tasks.length > 0) { - const next = this.tasks.shift()!; - next(); - } - }); - }; - if (this.count > 0) { - task(); - } else { - this.tasks.push(task); - } - }); - } -} - -// --- Step 3: Process the XML dump --- -async function processXML(mappings: Record): Promise { - return new Promise((resolve, reject) => { - const xmlUrl = - "https://dumps.wikimedia.org/enwikivoyage/latest/enwikivoyage-latest-pages-articles.xml.bz2"; - https - .get(xmlUrl, (res) => { - if (res.statusCode !== 200) { - return reject( - new Error(`Failed to fetch XML dump: ${res.statusCode}`), - ); - } - // Pipe through bz2 decompressor - const stream = res.pipe(bz2()); - // Use sax for streaming XML parsing - const parser = sax.createStream(true, {}); - let currentPageId: string | null = null; - let currentText: string | null = null; - let inPage = false; - let inRevision = false; - let inText = false; - let currentTag: string | null = null; // Track current tag - parser.on("opentag", (node) => { - currentTag = node.name; // Track current tag - if (node.name === "page") { - inPage = true; - currentPageId = null; - currentText = null; - } else if (node.name === "revision") { - inRevision = true; - } else if (inRevision && node.name === "text") { - inText = true; - } - }); - parser.on("closetag", (tagName) => { - if (tagName === "page") { - if ( - typeof currentPageId == "string" && - currentText !== null && - !!mappings[currentPageId] - ) { - const wikidataId = mappings[currentPageId]; - const filename = `${wikidataId}.wiki.txt`; - - // Make a copy as the value will continue changing - const textToSave = currentText.toString(); - - - saveToLocalFile(filename, textToSave).catch((err) => - console.error(`Save error for page ${currentPageId}:`, err) - ); - } - // Reset state for the next page - inPage = false; - currentPageId = null; - currentText = null; - } else if (tagName === "revision") { - inRevision = false; - } else if (tagName === "text") { - inText = false; - } - currentTag = null; // Reset current tag - }); - parser.on("text", (text) => { - const trimmedText = text.trim(); - if (!trimmedText) return; - if (currentTag === "id" && inPage && !inRevision && !currentPageId) { - currentPageId = trimmedText; - } else if (inText) { - currentText = (currentText || "") + trimmedText; - } - }); - parser.on("error", reject); - parser.on("end", resolve); - stream.pipe(parser); - }) - .on("error", reject); - }); -} - -// --- Main integration --- -async function main() { - try { - console.log("Fetching mappings from SQL dump..."); - const mappings = await fetchMappings(); - console.log(`Fetched ${Object.keys(mappings).length} mappings.`); - console.log("Processing XML dump..."); - await processXML(mappings); - console.log("Processing complete."); - } catch (err) { - console.error("Error:", err); - } -} - -main().then(() => process.exit()); \ No newline at end of file diff --git a/transform-documents.py b/transform-documents.py index 758c227..8e43a33 100644 --- a/transform-documents.py +++ b/transform-documents.py @@ -1,30 +1,20 @@ +#!/usr/bin/env python3 import os -from pathlib import Path import sys +import re +import zlib +import bz2 import asyncio -import importlib import logging +import importlib +import xml.sax +from pathlib import Path from dotenv import load_dotenv +import aiohttp from parser import WikivoyageParser logger = logging.getLogger(__name__) -async def process_file( - input_file: Path, - handler, -) -> None: - """ - Parse one wiki file and hand the resulting entry off to our handler. - Uses the filename (sans suffix) as the unique UID. - """ - - text = input_file.read_text(encoding="utf-8") - parser = WikivoyageParser() - entry = parser.parse(text) # assume returns a dict - uid = input_file.stem - - await handler.write_entry(entry, uid) - def gather_handler_kwargs(handler_name: str) -> dict: """ Find all ENV vars starting with HANDLER__ and turn them into kwargs. @@ -47,12 +37,154 @@ def gather_handler_kwargs(handler_name: str) -> dict: logger.debug(f"Handler kwargs: {kwargs}") return kwargs -async def main(): +async def fetch_mappings() -> dict[str, str]: + """ + Download and gunzip the page_props SQL dump, extract + page→wikibase_item mappings. + """ + sql_url = ( + "https://dumps.wikimedia.org/" + "enwikivoyage/latest/" + "enwikivoyage-latest-page_props.sql.gz" + ) + # decompress gzip + decomp = zlib.decompressobj(16 + zlib.MAX_WBITS) + # regex for tuples: (page,'prop','value',NULL_or_number) + tuple_re = re.compile(r"\((\d+),'([^']+)','([^']+)',(NULL|[\d\.]+)\)") + buffer = "" + mappings: dict[str, str] = {} + async with aiohttp.ClientSession() as session: + async with session.get(sql_url) as resp: + resp.raise_for_status() + async for chunk in resp.content.iter_chunked(1024 * 1024): + data = decomp.decompress(chunk) + if not data: + continue + text = data.decode("utf-8", errors="ignore") + buffer += text + for m in tuple_re.finditer(buffer): + page_id, prop, value = m.group(1), m.group(2), m.group(3) + if prop == "wikibase_item": + mappings[page_id] = value + # keep tail to handle split tuples + if len(buffer) > 1000: + buffer = buffer[-1000:] + return mappings + +class WikiDumpHandler(xml.sax.ContentHandler): + """ + SAX handler that, for each whose is in mappings, + collects the and schedules an async task to parse + and write via the user‐supplied handler. + """ + + def __init__(self, mappings, handler, max_concurrent): + super().__init__() + self.mappings = mappings + self.handler = handler + self.sem = ( + asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None + ) + self.tasks: list[asyncio.Task] = [] + + self.currentTag: str | None = None + self.inPage = False + self.inRevision = False + self.inText = False + self.currentPageId: str | None = None + self.currentText: list[str] = [] + + def startElement(self, name, attrs): + self.currentTag = name + if name == "page": + self.inPage = True + self.currentPageId = None + self.currentText = [] + elif name == "revision": + self.inRevision = True + elif name == "text" and self.inRevision: + self.inText = True + + def endElement(self, name): + if name == "page": + pid = self.currentPageId + if pid and pid in self.mappings: + wd_id = self.mappings[pid] + text = "".join(self.currentText) + # schedule processing + if self.sem: + task = asyncio.create_task(self._bounded_process(text, wd_id)) + else: + task = asyncio.create_task(self._process(text, wd_id)) + self.tasks.append(task) + # reset + self.inPage = self.inRevision = self.inText = False + self.currentPageId = None + self.currentText = [] + elif name == "revision": + self.inRevision = False + elif name == "text": + self.inText = False + self.currentTag = None + + def characters(self, content): + if not content.strip(): + return + if ( + self.currentTag == "id" + and self.inPage + and not self.inRevision + and not self.currentPageId + ): + self.currentPageId = content.strip() + elif self.inText: + self.currentText.append(content) + + async def _process(self, text: str, uid: str): + parser = WikivoyageParser() + entry = parser.parse(text) + await self.handler.write_entry(entry, uid) + + async def _bounded_process(self, text: str, uid: str): + # Only run N at once + async with self.sem: + await self._process(text, uid) + +async def process_dump( + mappings: dict[str, str], handler, max_concurrent: int +): + """ + Stream-download the bzip2-compressed XML dump and feed to SAX. + """ + xml_url = ( + "https://dumps.wikimedia.org/" + "enwikivoyage/latest/" + "enwikivoyage-latest-pages-articles.xml.bz2" + ) + decomp = bz2.BZ2Decompressor() + sax_parser = xml.sax.make_parser() + dump_handler = WikiDumpHandler(mappings, handler, max_concurrent) + sax_parser.setContentHandler(dump_handler) + + async with aiohttp.ClientSession() as session: + async with session.get(xml_url) as resp: + resp.raise_for_status() + async for chunk in resp.content.iter_chunked(1024 * 1024): + data = decomp.decompress(chunk) + if not data: + continue + text = data.decode("utf-8", errors="ignore") + sax_parser.feed(text) + sax_parser.close() + if dump_handler.tasks: + await asyncio.gather(*dump_handler.tasks) + +async def main(): # 1. Which handler to load? handler_name = os.getenv("HANDLER") if not handler_name: - print("Error: set ENV HANDLER (e.g. 'filesystem')") + logger.error("Error: set ENV HANDLER (e.g. 'filesystem')") sys.exit(1) # 2. Dynamic import @@ -60,31 +192,25 @@ async def main(): try: mod = importlib.import_module(module_path) except ImportError as e: - print(f"Error loading handler module {module_path}: {e}") + logger.error(f"Error loading handler module {module_path}: {e}") sys.exit(1) # 3. Find the class: e.g. "sftp" → "SftpHandler" class_name = handler_name.title().replace("_", "") + "Handler" if not hasattr(mod, class_name): - print(f"{module_path} defines no class {class_name}") + logger.error(f"{module_path} defines no class {class_name}") sys.exit(1) HandlerCls = getattr(mod, class_name) + logger.info(f"Using handler from {module_path}") + # 4. Build kwargs from ENV handler_kwargs = gather_handler_kwargs(handler_name) # 5. Instantiate handler = HandlerCls(**handler_kwargs) - # 6. Which dir to walk? - input_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".") - txt_files = list(input_dir.rglob("*.txt")) - - if not txt_files: - logger.info(f"No .txt files found under {input_dir}") - sys.exit(1) - - # 7. read concurrency setting + # 6. read concurrency setting try: max_conc = int(os.getenv("MAX_CONCURRENT", "0")) except ValueError: @@ -93,31 +219,18 @@ async def main(): if max_conc < 0: raise ValueError("MAX_CONCURRENT must be >= 0") - # 8. schedule tasks - if max_conc == 0: - # unbounded - tasks = [ - asyncio.create_task(process_file(txt, handler)) - for txt in txt_files - ] - else: - # bounded by semaphore - sem = asyncio.Semaphore(max_conc) - async def bounded(txt): - async with sem: - return await process_file(txt, handler) + # 7. Fetch mappings + logger.info("Fetching mappings from SQL dump…") + mappings = await fetch_mappings() + logger.info(f"Got {len(mappings)} wikibase_item mappings.") - tasks = [ - asyncio.create_task(bounded(txt)) - for txt in txt_files - ] + # 8. Stream & split the XML dump + logger.info("Processing XML dump…") + await process_dump(mappings, handler, max_conc) - # 9. run them all - await asyncio.gather(*tasks) + # 5. Finish up await handler.close() - - logger.info("All done.")