aboutsummaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--scripts/common/__init__.py28
-rw-r--r--scripts/common/db.py198
-rw-r--r--scripts/common/filing.py93
-rw-r--r--scripts/common/ingest.py57
-rw-r--r--scripts/common/normalize.py120
-rw-r--r--scripts/common/xml.py129
-rw-r--r--scripts/extract/__init__.py0
-rw-r--r--scripts/extract/irs_990_pdf.py699
-rw-r--r--scripts/parse/__init__.py0
-rw-r--r--scripts/parse/irs_990.py691
-rw-r--r--scripts/parse/irs_990ez.py449
-rw-r--r--scripts/parse/irs_990pf.py544
-rw-r--r--scripts/parse/irs_bmf.py231
-rw-r--r--scripts/seed.py81
14 files changed, 3320 insertions, 0 deletions
diff --git a/scripts/common/__init__.py b/scripts/common/__init__.py
new file mode 100644
index 0000000..f06ee26
--- /dev/null
+++ b/scripts/common/__init__.py
@@ -0,0 +1,28 @@
+"""
+Shared infrastructure for the 990 data pipeline (v2).
+
+This package provides the single authoritative implementation of
+normalization, XML helpers, DB access, and ingest tracking for
+all parsers under scripts/parse/, scripts/fetch/, and scripts/extract/.
+
+Old parsers in scripts/ still use scripts/parse_common.py directly.
+"""
+
+import zipfile_deflate64 # noqa: F401
+
+from scripts.common.db import (
+ execute, execute_scalar, execute_all, execute_transaction, copy_rows,
+ # Legacy (shell-based, for old parsers)
+ psql, psql_scalar, psql_query_values, insert_rows,
+)
+from scripts.common.normalize import normalize_ein, parse_numeric, map_form_type, is_placeholder
+from scripts.common.xml import (
+ text, strip_ns, leaf_paths, extract_filing_metadata,
+ derive_source_document_id,
+)
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.filing import (
+ upsert_raw_filing, record_raw_filing_source,
+)
diff --git a/scripts/common/db.py b/scripts/common/db.py
new file mode 100644
index 0000000..bbd220c
--- /dev/null
+++ b/scripts/common/db.py
@@ -0,0 +1,198 @@
+"""
+Database access layer using psycopg2 with connection pooling.
+
+All DB access in the v2 pipeline goes through this module.
+"""
+
+import csv
+import io
+import sys
+
+import psycopg2
+import psycopg2.pool
+import psycopg2.extras
+
+DB_HOST = "localhost"
+DB_PORT = 35434
+DB_USER = "tidyindex"
+DB_PASSWORD = "tidyindex"
+DB_NAME = "tidyindex"
+
+_pool = None
+
+
+def get_pool():
+ """Get or create the connection pool (lazy singleton)."""
+ global _pool
+ if _pool is None:
+ _pool = psycopg2.pool.SimpleConnectionPool(
+ minconn=1,
+ maxconn=4,
+ host=DB_HOST,
+ port=DB_PORT,
+ user=DB_USER,
+ password=DB_PASSWORD,
+ dbname=DB_NAME,
+ )
+ return _pool
+
+
+def get_conn():
+ """Get a connection from the pool."""
+ return get_pool().getconn()
+
+
+def put_conn(conn):
+ """Return a connection to the pool."""
+ get_pool().putconn(conn)
+
+
+def execute(sql, params=None):
+ """Execute SQL (no result). Auto-commits."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ conn.commit()
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_scalar(sql, params=None):
+ """Execute SQL and return a single scalar value, or None."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ row = cur.fetchone()
+ conn.commit()
+ return row[0] if row else None
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_all(sql, params=None):
+ """Execute SQL and return all rows as a list of tuples."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ rows = cur.fetchall()
+ conn.commit()
+ return rows
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_transaction(fn):
+ """Run fn(conn) inside a transaction. Commits on success, rolls back on error.
+
+ fn receives a connection with autocommit off. fn should use conn.cursor()
+ to execute statements. Do NOT commit inside fn — this wrapper handles it.
+ """
+ conn = get_conn()
+ try:
+ result = fn(conn)
+ conn.commit()
+ return result
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def copy_rows(table, columns, rows):
+ """Bulk insert rows via COPY FROM. Returns count of inserted rows."""
+ if not rows:
+ return 0
+
+ buf = io.StringIO()
+ writer = csv.DictWriter(buf, fieldnames=columns, extrasaction="ignore")
+ for row in rows:
+ writer.writerow(row)
+ buf.seek(0)
+
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.copy_expert(
+ f"COPY {table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv)",
+ buf,
+ )
+ count = cur.rowcount
+ conn.commit()
+ return count
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+# ============================================================
+# Legacy compatibility (for old parsers using parse_common.py)
+# These shell-based functions are NOT used by v2 code.
+# ============================================================
+
+import subprocess
+
+_DB_CONTAINER = "tidyindex-postgres"
+
+
+def psql(sql):
+ """Execute SQL via docker exec psql. Legacy — use execute() instead."""
+ result = subprocess.run(
+ ["docker", "exec", "-i", _DB_CONTAINER, "psql", "-U", DB_USER, "-d", DB_NAME],
+ input=sql,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ print(f"PSQL ERROR: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ return result.stdout
+
+
+def psql_scalar(sql):
+ """Legacy — use execute_scalar() instead."""
+ result = subprocess.run(
+ [
+ "docker", "exec", "-i", _DB_CONTAINER,
+ "psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A",
+ ],
+ input=sql,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ print(f"PSQL ERROR: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ for line in result.stdout.strip().split("\n"):
+ line = line.strip()
+ if line and not line.startswith("INSERT") and not line.startswith("UPDATE") and not line.startswith("DELETE"):
+ return line
+ return None
+
+
+def psql_query_values(sql):
+ """Legacy — use execute_all() instead."""
+ result = psql(sql)
+ lines = result.strip().split("\n")
+ if len(lines) >= 3:
+ return [line.strip() for line in lines[2:-1]]
+ return []
+
+
+def insert_rows(table, columns, rows):
+ """Legacy — use copy_rows() instead."""
+ return copy_rows(table, columns, rows)
diff --git a/scripts/common/filing.py b/scripts/common/filing.py
new file mode 100644
index 0000000..44bcabc
--- /dev/null
+++ b/scripts/common/filing.py
@@ -0,0 +1,93 @@
+"""
+Shared filing helpers for the new raw ingestion schema.
+
+Provides upsert/record operations for raw.filing and raw.filing_source.
+Form-specific child row operations live in the parsers themselves.
+"""
+
+from scripts.common.db import execute_scalar
+
+
+def _execute_scalar(conn, sql, params=None):
+ """Execute SQL and return a single scalar value using an optional transaction conn."""
+ if conn is None:
+ return execute_scalar(sql, params)
+
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ row = cur.fetchone()
+ return row[0] if row else None
+
+
+def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None):
+ """Insert or update a raw.filing row. Returns the raw_filing_id.
+
+ On conflict (same source_system + source_document_id), updates ingest_run_id
+ to the current run (meaning "most recently touched by this run").
+ """
+ return _execute_scalar(
+ conn,
+ """
+ INSERT INTO raw.filing (
+ source_system, source_document_id, ein, filer_name, form_type,
+ tax_year, tax_period_begin, tax_period_end,
+ return_version, return_timestamp, ingest_run_id
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+ ON CONFLICT (source_system, source_document_id)
+ DO UPDATE SET
+ ein = EXCLUDED.ein,
+ filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name),
+ form_type = EXCLUDED.form_type,
+ tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year),
+ tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin),
+ tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end),
+ return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version),
+ return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp),
+ source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url),
+ ingest_run_id = EXCLUDED.ingest_run_id
+ RETURNING id
+ """,
+ (
+ source_system, source_document_id, metadata["ein"],
+ metadata.get("filer_name"), metadata["form_type"],
+ metadata.get("tax_year"), metadata.get("tax_period_begin"),
+ metadata.get("tax_period_end"), metadata.get("return_version"),
+ metadata.get("return_timestamp"), ingest_run_id,
+ ),
+ )
+
+
+def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None):
+ """Record that a filing was seen at a specific archive+path.
+
+ If (source_archive, source_path) already exists, verifies it points to
+ the same raw_filing_id. Raises ValueError if there's a conflict.
+ """
+ new_id = _execute_scalar(
+ conn,
+ """
+ INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path)
+ VALUES (%s, %s, %s, %s)
+ ON CONFLICT (source_archive, source_path) DO NOTHING
+ RETURNING id
+ """,
+ (raw_filing_id, ingest_run_id, source_archive, source_path),
+ )
+
+ if new_id is not None:
+ return # inserted successfully
+
+ # Row already existed — verify ownership
+ existing_filing_id = _execute_scalar(
+ conn,
+ "SELECT raw_filing_id FROM raw.filing_source "
+ "WHERE source_archive = %s AND source_path = %s",
+ (source_archive, source_path),
+ )
+
+ if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id:
+ raise ValueError(
+ f"Filing source conflict: {source_archive}:{source_path} "
+ f"already mapped to raw_filing_id={existing_filing_id}, "
+ f"but trying to map to {raw_filing_id}"
+ )
diff --git a/scripts/common/ingest.py b/scripts/common/ingest.py
new file mode 100644
index 0000000..9adb6da
--- /dev/null
+++ b/scripts/common/ingest.py
@@ -0,0 +1,57 @@
+"""
+Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error).
+"""
+
+from scripts.common.db import execute, execute_scalar
+
+
+def start_ingest_run(parser, source_system, notes=None):
+ """Create an ingest_run row and return its id."""
+ return execute_scalar(
+ "INSERT INTO raw.ingest_run (parser, source_system, notes) "
+ "VALUES (%s, %s, %s) RETURNING id",
+ (parser, source_system, notes or ""),
+ )
+
+
+def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted):
+ """Mark an ingest_run as done."""
+ execute(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', "
+ "files_scanned = %s, files_matched = %s, rows_inserted = %s "
+ "WHERE id = %s",
+ (files_scanned, files_matched, rows_inserted, run_id),
+ )
+
+
+def fail_ingest_run(run_id):
+ """Mark an ingest_run as error."""
+ execute(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s",
+ (run_id,),
+ )
+
+
+def expire_stale_runs(minutes=60):
+ """Mark any ingest_run still 'running' after N minutes as 'error'.
+
+ Returns True if at least one run was expired, False otherwise.
+ """
+ return execute_scalar(
+ "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' "
+ "WHERE status = 'running' AND started_at < now() - interval '%s minutes' "
+ "RETURNING id",
+ (minutes,),
+ ) is not None
+
+
+def log_ingest_error(run_id, source_archive, source_path, error,
+ source_document_id=None, stage=None):
+ """Log an error to ingest_error."""
+ execute(
+ "INSERT INTO raw.ingest_error "
+ "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) "
+ "VALUES (%s, %s, %s, %s, %s, %s)",
+ (run_id, source_archive or "", source_path or "",
+ source_document_id, stage, str(error)),
+ )
diff --git a/scripts/common/normalize.py b/scripts/common/normalize.py
new file mode 100644
index 0000000..b06afed
--- /dev/null
+++ b/scripts/common/normalize.py
@@ -0,0 +1,120 @@
+"""
+Single authoritative implementations of data normalization.
+
+Every parser must use these — never hand-roll EIN cleanup,
+amount parsing, form type mapping, or placeholder detection.
+"""
+
+import re
+
+
+def normalize_ein(raw):
+ """Normalize an EIN to 9-digit zero-padded string.
+
+ Returns None if the input can't be normalized to exactly 9 digits.
+
+ >>> normalize_ein('04-3567890')
+ '043567890'
+ >>> normalize_ein('43567890')
+ '043567890'
+ >>> normalize_ein(' 04-3567890 ')
+ '043567890'
+ >>> normalize_ein(None)
+ >>> normalize_ein('')
+ >>> normalize_ein('not-an-ein')
+ """
+ if not raw:
+ return None
+ cleaned = re.sub(r'[^0-9]', '', str(raw).strip())
+ if not cleaned:
+ return None
+ padded = cleaned.zfill(9)
+ if len(padded) != 9:
+ return None
+ return padded
+
+
+def parse_numeric(raw):
+ """Parse a raw string to a clean numeric string for DB insertion.
+
+ Returns None for non-numeric values like 'SEE ATTACHED'.
+
+ >>> parse_numeric('1,234,567')
+ '1234567'
+ >>> parse_numeric('$1,234.56')
+ '1234.56'
+ >>> parse_numeric('(500)')
+ '-500'
+ >>> parse_numeric('SEE ATTACHED')
+ >>> parse_numeric(None)
+ """
+ if not raw:
+ return None
+ s = str(raw).strip()
+ if not s:
+ return None
+
+ s = s.replace('$', '').replace(',', '').strip()
+
+ # Parenthesized negatives: (500) -> -500
+ if s.startswith('(') and s.endswith(')'):
+ s = '-' + s[1:-1]
+
+ try:
+ float(s)
+ return s
+ except ValueError:
+ return None
+
+
+# IRS ReturnTypeCd -> our form_type reference table code
+_FORM_TYPE_MAP = {
+ '990': '990',
+ '990PF': '990PF',
+ '990EZ': '990EZ',
+ '990O': '990O',
+ '990T': '990T',
+ '990A': '990A',
+ '990PA': '990PA',
+ '990EA': '990EA',
+}
+
+
+def map_form_type(return_type_cd):
+ """Map an IRS ReturnTypeCd to our form_type code.
+
+ Raises ValueError if unknown — caller should log and skip the filing.
+
+ >>> map_form_type('990PF')
+ '990PF'
+ """
+ if not return_type_cd:
+ raise ValueError("Empty return type")
+ code = return_type_cd.strip()
+ if code in _FORM_TYPE_MAP:
+ return _FORM_TYPE_MAP[code]
+ raise ValueError(f"Unknown return type: {code!r}")
+
+
+_PLACEHOLDER_RE = re.compile(
+ r'^(SEE\s+(ATTACHED|STATEMENT|SCHEDULE|PART\s|CONTINUATION)|'
+ r'VARIOUS|MULTIPLE|N/?A|NONE|--+|\.+)$',
+ re.IGNORECASE,
+)
+
+
+def is_placeholder(value):
+ """Check if a text value is a placeholder rather than real data.
+
+ >>> is_placeholder('SEE ATTACHED')
+ True
+ >>> is_placeholder('VARIOUS')
+ True
+ >>> is_placeholder('BOYS AND GIRLS CLUB')
+ False
+ >>> is_placeholder(None)
+ False
+ """
+ if not value:
+ return False
+ return bool(_PLACEHOLDER_RE.match(value.strip()))
diff --git a/scripts/common/xml.py b/scripts/common/xml.py
new file mode 100644
index 0000000..b7e80cd
--- /dev/null
+++ b/scripts/common/xml.py
@@ -0,0 +1,129 @@
+"""
+XML parsing helpers.
+
+Single authoritative implementations of XML field extraction,
+source document ID derivation, and leaf path building.
+"""
+
+import re
+
+NS = "http://www.irs.gov/efile"
+NS_MAP = {"irs": NS}
+
+_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$')
+
+
+def strip_ns(tag):
+ """Strip XML namespace: '{http://...}Foo' -> 'Foo'.
+
+ Non-element lxml nodes such as comments expose a non-string ``tag``.
+ Those are skipped by the leaf-path walker rather than treated as fields.
+ """
+ if not isinstance(tag, str):
+ return None
+ if tag.startswith("{"):
+ return tag.split("}", 1)[1]
+ return tag
+
+
+def text(el, xpath):
+ """Extract text from an XML element by xpath. Returns None if not found."""
+ if el is None:
+ return None
+ found = el.find(xpath, NS_MAP)
+ return found.text.strip() if found is not None and found.text else None
+
+
+# Values an IRS indicator element may carry to mean "yes / true".
+_TRUTHY = frozenset({"X", "x", "1", "true"})
+
+
+def text_bool(el, xpath):
+ """Extract an IRS indicator element as True/False, or None if the tag is absent.
+
+ Unlike `text(...) in _TRUTHY`, this preserves the difference between
+ "filer explicitly left this unchecked" and "tag not present in the XML
+ at all." Critical for the raw layer, where we want to keep NULL for
+ missing-in-source and reserve False for an explicit non-truthy value.
+ """
+ val = text(el, xpath)
+ if val is None:
+ return None
+ return val in _TRUTHY
+
+
+def leaf_paths(el, prefix=""):
+ """Yield (path, value) for every leaf element under el.
+
+ Path uses '/' separator with namespace stripped:
+ 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt'
+ """
+ tag = strip_ns(el.tag)
+ if tag is None:
+ return
+ full = f"{prefix}{tag}" if prefix else tag
+ children = [child for child in el if isinstance(child.tag, str)]
+ if not children:
+ yield full, (el.text or "").strip()
+ else:
+ for child in children:
+ yield from leaf_paths(child, full + "/")
+
+
+def derive_source_document_id(source_system, filename):
+ """Derive source_document_id from a filename.
+
+ For irs_xml / irs_pdf: extracts the object_id (numeric stem).
+ Directory prefixes are stripped.
+
+ >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml')
+ '202213089349101246'
+ >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml')
+ '202213089349101246'
+ """
+ if source_system in ('irs_xml', 'irs_pdf'):
+ m = _OBJECT_ID_RE.search(filename)
+ if not m:
+ raise ValueError(f"Cannot extract object_id from: {filename!r}")
+ return m.group(1)
+ raise NotImplementedError(
+ f"derive_source_document_id not implemented for {source_system!r}"
+ )
+
+
+def extract_filing_metadata(tree):
+ """Extract filing-level metadata from an XML tree for raw_filing.
+
+ Returns a dict with: ein, filer_name, form_type, tax_year,
+ tax_period_begin, tax_period_end, return_version, return_timestamp.
+
+ EIN is normalized, form_type is mapped. Raises ValueError if the
+ filing cannot be identified.
+ """
+ from scripts.common.normalize import normalize_ein, map_form_type
+
+ root = tree.getroot()
+
+ raw_ein = text(tree, ".//irs:Filer/irs:EIN")
+ ein = normalize_ein(raw_ein)
+ if not ein:
+ raise ValueError(f"Cannot normalize EIN: {raw_ein!r}")
+
+ return_type_cd = text(tree, ".//irs:ReturnTypeCd")
+ form_type = map_form_type(return_type_cd)
+
+ filer_name = (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1")
+ )
+
+ return {
+ "ein": ein,
+ "filer_name": filer_name,
+ "form_type": form_type,
+ "tax_year": text(tree, ".//irs:TaxYr"),
+ "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"),
+ "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"),
+ "return_version": root.get("returnVersion"),
+ "return_timestamp": text(tree, ".//irs:ReturnTs"),
+ }
diff --git a/scripts/extract/__init__.py b/scripts/extract/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/scripts/extract/__init__.py
diff --git a/scripts/extract/irs_990_pdf.py b/scripts/extract/irs_990_pdf.py
new file mode 100644
index 0000000..1d1209c
--- /dev/null
+++ b/scripts/extract/irs_990_pdf.py
@@ -0,0 +1,699 @@
+"""
+Source-agnostic 990-PF PDF grant extractor.
+
+Takes a path to a local PDF and returns structured grant data. Side-effect
+free with respect to any database; only external call is to Anthropic's API.
+
+A separate loader consumes ExtractionResult and writes to raw.* tables.
+
+Usage (CLI):
+ python -m scripts.extract.irs_990_pdf path/to/file.pdf
+ python -m scripts.extract.irs_990_pdf path/to/file.pdf --json
+ python -m scripts.extract.irs_990_pdf path/to/file.pdf --tax-year 2021
+ python -m scripts.extract.irs_990_pdf path/to/file.pdf --source-label ny_ag
+
+Usage (programmatic):
+ from scripts.extract.irs_990_pdf import extract_from_pdf
+ result = extract_from_pdf("data/tmp/pdf_test/marley_2017.pdf", tax_year=2017)
+ for grant in result.grants:
+ print(grant.recipient_name, grant.amount)
+"""
+
+from __future__ import annotations
+
+import argparse
+import base64
+import json
+import re
+import subprocess
+import sys
+from dataclasses import dataclass, field, asdict
+from decimal import Decimal, InvalidOperation
+from pathlib import Path
+from typing import Any
+
+import anthropic
+import fitz # pymupdf
+import pdfplumber
+
+from scripts.common.normalize import is_placeholder, parse_numeric
+
+
+# ---------------------------------------------------------------------------
+# Constants
+# ---------------------------------------------------------------------------
+
+MODEL = "claude-haiku-4-5-20251001"
+
+# Text-path quality heuristic. If the number of grants extracted is less than
+# num_grant_pages * MIN_GRANTS_PER_PAGE, we flag the result as low-yield and
+# fall back to vision. This is an initial heuristic and may be tuned.
+MIN_GRANTS_PER_PAGE = 1
+
+# Minimum total text characters across all pages before we trust the text layer.
+MIN_TEXT_LAYER_CHARS = 200
+
+# Standard 990-PF form has 13 pages; grant attachments come after.
+ATTACHMENT_START_PAGE_IDX = 13 # 0-based; corresponds to page 14
+
+# Vision scan stops after this many consecutive non-grant pages (once any
+# grant page has been found).
+VISION_CONSECUTIVE_NO_LIMIT = 3
+
+# DPIs used for vision rendering.
+TRIAGE_DPI = 100
+EXTRACTION_DPI = 150
+
+GRANT_PAGE_KEYWORDS = [
+ r"GRANTS AND CONTRIBUTIONS PAID",
+ r"SUPPLEMENTARY INFORMATION",
+ r"PART XIV",
+ r"PART XV",
+ r"SCHEDULE OF.*GRANT",
+ r"GRANTS PAID",
+ r"CONTRIBUTIONS PAID",
+]
+_GRANT_PAGE_RE = re.compile("|".join(GRANT_PAGE_KEYWORDS))
+
+# Loose marker for "this PDF is a 990-PF". NY AG PDFs may contain only the
+# CHAR500 state cover form with no federal return attached — ~46% of that
+# corpus, based on sampling. Those should short-circuit before we waste any
+# Haiku calls.
+_IS_990PF_RE = re.compile(r"FORM\s*990-?PF")
+
+TEXT_EXTRACTION_PROMPT = """Extract ALL individual grants from this 990-PF page text.
+
+Return a JSON array. Each grant should have:
+- recipient_name: organization name (if grant is to an org)
+- recipient_person_name: person's name (if grant is to an individual)
+- address_line1: street address (if present)
+- address_line2: second address line (if present)
+- city: city (if present)
+- state: state abbreviation (if present)
+- zip: zip code (if present)
+- country: country (if present, only when non-US)
+- foreign_postal_code: foreign postal code (if present)
+- amount: dollar amount as string (digits only, no $ or commas)
+- purpose: purpose of grant (if present)
+- foundation_status: recipient status like PC, NC, PF (if present)
+- relationship: relationship of recipient to foundation (if present)
+
+IMPORTANT RULES:
+- Do NOT include total/subtotal rows
+- Do NOT include header rows or column labels
+- If there are multiple year columns, extract ONLY the most recent year
+- If recipient name is "SEE ATTACHED", "VARIOUS", "N/A", "NONE", or a similar placeholder, skip it
+- Return [] if no individual grants are found
+
+Return ONLY the JSON array, no other text."""
+
+VISION_EXTRACTION_PROMPT = """Extract ALL individual grants from this 990-PF page image.
+
+Return a JSON array. Each grant should have:
+- recipient_name: organization name (if grant is to an org)
+- recipient_person_name: person's name (if grant is to an individual)
+- address_line1: street address (if visible)
+- address_line2: second address line (if visible)
+- city: city
+- state: state abbreviation
+- zip: zip code (if visible)
+- country: country (only if non-US)
+- foreign_postal_code: foreign postal code (if present)
+- amount: dollar amount as string (digits only, no $ or commas)
+- purpose: purpose of grant
+- foundation_status: recipient status like PC, NC, PF (if present)
+- relationship: relationship of recipient to foundation (if present)
+
+IMPORTANT RULES:
+- Do NOT include total/subtotal rows
+- Do NOT include header rows
+- If there are multiple year columns, extract ONLY the most recent year
+- If recipient name is "SEE ATTACHED", "VARIOUS", "N/A", "NONE", or a similar placeholder, skip it
+- Return [] if no individual grants are found
+
+Return ONLY the JSON array, no other text."""
+
+TRIAGE_PROMPT = (
+ "Is this page a table of grant or contribution recipients listing "
+ "individual organization names with addresses and dollar amounts? "
+ "Answer ONLY yes or no."
+)
+
+
+# ---------------------------------------------------------------------------
+# Anthropic client (lazy singleton so importing this module is cheap)
+# ---------------------------------------------------------------------------
+
+_client: anthropic.Anthropic | None = None
+
+
+def _get_client() -> anthropic.Anthropic:
+ global _client
+ if _client is None:
+ api_key = subprocess.run(
+ ["pass", "show", "anthropic.com/api.anthropic.com/apikey"],
+ capture_output=True, text=True,
+ ).stdout.strip()
+ _client = anthropic.Anthropic(api_key=api_key)
+ return _client
+
+
+# ---------------------------------------------------------------------------
+# Dataclasses
+# ---------------------------------------------------------------------------
+
+@dataclass
+class ExtractedGrant:
+ line_number: int
+ recipient_name: str | None = None
+ recipient_name2: str | None = None
+ recipient_person_name: str | None = None
+ address_line1: str | None = None
+ address_line2: str | None = None
+ city: str | None = None
+ state: str | None = None
+ zip: str | None = None
+ country: str | None = None
+ foreign_postal_code: str | None = None
+ amount_raw: str | None = None
+ amount: Decimal | None = None
+ purpose: str | None = None
+ foundation_status: str | None = None
+ relationship: str | None = None
+
+
+@dataclass
+class ExtractionResult:
+ success: bool
+ grants: list[ExtractedGrant]
+ # 'supplemented' — ≥1 grant extracted
+ # 'no_grants' — extractor ran end-to-end, found nothing usable
+ # 'not_a_990pf' — PDF is readable but isn't a 990-PF (e.g. CHAR500 cover)
+ # None — catastrophic failure (success=False)
+ grant_detail_status: str | None
+ # 'pdfplumber+haiku_text' — text path was used
+ # 'haiku_vision_attempted' — vision path was used (success not implied)
+ # 'skipped_not_990pf' — short-circuited; no API calls made
+ # 'failed' — catastrophic failure (success=False)
+ method: str
+ diagnostics: dict = field(default_factory=dict)
+
+ @property
+ def total_amount(self) -> Decimal:
+ return sum((g.amount or Decimal(0)) for g in self.grants)
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _parse_haiku_response(text: str) -> list[dict]:
+ """Parse JSON from Haiku response, handling markdown code blocks."""
+ text = text.strip()
+ if text.startswith("```"):
+ # Drop opening fence + optional language tag
+ parts = text.split("\n", 1)
+ if len(parts) == 2:
+ text = parts[1]
+ text = text.rsplit("```", 1)[0]
+ text = text.strip()
+ try:
+ parsed = json.loads(text)
+ except json.JSONDecodeError:
+ return []
+ if not isinstance(parsed, list):
+ return []
+ return [x for x in parsed if isinstance(x, dict)]
+
+
+def _is_grant_page_text(text: str) -> bool:
+ return bool(_GRANT_PAGE_RE.search(text.upper()))
+
+
+def _clean_str(value: Any) -> str | None:
+ if value is None:
+ return None
+ s = str(value).strip()
+ return s or None
+
+
+def _normalize_grant(raw: dict) -> ExtractedGrant | None:
+ """Map a loose Haiku JSON dict to an ExtractedGrant.
+
+ Returns None if the row looks like a placeholder/header artifact.
+ line_number is set to 0 here; it's reassigned later after dedupe.
+ """
+ recipient_name = _clean_str(raw.get("recipient_name"))
+ recipient_person_name = _clean_str(raw.get("recipient_person_name"))
+
+ # Safety net: prompt already filters these, but double-check.
+ if recipient_name and is_placeholder(recipient_name):
+ recipient_name = None
+ if recipient_person_name and is_placeholder(recipient_person_name):
+ recipient_person_name = None
+
+ amount_raw = _clean_str(raw.get("amount"))
+ amount_numeric: Decimal | None = None
+ if amount_raw is not None:
+ parsed = parse_numeric(amount_raw)
+ if parsed is not None:
+ try:
+ amount_numeric = Decimal(parsed)
+ except InvalidOperation:
+ amount_numeric = None
+
+ return ExtractedGrant(
+ line_number=0,
+ recipient_name=recipient_name,
+ recipient_name2=_clean_str(raw.get("recipient_name2")),
+ recipient_person_name=recipient_person_name,
+ address_line1=_clean_str(raw.get("address_line1")),
+ address_line2=_clean_str(raw.get("address_line2")),
+ city=_clean_str(raw.get("city")),
+ state=_clean_str(raw.get("state")),
+ zip=_clean_str(raw.get("zip")),
+ country=_clean_str(raw.get("country")),
+ foreign_postal_code=_clean_str(raw.get("foreign_postal_code")),
+ amount_raw=amount_raw,
+ amount=amount_numeric,
+ purpose=_clean_str(raw.get("purpose")),
+ foundation_status=_clean_str(raw.get("foundation_status")),
+ relationship=_clean_str(raw.get("relationship")),
+ )
+
+
+def _year_hint(tax_year: int | None) -> str:
+ if tax_year is None:
+ return ""
+ return f"\n\nThis filing is for tax year {tax_year}."
+
+
+def _postprocess(grants: list[ExtractedGrant]) -> list[ExtractedGrant]:
+ """Drop placeholders / amount-less rows, dedupe, and reassign line_number."""
+ out: list[ExtractedGrant] = []
+ seen: set[tuple] = set()
+
+ for g in grants:
+ # Drop rows with no recipient at all.
+ if not g.recipient_name and not g.recipient_person_name:
+ continue
+ # Drop placeholder recipients that slipped through (belt-and-suspenders).
+ if g.recipient_name and is_placeholder(g.recipient_name):
+ continue
+ if g.recipient_person_name and is_placeholder(g.recipient_person_name):
+ continue
+ # Drop rows with no amount text at all (Haiku didn't see an amount column).
+ # Keep rows where amount_raw == '0' and amount == Decimal(0).
+ if g.amount is None and not g.amount_raw:
+ continue
+
+ key = (
+ (g.recipient_name or "").upper(),
+ (g.city or "").upper(),
+ (g.state or "").upper(),
+ g.amount_raw or "",
+ )
+ if key in seen:
+ continue
+ seen.add(key)
+ out.append(g)
+
+ for i, g in enumerate(out, start=1):
+ g.line_number = i
+ return out
+
+
+# ---------------------------------------------------------------------------
+# Text path
+# ---------------------------------------------------------------------------
+
+def _extract_text_layer(
+ pdf_path: Path,
+ tax_year: int | None,
+ diagnostics: dict,
+) -> tuple[list[ExtractedGrant], str, int]:
+ """Extract grants via pdfplumber text + Haiku text parsing.
+
+ Returns (grants, status, num_grant_pages).
+ status ∈ {'ok', 'not_a_990pf', 'no_text_layer', 'no_grant_pages',
+ 'haiku_empty', 'low_yield', 'error'}
+
+ 'not_a_990pf' means the PDF has a readable text layer but doesn't look
+ like a 990-PF at all (e.g. a standalone NY State CHAR500 cover form).
+ The caller should short-circuit on this — there's nothing for the vision
+ path to find either.
+ """
+ try:
+ pdf = pdfplumber.open(pdf_path)
+ except Exception as exc:
+ diagnostics["text_path_error"] = str(exc)
+ return [], "error", 0
+
+ try:
+ page_texts: list[tuple[int, str]] = []
+ total_chars = 0
+ for idx, page in enumerate(pdf.pages, start=1):
+ text = page.extract_text() or ""
+ page_texts.append((idx, text))
+ total_chars += len(text)
+
+ diagnostics["text_layer_chars"] = total_chars
+ diagnostics["pages_total"] = len(page_texts)
+
+ if total_chars < MIN_TEXT_LAYER_CHARS:
+ return [], "no_text_layer", 0
+
+ # Must look like a 990-PF at all. If not, don't bother with Haiku —
+ # and don't fall through to vision, since it'd just burn triage calls
+ # scanning a non-990-PF document end to end.
+ joined_upper = "\n".join(t for _, t in page_texts).upper()
+ if not _IS_990PF_RE.search(joined_upper):
+ return [], "not_a_990pf", 0
+
+ grant_pages = [
+ (num, text) for num, text in page_texts
+ if len(text) > 100 and _is_grant_page_text(text)
+ ]
+ diagnostics["grant_pages_identified"] = len(grant_pages)
+
+ if not grant_pages:
+ return [], "no_grant_pages", 0
+ finally:
+ pdf.close()
+
+ client = _get_client()
+ year_hint = _year_hint(tax_year)
+
+ raw_grants: list[dict] = []
+ try:
+ for page_num, text in grant_pages:
+ resp = client.messages.create(
+ model=MODEL,
+ max_tokens=4096,
+ messages=[{
+ "role": "user",
+ "content": f"{TEXT_EXTRACTION_PROMPT}{year_hint}\n\nPage text:\n{text}",
+ }],
+ )
+ raw_grants.extend(_parse_haiku_response(resp.content[0].text))
+ except anthropic.APIError as exc:
+ # Discard partial results — fall back to vision cleanly.
+ diagnostics["text_path_error"] = f"{type(exc).__name__}: {exc}"
+ return [], "error", len(grant_pages)
+
+ if not raw_grants:
+ return [], "haiku_empty", len(grant_pages)
+
+ grants: list[ExtractedGrant] = []
+ for raw in raw_grants:
+ g = _normalize_grant(raw)
+ if g is not None:
+ grants.append(g)
+
+ if len(grants) < len(grant_pages) * MIN_GRANTS_PER_PAGE:
+ return grants, "low_yield", len(grant_pages)
+
+ return grants, "ok", len(grant_pages)
+
+
+# ---------------------------------------------------------------------------
+# Vision path
+# ---------------------------------------------------------------------------
+
+def _render_page_b64(page, dpi: int) -> str:
+ pix = page.get_pixmap(dpi=dpi)
+ return base64.standard_b64encode(pix.tobytes("png")).decode("utf-8")
+
+
+def _extract_vision(
+ pdf_path: Path,
+ tax_year: int | None,
+ diagnostics: dict,
+) -> tuple[list[ExtractedGrant], int, int]:
+ """Extract grants via Haiku vision over rendered page images.
+
+ Returns (grants, pages_scanned, pages_extracted). Raises anthropic.APIError
+ on catastrophic API failure (caller handles).
+ """
+ client = _get_client()
+ year_hint = _year_hint(tax_year)
+
+ doc = fitz.open(pdf_path)
+ try:
+ total_pages = len(doc)
+ diagnostics.setdefault("pages_total", total_pages)
+
+ if total_pages <= ATTACHMENT_START_PAGE_IDX:
+ diagnostics["vision_pages_scanned"] = 0
+ diagnostics["vision_pages_extracted"] = 0
+ return [], 0, 0
+
+ # Phase 1: triage pages to find grant tables.
+ grant_page_indices: list[int] = []
+ consecutive_no = 0
+ found_any = False
+ pages_scanned = 0
+
+ for i in range(ATTACHMENT_START_PAGE_IDX, total_pages):
+ pages_scanned += 1
+ b64 = _render_page_b64(doc[i], dpi=TRIAGE_DPI)
+ resp = client.messages.create(
+ model=MODEL,
+ max_tokens=10,
+ messages=[{
+ "role": "user",
+ "content": [
+ {"type": "image",
+ "source": {"type": "base64", "media_type": "image/png", "data": b64}},
+ {"type": "text", "text": TRIAGE_PROMPT},
+ ],
+ }],
+ )
+ is_grant = "yes" in resp.content[0].text.lower()
+ if is_grant:
+ grant_page_indices.append(i)
+ consecutive_no = 0
+ found_any = True
+ else:
+ consecutive_no += 1
+ if found_any and consecutive_no >= VISION_CONSECUTIVE_NO_LIMIT:
+ break
+
+ diagnostics["vision_pages_scanned"] = pages_scanned
+
+ if not grant_page_indices:
+ diagnostics["vision_pages_extracted"] = 0
+ return [], pages_scanned, 0
+
+ # Phase 2: extract from identified grant pages.
+ raw_grants: list[dict] = []
+ pages_extracted = 0
+ for i in grant_page_indices:
+ b64 = _render_page_b64(doc[i], dpi=EXTRACTION_DPI)
+ resp = client.messages.create(
+ model=MODEL,
+ max_tokens=4096,
+ messages=[{
+ "role": "user",
+ "content": [
+ {"type": "image",
+ "source": {"type": "base64", "media_type": "image/png", "data": b64}},
+ {"type": "text", "text": f"{VISION_EXTRACTION_PROMPT}{year_hint}"},
+ ],
+ }],
+ )
+ page_grants = _parse_haiku_response(resp.content[0].text)
+ if page_grants:
+ pages_extracted += 1
+ raw_grants.extend(page_grants)
+
+ diagnostics["vision_pages_extracted"] = pages_extracted
+ finally:
+ doc.close()
+
+ grants: list[ExtractedGrant] = []
+ for raw in raw_grants:
+ g = _normalize_grant(raw)
+ if g is not None:
+ grants.append(g)
+
+ return grants, pages_scanned, pages_extracted
+
+
+# ---------------------------------------------------------------------------
+# Top-level extractor
+# ---------------------------------------------------------------------------
+
+def extract_from_pdf(
+ pdf_path: str | Path,
+ tax_year: int | None = None,
+ source_label: str | None = None,
+) -> ExtractionResult:
+ """Extract grants from a single 990-PF PDF.
+
+ Stateless: reads only the file at pdf_path and calls Anthropic's API.
+ Writes nothing. See module docstring for full contract.
+ """
+ pdf_path = Path(pdf_path)
+
+ diagnostics: dict = {
+ "tax_year_hint": tax_year,
+ "source_label": source_label,
+ }
+
+ if not pdf_path.exists():
+ diagnostics["error"] = f"PDF not found: {pdf_path}"
+ return ExtractionResult(
+ success=False,
+ grants=[],
+ grant_detail_status=None,
+ method="failed",
+ diagnostics=diagnostics,
+ )
+
+ # Text path
+ try:
+ text_grants, text_status, num_grant_pages = _extract_text_layer(
+ pdf_path, tax_year, diagnostics
+ )
+ except Exception as exc:
+ # pdfplumber can blow up on malformed PDFs — don't let that kill us,
+ # fall through to the vision path like any other text failure.
+ text_grants = []
+ text_status = "error"
+ num_grant_pages = 0
+ diagnostics.setdefault("text_path_error", f"{type(exc).__name__}: {exc}")
+
+ diagnostics["text_path_status"] = text_status
+
+ if text_status == "ok":
+ grants = _postprocess(text_grants)
+ return ExtractionResult(
+ success=True,
+ grants=grants,
+ grant_detail_status="supplemented" if grants else "no_grants",
+ method="pdfplumber+haiku_text",
+ diagnostics=diagnostics,
+ )
+
+ # Short-circuit: PDF has a readable text layer but isn't a 990-PF
+ # (e.g. a standalone NY State CHAR500 cover form). Don't run vision —
+ # there's nothing in the document for it to find.
+ if text_status == "not_a_990pf":
+ return ExtractionResult(
+ success=True,
+ grants=[],
+ grant_detail_status="not_a_990pf",
+ method="skipped_not_990pf",
+ diagnostics=diagnostics,
+ )
+
+ # Vision fallback — replaces text output entirely.
+ try:
+ vision_grants, _scanned, _extracted = _extract_vision(
+ pdf_path, tax_year, diagnostics
+ )
+ except anthropic.APIError as exc:
+ diagnostics["error"] = f"{type(exc).__name__}: {exc}"
+ return ExtractionResult(
+ success=False,
+ grants=[],
+ grant_detail_status=None,
+ method="failed",
+ diagnostics=diagnostics,
+ )
+ except Exception as exc:
+ diagnostics["error"] = f"{type(exc).__name__}: {exc}"
+ return ExtractionResult(
+ success=False,
+ grants=[],
+ grant_detail_status=None,
+ method="failed",
+ diagnostics=diagnostics,
+ )
+
+ grants = _postprocess(vision_grants)
+ return ExtractionResult(
+ success=True,
+ grants=grants,
+ grant_detail_status="supplemented" if grants else "no_grants",
+ method="haiku_vision_attempted",
+ diagnostics=diagnostics,
+ )
+
+
+# ---------------------------------------------------------------------------
+# CLI
+# ---------------------------------------------------------------------------
+
+def _result_to_jsonable(result: ExtractionResult) -> dict:
+ def grant_to_dict(g: ExtractedGrant) -> dict:
+ d = asdict(g)
+ if d["amount"] is not None:
+ d["amount"] = str(d["amount"])
+ return d
+
+ return {
+ "success": result.success,
+ "method": result.method,
+ "grant_detail_status": result.grant_detail_status,
+ "diagnostics": result.diagnostics,
+ "grants": [grant_to_dict(g) for g in result.grants],
+ "total_amount": str(result.total_amount),
+ }
+
+
+def _print_table(result: ExtractionResult) -> None:
+ print(f"success: {result.success}")
+ print(f"method: {result.method}")
+ print(f"grant_detail_status: {result.grant_detail_status}")
+ print(f"grants: {len(result.grants)}")
+ print(f"total_amount: ${result.total_amount:,}")
+ print("diagnostics:")
+ for k, v in result.diagnostics.items():
+ print(f" {k}: {v}")
+ if not result.grants:
+ return
+ print()
+ print(f"{'#':>4} {'recipient':<45} {'city':<20} {'st':<3} {'amount':>12}")
+ print("-" * 90)
+ for g in result.grants[:50]:
+ name = (g.recipient_name or g.recipient_person_name or "")[:45]
+ city = (g.city or "")[:20]
+ state = (g.state or "")[:3]
+ amt = f"${g.amount:,.0f}" if g.amount is not None else (g.amount_raw or "")
+ print(f"{g.line_number:>4} {name:<45} {city:<20} {state:<3} {amt:>12}")
+ if len(result.grants) > 50:
+ print(f"... and {len(result.grants) - 50} more")
+
+
+def main(argv: list[str] | None = None) -> int:
+ parser = argparse.ArgumentParser(
+ prog="scripts.extract.irs_990_pdf",
+ description="Extract grant data from a 990-PF PDF.",
+ )
+ parser.add_argument("pdf_path", help="Path to a local PDF file.")
+ parser.add_argument("--tax-year", type=int, default=None,
+ help="Tax year hint passed to the extraction prompts.")
+ parser.add_argument("--source-label", default=None,
+ help="Diagnostic label for the PDF source (e.g. 'ny_ag').")
+ parser.add_argument("--json", action="store_true",
+ help="Emit the full result as JSON instead of a table.")
+ args = parser.parse_args(argv)
+
+ result = extract_from_pdf(
+ args.pdf_path,
+ tax_year=args.tax_year,
+ source_label=args.source_label,
+ )
+
+ if args.json:
+ print(json.dumps(_result_to_jsonable(result), indent=2))
+ else:
+ _print_table(result)
+
+ return 0 if result.success else 1
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/parse/__init__.py b/scripts/parse/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/scripts/parse/__init__.py
diff --git a/scripts/parse/irs_990.py b/scripts/parse/irs_990.py
new file mode 100644
index 0000000..3a5cc2d
--- /dev/null
+++ b/scripts/parse/irs_990.py
@@ -0,0 +1,691 @@
+"""
+Parse IRS Form 990 XML files into the new raw schema.
+
+Populates: raw.filing, raw.filing_source, raw.form_990,
+ raw.grant_990, raw.schedule_o.
+
+Usage:
+ python -m scripts.parse.irs_990 data/irs/xml-zips/*.zip
+ python -m scripts.parse.irs_990 data/irs/xml-missing/202100139349100100_public.xml
+"""
+
+import io
+import os
+import sys
+import zipfile
+
+from lxml import etree
+
+from scripts.common.db import execute_transaction
+from scripts.common.normalize import normalize_ein, parse_numeric, is_placeholder
+from scripts.common.xml import (
+ NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata,
+)
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.filing import (
+ upsert_raw_filing, record_raw_filing_source,
+)
+
+PARSER_NAME = "parse_irs_990"
+SOURCE_SYSTEM = "irs_xml"
+
+# Standalone XML files use this as source_archive
+STANDALONE_ARCHIVE = "__standalone__"
+
+# Module-level truthy set used by every classification check.
+_TRUTHY = {"X", "x", "1", "true"}
+
+# IRS ReturnTypeCd values that map to Form 990 (includes amended return variant).
+FORM_990_RETURN_TYPES = {"990", "990A"}
+
+# Schedule I RecipientTable rows (one xpath — modern schema only).
+GRANT_XPATHS = [
+ f".//{{{NS}}}IRS990ScheduleI/{{{NS}}}RecipientTable",
+]
+
+SCHEDULE_I_XPATH = f".//{{{NS}}}IRS990ScheduleI"
+SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO"
+
+
+# ============================================================
+# Grant extraction (Schedule I RecipientTable)
+# ============================================================
+
+def extract_grant(g, line_number):
+ """Extract a single Schedule I grant row from a RecipientTable element.
+
+ Child element names vary slightly across IRS schema versions, so each
+ field tries the modern tag first and falls back to the older variant.
+
+ Returns None for stub rows that lack both a recipient name and any
+ grant amount — these appear in real filings as malformed RecipientTable
+ entries (e.g. just a PurposeOfGrantTxt with no recipient or amount).
+ """
+ if len(g) == 0:
+ return None
+
+ cash_raw = text(g, "irs:CashGrantAmt")
+ non_cash_raw = text(g, "irs:NonCashAssistanceAmt")
+ recipient_name = (
+ text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt")
+ or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1")
+ )
+ recipient_name2 = (
+ text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt")
+ or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2")
+ )
+
+ if (recipient_name is None and recipient_name2 is None
+ and cash_raw is None and non_cash_raw is None):
+ return None
+
+ return {
+ "line_number": line_number,
+ "recipient_name": recipient_name,
+ "recipient_name2": recipient_name2,
+ "recipient_ein": normalize_ein(text(g, "irs:RecipientEIN")),
+ "address_line1": (
+ text(g, "irs:USAddress/irs:AddressLine1Txt")
+ or text(g, "irs:USAddress/irs:AddressLine1")
+ or text(g, "irs:ForeignAddress/irs:AddressLine1Txt")
+ or text(g, "irs:ForeignAddress/irs:AddressLine1")
+ ),
+ "address_line2": (
+ text(g, "irs:USAddress/irs:AddressLine2Txt")
+ or text(g, "irs:USAddress/irs:AddressLine2")
+ or text(g, "irs:ForeignAddress/irs:AddressLine2Txt")
+ or text(g, "irs:ForeignAddress/irs:AddressLine2")
+ ),
+ "city": (
+ text(g, "irs:USAddress/irs:CityNm")
+ or text(g, "irs:USAddress/irs:City")
+ or text(g, "irs:ForeignAddress/irs:CityNm")
+ or text(g, "irs:ForeignAddress/irs:City")
+ ),
+ "state": (
+ text(g, "irs:USAddress/irs:StateAbbreviationCd")
+ or text(g, "irs:USAddress/irs:State")
+ or text(g, "irs:ForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "zip": (
+ text(g, "irs:USAddress/irs:ZIPCd")
+ or text(g, "irs:USAddress/irs:ZIPCode")
+ ),
+ "country": text(g, "irs:ForeignAddress/irs:CountryCd"),
+ "foreign_postal_code": text(g, "irs:ForeignAddress/irs:ForeignPostalCd"),
+ "cash_grant_amt_raw": cash_raw,
+ "cash_grant_amt": parse_numeric(cash_raw),
+ "non_cash_amt_raw": non_cash_raw,
+ "non_cash_amt": parse_numeric(non_cash_raw),
+ "non_cash_desc": text(g, "irs:NonCashAssistanceDesc"),
+ "valuation_method": text(g, "irs:ValuationMethodUsedDesc"),
+ "purpose": text(g, "irs:PurposeOfGrantTxt"),
+ "irc_section": text(g, "irs:IRCSectionDesc"),
+ }
+
+
+def find_all_grants(tree):
+ """Find all Schedule I RecipientTable elements."""
+ grants = []
+ for xpath in GRANT_XPATHS:
+ grants.extend(tree.findall(xpath))
+ return grants
+
+
+# ============================================================
+# Schedule O narrative extraction
+# ============================================================
+
+def extract_schedule_o(tree):
+ """Extract every SupplementalInformationDetail entry from Schedule O.
+
+ Returns a list of dicts (without raw_filing_id — added in process_filing).
+ line_number is the 1-based ordinal position among emitted rows. Entries
+ with no narrative text in any of the explanation fields are skipped to
+ avoid emitting low-signal placeholder rows.
+ """
+ rows = []
+ sched_o = tree.find(SCHEDULE_O_XPATH)
+ if sched_o is None:
+ return rows
+
+ details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail")
+ for d in details:
+ explanation = (
+ text(d, "irs:ExplanationTxt")
+ or text(d, "irs:MediumExplanationTxt")
+ or text(d, "irs:ShortExplanationTxt")
+ )
+ if explanation is None:
+ continue
+ rows.append({
+ "line_number": len(rows) + 1,
+ "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"),
+ "explanation": explanation,
+ })
+ return rows
+
+
+# ============================================================
+# Form 990 summary extraction
+# ============================================================
+
+def extract_form_990(tree):
+ """Extract filing-level summary fields for raw.form_990."""
+ f990 = f".//{{{NS}}}IRS990"
+
+ # Filer address from ReturnHeader (with old-schema fallbacks)
+ filer_addr = {
+ "filer_name2": (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2")
+ ),
+ "filer_address_line1": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1")
+ ),
+ "filer_address_line2": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2")
+ ),
+ "filer_city": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:City")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City")
+ ),
+ "filer_state": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:State")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "filer_zip": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode")
+ ),
+ "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"),
+ "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"),
+ "phone": text(tree, ".//irs:Filer/irs:PhoneNum"),
+ "website": text(tree, f"{f990}/irs:WebsiteAddressTxt"),
+ }
+
+ # Classification
+ org_type = None
+ if text(tree, f"{f990}/irs:TypeOfOrganizationCorpInd") in _TRUTHY:
+ org_type = "corp"
+ elif text(tree, f"{f990}/irs:TypeOfOrganizationTrustInd") in _TRUTHY:
+ org_type = "trust"
+ elif text(tree, f"{f990}/irs:TypeOfOrganizationAssocInd") in _TRUTHY:
+ org_type = "assoc"
+ elif text(tree, f"{f990}/irs:TypeOfOrganizationOtherInd") in _TRUTHY:
+ org_type = "other"
+
+ method_cash = text(tree, f"{f990}/irs:MethodOfAccountingCashInd")
+ method_accrual = text(tree, f"{f990}/irs:MethodOfAccountingAccrualInd")
+ method_other = text(tree, f"{f990}/irs:MethodOfAccountingOtherInd")
+ if method_cash in _TRUTHY:
+ accounting_method = "cash"
+ elif method_accrual in _TRUTHY:
+ accounting_method = "accrual"
+ elif method_other in _TRUTHY:
+ accounting_method = "other"
+ else:
+ accounting_method = None
+
+ # The 501(c) subsection number lives in an attribute, not in the
+ # element body. (The body is typically "X" — the checkbox indicator.)
+ # A 501(c)(3) filing sets Organization501c3Ind instead and leaves this
+ # attribute unset; we deliberately don't infer "3" in that case, to
+ # keep raw provenance honest.
+ section_501c_type = None
+ el_501c = tree.find(f"{f990}/irs:Organization501cInd", NS_MAP)
+ if el_501c is not None:
+ section_501c_type = el_501c.get("organization501cTypeTxt")
+
+ classification = {
+ "is_501c3": text_bool(tree, f"{f990}/irs:Organization501c3Ind"),
+ "section_501c_type": section_501c_type,
+ "org_type": org_type,
+ "group_return": text_bool(tree, f"{f990}/irs:GroupReturnForAffiliatesInd"),
+ "group_exemption_num": text(tree, f"{f990}/irs:GroupExemptionNum"),
+ "formation_year": text(tree, f"{f990}/irs:FormationYr"),
+ "legal_domicile_state": text(tree, f"{f990}/irs:LegalDomicileStateCd"),
+ "mission": (
+ text(tree, f"{f990}/irs:ActivityOrMissionDesc")
+ or text(tree, f"{f990}/irs:MissionDesc")
+ ),
+ "accounting_method": accounting_method,
+ }
+
+ # Filing status flags
+ status = {
+ "is_amended": text_bool(tree, f"{f990}/irs:AmendedReturnInd"),
+ "is_initial": text_bool(tree, f"{f990}/irs:InitialReturnInd"),
+ "is_final": text_bool(tree, f"{f990}/irs:FinalReturnInd"),
+ "is_terminated": text_bool(tree, f"{f990}/irs:TerminateOperationsInd"),
+ }
+
+ # Part I: current year summary
+ part_i_cy = {}
+ part_i_cy_fields = {
+ "gross_receipts": "GrossReceiptsAmt",
+ "cy_contributions_grants": "CYContributionsGrantsAmt",
+ "cy_program_service_revenue": "CYProgramServiceRevenueAmt",
+ "cy_investment_income": "CYInvestmentIncomeAmt",
+ "cy_other_revenue": "CYOtherRevenueAmt",
+ "cy_total_revenue": "CYTotalRevenueAmt",
+ "cy_grants_paid": "CYGrantsAndSimilarPaidAmt",
+ "cy_benefits_to_members": "CYBenefitsPaidToMembersAmt",
+ "cy_salaries_benefits": "CYSalariesCompEmpBnftPaidAmt",
+ "cy_fundraising_expense": "CYTotalFundraisingExpenseAmt",
+ "cy_other_expenses": "CYOtherExpensesAmt",
+ "cy_total_expenses": "CYTotalExpensesAmt",
+ "cy_revenue_less_expenses": "CYRevenuesLessExpensesAmt",
+ }
+ for col, elem in part_i_cy_fields.items():
+ part_i_cy[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}"))
+
+ # Part I: prior year summary
+ part_i_py = {
+ "py_total_revenue": parse_numeric(text(tree, f"{f990}/irs:PYTotalRevenueAmt")),
+ "py_total_expenses": parse_numeric(text(tree, f"{f990}/irs:PYTotalExpensesAmt")),
+ }
+
+ # Balance sheet (Part I summary / Part X)
+ balance_sheet = {}
+ bs_fields = {
+ "total_assets_boy": "TotalAssetsBOYAmt",
+ "total_assets_eoy": "TotalAssetsEOYAmt",
+ "total_liabilities_boy": "TotalLiabilitiesBOYAmt",
+ "total_liabilities_eoy": "TotalLiabilitiesEOYAmt",
+ "net_assets_boy": "NetAssetsOrFundBalancesBOYAmt",
+ "net_assets_eoy": "NetAssetsOrFundBalancesEOYAmt",
+ }
+ for col, elem in bs_fields.items():
+ balance_sheet[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}"))
+
+ # Governance / workforce
+ workforce = {
+ "total_employees": parse_numeric(text(tree, f"{f990}/irs:TotalEmployeeCnt")),
+ "total_volunteers": parse_numeric(text(tree, f"{f990}/irs:TotalVolunteersCnt")),
+ "voting_members": parse_numeric(
+ text(tree, f"{f990}/irs:VotingMembersGoverningBodyCnt")
+ or text(tree, f"{f990}/irs:GoverningBodyVotingMembersCnt")
+ ),
+ "independent_voting_members": parse_numeric(
+ text(tree, f"{f990}/irs:VotingMembersIndependentCnt")
+ or text(tree, f"{f990}/irs:IndependentVotingMemberCnt")
+ ),
+ }
+
+ # Part IX: functional expense breakdown
+ func_exp = {
+ "program_services_expense": parse_numeric(
+ text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ProgramServicesAmt")
+ ),
+ "management_general_expense": parse_numeric(
+ text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ManagementAndGeneralAmt")
+ ),
+ "fundraising_expense_ix": parse_numeric(
+ text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:FundraisingAmt")
+ ),
+ }
+
+ # Part VIII: revenue detail
+ revenue_detail = {
+ "government_grants": parse_numeric(text(tree, f"{f990}/irs:GovernmentGrantsAmt")),
+ "total_contributions": parse_numeric(text(tree, f"{f990}/irs:TotalContributionsAmt")),
+ "total_program_service_rev": parse_numeric(
+ text(tree, f"{f990}/irs:TotalProgramServiceRevenueAmt")
+ ),
+ "investment_income": parse_numeric(
+ text(tree, f"{f990}/irs:InvestmentIncomeGrp/irs:TotalRevenueColumnAmt")
+ ),
+ }
+
+ # UBI
+ ubi = {
+ "gross_ubi": parse_numeric(text(tree, f"{f990}/irs:TotalGrossUBIAmt")),
+ "net_ubi": parse_numeric(text(tree, f"{f990}/irs:NetUnrelatedBusTxblIncmAmt")),
+ }
+
+ # Schedule I metadata (1:1 with the filing)
+ sched_i_el = tree.find(SCHEDULE_I_XPATH)
+ if sched_i_el is not None:
+ sched_i = {
+ "sched_i_grant_records_maintained": text_bool(sched_i_el, "irs:GrantRecordsMaintainedInd"),
+ "sched_i_501c3_org_count": parse_numeric(text(sched_i_el, "irs:Total501c3OrgCnt")),
+ "sched_i_other_org_count": parse_numeric(text(sched_i_el, "irs:TotalOtherOrgCnt")),
+ "sched_i_total_grants_amt": None,
+ }
+ else:
+ sched_i = {
+ "sched_i_grant_records_maintained": None,
+ "sched_i_501c3_org_count": None,
+ "sched_i_other_org_count": None,
+ "sched_i_total_grants_amt": None,
+ }
+
+ # Officer / signer
+ officer = {
+ "principal_officer": text(tree, f"{f990}/irs:PrincipalOfficerNm"),
+ "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"),
+ "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"),
+ "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"),
+ "preparer_firm": (
+ text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1")
+ ),
+ }
+
+ return {
+ **filer_addr, **classification, **status,
+ **part_i_cy, **part_i_py, **balance_sheet,
+ **workforce, **func_exp, **revenue_detail, **ubi,
+ **sched_i, **officer,
+ }
+
+
+# ============================================================
+# Grant detail status
+# ============================================================
+
+def compute_grant_detail_status(sched_i_element, grant_elements, grant_rows, cy_grants_paid):
+ """Determine grant detail completeness for a Form 990 filing.
+
+ Uses filing-level context (Schedule I presence + Part I CYGrantsAndSimilarPaidAmt)
+ to distinguish "no Schedule I because no grants" from "no Schedule I but
+ grants reported on Part I".
+ """
+ if sched_i_element is None:
+ # No Schedule I — but check whether Part I reports grants paid.
+ if cy_grants_paid is not None and float(cy_grants_paid) > 0:
+ return "unresolved"
+ return "no_grants"
+
+ if not grant_elements:
+ return "unresolved"
+
+ if not grant_rows:
+ return "unresolved"
+
+ placeholder_count = sum(
+ 1 for r in grant_rows
+ if is_placeholder(r.get("recipient_name"))
+ or (is_placeholder(r.get("cash_grant_amt_raw"))
+ and is_placeholder(r.get("non_cash_amt_raw")))
+ )
+
+ if placeholder_count == len(grant_rows):
+ return "placeholder_only"
+ if placeholder_count > 0:
+ return "see_attached"
+ return "complete"
+
+
+# ============================================================
+# Per-filing processing
+# ============================================================
+
+GRANT_COLUMNS = [
+ "raw_filing_id", "line_number",
+ "recipient_name", "recipient_name2", "recipient_ein",
+ "address_line1", "address_line2", "city", "state", "zip",
+ "country", "foreign_postal_code",
+ "cash_grant_amt_raw", "cash_grant_amt",
+ "non_cash_amt_raw", "non_cash_amt",
+ "non_cash_desc", "valuation_method", "purpose", "irc_section",
+]
+
+SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"]
+
+
+
+def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id):
+ """Process a single Form 990 filing. All child writes are transactional."""
+
+ # Extract filing metadata
+ metadata = extract_filing_metadata(tree)
+
+ # Locate Schedule I element (used both for grant extraction and status logic)
+ sched_i_element = tree.find(SCHEDULE_I_XPATH)
+
+ # Extract grants from Schedule I RecipientTable rows
+ grant_elements = find_all_grants(tree)
+ extracted_grants = []
+ for i, g in enumerate(grant_elements, start=1):
+ row = extract_grant(g, i)
+ if row is not None:
+ extracted_grants.append(row)
+
+ # Extract Schedule O narrative entries
+ schedule_o_entries = extract_schedule_o(tree)
+
+ # Extract form summary
+ form_data = extract_form_990(tree)
+ form_data["grant_detail_status"] = compute_grant_detail_status(
+ sched_i_element, grant_elements, extracted_grants, form_data.get("cy_grants_paid"),
+ )
+
+ root = tree.getroot()
+ return_header = root.find(f"{{{NS}}}ReturnHeader")
+ return_data = root.find(f"{{{NS}}}ReturnData")
+
+ def _do(conn):
+ raw_filing_id = upsert_raw_filing(
+ SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn
+ )
+ record_raw_filing_source(
+ raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn
+ )
+
+ filing_form_data = {**form_data, "raw_filing_id": raw_filing_id}
+ grant_rows = [
+ {**row, "raw_filing_id": raw_filing_id}
+ for row in extracted_grants
+ ]
+ schedule_o_rows = [
+ {**row, "raw_filing_id": raw_filing_id}
+ for row in schedule_o_entries
+ ]
+
+ _replace_children(
+ conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows,
+ )
+
+ # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields
+ return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows)
+
+ return execute_transaction(_do)
+
+
+def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows):
+ """Delete and re-insert all child rows for a filing using the caller's transaction."""
+ form_columns = list(form_data.keys())
+ form_placeholders = ", ".join(["%s"] * len(form_columns))
+ form_values = [form_data[col] for col in form_columns]
+
+ grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS))
+ schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS))
+
+ with conn.cursor() as cur:
+ # Delete old child rows
+ cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,))
+ cur.execute("DELETE FROM raw.grant_990 WHERE raw_filing_id = %s", (raw_filing_id,))
+ cur.execute("DELETE FROM raw.form_990 WHERE raw_filing_id = %s", (raw_filing_id,))
+
+ # Insert form summary
+ cur.execute(
+ f"INSERT INTO raw.form_990 ({', '.join(form_columns)}) "
+ f"VALUES ({form_placeholders})",
+ form_values,
+ )
+
+ # Insert grants
+ if grant_rows:
+ from psycopg2.extras import execute_batch
+ grant_values = [
+ [row.get(col) for col in GRANT_COLUMNS]
+ for row in grant_rows
+ ]
+ execute_batch(
+ cur,
+ f"INSERT INTO raw.grant_990 ({', '.join(GRANT_COLUMNS)}) "
+ f"VALUES ({grant_placeholders})",
+ grant_values,
+ )
+
+ # Insert Schedule O narrative
+ if schedule_o_rows:
+ from psycopg2.extras import execute_batch
+ schedule_o_values = [
+ [row.get(col) for col in SCHEDULE_O_COLUMNS]
+ for row in schedule_o_rows
+ ]
+ execute_batch(
+ cur,
+ f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) "
+ f"VALUES ({schedule_o_placeholders})",
+ schedule_o_values,
+ )
+
+ # Insert XML fields
+
+
+# ============================================================
+# ZIP / file processing
+# ============================================================
+
+def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id):
+ """Parse XML bytes and process if it's a Form 990. Returns rows inserted or None if skipped."""
+ try:
+ tree = etree.parse(io.BytesIO(xml_bytes))
+ except etree.XMLSyntaxError as e:
+ log_ingest_error(ingest_run_id, source_archive, source_path,
+ f"XML parse error: {e}", stage="parse_xml")
+ return None
+
+ ret_type = text(tree, ".//irs:ReturnTypeCd")
+ if ret_type not in FORM_990_RETURN_TYPES:
+ return None
+
+ source_document_id = None
+ try:
+ source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path)
+ return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id)
+ except Exception as e:
+ stage = "process_filing" if source_document_id else "derive_source_document_id"
+ log_ingest_error(ingest_run_id, source_archive, source_path, e,
+ source_document_id=source_document_id, stage=stage)
+ raise
+
+
+def process_zip(zip_path, ingest_run_id):
+ """Process all XMLs in a ZIP file."""
+ basename = os.path.basename(zip_path)
+ try:
+ zf = zipfile.ZipFile(zip_path)
+ except zipfile.BadZipFile as e:
+ log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip")
+ print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr)
+ return 0, 0, 0
+
+ with zf:
+ names = [n for n in zf.namelist() if n.endswith(".xml")]
+
+ print(f"Processing {basename}: {len(names)} XML files")
+ files_scanned = 0
+ files_matched = 0
+ total_rows = 0
+
+ for i, name in enumerate(names):
+ # Count every ZIP member as scanned, even ones we fail to read —
+ # otherwise read failures silently shrink the scanned total and
+ # make run-level metrics misleading.
+ files_scanned += 1
+ try:
+ xml_bytes = zf.read(name)
+ except Exception as e:
+ log_ingest_error(ingest_run_id, basename, name, e, stage="read")
+ continue
+
+ try:
+ rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id)
+ except Exception as e:
+ print(f" ERROR in {name}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ files_matched += 1
+ total_rows += rows
+
+ if (i + 1) % 1000 == 0:
+ print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows")
+
+ print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows")
+ return files_scanned, files_matched, total_rows
+
+
+def main():
+ args = sys.argv[1:]
+ if not args:
+ print(f"Usage: python -m scripts.parse.irs_990 <zip_or_xml_files...>", file=sys.stderr)
+ sys.exit(1)
+
+ notes = " ".join(os.path.basename(a) for a in args)
+ ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
+
+ grand_scanned = 0
+ grand_matched = 0
+ grand_rows = 0
+
+ try:
+ for path in args:
+ if path.endswith(".zip"):
+ scanned, matched, rows = process_zip(path, ingest_run_id)
+ grand_scanned += scanned
+ grand_matched += matched
+ grand_rows += rows
+
+ elif path.endswith(".xml"):
+ xml_name = os.path.basename(path)
+ # Count before I/O, so read failures still show up in scanned.
+ grand_scanned += 1
+ try:
+ with open(path, "rb") as f:
+ xml_bytes = f.read()
+ except Exception as e:
+ log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read")
+ print(f"ERROR reading {path}: {e}", file=sys.stderr)
+ continue
+
+ try:
+ rows = process_xml_bytes(
+ xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id
+ )
+ except Exception as e:
+ print(f"ERROR in {path}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ grand_matched += 1
+ grand_rows += rows
+ else:
+ print(f"Skipping unknown file type: {path}", file=sys.stderr)
+
+ finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows)
+ except Exception:
+ fail_ingest_run(ingest_run_id)
+ raise
+
+ print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/parse/irs_990ez.py b/scripts/parse/irs_990ez.py
new file mode 100644
index 0000000..bea4fdd
--- /dev/null
+++ b/scripts/parse/irs_990ez.py
@@ -0,0 +1,449 @@
+"""
+Parse IRS Form 990-EZ XML files into the new raw schema.
+
+Populates: raw.filing, raw.filing_source, raw.form_990ez,
+ raw.schedule_o.
+
+Form 990-EZ has no structured grant recipient table — line 10 of Part I
+aggregates grants into a single amount and instructs the filer to "list
+in Schedule O." v1 scope is preservation (filing facts + Schedule O),
+not grant normalization.
+
+Usage:
+ python -m scripts.parse.irs_990ez data/irs/xml-zips/*.zip
+ python -m scripts.parse.irs_990ez data/irs/xml-missing/202100139349100100_public.xml
+"""
+
+import io
+import os
+import sys
+import zipfile
+
+from lxml import etree
+
+from scripts.common.db import execute_transaction
+from scripts.common.normalize import parse_numeric
+from scripts.common.xml import (
+ NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata,
+)
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.filing import (
+ upsert_raw_filing, record_raw_filing_source,
+)
+
+PARSER_NAME = "parse_irs_990ez"
+SOURCE_SYSTEM = "irs_xml"
+
+# Standalone XML files use this as source_archive
+STANDALONE_ARCHIVE = "__standalone__"
+
+# IRS ReturnTypeCd values that map to Form 990-EZ (includes amended variant).
+FORM_990EZ_RETURN_TYPES = {"990EZ", "990EA"}
+
+SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO"
+
+
+# ============================================================
+# Schedule O narrative extraction
+# ============================================================
+
+def extract_schedule_o(tree):
+ """Extract every SupplementalInformationDetail entry from Schedule O.
+
+ Returns a list of dicts (without raw_filing_id — added in process_filing).
+ line_number is the 1-based ordinal position among emitted rows. Entries
+ with no narrative text in any of the explanation fields are skipped to
+ avoid emitting low-signal placeholder rows.
+ """
+ rows = []
+ sched_o = tree.find(SCHEDULE_O_XPATH)
+ if sched_o is None:
+ return rows
+
+ details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail")
+ for d in details:
+ explanation = (
+ text(d, "irs:ExplanationTxt")
+ or text(d, "irs:MediumExplanationTxt")
+ or text(d, "irs:ShortExplanationTxt")
+ )
+ if explanation is None:
+ continue
+ rows.append({
+ "line_number": len(rows) + 1,
+ "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"),
+ "explanation": explanation,
+ })
+ return rows
+
+
+# ============================================================
+# Form 990-EZ summary extraction
+# ============================================================
+
+def extract_form_990ez(tree):
+ """Extract filing-level summary fields for raw.form_990ez."""
+ ez = f".//{{{NS}}}IRS990EZ"
+
+ # Filer identity from ReturnHeader (with old-schema fallbacks).
+ filer = {
+ "filer_name2": (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2")
+ ),
+ "filer_address_line1": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1")
+ ),
+ "filer_city": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:City")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City")
+ ),
+ "filer_state": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:State")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "filer_zip": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode")
+ ),
+ "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"),
+ "phone": text(tree, ".//irs:Filer/irs:PhoneNum"),
+ "website": text(tree, f"{ez}/irs:WebsiteAddressTxt"),
+ }
+
+ # Classification. The 990-EZ schema puts the 501(c) subsection in an
+ # attribute on Organization501cInd (e.g. <Organization501cInd
+ # organization501cTypeTxt="7"/>), not in element text. A 501(c)(3) filing
+ # instead sets <Organization501c3Ind>X</Organization501c3Ind> with no
+ # separate subsection element — in that case we leave section_501c_type
+ # NULL rather than inferring "3" from the boolean flag.
+ section_501c_type = None
+ el_501c = tree.find(f"{ez}/irs:Organization501cInd", NS_MAP)
+ if el_501c is not None:
+ section_501c_type = el_501c.get("organization501cTypeTxt")
+
+ classification = {
+ "is_501c3": text_bool(tree, f"{ez}/irs:Organization501c3Ind"),
+ "section_501c_type": section_501c_type,
+ "group_exemption_num": text(tree, f"{ez}/irs:GroupExemptionNum"),
+ }
+
+ # Filing status flags
+ status = {
+ "is_amended": text_bool(tree, f"{ez}/irs:AmendedReturnInd"),
+ "is_initial": text_bool(tree, f"{ez}/irs:InitialReturnInd"),
+ "is_final": text_bool(tree, f"{ez}/irs:FinalReturnInd"),
+ }
+
+ # Part I: revenue / expenses
+ part_i = {}
+ part_i_fields = {
+ "gross_receipts": "GrossReceiptsAmt",
+ "contributions_gifts_grants": "ContributionsGiftsGrantsEtcAmt",
+ "program_service_revenue": "ProgramServiceRevenueAmt",
+ "investment_income": "InvestmentIncomeAmt",
+ "total_revenue": "TotalRevenueAmt",
+ "grants_paid": "GrantsAndSimilarAmountsPaidAmt",
+ "salaries_compensation": "SalariesOtherCompEmplBnftAmt",
+ "total_expenses": "TotalExpensesAmt",
+ "revenue_less_expenses": "ExcessOrDeficitForYearAmt",
+ }
+ for col, elem in part_i_fields.items():
+ part_i[col] = parse_numeric(text(tree, f"{ez}/irs:{elem}"))
+
+ # Part II: balance sheet (BOY/EOY child elements under group wrappers).
+ balance_sheet = {
+ "total_assets_boy": parse_numeric(
+ text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:BOYAmt")
+ ),
+ "total_assets_eoy": parse_numeric(
+ text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:EOYAmt")
+ ),
+ "total_liabilities_boy": parse_numeric(
+ text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:BOYAmt")
+ ),
+ "total_liabilities_eoy": parse_numeric(
+ text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:EOYAmt")
+ ),
+ "net_assets_boy": parse_numeric(
+ text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:BOYAmt")
+ ),
+ "net_assets_eoy": parse_numeric(
+ text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:EOYAmt")
+ ),
+ }
+
+ # Schedule O presence
+ sched_o_presence = {
+ "has_schedule_o": tree.find(SCHEDULE_O_XPATH) is not None,
+ }
+
+ # Officer / signer
+ officer = {
+ "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"),
+ "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"),
+ "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"),
+ "preparer_firm": (
+ text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1")
+ ),
+ }
+
+ return {
+ **filer, **classification, **status,
+ **part_i, **balance_sheet,
+ **sched_o_presence, **officer,
+ }
+
+
+# ============================================================
+# Grant detail status
+# ============================================================
+
+def compute_grant_detail_status(form_data, schedule_o_rows):
+ """Determine grant detail completeness for a Form 990-EZ filing.
+
+ 990-EZ has no recipient table to inspect. Line 10 (grants_paid) is an
+ aggregate and grant detail is instructed to be listed in Schedule O, but
+ the raw XML gives us no way to tell that a given Schedule O entry is
+ *the* line-10 narrative rather than unrelated supplemental text. For a
+ provenance-strict raw layer we therefore only report what we can verify
+ from the structured fields:
+
+ - no_grants: line 10 is null or 0 (no grants reported).
+ - unresolved: line 10 is positive. Whether that detail lives in Schedule
+ O, an attachment, or nowhere is left to a downstream classifier.
+
+ The schedule_o_rows argument is intentionally unused; it's kept in the
+ signature so callers (and a future classifier) have a single place to
+ evolve the logic.
+ """
+ del schedule_o_rows # intentionally unused — see docstring
+ grants_paid = form_data.get("grants_paid")
+ if grants_paid is None or float(grants_paid) == 0:
+ return "no_grants"
+ return "unresolved"
+
+
+# ============================================================
+# Per-filing processing
+# ============================================================
+
+SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"]
+
+
+
+def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id):
+ """Process a single Form 990-EZ filing. All child writes are transactional."""
+
+ metadata = extract_filing_metadata(tree)
+ schedule_o_entries = extract_schedule_o(tree)
+ form_data = extract_form_990ez(tree)
+ form_data["grant_detail_status"] = compute_grant_detail_status(
+ form_data, schedule_o_entries,
+ )
+
+ root = tree.getroot()
+ return_header = root.find(f"{{{NS}}}ReturnHeader")
+ return_data = root.find(f"{{{NS}}}ReturnData")
+
+ def _do(conn):
+ raw_filing_id = upsert_raw_filing(
+ SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn
+ )
+ record_raw_filing_source(
+ raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn
+ )
+
+ filing_form_data = {**form_data, "raw_filing_id": raw_filing_id}
+ schedule_o_rows = [
+ {**row, "raw_filing_id": raw_filing_id}
+ for row in schedule_o_entries
+ ]
+
+ _replace_children(
+ conn, raw_filing_id, filing_form_data, schedule_o_rows, xml_rows,
+ )
+
+ # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + xml fields
+ return 3 + len(schedule_o_rows) + len(xml_rows)
+
+ return execute_transaction(_do)
+
+
+def _replace_children(conn, raw_filing_id, form_data, schedule_o_rows):
+ """Delete and re-insert all child rows for a filing using the caller's transaction."""
+ form_columns = list(form_data.keys())
+ form_placeholders = ", ".join(["%s"] * len(form_columns))
+ form_values = [form_data[col] for col in form_columns]
+
+ schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS))
+
+ with conn.cursor() as cur:
+ # Delete old child rows
+ cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,))
+ cur.execute("DELETE FROM raw.form_990ez WHERE raw_filing_id = %s", (raw_filing_id,))
+
+ # Insert form summary
+ cur.execute(
+ f"INSERT INTO raw.form_990ez ({', '.join(form_columns)}) "
+ f"VALUES ({form_placeholders})",
+ form_values,
+ )
+
+ # Insert Schedule O narrative
+ if schedule_o_rows:
+ from psycopg2.extras import execute_batch
+ schedule_o_values = [
+ [row.get(col) for col in SCHEDULE_O_COLUMNS]
+ for row in schedule_o_rows
+ ]
+ execute_batch(
+ cur,
+ f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) "
+ f"VALUES ({schedule_o_placeholders})",
+ schedule_o_values,
+ )
+
+ # Insert XML fields
+
+
+# ============================================================
+# ZIP / file processing
+# ============================================================
+
+def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id):
+ """Parse XML bytes and process if it's a Form 990-EZ. Returns rows inserted or None if skipped."""
+ try:
+ tree = etree.parse(io.BytesIO(xml_bytes))
+ except etree.XMLSyntaxError as e:
+ log_ingest_error(ingest_run_id, source_archive, source_path,
+ f"XML parse error: {e}", stage="parse_xml")
+ return None
+
+ ret_type = text(tree, ".//irs:ReturnTypeCd")
+ if ret_type not in FORM_990EZ_RETURN_TYPES:
+ return None
+
+ source_document_id = None
+ try:
+ source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path)
+ return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id)
+ except Exception as e:
+ stage = "process_filing" if source_document_id else "derive_source_document_id"
+ log_ingest_error(ingest_run_id, source_archive, source_path, e,
+ source_document_id=source_document_id, stage=stage)
+ raise
+
+
+def process_zip(zip_path, ingest_run_id):
+ """Process all XMLs in a ZIP file."""
+ basename = os.path.basename(zip_path)
+ try:
+ zf = zipfile.ZipFile(zip_path)
+ except zipfile.BadZipFile as e:
+ log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip")
+ print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr)
+ return 0, 0, 0
+
+ with zf:
+ names = [n for n in zf.namelist() if n.endswith(".xml")]
+
+ print(f"Processing {basename}: {len(names)} XML files")
+ files_scanned = 0
+ files_matched = 0
+ total_rows = 0
+
+ for i, name in enumerate(names):
+ # Count every ZIP member as scanned, even ones we fail to read —
+ # otherwise read failures silently shrink the scanned total and
+ # make run-level metrics misleading.
+ files_scanned += 1
+ try:
+ xml_bytes = zf.read(name)
+ except Exception as e:
+ log_ingest_error(ingest_run_id, basename, name, e, stage="read")
+ continue
+
+ try:
+ rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id)
+ except Exception as e:
+ print(f" ERROR in {name}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ files_matched += 1
+ total_rows += rows
+
+ if (i + 1) % 1000 == 0:
+ print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows")
+
+ print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows")
+ return files_scanned, files_matched, total_rows
+
+
+def main():
+ args = sys.argv[1:]
+ if not args:
+ print(f"Usage: python -m scripts.parse.irs_990ez <zip_or_xml_files...>", file=sys.stderr)
+ sys.exit(1)
+
+ notes = " ".join(os.path.basename(a) for a in args)
+ ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
+
+ grand_scanned = 0
+ grand_matched = 0
+ grand_rows = 0
+
+ try:
+ for path in args:
+ if path.endswith(".zip"):
+ scanned, matched, rows = process_zip(path, ingest_run_id)
+ grand_scanned += scanned
+ grand_matched += matched
+ grand_rows += rows
+
+ elif path.endswith(".xml"):
+ xml_name = os.path.basename(path)
+ # Count before I/O, so read failures still show up in scanned.
+ grand_scanned += 1
+ try:
+ with open(path, "rb") as f:
+ xml_bytes = f.read()
+ except Exception as e:
+ log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read")
+ print(f"ERROR reading {path}: {e}", file=sys.stderr)
+ continue
+
+ try:
+ rows = process_xml_bytes(
+ xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id
+ )
+ except Exception as e:
+ print(f"ERROR in {path}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ grand_matched += 1
+ grand_rows += rows
+ else:
+ print(f"Skipping unknown file type: {path}", file=sys.stderr)
+
+ finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows)
+ except Exception:
+ fail_ingest_run(ingest_run_id)
+ raise
+
+ print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/parse/irs_990pf.py b/scripts/parse/irs_990pf.py
new file mode 100644
index 0000000..3d245b8
--- /dev/null
+++ b/scripts/parse/irs_990pf.py
@@ -0,0 +1,544 @@
+"""
+Parse IRS 990-PF XML files into the new raw schema.
+
+Populates: raw.filing, raw.filing_source, raw.form_990pf,
+ raw.grant_990pf.
+
+Usage:
+ python -m scripts.parse.irs_990pf data/irs/xml-zips/*.zip
+ python -m scripts.parse.irs_990pf data/irs/xml-missing/202100139349100100_public.xml
+"""
+
+import io
+import os
+import sys
+import zipfile
+
+from lxml import etree
+
+from scripts.common.db import execute_transaction
+from scripts.common.normalize import parse_numeric, is_placeholder
+from scripts.common.xml import (
+ NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata,
+)
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.filing import (
+ upsert_raw_filing, record_raw_filing_source,
+)
+
+PARSER_NAME = "parse_irs_990pf"
+SOURCE_SYSTEM = "irs_xml"
+
+# Standalone XML files use this as source_archive
+STANDALONE_ARCHIVE = "__standalone__"
+
+# Grant element XPaths — three schema variants across IRS versions
+GRANT_XPATHS = [
+ f".//{{{NS}}}SupplementaryInformationGrp/{{{NS}}}GrantOrContributionPdDurYrGrp",
+ f".//{{{NS}}}SupplementaryInformation/{{{NS}}}GrantOrContriPaidDuringYear",
+ f".//{{{NS}}}SupplementaryInfomation/{{{NS}}}GrantOrContriPaidDuringYear", # IRS typo
+]
+
+
+# ============================================================
+# Grant extraction
+# ============================================================
+
+def extract_grant(g, line_number):
+ """Extract a single grant row from a grant XML element.
+
+ Child element names vary across IRS schema versions, so each field
+ tries the modern tag first, then falls back to the older variant.
+ """
+ if len(g) == 0:
+ return None
+
+ amount_raw = (
+ text(g, "irs:Amt")
+ or text(g, "irs:Amount")
+ )
+
+ return {
+ "line_number": line_number,
+ "recipient_name": (
+ text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt")
+ or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1")
+ ),
+ "recipient_name2": (
+ text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt")
+ or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2")
+ ),
+ "recipient_person_name": (
+ text(g, "irs:RecipientPersonNm")
+ or text(g, "irs:RecipientPersonName")
+ ),
+ "address_line1": (
+ text(g, "irs:RecipientUSAddress/irs:AddressLine1Txt")
+ or text(g, "irs:RecipientUSAddress/irs:AddressLine1")
+ or text(g, "irs:RecipientForeignAddress/irs:AddressLine1Txt")
+ or text(g, "irs:RecipientForeignAddress/irs:AddressLine1")
+ ),
+ "address_line2": (
+ text(g, "irs:RecipientUSAddress/irs:AddressLine2Txt")
+ or text(g, "irs:RecipientUSAddress/irs:AddressLine2")
+ or text(g, "irs:RecipientForeignAddress/irs:AddressLine2Txt")
+ or text(g, "irs:RecipientForeignAddress/irs:AddressLine2")
+ ),
+ "city": (
+ text(g, "irs:RecipientUSAddress/irs:CityNm")
+ or text(g, "irs:RecipientUSAddress/irs:City")
+ or text(g, "irs:RecipientForeignAddress/irs:CityNm")
+ or text(g, "irs:RecipientForeignAddress/irs:City")
+ ),
+ "state": (
+ text(g, "irs:RecipientUSAddress/irs:StateAbbreviationCd")
+ or text(g, "irs:RecipientUSAddress/irs:State")
+ or text(g, "irs:RecipientForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "zip": (
+ text(g, "irs:RecipientUSAddress/irs:ZIPCd")
+ or text(g, "irs:RecipientUSAddress/irs:ZIPCode")
+ ),
+ "country": text(g, "irs:RecipientForeignAddress/irs:CountryCd"),
+ "foreign_postal_code": text(g, "irs:RecipientForeignAddress/irs:ForeignPostalCd"),
+ "amount_raw": amount_raw,
+ "amount": parse_numeric(amount_raw),
+ "purpose": (
+ text(g, "irs:GrantOrContributionPurposeTxt")
+ or text(g, "irs:PurposeOfGrantOrContriTxt")
+ ),
+ "foundation_status": (
+ text(g, "irs:RecipientFoundationStatusTxt")
+ or text(g, "irs:RecipientFoundationStatusCd")
+ ),
+ "relationship": (
+ text(g, "irs:RecipientRelationshipTxt")
+ or text(g, "irs:RecipientRelationship")
+ ),
+ }
+
+
+def find_all_grants(tree):
+ """Find all grant elements across schema variants."""
+ grants = []
+ for xpath in GRANT_XPATHS:
+ grants.extend(tree.findall(xpath))
+ return grants
+
+
+# ============================================================
+# Form 990-PF summary extraction
+# ============================================================
+
+def extract_form_990pf(tree):
+ """Extract filing-level summary fields for raw_form_990pf."""
+ pf = f".//{{{NS}}}IRS990PF"
+ a = f"{pf}/{{{NS}}}AnalysisOfRevenueAndExpenses"
+ bs = f"{pf}/{{{NS}}}Form990PFBalanceSheetsGrp"
+ si = f"{pf}/{{{NS}}}SupplementaryInformationGrp"
+ sa = f"{pf}/{{{NS}}}StatementsRegardingActyGrp"
+
+ # Filer address from ReturnHeader. Each field tries the modern (*Txt /
+ # *Cd / CityNm / StateAbbreviationCd / ZIPCd) tag first and falls back
+ # to the old-schema variant used by older filings.
+ filer_addr = {
+ "filer_name2": (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2")
+ ),
+ "filer_address_line1": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1")
+ ),
+ "filer_address_line2": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2")
+ ),
+ "filer_city": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:City")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City")
+ ),
+ "filer_state": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:State")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "filer_zip": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode")
+ ),
+ "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"),
+ "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"),
+ "phone": text(tree, ".//irs:Filer/irs:PhoneNum"),
+ }
+
+ # Classification. text_bool preserves None (tag absent) vs False
+ # (tag present but not ticked); the accounting_method scan treats both
+ # the same because we're looking for the first truthy branch.
+ method_cash = text_bool(tree, f"{pf}/irs:MethodOfAccountingCashInd")
+ method_accrual = text_bool(tree, f"{pf}/irs:MethodOfAccountingAccrualInd")
+
+ classification = {
+ "is_501c3_pf": text_bool(tree, f"{pf}/irs:Organization501c3ExemptPFInd"),
+ "is_4947a1_trust": text_bool(tree, f"{pf}/irs:Organization4947a1TrtdPFInd"),
+ "is_private_operating": text_bool(tree, f"{sa}/irs:PrivateOperatingFoundationInd"),
+ "accounting_method": (
+ "cash" if method_cash
+ else ("accrual" if method_accrual else None)
+ ),
+ }
+
+ # Filing status. is_initial is true when either of two mutually-exclusive
+ # indicator tags is set; we preserve None when both tags are absent.
+ initial_std = text_bool(tree, f"{pf}/irs:InitialReturnInd")
+ initial_former = text_bool(tree, f"{pf}/irs:InitialReturnFormerPubChrtyInd")
+ if initial_std is None and initial_former is None:
+ is_initial = None
+ else:
+ is_initial = bool(initial_std) or bool(initial_former)
+
+ status = {
+ "is_amended": text_bool(tree, f"{pf}/irs:AmendedReturnInd"),
+ "is_initial": is_initial,
+ "is_final": text_bool(tree, f"{pf}/irs:FinalReturnInd"),
+ }
+
+ # Part I: revenue and expenses
+ part_i = {}
+ part_i_fields = {
+ "contributions_received": f"{a}/irs:ContriRcvdRevAndExpnssAmt",
+ "interest_revenue": f"{a}/irs:InterestOnSavRevAndExpnssAmt",
+ "dividends_revenue": f"{a}/irs:DividendsRevAndExpnssAmt",
+ "net_gain_sale_assets": f"{a}/irs:NetGainSaleAstRevAndExpnssAmt",
+ "total_revenue": f"{a}/irs:TotalRevAndExpnssAmt",
+ "total_net_investment_income": f"{a}/irs:TotalNetInvstIncmAmt",
+ "compensation_officers": f"{a}/irs:CompOfcrDirTrstRevAndExpnssAmt",
+ "total_operating_expenses": f"{a}/irs:TotOprExpensesRevAndExpnssAmt",
+ "contributions_paid": f"{a}/irs:ContriPaidRevAndExpnssAmt",
+ "total_expenses": f"{a}/irs:TotalExpensesRevAndExpnssAmt",
+ "total_charitable_disbursements": f"{a}/irs:TotalExpensesDsbrsChrtblAmt",
+ "excess_revenue_over_expenses": f"{a}/irs:ExcessRevenueOverExpensesAmt",
+ "net_investment_income": f"{a}/irs:NetInvestmentIncomeAmt",
+ "adjusted_net_income": f"{a}/irs:AdjustedNetIncomeAmt",
+ }
+ for col, xpath in part_i_fields.items():
+ part_i[col] = parse_numeric(text(tree, xpath))
+
+ # Part II: balance sheets
+ part_ii = {}
+ part_ii_fields = {
+ "total_assets_boy": f"{bs}/irs:TotalAssetsBOYAmt",
+ "total_assets_eoy": f"{bs}/irs:TotalAssetsEOYAmt",
+ "total_assets_eoy_fmv": f"{bs}/irs:TotalAssetsEOYFMVAmt",
+ "total_liabilities_boy": f"{bs}/irs:TotalLiabilitiesBOYAmt",
+ "total_liabilities_eoy": f"{bs}/irs:TotalLiabilitiesEOYAmt",
+ "net_assets_boy": f"{bs}/irs:TotNetAstOrFundBalancesBOYAmt",
+ "net_assets_eoy": f"{bs}/irs:TotNetAstOrFundBalancesEOYAmt",
+ "fmv_assets_eoy": f"{pf}/irs:FMVAssetsEOYAmt",
+ }
+ for col, xpath in part_ii_fields.items():
+ part_ii[col] = parse_numeric(text(tree, xpath))
+
+ # Parts X-XII
+ dist = {
+ "minimum_investment_return": parse_numeric(
+ text(tree, f"{pf}/irs:MinimumInvestmentReturnGrp/irs:MinimumInvestmentReturnAmt")
+ ),
+ "distributable_amount": parse_numeric(
+ text(tree, f"{pf}/irs:DistributableAmountGrp/irs:DistributableAsAdjustedAmt")
+ ),
+ "qualifying_distributions": parse_numeric(
+ text(tree, f"{pf}/irs:QualifyingDistriPartXIIGrp/irs:QualifyingDistributionsAmt")
+ ),
+ "excise_tax_amount": parse_numeric(
+ text(tree, f"{pf}/irs:ExciseTaxBasedOnInvstIncmGrp/irs:InvestmentIncomeExciseTaxAmt")
+ ),
+ }
+
+ # Part XV totals
+ xv = {
+ "total_grants_paid": parse_numeric(
+ text(tree, f"{si}/irs:TotalGrantOrContriPdDurYrAmt")
+ ),
+ "total_grants_approved_future": parse_numeric(
+ text(tree, f"{si}/irs:TotalGrantOrContriApprvFutAmt")
+ ),
+ }
+
+ # Misc
+ misc = {
+ "website": text(tree, f"{sa}/irs:WebsiteAddressTxt"),
+ "state_of_registration": text(tree, f"{sa}/irs:OrgReportOrRegisterStateCd"),
+ }
+
+ # Officer / signer
+ officer = {
+ "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"),
+ "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"),
+ "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"),
+ "preparer_firm": (
+ text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1")
+ ),
+ }
+
+ return {**filer_addr, **classification, **status, **part_i, **part_ii, **dist, **xv, **misc, **officer}
+
+
+# ============================================================
+# Grant detail status
+# ============================================================
+
+def compute_grant_detail_status(grant_elements, grant_rows):
+ """Determine grant detail completeness status."""
+ if not grant_elements:
+ return "no_grants"
+ if not grant_rows:
+ return "unresolved"
+
+ placeholder_count = sum(
+ 1 for r in grant_rows
+ if is_placeholder(r.get("recipient_name"))
+ or is_placeholder(r.get("amount_raw"))
+ )
+
+ if placeholder_count == len(grant_rows):
+ return "placeholder_only"
+ if placeholder_count > 0:
+ return "see_attached"
+ return "complete"
+
+
+# ============================================================
+# Per-filing processing
+# ============================================================
+
+GRANT_COLUMNS = [
+ "raw_filing_id", "line_number",
+ "recipient_name", "recipient_name2", "recipient_person_name",
+ "address_line1", "address_line2", "city", "state", "zip",
+ "country", "foreign_postal_code",
+ "amount_raw", "amount", "purpose", "foundation_status", "relationship",
+]
+
+
+
+def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id):
+ """Process a single 990-PF filing. All child writes are transactional."""
+
+ # Extract filing metadata
+ metadata = extract_filing_metadata(tree)
+
+ # Extract grants
+ grant_elements = find_all_grants(tree)
+ extracted_grants = []
+ for i, g in enumerate(grant_elements, start=1):
+ row = extract_grant(g, i)
+ if row is not None:
+ extracted_grants.append(row)
+
+ # Extract form summary
+ form_data = extract_form_990pf(tree)
+ form_data["grant_detail_status"] = compute_grant_detail_status(grant_elements, extracted_grants)
+
+ root = tree.getroot()
+ return_header = root.find(f"{{{NS}}}ReturnHeader")
+ return_data = root.find(f"{{{NS}}}ReturnData")
+
+ def _do(conn):
+ raw_filing_id = upsert_raw_filing(
+ SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn
+ )
+ record_raw_filing_source(
+ raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn
+ )
+
+ filing_form_data = {**form_data, "raw_filing_id": raw_filing_id}
+ grant_rows = [
+ {**row, "raw_filing_id": raw_filing_id}
+ for row in extracted_grants
+ ]
+
+ _replace_children(conn, raw_filing_id, filing_form_data, grant_rows, xml_rows)
+
+ # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants + xml fields
+ return 3 + len(grant_rows) + len(xml_rows)
+
+ return execute_transaction(_do)
+
+
+def _replace_children(conn, raw_filing_id, form_data, grant_rows, xml_rows):
+ """Delete and re-insert all child rows for a filing using the caller's transaction."""
+ form_columns = list(form_data.keys())
+ form_placeholders = ", ".join(["%s"] * len(form_columns))
+ form_values = [form_data[col] for col in form_columns]
+
+ grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS))
+
+
+ with conn.cursor() as cur:
+ # Delete old child rows
+ cur.execute("DELETE FROM raw.grant_990pf WHERE raw_filing_id = %s", (raw_filing_id,))
+ cur.execute("DELETE FROM raw.form_990pf WHERE raw_filing_id = %s", (raw_filing_id,))
+
+ # Insert form summary
+ cur.execute(
+ f"INSERT INTO raw.form_990pf ({', '.join(form_columns)}) "
+ f"VALUES ({form_placeholders})",
+ form_values,
+ )
+
+ # Insert grants
+ if grant_rows:
+ from psycopg2.extras import execute_batch
+ grant_values = [
+ [row.get(col) for col in GRANT_COLUMNS]
+ for row in grant_rows
+ ]
+ execute_batch(
+ cur,
+ f"INSERT INTO raw.grant_990pf ({', '.join(GRANT_COLUMNS)}) "
+ f"VALUES ({grant_placeholders})",
+ grant_values,
+ )
+
+ # Insert XML fields
+
+
+# ============================================================
+# ZIP / file processing
+# ============================================================
+
+def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id):
+ """Parse XML bytes and process if it's a 990-PF. Returns rows inserted or None if skipped."""
+ try:
+ tree = etree.parse(io.BytesIO(xml_bytes))
+ except etree.XMLSyntaxError as e:
+ log_ingest_error(ingest_run_id, source_archive, source_path,
+ f"XML parse error: {e}", stage="parse_xml")
+ return None
+
+ ret_type = text(tree, ".//irs:ReturnTypeCd")
+ if ret_type != "990PF":
+ return None
+
+ source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path)
+ try:
+ return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id)
+ except Exception as e:
+ log_ingest_error(ingest_run_id, source_archive, source_path, e,
+ source_document_id=source_document_id, stage="process_filing")
+ raise
+
+
+def process_zip(zip_path, ingest_run_id):
+ """Process all XMLs in a ZIP file."""
+ basename = os.path.basename(zip_path)
+ try:
+ zf = zipfile.ZipFile(zip_path)
+ except zipfile.BadZipFile as e:
+ log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip")
+ print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr)
+ return 0, 0, 0
+
+ with zf:
+ names = [n for n in zf.namelist() if n.endswith(".xml")]
+
+ print(f"Processing {basename}: {len(names)} XML files")
+ files_scanned = 0
+ files_matched = 0
+ total_rows = 0
+
+ for i, name in enumerate(names):
+ # Count every ZIP member as scanned, even ones we fail to read —
+ # otherwise read failures silently shrink the scanned total and
+ # make run-level metrics misleading.
+ files_scanned += 1
+ try:
+ xml_bytes = zf.read(name)
+ except Exception as e:
+ log_ingest_error(ingest_run_id, basename, name, e, stage="read")
+ continue
+
+ try:
+ rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id)
+ except Exception as e:
+ print(f" ERROR in {name}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ files_matched += 1
+ total_rows += rows
+
+ if (i + 1) % 1000 == 0:
+ print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows")
+
+ print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows")
+ return files_scanned, files_matched, total_rows
+
+
+def main():
+ args = sys.argv[1:]
+ if not args:
+ print(f"Usage: python -m scripts.parse.irs_990pf <zip_or_xml_files...>", file=sys.stderr)
+ sys.exit(1)
+
+ notes = " ".join(os.path.basename(a) for a in args)
+ ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
+
+ grand_scanned = 0
+ grand_matched = 0
+ grand_rows = 0
+
+ try:
+ for path in args:
+ if path.endswith(".zip"):
+ scanned, matched, rows = process_zip(path, ingest_run_id)
+ grand_scanned += scanned
+ grand_matched += matched
+ grand_rows += rows
+
+ elif path.endswith(".xml"):
+ xml_name = os.path.basename(path)
+ # Count before I/O, so read failures still show up in scanned.
+ grand_scanned += 1
+ try:
+ with open(path, "rb") as f:
+ xml_bytes = f.read()
+ except Exception as e:
+ log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read")
+ print(f"ERROR reading {path}: {e}", file=sys.stderr)
+ continue
+
+ try:
+ rows = process_xml_bytes(
+ xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id
+ )
+ except Exception as e:
+ print(f"ERROR in {path}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ grand_matched += 1
+ grand_rows += rows
+ else:
+ print(f"Skipping unknown file type: {path}", file=sys.stderr)
+
+ finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows)
+ except Exception:
+ fail_ingest_run(ingest_run_id)
+ raise
+
+ print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/parse/irs_bmf.py b/scripts/parse/irs_bmf.py
new file mode 100644
index 0000000..cf02575
--- /dev/null
+++ b/scripts/parse/irs_bmf.py
@@ -0,0 +1,231 @@
+"""
+Load IRS Business Master File (BMF) CSVs into raw.bmf.
+
+Usage:
+ python -m scripts.parse.irs_bmf
+ python -m scripts.parse.irs_bmf data/irs/bmf/eo1.csv data/irs/bmf/eo2.csv
+"""
+
+import csv
+import os
+import sys
+
+from psycopg2.extras import execute_batch
+
+from scripts.common.db import execute_transaction
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.normalize import normalize_ein
+
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "..", "data", "irs", "bmf")
+PARSER_NAME = "load_raw_bmf"
+SOURCE_SYSTEM = "irs_bmf"
+
+BMF_COLUMNS = [
+ "ein",
+ "name",
+ "ico",
+ "street",
+ "city",
+ "state",
+ "zip",
+ "grp",
+ "subsection",
+ "affiliation",
+ "classification",
+ "ruling",
+ "deductibility",
+ "foundation",
+ "activity",
+ "organization",
+ "status",
+ "tax_period",
+ "asset_cd",
+ "income_cd",
+ "filing_req_cd",
+ "pf_filing_req_cd",
+ "acct_pd",
+ "asset_amt",
+ "income_amt",
+ "revenue_amt",
+ "ntee_cd",
+ "sort_name",
+ "source_file",
+ "ingest_run_id",
+]
+
+
+def parse_bigint(value):
+ if value is None:
+ return None
+ s = str(value).strip()
+ if not s:
+ return None
+ return int(s)
+
+
+def row_to_record(row, source_file, ingest_run_id):
+ ein = normalize_ein(row.get("EIN"))
+ if not ein:
+ raise ValueError(f"Cannot normalize EIN: {row.get('EIN')!r}")
+
+ return {
+ "ein": ein,
+ "name": row.get("NAME") or None,
+ "ico": row.get("ICO") or None,
+ "street": row.get("STREET") or None,
+ "city": row.get("CITY") or None,
+ "state": row.get("STATE") or None,
+ "zip": row.get("ZIP") or None,
+ "grp": row.get("GROUP") or None,
+ "subsection": row.get("SUBSECTION") or None,
+ "affiliation": row.get("AFFILIATION") or None,
+ "classification": row.get("CLASSIFICATION") or None,
+ "ruling": row.get("RULING") or None,
+ "deductibility": row.get("DEDUCTIBILITY") or None,
+ "foundation": row.get("FOUNDATION") or None,
+ "activity": row.get("ACTIVITY") or None,
+ "organization": row.get("ORGANIZATION") or None,
+ "status": row.get("STATUS") or None,
+ "tax_period": row.get("TAX_PERIOD") or None,
+ "asset_cd": row.get("ASSET_CD") or None,
+ "income_cd": row.get("INCOME_CD") or None,
+ "filing_req_cd": row.get("FILING_REQ_CD") or None,
+ "pf_filing_req_cd": row.get("PF_FILING_REQ_CD") or None,
+ "acct_pd": row.get("ACCT_PD") or None,
+ "asset_amt": parse_bigint(row.get("ASSET_AMT")),
+ "income_amt": parse_bigint(row.get("INCOME_AMT")),
+ "revenue_amt": parse_bigint(row.get("REVENUE_AMT")),
+ "ntee_cd": row.get("NTEE_CD") or None,
+ "sort_name": row.get("SORT_NAME") or None,
+ "source_file": source_file,
+ "ingest_run_id": ingest_run_id,
+ }
+
+
+def upsert_records(records):
+ if not records:
+ return 0
+
+ placeholders = ", ".join(["%s"] * len(BMF_COLUMNS))
+ sql = (
+ f"INSERT INTO raw.bmf ({', '.join(BMF_COLUMNS)}) "
+ f"VALUES ({placeholders}) "
+ "ON CONFLICT (ein) DO UPDATE SET "
+ "name = EXCLUDED.name, "
+ "ico = EXCLUDED.ico, "
+ "street = EXCLUDED.street, "
+ "city = EXCLUDED.city, "
+ "state = EXCLUDED.state, "
+ "zip = EXCLUDED.zip, "
+ "grp = EXCLUDED.grp, "
+ "subsection = EXCLUDED.subsection, "
+ "affiliation = EXCLUDED.affiliation, "
+ "classification = EXCLUDED.classification, "
+ "ruling = EXCLUDED.ruling, "
+ "deductibility = EXCLUDED.deductibility, "
+ "foundation = EXCLUDED.foundation, "
+ "activity = EXCLUDED.activity, "
+ "organization = EXCLUDED.organization, "
+ "status = EXCLUDED.status, "
+ "tax_period = EXCLUDED.tax_period, "
+ "asset_cd = EXCLUDED.asset_cd, "
+ "income_cd = EXCLUDED.income_cd, "
+ "filing_req_cd = EXCLUDED.filing_req_cd, "
+ "pf_filing_req_cd = EXCLUDED.pf_filing_req_cd, "
+ "acct_pd = EXCLUDED.acct_pd, "
+ "asset_amt = EXCLUDED.asset_amt, "
+ "income_amt = EXCLUDED.income_amt, "
+ "revenue_amt = EXCLUDED.revenue_amt, "
+ "ntee_cd = EXCLUDED.ntee_cd, "
+ "sort_name = EXCLUDED.sort_name, "
+ "source_file = EXCLUDED.source_file, "
+ "ingest_run_id = EXCLUDED.ingest_run_id"
+ )
+
+ values = [[record.get(col) for col in BMF_COLUMNS] for record in records]
+
+ def _do(conn):
+ with conn.cursor() as cur:
+ execute_batch(cur, sql, values, page_size=1000)
+ return len(values)
+
+ return execute_transaction(_do)
+
+
+def discover_files(args):
+ if args:
+ return args
+ return sorted(
+ os.path.join(DATA_DIR, name)
+ for name in os.listdir(DATA_DIR)
+ if name.endswith(".csv")
+ )
+
+
+def main():
+ files = discover_files(sys.argv[1:])
+ if not files:
+ print(f"No CSV files found in {DATA_DIR}", file=sys.stderr)
+ sys.exit(1)
+
+ notes = " ".join(os.path.basename(path) for path in files)
+ ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
+
+ files_scanned = 0
+ files_matched = 0
+ rows_inserted = 0
+
+ try:
+ for csv_path in files:
+ basename = os.path.basename(csv_path)
+ files_scanned += 1
+ print(f"Loading {basename}...")
+
+ batch = []
+ file_rows = 0
+ file_errors = 0
+
+ with open(csv_path, newline="", encoding="utf-8-sig") as f:
+ reader = csv.DictReader(f)
+ for line_number, row in enumerate(reader, start=2):
+ try:
+ batch.append(row_to_record(row, basename, ingest_run_id))
+ except Exception as e:
+ file_errors += 1
+ log_ingest_error(
+ ingest_run_id,
+ basename,
+ f"line:{line_number}",
+ e,
+ source_document_id=row.get("EIN"),
+ stage="normalize_row",
+ )
+ continue
+
+ if len(batch) >= 5000:
+ rows_inserted += upsert_records(batch)
+ file_rows += len(batch)
+ batch = []
+
+ if batch:
+ rows_inserted += upsert_records(batch)
+ file_rows += len(batch)
+
+ files_matched += 1
+ print(f" {basename}: {file_rows:,} rows loaded, {file_errors} errors")
+
+ finish_ingest_run(ingest_run_id, files_scanned, files_matched, rows_inserted)
+ except Exception:
+ fail_ingest_run(ingest_run_id)
+ raise
+
+ print(
+ f"Done. {files_scanned} files scanned, "
+ f"{files_matched} files loaded, {rows_inserted:,} rows upserted."
+ )
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/seed.py b/scripts/seed.py
new file mode 100644
index 0000000..72c12bd
--- /dev/null
+++ b/scripts/seed.py
@@ -0,0 +1,81 @@
+"""
+Seed raw-layer reference tables with initial values.
+
+Idempotent — safe to run repeatedly. Uses ON CONFLICT DO UPDATE
+so display_name/notes stay current if you change them here.
+
+Usage: python scripts/seed.py
+"""
+
+import sys
+import os
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+from scripts.common.db import execute
+
+
+def seed_source_system():
+ execute("""
+ INSERT INTO raw.source_system (code, display_name, url, notes) VALUES
+ ('irs_xml', 'IRS Bulk XML E-Files', 'https://www.irs.gov/charities-non-profits/form-990-series-downloads', NULL),
+ ('irs_pdf', 'IRS PDF Filings', NULL, NULL),
+ ('irs_bmf', 'IRS EO BMF Extract', 'https://www.irs.gov/charities-non-profits/exempt-organizations-business-master-file-extract-eo-bmf', NULL),
+ ('ny_ag', 'New York Attorney General', 'https://www.charitiesnys.com/RegistrySearch/search_charities.jsp', NULL),
+ ('ca_ag', 'California Attorney General', 'https://rct.doj.ca.gov/Verification/Web/Search.aspx', NULL),
+ ('fl_ag', 'Florida Charities', NULL, NULL),
+ ('tx_ag', 'Texas Secretary of State', NULL, NULL),
+ ('nj_ag', 'New Jersey Charities', NULL, NULL),
+ ('il_ag', 'Illinois Attorney General', NULL, NULL),
+ ('pa_ag', 'Pennsylvania Charities', NULL, NULL),
+ ('tn_ag', 'Tennessee Secretary of State', NULL, NULL)
+ ON CONFLICT (code) DO UPDATE SET
+ display_name = EXCLUDED.display_name,
+ url = EXCLUDED.url;
+ """)
+
+
+def seed_form_type():
+ execute("""
+ INSERT INTO raw.form_type (code, display_name, notes) VALUES
+ ('990', 'Form 990', NULL),
+ ('990PF', 'Form 990-PF', NULL),
+ ('990EZ', 'Form 990-EZ', NULL),
+ ('990O', 'Form 990-O', NULL),
+ ('990T', 'Form 990-T', NULL),
+ ('990A', 'Form 990 (Amended)', NULL),
+ ('990PA', 'Form 990-PF (Amended)', NULL),
+ ('990EA', 'Form 990-EZ (Amended)', NULL)
+ ON CONFLICT (code) DO UPDATE SET
+ display_name = EXCLUDED.display_name;
+ """)
+
+
+def seed_grant_detail_status():
+ execute("""
+ INSERT INTO raw.grant_detail_status (code, display_name, notes) VALUES
+ ('complete', 'Complete', 'All grants are in raw grant rows'),
+ ('see_attached', 'See Attached', 'Filing says SEE ATTACHED for grant detail'),
+ ('schedule_o', 'Schedule O', 'Grant detail is in Schedule O narrative'),
+ ('placeholder_only', 'Placeholder Only', 'Only placeholder rows (VARIOUS, SEE STATEMENT, etc.)'),
+ ('no_grants', 'No Grants', 'Filing has no grant schedule or zero grants reported'),
+ ('unresolved', 'Unresolved', 'Completeness not yet determined'),
+ ('supplemented', 'Supplemented', 'Detail recovered from supplemental source (PDF, state AG)')
+ ON CONFLICT (code) DO UPDATE SET
+ display_name = EXCLUDED.display_name,
+ notes = EXCLUDED.notes;
+ """)
+
+
+def main():
+ print("Seeding reference tables...")
+ seed_source_system()
+ print(" source_system: done")
+ seed_form_type()
+ print(" form_type: done")
+ seed_grant_detail_status()
+ print(" grant_detail_status: done")
+ print("All reference tables seeded.")
+
+
+if __name__ == "__main__":
+ main()