1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
"""
Shared filing helpers for the new raw ingestion schema.
Provides upsert/record operations for raw.filing and raw.filing_source.
Form-specific child row operations live in the parsers themselves.
"""
from scripts.common.db import execute_all, execute_scalar
def _execute_scalar(conn, sql, params=None):
"""Execute SQL and return a single scalar value using an optional transaction conn."""
if conn is None:
return execute_scalar(sql, params)
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
return row[0] if row else None
def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None):
"""Insert or update a raw.filing row. Returns the raw_filing_id.
On conflict (same source_system + source_document_id), updates ingest_run_id
to the current run (meaning "most recently touched by this run").
"""
return _execute_scalar(
conn,
"""
INSERT INTO raw.filing (
source_system, source_document_id, ein, filer_name, form_type,
tax_year, tax_period_begin, tax_period_end,
return_version, return_timestamp, ingest_run_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (source_system, source_document_id)
DO UPDATE SET
ein = EXCLUDED.ein,
filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name),
form_type = EXCLUDED.form_type,
tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year),
tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin),
tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end),
return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version),
return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp),
source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url),
ingest_run_id = EXCLUDED.ingest_run_id
RETURNING id
""",
(
source_system, source_document_id, metadata["ein"],
metadata.get("filer_name"), metadata["form_type"],
metadata.get("tax_year"), metadata.get("tax_period_begin"),
metadata.get("tax_period_end"), metadata.get("return_version"),
metadata.get("return_timestamp"), ingest_run_id,
),
)
def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None):
"""Record that a filing was seen at a specific archive+path.
If (source_archive, source_path) already exists, verifies it points to
the same raw_filing_id. Raises ValueError if there's a conflict.
"""
new_id = _execute_scalar(
conn,
"""
INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path)
VALUES (%s, %s, %s, %s)
ON CONFLICT (source_archive, source_path) DO NOTHING
RETURNING id
""",
(raw_filing_id, ingest_run_id, source_archive, source_path),
)
if new_id is not None:
return # inserted successfully
# Row already existed — verify ownership
existing_filing_id = _execute_scalar(
conn,
"SELECT raw_filing_id FROM raw.filing_source "
"WHERE source_archive = %s AND source_path = %s",
(source_archive, source_path),
)
if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id:
raise ValueError(
f"Filing source conflict: {source_archive}:{source_path} "
f"already mapped to raw_filing_id={existing_filing_id}, "
f"but trying to map to {raw_filing_id}"
)
def get_seen_source_paths(source_archive):
"""Return the set of source_path values already committed for an archive."""
rows = execute_all(
"SELECT source_path FROM raw.filing_source WHERE source_archive = %s",
(source_archive,),
)
return {row[0] for row in rows}
|