aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/filing.py
diff options
context:
space:
mode:
authorbenj <benj@rse8.com>2026-04-10 11:13:57 +0800
committerbenj <benj@rse8.com>2026-04-10 11:13:57 +0800
commit6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch)
tree52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/common/filing.py
parent493746b14c1251a45b061d2e3edd9160c929d2b9 (diff)
downloadtidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip
ensure parsers do not parse and store raw XML fields
Diffstat (limited to '')
-rw-r--r--scripts/common/filing.py93
1 files changed, 93 insertions, 0 deletions
diff --git a/scripts/common/filing.py b/scripts/common/filing.py
new file mode 100644
index 0000000..44bcabc
--- /dev/null
+++ b/scripts/common/filing.py
@@ -0,0 +1,93 @@
+"""
+Shared filing helpers for the new raw ingestion schema.
+
+Provides upsert/record operations for raw.filing and raw.filing_source.
+Form-specific child row operations live in the parsers themselves.
+"""
+
+from scripts.common.db import execute_scalar
+
+
+def _execute_scalar(conn, sql, params=None):
+ """Execute SQL and return a single scalar value using an optional transaction conn."""
+ if conn is None:
+ return execute_scalar(sql, params)
+
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ row = cur.fetchone()
+ return row[0] if row else None
+
+
+def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None):
+ """Insert or update a raw.filing row. Returns the raw_filing_id.
+
+ On conflict (same source_system + source_document_id), updates ingest_run_id
+ to the current run (meaning "most recently touched by this run").
+ """
+ return _execute_scalar(
+ conn,
+ """
+ INSERT INTO raw.filing (
+ source_system, source_document_id, ein, filer_name, form_type,
+ tax_year, tax_period_begin, tax_period_end,
+ return_version, return_timestamp, ingest_run_id
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ ON CONFLICT (source_system, source_document_id)
+ DO UPDATE SET
+ ein = EXCLUDED.ein,
+ filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name),
+ form_type = EXCLUDED.form_type,
+ tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year),
+ tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin),
+ tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end),
+ return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version),
+ return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp),
+ source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url),
+ ingest_run_id = EXCLUDED.ingest_run_id
+ RETURNING id
+ """,
+ (
+ source_system, source_document_id, metadata["ein"],
+ metadata.get("filer_name"), metadata["form_type"],
+ metadata.get("tax_year"), metadata.get("tax_period_begin"),
+ metadata.get("tax_period_end"), metadata.get("return_version"),
+ metadata.get("return_timestamp"), ingest_run_id,
+ ),
+ )
+
+
+def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None):
+ """Record that a filing was seen at a specific archive+path.
+
+ If (source_archive, source_path) already exists, verifies it points to
+ the same raw_filing_id. Raises ValueError if there's a conflict.
+ """
+ new_id = _execute_scalar(
+ conn,
+ """
+ INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path)
+ VALUES (%s, %s, %s, %s)
+ ON CONFLICT (source_archive, source_path) DO NOTHING
+ RETURNING id
+ """,
+ (raw_filing_id, ingest_run_id, source_archive, source_path),
+ )
+
+ if new_id is not None:
+ return # inserted successfully
+
+ # Row already existed — verify ownership
+ existing_filing_id = _execute_scalar(
+ conn,
+ "SELECT raw_filing_id FROM raw.filing_source "
+ "WHERE source_archive = %s AND source_path = %s",
+ (source_archive, source_path),
+ )
+
+ if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id:
+ raise ValueError(
+ f"Filing source conflict: {source_archive}:{source_path} "
+ f"already mapped to raw_filing_id={existing_filing_id}, "
+ f"but trying to map to {raw_filing_id}"
+ )