aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/ingest.py
blob: 9adb6da753ddd3b414bd7da0c40abd25b6913300 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""
Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error).
"""

from scripts.common.db import execute, execute_scalar


def start_ingest_run(parser, source_system, notes=None):
    """Create an ingest_run row and return its id."""
    return execute_scalar(
        "INSERT INTO raw.ingest_run (parser, source_system, notes) "
        "VALUES (%s, %s, %s) RETURNING id",
        (parser, source_system, notes or ""),
    )


def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted):
    """Mark an ingest_run as done."""
    execute(
        "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', "
        "files_scanned = %s, files_matched = %s, rows_inserted = %s "
        "WHERE id = %s",
        (files_scanned, files_matched, rows_inserted, run_id),
    )


def fail_ingest_run(run_id):
    """Mark an ingest_run as error."""
    execute(
        "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s",
        (run_id,),
    )


def expire_stale_runs(minutes=60):
    """Mark any ingest_run still 'running' after N minutes as 'error'.

    Returns True if at least one run was expired, False otherwise.
    """
    return execute_scalar(
        "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' "
        "WHERE status = 'running' AND started_at < now() - interval '%s minutes' "
        "RETURNING id",
        (minutes,),
    ) is not None


def log_ingest_error(run_id, source_archive, source_path, error,
                     source_document_id=None, stage=None):
    """Log an error to ingest_error."""
    execute(
        "INSERT INTO raw.ingest_error "
        "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) "
        "VALUES (%s, %s, %s, %s, %s, %s)",
        (run_id, source_archive or "", source_path or "",
         source_document_id, stage, str(error)),
    )