""" Shared filing helpers for the new raw ingestion schema. Provides upsert/record operations for raw.filing and raw.filing_source. Form-specific child row operations live in the parsers themselves. """ from scripts.common.db import execute_scalar def _execute_scalar(conn, sql, params=None): """Execute SQL and return a single scalar value using an optional transaction conn.""" if conn is None: return execute_scalar(sql, params) with conn.cursor() as cur: cur.execute(sql, params) row = cur.fetchone() return row[0] if row else None def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None): """Insert or update a raw.filing row. Returns the raw_filing_id. On conflict (same source_system + source_document_id), updates ingest_run_id to the current run (meaning "most recently touched by this run"). """ return _execute_scalar( conn, """ INSERT INTO raw.filing ( source_system, source_document_id, ein, filer_name, form_type, tax_year, tax_period_begin, tax_period_end, return_version, return_timestamp, ingest_run_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (source_system, source_document_id) DO UPDATE SET ein = EXCLUDED.ein, filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name), form_type = EXCLUDED.form_type, tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year), tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin), tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end), return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version), return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp), source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url), ingest_run_id = EXCLUDED.ingest_run_id RETURNING id """, ( source_system, source_document_id, metadata["ein"], metadata.get("filer_name"), metadata["form_type"], metadata.get("tax_year"), metadata.get("tax_period_begin"), metadata.get("tax_period_end"), metadata.get("return_version"), metadata.get("return_timestamp"), ingest_run_id, ), ) def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None): """Record that a filing was seen at a specific archive+path. If (source_archive, source_path) already exists, verifies it points to the same raw_filing_id. Raises ValueError if there's a conflict. """ new_id = _execute_scalar( conn, """ INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path) VALUES (%s, %s, %s, %s) ON CONFLICT (source_archive, source_path) DO NOTHING RETURNING id """, (raw_filing_id, ingest_run_id, source_archive, source_path), ) if new_id is not None: return # inserted successfully # Row already existed — verify ownership existing_filing_id = _execute_scalar( conn, "SELECT raw_filing_id FROM raw.filing_source " "WHERE source_archive = %s AND source_path = %s", (source_archive, source_path), ) if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id: raise ValueError( f"Filing source conflict: {source_archive}:{source_path} " f"already mapped to raw_filing_id={existing_filing_id}, " f"but trying to map to {raw_filing_id}" )