aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/ingest.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--scripts/common/ingest.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/scripts/common/ingest.py b/scripts/common/ingest.py
new file mode 100644
index 0000000..9adb6da
--- /dev/null
+++ b/scripts/common/ingest.py
@@ -0,0 +1,57 @@
+"""
+Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error).
+"""
+
+from scripts.common.db import execute, execute_scalar
+
+
+def start_ingest_run(parser, source_system, notes=None):
+ """Create an ingest_run row and return its id."""
+ return execute_scalar(
+ "INSERT INTO raw.ingest_run (parser, source_system, notes) "
+ "VALUES (%s, %s, %s) RETURNING id",
+ (parser, source_system, notes or ""),
+ )
+
+
+def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted):
+ """Mark an ingest_run as done."""
+ execute(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', "
+ "files_scanned = %s, files_matched = %s, rows_inserted = %s "
+ "WHERE id = %s",
+ (files_scanned, files_matched, rows_inserted, run_id),
+ )
+
+
+def fail_ingest_run(run_id):
+ """Mark an ingest_run as error."""
+ execute(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s",
+ (run_id,),
+ )
+
+
+def expire_stale_runs(minutes=60):
+ """Mark any ingest_run still 'running' after N minutes as 'error'.
+
+ Returns True if at least one run was expired, False otherwise.
+ """
+ return execute_scalar(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' "
+ "WHERE status = 'running' AND started_at < now() - interval '%s minutes' "
+ "RETURNING id",
+ (minutes,),
+ ) is not None
+
+
+def log_ingest_error(run_id, source_archive, source_path, error,
+ source_document_id=None, stage=None):
+ """Log an error to ingest_error."""
+ execute(
+ "INSERT INTO raw.ingest_error "
+ "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) "
+ "VALUES (%s, %s, %s, %s, %s, %s)",
+ (run_id, source_archive or "", source_path or "",
+ source_document_id, stage, str(error)),
+ )