aboutsummaryrefslogtreecommitdiff
path: root/scripts/common
diff options
context:
space:
mode:
authorbenj <benj@rse8.com>2026-04-10 11:13:57 +0800
committerbenj <benj@rse8.com>2026-04-10 11:13:57 +0800
commit6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch)
tree52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/common
parent493746b14c1251a45b061d2e3edd9160c929d2b9 (diff)
downloadtidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip
ensure parsers do not parse and store raw XML fields
Diffstat (limited to 'scripts/common')
-rw-r--r--scripts/common/__init__.py28
-rw-r--r--scripts/common/db.py198
-rw-r--r--scripts/common/filing.py93
-rw-r--r--scripts/common/ingest.py57
-rw-r--r--scripts/common/normalize.py120
-rw-r--r--scripts/common/xml.py129
6 files changed, 625 insertions, 0 deletions
diff --git a/scripts/common/__init__.py b/scripts/common/__init__.py
new file mode 100644
index 0000000..f06ee26
--- /dev/null
+++ b/scripts/common/__init__.py
@@ -0,0 +1,28 @@
+"""
+Shared infrastructure for the 990 data pipeline (v2).
+
+This package provides the single authoritative implementation of
+normalization, XML helpers, DB access, and ingest tracking for
+all parsers under scripts/parse/, scripts/fetch/, and scripts/extract/.
+
+Old parsers in scripts/ still use scripts/parse_common.py directly.
+"""
+
+import zipfile_deflate64 # noqa: F401
+
+from scripts.common.db import (
+ execute, execute_scalar, execute_all, execute_transaction, copy_rows,
+ # Legacy (shell-based, for old parsers)
+ psql, psql_scalar, psql_query_values, insert_rows,
+)
+from scripts.common.normalize import normalize_ein, parse_numeric, map_form_type, is_placeholder
+from scripts.common.xml import (
+ text, strip_ns, leaf_paths, extract_filing_metadata,
+ derive_source_document_id,
+)
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.filing import (
+ upsert_raw_filing, record_raw_filing_source,
+)
diff --git a/scripts/common/db.py b/scripts/common/db.py
new file mode 100644
index 0000000..bbd220c
--- /dev/null
+++ b/scripts/common/db.py
@@ -0,0 +1,198 @@
+"""
+Database access layer using psycopg2 with connection pooling.
+
+All DB access in the v2 pipeline goes through this module.
+"""
+
+import csv
+import io
+import sys
+
+import psycopg2
+import psycopg2.pool
+import psycopg2.extras
+
+DB_HOST = "localhost"
+DB_PORT = 35434
+DB_USER = "tidyindex"
+DB_PASSWORD = "tidyindex"
+DB_NAME = "tidyindex"
+
+_pool = None
+
+
+def get_pool():
+ """Get or create the connection pool (lazy singleton)."""
+ global _pool
+ if _pool is None:
+ _pool = psycopg2.pool.SimpleConnectionPool(
+ minconn=1,
+ maxconn=4,
+ host=DB_HOST,
+ port=DB_PORT,
+ user=DB_USER,
+ password=DB_PASSWORD,
+ dbname=DB_NAME,
+ )
+ return _pool
+
+
+def get_conn():
+ """Get a connection from the pool."""
+ return get_pool().getconn()
+
+
+def put_conn(conn):
+ """Return a connection to the pool."""
+ get_pool().putconn(conn)
+
+
+def execute(sql, params=None):
+ """Execute SQL (no result). Auto-commits."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ conn.commit()
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_scalar(sql, params=None):
+ """Execute SQL and return a single scalar value, or None."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ row = cur.fetchone()
+ conn.commit()
+ return row[0] if row else None
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_all(sql, params=None):
+ """Execute SQL and return all rows as a list of tuples."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ rows = cur.fetchall()
+ conn.commit()
+ return rows
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_transaction(fn):
+ """Run fn(conn) inside a transaction. Commits on success, rolls back on error.
+
+ fn receives a connection with autocommit off. fn should use conn.cursor()
+ to execute statements. Do NOT commit inside fn — this wrapper handles it.
+ """
+ conn = get_conn()
+ try:
+ result = fn(conn)
+ conn.commit()
+ return result
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def copy_rows(table, columns, rows):
+ """Bulk insert rows via COPY FROM. Returns count of inserted rows."""
+ if not rows:
+ return 0
+
+ buf = io.StringIO()
+ writer = csv.DictWriter(buf, fieldnames=columns, extrasaction="ignore")
+ for row in rows:
+ writer.writerow(row)
+ buf.seek(0)
+
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.copy_expert(
+ f"COPY {table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv)",
+ buf,
+ )
+ count = cur.rowcount
+ conn.commit()
+ return count
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+# ============================================================
+# Legacy compatibility (for old parsers using parse_common.py)
+# These shell-based functions are NOT used by v2 code.
+# ============================================================
+
+import subprocess
+
+_DB_CONTAINER = "tidyindex-postgres"
+
+
+def psql(sql):
+ """Execute SQL via docker exec psql. Legacy — use execute() instead."""
+ result = subprocess.run(
+ ["docker", "exec", "-i", _DB_CONTAINER, "psql", "-U", DB_USER, "-d", DB_NAME],
+ input=sql,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ print(f"PSQL ERROR: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ return result.stdout
+
+
+def psql_scalar(sql):
+ """Legacy — use execute_scalar() instead."""
+ result = subprocess.run(
+ [
+ "docker", "exec", "-i", _DB_CONTAINER,
+ "psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A",
+ ],
+ input=sql,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ print(f"PSQL ERROR: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ for line in result.stdout.strip().split("\n"):
+ line = line.strip()
+ if line and not line.startswith("INSERT") and not line.startswith("UPDATE") and not line.startswith("DELETE"):
+ return line
+ return None
+
+
+def psql_query_values(sql):
+ """Legacy — use execute_all() instead."""
+ result = psql(sql)
+ lines = result.strip().split("\n")
+ if len(lines) >= 3:
+ return [line.strip() for line in lines[2:-1]]
+ return []
+
+
+def insert_rows(table, columns, rows):
+ """Legacy — use copy_rows() instead."""
+ return copy_rows(table, columns, rows)
diff --git a/scripts/common/filing.py b/scripts/common/filing.py
new file mode 100644
index 0000000..44bcabc
--- /dev/null
+++ b/scripts/common/filing.py
@@ -0,0 +1,93 @@
+"""
+Shared filing helpers for the new raw ingestion schema.
+
+Provides upsert/record operations for raw.filing and raw.filing_source.
+Form-specific child row operations live in the parsers themselves.
+"""
+
+from scripts.common.db import execute_scalar
+
+
+def _execute_scalar(conn, sql, params=None):
+ """Execute SQL and return a single scalar value using an optional transaction conn."""
+ if conn is None:
+ return execute_scalar(sql, params)
+
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ row = cur.fetchone()
+ return row[0] if row else None
+
+
+def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None):
+ """Insert or update a raw.filing row. Returns the raw_filing_id.
+
+ On conflict (same source_system + source_document_id), updates ingest_run_id
+ to the current run (meaning "most recently touched by this run").
+ """
+ return _execute_scalar(
+ conn,
+ """
+ INSERT INTO raw.filing (
+ source_system, source_document_id, ein, filer_name, form_type,
+ tax_year, tax_period_begin, tax_period_end,
+ return_version, return_timestamp, ingest_run_id
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ ON CONFLICT (source_system, source_document_id)
+ DO UPDATE SET
+ ein = EXCLUDED.ein,
+ filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name),
+ form_type = EXCLUDED.form_type,
+ tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year),
+ tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin),
+ tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end),
+ return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version),
+ return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp),
+ source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url),
+ ingest_run_id = EXCLUDED.ingest_run_id
+ RETURNING id
+ """,
+ (
+ source_system, source_document_id, metadata["ein"],
+ metadata.get("filer_name"), metadata["form_type"],
+ metadata.get("tax_year"), metadata.get("tax_period_begin"),
+ metadata.get("tax_period_end"), metadata.get("return_version"),
+ metadata.get("return_timestamp"), ingest_run_id,
+ ),
+ )
+
+
+def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None):
+ """Record that a filing was seen at a specific archive+path.
+
+ If (source_archive, source_path) already exists, verifies it points to
+ the same raw_filing_id. Raises ValueError if there's a conflict.
+ """
+ new_id = _execute_scalar(
+ conn,
+ """
+ INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path)
+ VALUES (%s, %s, %s, %s)
+ ON CONFLICT (source_archive, source_path) DO NOTHING
+ RETURNING id
+ """,
+ (raw_filing_id, ingest_run_id, source_archive, source_path),
+ )
+
+ if new_id is not None:
+ return # inserted successfully
+
+ # Row already existed — verify ownership
+ existing_filing_id = _execute_scalar(
+ conn,
+ "SELECT raw_filing_id FROM raw.filing_source "
+ "WHERE source_archive = %s AND source_path = %s",
+ (source_archive, source_path),
+ )
+
+ if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id:
+ raise ValueError(
+ f"Filing source conflict: {source_archive}:{source_path} "
+ f"already mapped to raw_filing_id={existing_filing_id}, "
+ f"but trying to map to {raw_filing_id}"
+ )
diff --git a/scripts/common/ingest.py b/scripts/common/ingest.py
new file mode 100644
index 0000000..9adb6da
--- /dev/null
+++ b/scripts/common/ingest.py
@@ -0,0 +1,57 @@
+"""
+Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error).
+"""
+
+from scripts.common.db import execute, execute_scalar
+
+
+def start_ingest_run(parser, source_system, notes=None):
+ """Create an ingest_run row and return its id."""
+ return execute_scalar(
+ "INSERT INTO raw.ingest_run (parser, source_system, notes) "
+ "VALUES (%s, %s, %s) RETURNING id",
+ (parser, source_system, notes or ""),
+ )
+
+
+def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted):
+ """Mark an ingest_run as done."""
+ execute(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', "
+ "files_scanned = %s, files_matched = %s, rows_inserted = %s "
+ "WHERE id = %s",
+ (files_scanned, files_matched, rows_inserted, run_id),
+ )
+
+
+def fail_ingest_run(run_id):
+ """Mark an ingest_run as error."""
+ execute(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s",
+ (run_id,),
+ )
+
+
+def expire_stale_runs(minutes=60):
+ """Mark any ingest_run still 'running' after N minutes as 'error'.
+
+ Returns True if at least one run was expired, False otherwise.
+ """
+ return execute_scalar(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' "
+ "WHERE status = 'running' AND started_at < now() - interval '%s minutes' "
+ "RETURNING id",
+ (minutes,),
+ ) is not None
+
+
+def log_ingest_error(run_id, source_archive, source_path, error,
+ source_document_id=None, stage=None):
+ """Log an error to ingest_error."""
+ execute(
+ "INSERT INTO raw.ingest_error "
+ "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) "
+ "VALUES (%s, %s, %s, %s, %s, %s)",
+ (run_id, source_archive or "", source_path or "",
+ source_document_id, stage, str(error)),
+ )
diff --git a/scripts/common/normalize.py b/scripts/common/normalize.py
new file mode 100644
index 0000000..b06afed
--- /dev/null
+++ b/scripts/common/normalize.py
@@ -0,0 +1,120 @@
+"""
+Single authoritative implementations of data normalization.
+
+Every parser must use these — never hand-roll EIN cleanup,
+amount parsing, form type mapping, or placeholder detection.
+"""
+
+import re
+
+
+def normalize_ein(raw):
+ """Normalize an EIN to 9-digit zero-padded string.
+
+ Returns None if the input can't be normalized to exactly 9 digits.
+
+ >>> normalize_ein('04-3567890')
+ '043567890'
+ >>> normalize_ein('43567890')
+ '043567890'
+ >>> normalize_ein(' 04-3567890 ')
+ '043567890'
+ >>> normalize_ein(None)
+ >>> normalize_ein('')
+ >>> normalize_ein('not-an-ein')
+ """
+ if not raw:
+ return None
+ cleaned = re.sub(r'[^0-9]', '', str(raw).strip())
+ if not cleaned:
+ return None
+ padded = cleaned.zfill(9)
+ if len(padded) != 9:
+ return None
+ return padded
+
+
+def parse_numeric(raw):
+ """Parse a raw string to a clean numeric string for DB insertion.
+
+ Returns None for non-numeric values like 'SEE ATTACHED'.
+
+ >>> parse_numeric('1,234,567')
+ '1234567'
+ >>> parse_numeric('$1,234.56')
+ '1234.56'
+ >>> parse_numeric('(500)')
+ '-500'
+ >>> parse_numeric('SEE ATTACHED')
+ >>> parse_numeric(None)
+ """
+ if not raw:
+ return None
+ s = str(raw).strip()
+ if not s:
+ return None
+
+ s = s.replace('$', '').replace(',', '').strip()
+
+ # Parenthesized negatives: (500) -> -500
+ if s.startswith('(') and s.endswith(')'):
+ s = '-' + s[1:-1]
+
+ try:
+ float(s)
+ return s
+ except ValueError:
+ return None
+
+
+# IRS ReturnTypeCd -> our form_type reference table code
+_FORM_TYPE_MAP = {
+ '990': '990',
+ '990PF': '990PF',
+ '990EZ': '990EZ',
+ '990O': '990O',
+ '990T': '990T',
+ '990A': '990A',
+ '990PA': '990PA',
+ '990EA': '990EA',
+}
+
+
+def map_form_type(return_type_cd):
+ """Map an IRS ReturnTypeCd to our form_type code.
+
+ Raises ValueError if unknown — caller should log and skip the filing.
+
+ >>> map_form_type('990PF')
+ '990PF'
+ """
+ if not return_type_cd:
+ raise ValueError("Empty return type")
+ code = return_type_cd.strip()
+ if code in _FORM_TYPE_MAP:
+ return _FORM_TYPE_MAP[code]
+ raise ValueError(f"Unknown return type: {code!r}")
+
+
+_PLACEHOLDER_RE = re.compile(
+ r'^(SEE\s+(ATTACHED|STATEMENT|SCHEDULE|PART\s|CONTINUATION)|'
+ r'VARIOUS|MULTIPLE|N/?A|NONE|--+|\.+)$',
+ re.IGNORECASE,
+)
+
+
+def is_placeholder(value):
+ """Check if a text value is a placeholder rather than real data.
+
+ >>> is_placeholder('SEE ATTACHED')
+ True
+ >>> is_placeholder('VARIOUS')
+ True
+ >>> is_placeholder('BOYS AND GIRLS CLUB')
+ False
+ >>> is_placeholder(None)
+ False
+ """
+ if not value:
+ return False
+ return bool(_PLACEHOLDER_RE.match(value.strip()))
diff --git a/scripts/common/xml.py b/scripts/common/xml.py
new file mode 100644
index 0000000..b7e80cd
--- /dev/null
+++ b/scripts/common/xml.py
@@ -0,0 +1,129 @@
+"""
+XML parsing helpers.
+
+Single authoritative implementations of XML field extraction,
+source document ID derivation, and leaf path building.
+"""
+
+import re
+
+NS = "http://www.irs.gov/efile"
+NS_MAP = {"irs": NS}
+
+_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$')
+
+
+def strip_ns(tag):
+ """Strip XML namespace: '{http://...}Foo' -> 'Foo'.
+
+ Non-element lxml nodes such as comments expose a non-string ``tag``.
+ Those are skipped by the leaf-path walker rather than treated as fields.
+ """
+ if not isinstance(tag, str):
+ return None
+ if tag.startswith("{"):
+ return tag.split("}", 1)[1]
+ return tag
+
+
+def text(el, xpath):
+ """Extract text from an XML element by xpath. Returns None if not found."""
+ if el is None:
+ return None
+ found = el.find(xpath, NS_MAP)
+ return found.text.strip() if found is not None and found.text else None
+
+
+# Values an IRS indicator element may carry to mean "yes / true".
+_TRUTHY = frozenset({"X", "x", "1", "true"})
+
+
+def text_bool(el, xpath):
+ """Extract an IRS indicator element as True/False, or None if the tag is absent.
+
+ Unlike `text(...) in _TRUTHY`, this preserves the difference between
+ "filer explicitly left this unchecked" and "tag not present in the XML
+ at all." Critical for the raw layer, where we want to keep NULL for
+ missing-in-source and reserve False for an explicit non-truthy value.
+ """
+ val = text(el, xpath)
+ if val is None:
+ return None
+ return val in _TRUTHY
+
+
+def leaf_paths(el, prefix=""):
+ """Yield (path, value) for every leaf element under el.
+
+ Path uses '/' separator with namespace stripped:
+ 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt'
+ """
+ tag = strip_ns(el.tag)
+ if tag is None:
+ return
+ full = f"{prefix}{tag}" if prefix else tag
+ children = [child for child in el if isinstance(child.tag, str)]
+ if not children:
+ yield full, (el.text or "").strip()
+ else:
+ for child in children:
+ yield from leaf_paths(child, full + "/")
+
+
+def derive_source_document_id(source_system, filename):
+ """Derive source_document_id from a filename.
+
+ For irs_xml / irs_pdf: extracts the object_id (numeric stem).
+ Directory prefixes are stripped.
+
+ >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml')
+ '202213089349101246'
+ >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml')
+ '202213089349101246'
+ """
+ if source_system in ('irs_xml', 'irs_pdf'):
+ m = _OBJECT_ID_RE.search(filename)
+ if not m:
+ raise ValueError(f"Cannot extract object_id from: {filename!r}")
+ return m.group(1)
+ raise NotImplementedError(
+ f"derive_source_document_id not implemented for {source_system!r}"
+ )
+
+
+def extract_filing_metadata(tree):
+ """Extract filing-level metadata from an XML tree for raw_filing.
+
+ Returns a dict with: ein, filer_name, form_type, tax_year,
+ tax_period_begin, tax_period_end, return_version, return_timestamp.
+
+ EIN is normalized, form_type is mapped. Raises ValueError if the
+ filing cannot be identified.
+ """
+ from scripts.common.normalize import normalize_ein, map_form_type
+
+ root = tree.getroot()
+
+ raw_ein = text(tree, ".//irs:Filer/irs:EIN")
+ ein = normalize_ein(raw_ein)
+ if not ein:
+ raise ValueError(f"Cannot normalize EIN: {raw_ein!r}")
+
+ return_type_cd = text(tree, ".//irs:ReturnTypeCd")
+ form_type = map_form_type(return_type_cd)
+
+ filer_name = (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1")
+ )
+
+ return {
+ "ein": ein,
+ "filer_name": filer_name,
+ "form_type": form_type,
+ "tax_year": text(tree, ".//irs:TaxYr"),
+ "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"),
+ "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"),
+ "return_version": root.get("returnVersion"),
+ "return_timestamp": text(tree, ".//irs:ReturnTs"),
+ }