From 6605e2cc428e3bdaa174ccc432941eab8c5d61cb Mon Sep 17 00:00:00 2001 From: benj Date: Fri, 10 Apr 2026 11:13:57 +0800 Subject: ensure parsers do not parse and store raw XML fields --- scripts/common/ingest.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 scripts/common/ingest.py (limited to 'scripts/common/ingest.py') diff --git a/scripts/common/ingest.py b/scripts/common/ingest.py new file mode 100644 index 0000000..9adb6da --- /dev/null +++ b/scripts/common/ingest.py @@ -0,0 +1,57 @@ +""" +Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error). +""" + +from scripts.common.db import execute, execute_scalar + + +def start_ingest_run(parser, source_system, notes=None): + """Create an ingest_run row and return its id.""" + return execute_scalar( + "INSERT INTO raw.ingest_run (parser, source_system, notes) " + "VALUES (%s, %s, %s) RETURNING id", + (parser, source_system, notes or ""), + ) + + +def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted): + """Mark an ingest_run as done.""" + execute( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', " + "files_scanned = %s, files_matched = %s, rows_inserted = %s " + "WHERE id = %s", + (files_scanned, files_matched, rows_inserted, run_id), + ) + + +def fail_ingest_run(run_id): + """Mark an ingest_run as error.""" + execute( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s", + (run_id,), + ) + + +def expire_stale_runs(minutes=60): + """Mark any ingest_run still 'running' after N minutes as 'error'. + + Returns True if at least one run was expired, False otherwise. + """ + return execute_scalar( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' " + "WHERE status = 'running' AND started_at < now() - interval '%s minutes' " + "RETURNING id", + (minutes,), + ) is not None + + +def log_ingest_error(run_id, source_archive, source_path, error, + source_document_id=None, stage=None): + """Log an error to ingest_error.""" + execute( + "INSERT INTO raw.ingest_error " + "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) " + "VALUES (%s, %s, %s, %s, %s, %s)", + (run_id, source_archive or "", source_path or "", + source_document_id, stage, str(error)), + ) -- cgit v1.2.3