diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/common/filing.py | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to 'scripts/common/filing.py')
| -rw-r--r-- | scripts/common/filing.py | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/scripts/common/filing.py b/scripts/common/filing.py new file mode 100644 index 0000000..44bcabc --- /dev/null +++ b/scripts/common/filing.py @@ -0,0 +1,93 @@ +""" +Shared filing helpers for the new raw ingestion schema. + +Provides upsert/record operations for raw.filing and raw.filing_source. +Form-specific child row operations live in the parsers themselves. +""" + +from scripts.common.db import execute_scalar + + +def _execute_scalar(conn, sql, params=None): + """Execute SQL and return a single scalar value using an optional transaction conn.""" + if conn is None: + return execute_scalar(sql, params) + + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + return row[0] if row else None + + +def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None): + """Insert or update a raw.filing row. Returns the raw_filing_id. + + On conflict (same source_system + source_document_id), updates ingest_run_id + to the current run (meaning "most recently touched by this run"). + """ + return _execute_scalar( + conn, + """ + INSERT INTO raw.filing ( + source_system, source_document_id, ein, filer_name, form_type, + tax_year, tax_period_begin, tax_period_end, + return_version, return_timestamp, ingest_run_id + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (source_system, source_document_id) + DO UPDATE SET + ein = EXCLUDED.ein, + filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name), + form_type = EXCLUDED.form_type, + tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year), + tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin), + tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end), + return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version), + return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp), + source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url), + ingest_run_id = EXCLUDED.ingest_run_id + RETURNING id + """, + ( + source_system, source_document_id, metadata["ein"], + metadata.get("filer_name"), metadata["form_type"], + metadata.get("tax_year"), metadata.get("tax_period_begin"), + metadata.get("tax_period_end"), metadata.get("return_version"), + metadata.get("return_timestamp"), ingest_run_id, + ), + ) + + +def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None): + """Record that a filing was seen at a specific archive+path. + + If (source_archive, source_path) already exists, verifies it points to + the same raw_filing_id. Raises ValueError if there's a conflict. + """ + new_id = _execute_scalar( + conn, + """ + INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path) + VALUES (%s, %s, %s, %s) + ON CONFLICT (source_archive, source_path) DO NOTHING + RETURNING id + """, + (raw_filing_id, ingest_run_id, source_archive, source_path), + ) + + if new_id is not None: + return # inserted successfully + + # Row already existed — verify ownership + existing_filing_id = _execute_scalar( + conn, + "SELECT raw_filing_id FROM raw.filing_source " + "WHERE source_archive = %s AND source_path = %s", + (source_archive, source_path), + ) + + if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id: + raise ValueError( + f"Filing source conflict: {source_archive}:{source_path} " + f"already mapped to raw_filing_id={existing_filing_id}, " + f"but trying to map to {raw_filing_id}" + ) |
