refined handling of a single file object leveraging create and close methods

This commit is contained in:
2025-10-15 18:55:29 +02:00
parent ee8359491a
commit 22c7d48d3d

View File

@@ -1,7 +1,6 @@
"""CSV output handler for writing entries to a single CSV file with ID and title."""
import os
from pathlib import Path
import aiofiles
import logging
from .base_handler import BaseHandler
@@ -12,30 +11,38 @@ class CsvHandler(BaseHandler):
rather than writing the entire JSON document.
"""
def __init__(
self,
output_path: str,
fail_on_error: bool = True,
**kwargs
):
file_writer: any # I believe aiofiles doesn't expose a type for this
@classmethod
async def create(
cls,
output_path: Path,
**kwargs,
) -> "CsvHandler":
"""
Initializes the CSVHandler.
Args:
output_path (str): Path to the CSV file to write to.
fail_on_error (bool): If True, the handler will raise an exception on error.
**kwargs: Additional keyword arguments.
"""
super().__init__(fail_on_error=fail_on_error, **kwargs)
self.output_path = output_path
self.logger = logging.getLogger(__name__)
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(self.output_path), exist_ok=True)
obj = await super().create(**kwargs)
output_path = Path(output_path)
# Create the containging directory if it doesn't exist
output_path.parent.mkdir(parents=True, exist_ok=True)
# Create file with header if it doesn't exist
with open(self.output_path, 'w', encoding='utf-8') as f:
f.write("id,title\n")
if not output_path.exists():
async with aiofiles.open(output_path, mode='w', encoding='utf-8') as file:
await file.write('"id","title"\n')
# open the file and keep it open for appending
obj.file_writer = await aiofiles.open(output_path, mode='a', encoding='utf-8')
# this has type
return obj
async def _write_entry(self, entry: dict, uid: str) -> bool:
"""
@@ -44,31 +51,28 @@ class CsvHandler(BaseHandler):
Args:
entry (dict): The entry to write.
uid (str): The unique identifier for the entry.
Returns:
bool: True if the entry was written successfully, False otherwise.
"""
try:
# Extract title from properties.title
title = ""
if "properties" in entry and "title" in entry["properties"]:
title = entry["properties"]["title"]
title = entry.get("properties", {}).get("title", "")
# Escape quotes in title
title = str(title).replace('"', '""')
# Open file in append mode for each write
async with aiofiles.open(self.output_path, mode='a', encoding='utf-8') as file:
# Write the row
await file.write(f'"{uid}","{title}"\n')
await self.file_writer.write(f'"{uid}","{title}"\n')
return True
except Exception as e:
self.logger.error(f"Error writing entry {uid} to CSV: {str(e)}")
self.logger.exception(f"Error writing entry {uid} to CSV.")
return False
async def close(self):
"""
Performs cleanup and logs statistics.
"""
await super().close()
await self.file_writer.close()
await super().close()