""" Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error). """ from scripts.common.db import execute, execute_scalar def start_ingest_run(parser, source_system, notes=None): """Create an ingest_run row and return its id.""" return execute_scalar( "INSERT INTO raw.ingest_run (parser, source_system, notes) " "VALUES (%s, %s, %s) RETURNING id", (parser, source_system, notes or ""), ) def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted): """Mark an ingest_run as done.""" execute( "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', " "files_scanned = %s, files_matched = %s, rows_inserted = %s " "WHERE id = %s", (files_scanned, files_matched, rows_inserted, run_id), ) def fail_ingest_run(run_id): """Mark an ingest_run as error.""" execute( "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s", (run_id,), ) def expire_stale_runs(minutes=60): """Mark any ingest_run still 'running' after N minutes as 'error'. Returns True if at least one run was expired, False otherwise. """ return execute_scalar( "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' " "WHERE status = 'running' AND started_at < now() - interval '%s minutes' " "RETURNING id", (minutes,), ) is not None def log_ingest_error(run_id, source_archive, source_path, error, source_document_id=None, stage=None): """Log an error to ingest_error.""" execute( "INSERT INTO raw.ingest_error " "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) " "VALUES (%s, %s, %s, %s, %s, %s)", (run_id, source_archive or "", source_path or "", source_document_id, stage, str(error)), )