aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/filing.py
blob: 3c7aaf3908663e6462be27bfdd1a10be327f8e57 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
Shared filing helpers for the new raw ingestion schema.

Provides upsert/record operations for raw.filing and raw.filing_source.
Form-specific child row operations live in the parsers themselves.
"""

from scripts.common.db import execute_all, execute_scalar


def _execute_scalar(conn, sql, params=None):
    """Execute SQL and return a single scalar value using an optional transaction conn."""
    if conn is None:
        return execute_scalar(sql, params)

    with conn.cursor() as cur:
        cur.execute(sql, params)
        row = cur.fetchone()
    return row[0] if row else None


def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None):
    """Insert or update a raw.filing row. Returns the raw_filing_id.

    On conflict (same source_system + source_document_id), updates ingest_run_id
    to the current run (meaning "most recently touched by this run").
    """
    return _execute_scalar(
        conn,
        """
        INSERT INTO raw.filing (
            source_system, source_document_id, ein, filer_name, form_type,
            tax_year, tax_period_begin, tax_period_end,
            return_version, return_timestamp, ingest_run_id
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (source_system, source_document_id)
        DO UPDATE SET
            ein = EXCLUDED.ein,
            filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name),
            form_type = EXCLUDED.form_type,
            tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year),
            tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin),
            tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end),
            return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version),
            return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp),
            source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url),
            ingest_run_id = EXCLUDED.ingest_run_id
        RETURNING id
        """,
        (
            source_system, source_document_id, metadata["ein"],
            metadata.get("filer_name"), metadata["form_type"],
            metadata.get("tax_year"), metadata.get("tax_period_begin"),
            metadata.get("tax_period_end"), metadata.get("return_version"),
            metadata.get("return_timestamp"), ingest_run_id,
        ),
    )


def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None):
    """Record that a filing was seen at a specific archive+path.

    If (source_archive, source_path) already exists, verifies it points to
    the same raw_filing_id. Raises ValueError if there's a conflict.
    """
    new_id = _execute_scalar(
        conn,
        """
        INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (source_archive, source_path) DO NOTHING
        RETURNING id
        """,
        (raw_filing_id, ingest_run_id, source_archive, source_path),
    )

    if new_id is not None:
        return  # inserted successfully

    # Row already existed — verify ownership
    existing_filing_id = _execute_scalar(
        conn,
        "SELECT raw_filing_id FROM raw.filing_source "
        "WHERE source_archive = %s AND source_path = %s",
        (source_archive, source_path),
    )

    if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id:
        raise ValueError(
            f"Filing source conflict: {source_archive}:{source_path} "
            f"already mapped to raw_filing_id={existing_filing_id}, "
            f"but trying to map to {raw_filing_id}"
        )


def get_seen_source_paths(source_archive):
    """Return the set of source_path values already committed for an archive."""
    rows = execute_all(
        "SELECT source_path FROM raw.filing_source WHERE source_archive = %s",
        (source_archive,),
    )
    return {row[0] for row in rows}