1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
"""
Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error).
"""
from scripts.common.db import execute, execute_scalar
def start_ingest_run(parser, source_system, notes=None):
"""Create an ingest_run row and return its id."""
return execute_scalar(
"INSERT INTO raw.ingest_run (parser, source_system, notes) "
"VALUES (%s, %s, %s) RETURNING id",
(parser, source_system, notes or ""),
)
def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted):
"""Mark an ingest_run as done."""
execute(
"UPDATE raw.ingest_run SET finished_at = now(), status = 'done', "
"files_scanned = %s, files_matched = %s, rows_inserted = %s "
"WHERE id = %s",
(files_scanned, files_matched, rows_inserted, run_id),
)
def fail_ingest_run(run_id):
"""Mark an ingest_run as error."""
execute(
"UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s",
(run_id,),
)
def expire_stale_runs(minutes=60):
"""Mark any ingest_run still 'running' after N minutes as 'error'.
Returns True if at least one run was expired, False otherwise.
"""
return execute_scalar(
"UPDATE raw.ingest_run SET finished_at = now(), status = 'error' "
"WHERE status = 'running' AND started_at < now() - interval '%s minutes' "
"RETURNING id",
(minutes,),
) is not None
def log_ingest_error(run_id, source_archive, source_path, error,
source_document_id=None, stage=None):
"""Log an error to ingest_error."""
execute(
"INSERT INTO raw.ingest_error "
"(ingest_run_id, source_archive, source_path, source_document_id, stage, error) "
"VALUES (%s, %s, %s, %s, %s, %s)",
(run_id, source_archive or "", source_path or "",
source_document_id, stage, str(error)),
)
|