diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/common | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to '')
| -rw-r--r-- | scripts/common/__init__.py | 28 | ||||
| -rw-r--r-- | scripts/common/db.py | 198 | ||||
| -rw-r--r-- | scripts/common/filing.py | 93 | ||||
| -rw-r--r-- | scripts/common/ingest.py | 57 | ||||
| -rw-r--r-- | scripts/common/normalize.py | 120 | ||||
| -rw-r--r-- | scripts/common/xml.py | 129 |
6 files changed, 625 insertions, 0 deletions
diff --git a/scripts/common/__init__.py b/scripts/common/__init__.py new file mode 100644 index 0000000..f06ee26 --- /dev/null +++ b/scripts/common/__init__.py @@ -0,0 +1,28 @@ +""" +Shared infrastructure for the 990 data pipeline (v2). + +This package provides the single authoritative implementation of +normalization, XML helpers, DB access, and ingest tracking for +all parsers under scripts/parse/, scripts/fetch/, and scripts/extract/. + +Old parsers in scripts/ still use scripts/parse_common.py directly. +""" + +import zipfile_deflate64 # noqa: F401 + +from scripts.common.db import ( + execute, execute_scalar, execute_all, execute_transaction, copy_rows, + # Legacy (shell-based, for old parsers) + psql, psql_scalar, psql_query_values, insert_rows, +) +from scripts.common.normalize import normalize_ein, parse_numeric, map_form_type, is_placeholder +from scripts.common.xml import ( + text, strip_ns, leaf_paths, extract_filing_metadata, + derive_source_document_id, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) diff --git a/scripts/common/db.py b/scripts/common/db.py new file mode 100644 index 0000000..bbd220c --- /dev/null +++ b/scripts/common/db.py @@ -0,0 +1,198 @@ +""" +Database access layer using psycopg2 with connection pooling. + +All DB access in the v2 pipeline goes through this module. +""" + +import csv +import io +import sys + +import psycopg2 +import psycopg2.pool +import psycopg2.extras + +DB_HOST = "localhost" +DB_PORT = 35434 +DB_USER = "tidyindex" +DB_PASSWORD = "tidyindex" +DB_NAME = "tidyindex" + +_pool = None + + +def get_pool(): + """Get or create the connection pool (lazy singleton).""" + global _pool + if _pool is None: + _pool = psycopg2.pool.SimpleConnectionPool( + minconn=1, + maxconn=4, + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASSWORD, + dbname=DB_NAME, + ) + return _pool + + +def get_conn(): + """Get a connection from the pool.""" + return get_pool().getconn() + + +def put_conn(conn): + """Return a connection to the pool.""" + get_pool().putconn(conn) + + +def execute(sql, params=None): + """Execute SQL (no result). Auto-commits.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_scalar(sql, params=None): + """Execute SQL and return a single scalar value, or None.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + conn.commit() + return row[0] if row else None + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_all(sql, params=None): + """Execute SQL and return all rows as a list of tuples.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + rows = cur.fetchall() + conn.commit() + return rows + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_transaction(fn): + """Run fn(conn) inside a transaction. Commits on success, rolls back on error. + + fn receives a connection with autocommit off. fn should use conn.cursor() + to execute statements. Do NOT commit inside fn — this wrapper handles it. + """ + conn = get_conn() + try: + result = fn(conn) + conn.commit() + return result + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def copy_rows(table, columns, rows): + """Bulk insert rows via COPY FROM. Returns count of inserted rows.""" + if not rows: + return 0 + + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=columns, extrasaction="ignore") + for row in rows: + writer.writerow(row) + buf.seek(0) + + conn = get_conn() + try: + with conn.cursor() as cur: + cur.copy_expert( + f"COPY {table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv)", + buf, + ) + count = cur.rowcount + conn.commit() + return count + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +# ============================================================ +# Legacy compatibility (for old parsers using parse_common.py) +# These shell-based functions are NOT used by v2 code. +# ============================================================ + +import subprocess + +_DB_CONTAINER = "tidyindex-postgres" + + +def psql(sql): + """Execute SQL via docker exec psql. Legacy — use execute() instead.""" + result = subprocess.run( + ["docker", "exec", "-i", _DB_CONTAINER, "psql", "-U", DB_USER, "-d", DB_NAME], + input=sql, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f"PSQL ERROR: {result.stderr}", file=sys.stderr) + sys.exit(1) + return result.stdout + + +def psql_scalar(sql): + """Legacy — use execute_scalar() instead.""" + result = subprocess.run( + [ + "docker", "exec", "-i", _DB_CONTAINER, + "psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", + ], + input=sql, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f"PSQL ERROR: {result.stderr}", file=sys.stderr) + sys.exit(1) + for line in result.stdout.strip().split("\n"): + line = line.strip() + if line and not line.startswith("INSERT") and not line.startswith("UPDATE") and not line.startswith("DELETE"): + return line + return None + + +def psql_query_values(sql): + """Legacy — use execute_all() instead.""" + result = psql(sql) + lines = result.strip().split("\n") + if len(lines) >= 3: + return [line.strip() for line in lines[2:-1]] + return [] + + +def insert_rows(table, columns, rows): + """Legacy — use copy_rows() instead.""" + return copy_rows(table, columns, rows) diff --git a/scripts/common/filing.py b/scripts/common/filing.py new file mode 100644 index 0000000..44bcabc --- /dev/null +++ b/scripts/common/filing.py @@ -0,0 +1,93 @@ +""" +Shared filing helpers for the new raw ingestion schema. + +Provides upsert/record operations for raw.filing and raw.filing_source. +Form-specific child row operations live in the parsers themselves. +""" + +from scripts.common.db import execute_scalar + + +def _execute_scalar(conn, sql, params=None): + """Execute SQL and return a single scalar value using an optional transaction conn.""" + if conn is None: + return execute_scalar(sql, params) + + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + return row[0] if row else None + + +def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None): + """Insert or update a raw.filing row. Returns the raw_filing_id. + + On conflict (same source_system + source_document_id), updates ingest_run_id + to the current run (meaning "most recently touched by this run"). + """ + return _execute_scalar( + conn, + """ + INSERT INTO raw.filing ( + source_system, source_document_id, ein, filer_name, form_type, + tax_year, tax_period_begin, tax_period_end, + return_version, return_timestamp, ingest_run_id + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (source_system, source_document_id) + DO UPDATE SET + ein = EXCLUDED.ein, + filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name), + form_type = EXCLUDED.form_type, + tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year), + tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin), + tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end), + return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version), + return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp), + source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url), + ingest_run_id = EXCLUDED.ingest_run_id + RETURNING id + """, + ( + source_system, source_document_id, metadata["ein"], + metadata.get("filer_name"), metadata["form_type"], + metadata.get("tax_year"), metadata.get("tax_period_begin"), + metadata.get("tax_period_end"), metadata.get("return_version"), + metadata.get("return_timestamp"), ingest_run_id, + ), + ) + + +def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None): + """Record that a filing was seen at a specific archive+path. + + If (source_archive, source_path) already exists, verifies it points to + the same raw_filing_id. Raises ValueError if there's a conflict. + """ + new_id = _execute_scalar( + conn, + """ + INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path) + VALUES (%s, %s, %s, %s) + ON CONFLICT (source_archive, source_path) DO NOTHING + RETURNING id + """, + (raw_filing_id, ingest_run_id, source_archive, source_path), + ) + + if new_id is not None: + return # inserted successfully + + # Row already existed — verify ownership + existing_filing_id = _execute_scalar( + conn, + "SELECT raw_filing_id FROM raw.filing_source " + "WHERE source_archive = %s AND source_path = %s", + (source_archive, source_path), + ) + + if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id: + raise ValueError( + f"Filing source conflict: {source_archive}:{source_path} " + f"already mapped to raw_filing_id={existing_filing_id}, " + f"but trying to map to {raw_filing_id}" + ) diff --git a/scripts/common/ingest.py b/scripts/common/ingest.py new file mode 100644 index 0000000..9adb6da --- /dev/null +++ b/scripts/common/ingest.py @@ -0,0 +1,57 @@ +""" +Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error). +""" + +from scripts.common.db import execute, execute_scalar + + +def start_ingest_run(parser, source_system, notes=None): + """Create an ingest_run row and return its id.""" + return execute_scalar( + "INSERT INTO raw.ingest_run (parser, source_system, notes) " + "VALUES (%s, %s, %s) RETURNING id", + (parser, source_system, notes or ""), + ) + + +def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted): + """Mark an ingest_run as done.""" + execute( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', " + "files_scanned = %s, files_matched = %s, rows_inserted = %s " + "WHERE id = %s", + (files_scanned, files_matched, rows_inserted, run_id), + ) + + +def fail_ingest_run(run_id): + """Mark an ingest_run as error.""" + execute( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s", + (run_id,), + ) + + +def expire_stale_runs(minutes=60): + """Mark any ingest_run still 'running' after N minutes as 'error'. + + Returns True if at least one run was expired, False otherwise. + """ + return execute_scalar( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' " + "WHERE status = 'running' AND started_at < now() - interval '%s minutes' " + "RETURNING id", + (minutes,), + ) is not None + + +def log_ingest_error(run_id, source_archive, source_path, error, + source_document_id=None, stage=None): + """Log an error to ingest_error.""" + execute( + "INSERT INTO raw.ingest_error " + "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) " + "VALUES (%s, %s, %s, %s, %s, %s)", + (run_id, source_archive or "", source_path or "", + source_document_id, stage, str(error)), + ) diff --git a/scripts/common/normalize.py b/scripts/common/normalize.py new file mode 100644 index 0000000..b06afed --- /dev/null +++ b/scripts/common/normalize.py @@ -0,0 +1,120 @@ +""" +Single authoritative implementations of data normalization. + +Every parser must use these — never hand-roll EIN cleanup, +amount parsing, form type mapping, or placeholder detection. +""" + +import re + + +def normalize_ein(raw): + """Normalize an EIN to 9-digit zero-padded string. + + Returns None if the input can't be normalized to exactly 9 digits. + + >>> normalize_ein('04-3567890') + '043567890' + >>> normalize_ein('43567890') + '043567890' + >>> normalize_ein(' 04-3567890 ') + '043567890' + >>> normalize_ein(None) + >>> normalize_ein('') + >>> normalize_ein('not-an-ein') + """ + if not raw: + return None + cleaned = re.sub(r'[^0-9]', '', str(raw).strip()) + if not cleaned: + return None + padded = cleaned.zfill(9) + if len(padded) != 9: + return None + return padded + + +def parse_numeric(raw): + """Parse a raw string to a clean numeric string for DB insertion. + + Returns None for non-numeric values like 'SEE ATTACHED'. + + >>> parse_numeric('1,234,567') + '1234567' + >>> parse_numeric('$1,234.56') + '1234.56' + >>> parse_numeric('(500)') + '-500' + >>> parse_numeric('SEE ATTACHED') + >>> parse_numeric(None) + """ + if not raw: + return None + s = str(raw).strip() + if not s: + return None + + s = s.replace('$', '').replace(',', '').strip() + + # Parenthesized negatives: (500) -> -500 + if s.startswith('(') and s.endswith(')'): + s = '-' + s[1:-1] + + try: + float(s) + return s + except ValueError: + return None + + +# IRS ReturnTypeCd -> our form_type reference table code +_FORM_TYPE_MAP = { + '990': '990', + '990PF': '990PF', + '990EZ': '990EZ', + '990O': '990O', + '990T': '990T', + '990A': '990A', + '990PA': '990PA', + '990EA': '990EA', +} + + +def map_form_type(return_type_cd): + """Map an IRS ReturnTypeCd to our form_type code. + + Raises ValueError if unknown — caller should log and skip the filing. + + >>> map_form_type('990PF') + '990PF' + """ + if not return_type_cd: + raise ValueError("Empty return type") + code = return_type_cd.strip() + if code in _FORM_TYPE_MAP: + return _FORM_TYPE_MAP[code] + raise ValueError(f"Unknown return type: {code!r}") + + +_PLACEHOLDER_RE = re.compile( + r'^(SEE\s+(ATTACHED|STATEMENT|SCHEDULE|PART\s|CONTINUATION)|' + r'VARIOUS|MULTIPLE|N/?A|NONE|--+|\.+)$', + re.IGNORECASE, +) + + +def is_placeholder(value): + """Check if a text value is a placeholder rather than real data. + + >>> is_placeholder('SEE ATTACHED') + True + >>> is_placeholder('VARIOUS') + True + >>> is_placeholder('BOYS AND GIRLS CLUB') + False + >>> is_placeholder(None) + False + """ + if not value: + return False + return bool(_PLACEHOLDER_RE.match(value.strip())) diff --git a/scripts/common/xml.py b/scripts/common/xml.py new file mode 100644 index 0000000..b7e80cd --- /dev/null +++ b/scripts/common/xml.py @@ -0,0 +1,129 @@ +""" +XML parsing helpers. + +Single authoritative implementations of XML field extraction, +source document ID derivation, and leaf path building. +""" + +import re + +NS = "http://www.irs.gov/efile" +NS_MAP = {"irs": NS} + +_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$') + + +def strip_ns(tag): + """Strip XML namespace: '{http://...}Foo' -> 'Foo'. + + Non-element lxml nodes such as comments expose a non-string ``tag``. + Those are skipped by the leaf-path walker rather than treated as fields. + """ + if not isinstance(tag, str): + return None + if tag.startswith("{"): + return tag.split("}", 1)[1] + return tag + + +def text(el, xpath): + """Extract text from an XML element by xpath. Returns None if not found.""" + if el is None: + return None + found = el.find(xpath, NS_MAP) + return found.text.strip() if found is not None and found.text else None + + +# Values an IRS indicator element may carry to mean "yes / true". +_TRUTHY = frozenset({"X", "x", "1", "true"}) + + +def text_bool(el, xpath): + """Extract an IRS indicator element as True/False, or None if the tag is absent. + + Unlike `text(...) in _TRUTHY`, this preserves the difference between + "filer explicitly left this unchecked" and "tag not present in the XML + at all." Critical for the raw layer, where we want to keep NULL for + missing-in-source and reserve False for an explicit non-truthy value. + """ + val = text(el, xpath) + if val is None: + return None + return val in _TRUTHY + + +def leaf_paths(el, prefix=""): + """Yield (path, value) for every leaf element under el. + + Path uses '/' separator with namespace stripped: + 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt' + """ + tag = strip_ns(el.tag) + if tag is None: + return + full = f"{prefix}{tag}" if prefix else tag + children = [child for child in el if isinstance(child.tag, str)] + if not children: + yield full, (el.text or "").strip() + else: + for child in children: + yield from leaf_paths(child, full + "/") + + +def derive_source_document_id(source_system, filename): + """Derive source_document_id from a filename. + + For irs_xml / irs_pdf: extracts the object_id (numeric stem). + Directory prefixes are stripped. + + >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml') + '202213089349101246' + >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml') + '202213089349101246' + """ + if source_system in ('irs_xml', 'irs_pdf'): + m = _OBJECT_ID_RE.search(filename) + if not m: + raise ValueError(f"Cannot extract object_id from: {filename!r}") + return m.group(1) + raise NotImplementedError( + f"derive_source_document_id not implemented for {source_system!r}" + ) + + +def extract_filing_metadata(tree): + """Extract filing-level metadata from an XML tree for raw_filing. + + Returns a dict with: ein, filer_name, form_type, tax_year, + tax_period_begin, tax_period_end, return_version, return_timestamp. + + EIN is normalized, form_type is mapped. Raises ValueError if the + filing cannot be identified. + """ + from scripts.common.normalize import normalize_ein, map_form_type + + root = tree.getroot() + + raw_ein = text(tree, ".//irs:Filer/irs:EIN") + ein = normalize_ein(raw_ein) + if not ein: + raise ValueError(f"Cannot normalize EIN: {raw_ein!r}") + + return_type_cd = text(tree, ".//irs:ReturnTypeCd") + form_type = map_form_type(return_type_cd) + + filer_name = ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1") + ) + + return { + "ein": ein, + "filer_name": filer_name, + "form_type": form_type, + "tax_year": text(tree, ".//irs:TaxYr"), + "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"), + "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"), + "return_version": root.get("returnVersion"), + "return_timestamp": text(tree, ".//irs:ReturnTs"), + } |
