diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to '')
| -rw-r--r-- | migrations/001_raw_schema.sql | 535 | ||||
| -rw-r--r-- | requirements.txt | 6 | ||||
| -rw-r--r-- | scripts/common/__init__.py | 28 | ||||
| -rw-r--r-- | scripts/common/db.py | 198 | ||||
| -rw-r--r-- | scripts/common/filing.py | 93 | ||||
| -rw-r--r-- | scripts/common/ingest.py | 57 | ||||
| -rw-r--r-- | scripts/common/normalize.py | 120 | ||||
| -rw-r--r-- | scripts/common/xml.py | 129 | ||||
| -rw-r--r-- | scripts/extract/__init__.py | 0 | ||||
| -rw-r--r-- | scripts/extract/irs_990_pdf.py | 699 | ||||
| -rw-r--r-- | scripts/parse/__init__.py | 0 | ||||
| -rw-r--r-- | scripts/parse/irs_990.py | 691 | ||||
| -rw-r--r-- | scripts/parse/irs_990ez.py | 449 | ||||
| -rw-r--r-- | scripts/parse/irs_990pf.py | 544 | ||||
| -rw-r--r-- | scripts/parse/irs_bmf.py | 231 | ||||
| -rw-r--r-- | scripts/seed.py | 81 |
16 files changed, 3861 insertions, 0 deletions
diff --git a/migrations/001_raw_schema.sql b/migrations/001_raw_schema.sql new file mode 100644 index 0000000..6661de7 --- /dev/null +++ b/migrations/001_raw_schema.sql @@ -0,0 +1,535 @@ +-- Migration 001: Raw ingestion schema +-- +-- Design principles: +-- - Every row traces back to: which ingest run, which source system, +-- which document, and which filing. +-- - raw.filing is the dedup anchor: one row per unique filing per source system. +-- - Per-form tables (raw.form_990, raw.form_990pf, raw.form_990ez) hold +-- filing-level summary fields, 1:1 with raw.filing. +-- - Grant tables stay split by form type (different column sets). +-- - raw.filing_source tracks every place a filing was seen (which archive, which path). +-- - IRS manifest data lives in its own table for completeness auditing. +-- - Nothing here touches canonicalization — that's a later layer. +-- +-- Parser conventions (EIN normalization, source_document_id derivation, +-- line_number stability, amount parsing, placeholder detection) are +-- enforced in code at scripts/common/, not in documentation. + +BEGIN; + +CREATE SCHEMA IF NOT EXISTS raw; + +-- ============================================================ +-- 1. Reference tables +-- ============================================================ + +-- Data sources. Add rows as new sources come online — no schema changes needed. +CREATE TABLE IF NOT EXISTS raw.source_system ( + code TEXT PRIMARY KEY, -- e.g. 'irs_xml', 'ny_ag' + display_name TEXT NOT NULL, + url TEXT, -- home page or API docs + notes TEXT +); + +-- IRS form types. Includes amended and other variants. +-- New form types from future sources get a row here, not a schema change. +CREATE TABLE IF NOT EXISTS raw.form_type ( + code TEXT PRIMARY KEY, -- e.g. '990', '990PF' + display_name TEXT NOT NULL, + notes TEXT +); + +-- Grant detail completeness status. +-- Tracks whether parsed grant rows fully represent the filing's grants. +CREATE TABLE IF NOT EXISTS raw.grant_detail_status ( + code TEXT PRIMARY KEY, + display_name TEXT NOT NULL, + notes TEXT +); + +-- Reference tables are seeded by scripts/seed.py, not by this migration. + +-- ============================================================ +-- 2. Ingest tracking +-- ============================================================ + +-- One row per invocation of a parser/loader script. +CREATE TABLE IF NOT EXISTS raw.ingest_run ( + id SERIAL PRIMARY KEY, + parser TEXT NOT NULL, -- e.g. 'parse_990pf', 'parse_990_schedule_i' + source_system TEXT NOT NULL REFERENCES raw.source_system(code), + started_at TIMESTAMPTZ NOT NULL DEFAULT now(), + finished_at TIMESTAMPTZ, + status TEXT NOT NULL DEFAULT 'running' + CHECK (status IN ('running', 'done', 'error')), + files_scanned INT, + files_matched INT, + rows_inserted INT, + notes TEXT -- free-form (CLI args, etc.) +); + +-- Per-file errors during ingestion. +CREATE TABLE IF NOT EXISTS raw.ingest_error ( + id SERIAL PRIMARY KEY, + ingest_run_id INT REFERENCES raw.ingest_run(id), + source_archive TEXT, -- zip filename or equivalent container + source_path TEXT, -- file within archive, or standalone file path + source_document_id TEXT, -- parsed document id, if available at error time + stage TEXT, -- processing stage: 'read', 'parse_xml', 'extract', 'db_write', etc. + error TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- ============================================================ +-- 3. IRS filing manifest (from IRS index CSVs) +-- ============================================================ + +-- One row per (object_id, index_year) pair. The same filing can appear in +-- multiple annual index releases, so object_id alone is not unique here. +CREATE TABLE IF NOT EXISTS raw.irs_manifest ( + id SERIAL PRIMARY KEY, + return_id TEXT, + filing_type TEXT, -- 'EFILE' + ein TEXT NOT NULL, + tax_period TEXT, -- YYYYMM + sub_date TEXT, -- submission date as published + taxpayer_name TEXT, + return_type TEXT NOT NULL, -- '990', '990EZ', '990PF', amended variants + dln TEXT, + object_id TEXT NOT NULL, + index_year INT NOT NULL, -- which year's index CSV this came from + UNIQUE (object_id, index_year) +); + +CREATE INDEX IF NOT EXISTS idx_raw_irs_manifest_ein ON raw.irs_manifest (ein); +CREATE INDEX IF NOT EXISTS idx_raw_irs_manifest_return_type ON raw.irs_manifest (return_type); +CREATE INDEX IF NOT EXISTS idx_raw_irs_manifest_object_id ON raw.irs_manifest (object_id); + +-- ============================================================ +-- 4. Raw filing table (dedup anchor) +-- ============================================================ + +-- One row per unique filing we have actually ingested. +-- Unique on (source_system, source_document_id): object_id works for IRS XML, +-- but other sources will have their own document identifiers. +-- +-- For source_system = 'irs_xml', source_document_id is the object_id, +-- which can be joined to irs_manifest.object_id for reconciliation. +CREATE TABLE IF NOT EXISTS raw.filing ( + id SERIAL PRIMARY KEY, + source_system TEXT NOT NULL REFERENCES raw.source_system(code), + source_document_id TEXT NOT NULL, -- IRS: object_id; NY AG: filing_id; etc. + ein TEXT NOT NULL CHECK (ein ~ '^\d{9}$'), + filer_name TEXT, + form_type TEXT NOT NULL REFERENCES raw.form_type(code), + tax_year INT, + tax_period_begin DATE, -- from XML TaxPeriodBeginDt + tax_period_end DATE, -- from XML TaxPeriodEndDt + return_version TEXT, + return_timestamp TIMESTAMPTZ, -- from XML ReturnTs + source_url TEXT, -- canonical lookup URL where available + ingest_run_id INT REFERENCES raw.ingest_run(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (source_system, source_document_id) +); + +CREATE INDEX IF NOT EXISTS idx_raw_filing_ein ON raw.filing (ein); +CREATE INDEX IF NOT EXISTS idx_raw_filing_form_type ON raw.filing (form_type); +CREATE INDEX IF NOT EXISTS idx_raw_filing_source_document_id ON raw.filing (source_document_id); +CREATE INDEX IF NOT EXISTS idx_raw_filing_tax_year ON raw.filing (tax_year); + +-- ============================================================ +-- 5. Filing provenance (where each filing was seen) +-- ============================================================ + +-- A filing can appear in multiple archives (overlapping ZIPs, re-releases). +-- This table records every sighting so we can debug path inconsistencies +-- and understand archive overlap without losing information. +-- +-- The UNIQUE on (source_archive, source_path) prevents the same archive/path +-- from mapping to different filings — a given file in a given ZIP is one filing. +CREATE TABLE IF NOT EXISTS raw.filing_source ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL REFERENCES raw.filing(id), + ingest_run_id INT REFERENCES raw.ingest_run(id), + source_archive TEXT NOT NULL, -- e.g. 'download990xml_2020_1.zip' + source_path TEXT NOT NULL, -- e.g. 'Cycles_202042_202052/202013089349101246_public.xml' + seen_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (source_archive, source_path) +); + +CREATE INDEX IF NOT EXISTS idx_raw_filing_source_archive ON raw.filing_source (source_archive); +CREATE INDEX IF NOT EXISTS idx_raw_filing_source_filing ON raw.filing_source (raw_filing_id); + +-- ============================================================ +-- 6. Filing summary tables (per form type, 1:1 with raw.filing) +-- ============================================================ + +-- Form 990: public charity / large exempt org filing summary. +-- Holds Part I summary, balance sheet totals, governance counts, +-- and Schedule I metadata (since Schedule I is 1:1 with the 990 filing). +CREATE TABLE IF NOT EXISTS raw.form_990 ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL UNIQUE REFERENCES raw.filing(id), + + -- filer identity (as reported on this specific return) + filer_name2 TEXT, -- BusinessNameLine2Txt + filer_address_line1 TEXT, + filer_address_line2 TEXT, + filer_city TEXT, + filer_state TEXT, + filer_zip TEXT, + filer_country TEXT, -- only for foreign filers + filer_foreign_postal_code TEXT, + phone TEXT, + website TEXT, + + -- classification + is_501c3 BOOLEAN, + section_501c_type TEXT, -- '4', '6', '7', etc. if not 501c3 + org_type TEXT, -- 'corp', 'trust', 'assoc', 'other' + group_return BOOLEAN, + group_exemption_num TEXT, + formation_year TEXT, + legal_domicile_state TEXT, + mission TEXT, + accounting_method TEXT, -- 'cash', 'accrual', 'other' + + -- filing status flags + is_amended BOOLEAN DEFAULT false, + is_final BOOLEAN DEFAULT false, + is_initial BOOLEAN DEFAULT false, + is_terminated BOOLEAN DEFAULT false, + + -- Part I: current year summary + gross_receipts NUMERIC, + cy_contributions_grants NUMERIC, + cy_program_service_revenue NUMERIC, + cy_investment_income NUMERIC, + cy_other_revenue NUMERIC, + cy_total_revenue NUMERIC, + cy_grants_paid NUMERIC, + cy_benefits_to_members NUMERIC, + cy_salaries_benefits NUMERIC, + cy_fundraising_expense NUMERIC, + cy_other_expenses NUMERIC, + cy_total_expenses NUMERIC, + cy_revenue_less_expenses NUMERIC, + + -- Part I: prior year summary (NULL for first-year filers) + py_total_revenue NUMERIC, + py_total_expenses NUMERIC, + + -- balance sheet (Part I summary / Part X) + total_assets_boy NUMERIC, + total_assets_eoy NUMERIC, + total_liabilities_boy NUMERIC, + total_liabilities_eoy NUMERIC, + net_assets_boy NUMERIC, + net_assets_eoy NUMERIC, + + -- governance / workforce + total_employees INT, + total_volunteers INT, + voting_members INT, + independent_voting_members INT, + + -- Part IX: functional expense breakdown + program_services_expense NUMERIC, + management_general_expense NUMERIC, + fundraising_expense_ix NUMERIC, -- Part IX col D total + + -- Part VIII: revenue detail + government_grants NUMERIC, + total_contributions NUMERIC, + total_program_service_rev NUMERIC, + investment_income NUMERIC, + + -- UBI + gross_ubi NUMERIC, + net_ubi NUMERIC, + + -- Schedule I metadata (1:1 with filing) + sched_i_grant_records_maintained BOOLEAN, + sched_i_501c3_org_count INT, + sched_i_other_org_count INT, + sched_i_total_grants_amt NUMERIC, -- for reconciliation vs grant rows + + -- grant detail completeness + grant_detail_status TEXT NOT NULL DEFAULT 'unresolved' + REFERENCES raw.grant_detail_status(code), + + -- officer / signer + principal_officer TEXT, + officer_name TEXT, + officer_title TEXT, + signature_date DATE, + preparer_firm TEXT +); + +-- Form 990-PF: private foundation filing summary. +CREATE TABLE IF NOT EXISTS raw.form_990pf ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL UNIQUE REFERENCES raw.filing(id), + + -- filer identity (as reported on this specific return) + filer_name2 TEXT, + filer_address_line1 TEXT, + filer_address_line2 TEXT, + filer_city TEXT, + filer_state TEXT, + filer_zip TEXT, + filer_country TEXT, + filer_foreign_postal_code TEXT, + phone TEXT, + website TEXT, + + -- classification + is_501c3_pf BOOLEAN, + is_4947a1_trust BOOLEAN, + is_private_operating BOOLEAN, + accounting_method TEXT, -- 'cash', 'accrual' + + -- filing status flags + is_amended BOOLEAN DEFAULT false, + is_final BOOLEAN DEFAULT false, + is_initial BOOLEAN DEFAULT false, + + -- Part I: revenue and expenses + contributions_received NUMERIC, + interest_revenue NUMERIC, + dividends_revenue NUMERIC, + net_gain_sale_assets NUMERIC, + total_revenue NUMERIC, + total_net_investment_income NUMERIC, + compensation_officers NUMERIC, + total_operating_expenses NUMERIC, + contributions_paid NUMERIC, + total_expenses NUMERIC, + total_charitable_disbursements NUMERIC, + excess_revenue_over_expenses NUMERIC, + net_investment_income NUMERIC, + adjusted_net_income NUMERIC, + + -- Part II: balance sheets + total_assets_boy NUMERIC, + total_assets_eoy NUMERIC, + total_assets_eoy_fmv NUMERIC, + total_liabilities_boy NUMERIC, + total_liabilities_eoy NUMERIC, + net_assets_boy NUMERIC, + net_assets_eoy NUMERIC, + fmv_assets_eoy NUMERIC, -- cover-page FMV figure + + -- Part X-XII: distribution calculations + minimum_investment_return NUMERIC, + distributable_amount NUMERIC, + qualifying_distributions NUMERIC, + + -- excise tax + excise_tax_amount NUMERIC, + + -- Part XV: supplementary information totals (for reconciliation) + total_grants_paid NUMERIC, + total_grants_approved_future NUMERIC, + + -- state registration + state_of_registration TEXT, + + -- grant detail completeness + grant_detail_status TEXT NOT NULL DEFAULT 'unresolved' + REFERENCES raw.grant_detail_status(code), + + -- officer / signer + officer_name TEXT, + officer_title TEXT, + signature_date DATE, + preparer_firm TEXT +); + +-- Form 990-EZ: smaller exempt org filing summary. +-- Parser not yet built, but schema should exist so the table set is complete. +CREATE TABLE IF NOT EXISTS raw.form_990ez ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL UNIQUE REFERENCES raw.filing(id), + + -- filer identity + filer_name2 TEXT, + filer_address_line1 TEXT, + filer_city TEXT, + filer_state TEXT, + filer_zip TEXT, + filer_country TEXT, + phone TEXT, + website TEXT, + + -- classification + is_501c3 BOOLEAN, + section_501c_type TEXT, + group_exemption_num TEXT, + + -- filing status flags + is_amended BOOLEAN DEFAULT false, + is_final BOOLEAN DEFAULT false, + is_initial BOOLEAN DEFAULT false, + + -- Part I: revenue, expenses, assets + gross_receipts NUMERIC, + contributions_gifts_grants NUMERIC, + program_service_revenue NUMERIC, + investment_income NUMERIC, + total_revenue NUMERIC, + grants_paid NUMERIC, + salaries_compensation NUMERIC, + total_expenses NUMERIC, + revenue_less_expenses NUMERIC, + total_assets_boy NUMERIC, + total_assets_eoy NUMERIC, + total_liabilities_boy NUMERIC, + total_liabilities_eoy NUMERIC, + net_assets_boy NUMERIC, + net_assets_eoy NUMERIC, + + -- grant detail completeness + -- 990-EZ grant detail often lives in Schedule O or attachments, + -- not in a structured repeated table like Schedule I. + grant_detail_status TEXT NOT NULL DEFAULT 'unresolved' + REFERENCES raw.grant_detail_status(code), + has_schedule_o BOOLEAN, + + -- officer / signer + officer_name TEXT, + officer_title TEXT, + signature_date DATE, + preparer_firm TEXT +); + +-- ============================================================ +-- 7. Raw grant tables (per form type) +-- ============================================================ + +-- 990-PF Part XV grants (private foundation → recipient) +-- +-- line_number is the ordinal position of this grant within the filing's +-- grant list (1-based). Together with raw_filing_id it forms a natural +-- dedup key: re-ingesting the same filing cannot create duplicate rows. +CREATE TABLE IF NOT EXISTS raw.grant_990pf ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL REFERENCES raw.filing(id), + line_number INT NOT NULL, + -- recipient fields + recipient_name TEXT, + recipient_name2 TEXT, + recipient_person_name TEXT, -- non-null = individual grant + address_line1 TEXT, + address_line2 TEXT, + city TEXT, + state TEXT, + zip TEXT, + country TEXT, + foreign_postal_code TEXT, + -- grant fields (raw text preserved, typed column alongside) + amount_raw TEXT, + amount NUMERIC, + purpose TEXT, + foundation_status TEXT, + relationship TEXT, + UNIQUE (raw_filing_id, line_number) +); + +CREATE INDEX IF NOT EXISTS idx_raw_grant_990pf_filing ON raw.grant_990pf (raw_filing_id); + +-- 990 Schedule I grants (public charity → recipient) +CREATE TABLE IF NOT EXISTS raw.grant_990 ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL REFERENCES raw.filing(id), + line_number INT NOT NULL, + -- recipient fields + recipient_name TEXT, + recipient_name2 TEXT, + recipient_ein TEXT CHECK (recipient_ein IS NULL OR recipient_ein ~ '^\d{9}$'), + address_line1 TEXT, + address_line2 TEXT, + city TEXT, + state TEXT, + zip TEXT, + country TEXT, + foreign_postal_code TEXT, + -- grant fields (raw text preserved, typed columns alongside) + cash_grant_amt_raw TEXT, + cash_grant_amt NUMERIC, + non_cash_amt_raw TEXT, + non_cash_amt NUMERIC, + non_cash_desc TEXT, + valuation_method TEXT, + purpose TEXT, + irc_section TEXT, + UNIQUE (raw_filing_id, line_number) +); + +CREATE INDEX IF NOT EXISTS idx_raw_grant_990_filing ON raw.grant_990 (raw_filing_id); + +-- ============================================================ +-- 8. Schedule O / narrative text +-- ============================================================ + +-- Preserves narrative text from Schedule O and supplemental information. +-- Both 990 and 990-EZ push grant detail and explanatory text here. +-- Not fully structured — stores raw text blocks with whatever section +-- reference the filing provides. +CREATE TABLE IF NOT EXISTS raw.schedule_o ( + id SERIAL PRIMARY KEY, + raw_filing_id INT NOT NULL REFERENCES raw.filing(id), + line_number INT NOT NULL, -- ordinal within the schedule + form_line_ref TEXT, -- e.g. 'SCHEDULE I, PART I, LINE 2' + explanation TEXT, + UNIQUE (raw_filing_id, line_number) +); + +CREATE INDEX IF NOT EXISTS idx_raw_schedule_o_filing ON raw.schedule_o (raw_filing_id); + +-- ============================================================ +-- 10. BMF (organization registry reference source) +-- ============================================================ + +-- Current-snapshot IRS EO BMF rows, minimally transformed from the published +-- CSVs. This is a registry/reference source, not a filing source. +CREATE TABLE IF NOT EXISTS raw.bmf ( + ein TEXT PRIMARY KEY CHECK (ein ~ '^\d{9}$'), + name TEXT, + ico TEXT, + street TEXT, + city TEXT, + state TEXT, + zip TEXT, + grp TEXT, + subsection TEXT, + affiliation TEXT, + classification TEXT, + ruling TEXT, + deductibility TEXT, + foundation TEXT, + activity TEXT, + organization TEXT, + status TEXT, + tax_period TEXT, + asset_cd TEXT, + income_cd TEXT, + filing_req_cd TEXT, + pf_filing_req_cd TEXT, + acct_pd TEXT, + asset_amt BIGINT, + income_amt BIGINT, + revenue_amt BIGINT, + ntee_cd TEXT, + sort_name TEXT, + source_file TEXT, + ingest_run_id INT REFERENCES raw.ingest_run(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_raw_bmf_name ON raw.bmf (name); +CREATE INDEX IF NOT EXISTS idx_raw_bmf_state ON raw.bmf (state); +CREATE INDEX IF NOT EXISTS idx_raw_bmf_ntee_cd ON raw.bmf (ntee_cd); + +COMMIT; diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f0eb38c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +lxml==6.0.2 +psycopg2-binary==2.9.11 +anthropic +pymupdf +pdfplumber +zipfile-deflate64==0.2.0 diff --git a/scripts/common/__init__.py b/scripts/common/__init__.py new file mode 100644 index 0000000..f06ee26 --- /dev/null +++ b/scripts/common/__init__.py @@ -0,0 +1,28 @@ +""" +Shared infrastructure for the 990 data pipeline (v2). + +This package provides the single authoritative implementation of +normalization, XML helpers, DB access, and ingest tracking for +all parsers under scripts/parse/, scripts/fetch/, and scripts/extract/. + +Old parsers in scripts/ still use scripts/parse_common.py directly. +""" + +import zipfile_deflate64 # noqa: F401 + +from scripts.common.db import ( + execute, execute_scalar, execute_all, execute_transaction, copy_rows, + # Legacy (shell-based, for old parsers) + psql, psql_scalar, psql_query_values, insert_rows, +) +from scripts.common.normalize import normalize_ein, parse_numeric, map_form_type, is_placeholder +from scripts.common.xml import ( + text, strip_ns, leaf_paths, extract_filing_metadata, + derive_source_document_id, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) diff --git a/scripts/common/db.py b/scripts/common/db.py new file mode 100644 index 0000000..bbd220c --- /dev/null +++ b/scripts/common/db.py @@ -0,0 +1,198 @@ +""" +Database access layer using psycopg2 with connection pooling. + +All DB access in the v2 pipeline goes through this module. +""" + +import csv +import io +import sys + +import psycopg2 +import psycopg2.pool +import psycopg2.extras + +DB_HOST = "localhost" +DB_PORT = 35434 +DB_USER = "tidyindex" +DB_PASSWORD = "tidyindex" +DB_NAME = "tidyindex" + +_pool = None + + +def get_pool(): + """Get or create the connection pool (lazy singleton).""" + global _pool + if _pool is None: + _pool = psycopg2.pool.SimpleConnectionPool( + minconn=1, + maxconn=4, + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASSWORD, + dbname=DB_NAME, + ) + return _pool + + +def get_conn(): + """Get a connection from the pool.""" + return get_pool().getconn() + + +def put_conn(conn): + """Return a connection to the pool.""" + get_pool().putconn(conn) + + +def execute(sql, params=None): + """Execute SQL (no result). Auto-commits.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_scalar(sql, params=None): + """Execute SQL and return a single scalar value, or None.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + conn.commit() + return row[0] if row else None + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_all(sql, params=None): + """Execute SQL and return all rows as a list of tuples.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + rows = cur.fetchall() + conn.commit() + return rows + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_transaction(fn): + """Run fn(conn) inside a transaction. Commits on success, rolls back on error. + + fn receives a connection with autocommit off. fn should use conn.cursor() + to execute statements. Do NOT commit inside fn — this wrapper handles it. + """ + conn = get_conn() + try: + result = fn(conn) + conn.commit() + return result + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def copy_rows(table, columns, rows): + """Bulk insert rows via COPY FROM. Returns count of inserted rows.""" + if not rows: + return 0 + + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=columns, extrasaction="ignore") + for row in rows: + writer.writerow(row) + buf.seek(0) + + conn = get_conn() + try: + with conn.cursor() as cur: + cur.copy_expert( + f"COPY {table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv)", + buf, + ) + count = cur.rowcount + conn.commit() + return count + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +# ============================================================ +# Legacy compatibility (for old parsers using parse_common.py) +# These shell-based functions are NOT used by v2 code. +# ============================================================ + +import subprocess + +_DB_CONTAINER = "tidyindex-postgres" + + +def psql(sql): + """Execute SQL via docker exec psql. Legacy — use execute() instead.""" + result = subprocess.run( + ["docker", "exec", "-i", _DB_CONTAINER, "psql", "-U", DB_USER, "-d", DB_NAME], + input=sql, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f"PSQL ERROR: {result.stderr}", file=sys.stderr) + sys.exit(1) + return result.stdout + + +def psql_scalar(sql): + """Legacy — use execute_scalar() instead.""" + result = subprocess.run( + [ + "docker", "exec", "-i", _DB_CONTAINER, + "psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", + ], + input=sql, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f"PSQL ERROR: {result.stderr}", file=sys.stderr) + sys.exit(1) + for line in result.stdout.strip().split("\n"): + line = line.strip() + if line and not line.startswith("INSERT") and not line.startswith("UPDATE") and not line.startswith("DELETE"): + return line + return None + + +def psql_query_values(sql): + """Legacy — use execute_all() instead.""" + result = psql(sql) + lines = result.strip().split("\n") + if len(lines) >= 3: + return [line.strip() for line in lines[2:-1]] + return [] + + +def insert_rows(table, columns, rows): + """Legacy — use copy_rows() instead.""" + return copy_rows(table, columns, rows) diff --git a/scripts/common/filing.py b/scripts/common/filing.py new file mode 100644 index 0000000..44bcabc --- /dev/null +++ b/scripts/common/filing.py @@ -0,0 +1,93 @@ +""" +Shared filing helpers for the new raw ingestion schema. + +Provides upsert/record operations for raw.filing and raw.filing_source. +Form-specific child row operations live in the parsers themselves. +""" + +from scripts.common.db import execute_scalar + + +def _execute_scalar(conn, sql, params=None): + """Execute SQL and return a single scalar value using an optional transaction conn.""" + if conn is None: + return execute_scalar(sql, params) + + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + return row[0] if row else None + + +def upsert_raw_filing(source_system, source_document_id, metadata, ingest_run_id, conn=None): + """Insert or update a raw.filing row. Returns the raw_filing_id. + + On conflict (same source_system + source_document_id), updates ingest_run_id + to the current run (meaning "most recently touched by this run"). + """ + return _execute_scalar( + conn, + """ + INSERT INTO raw.filing ( + source_system, source_document_id, ein, filer_name, form_type, + tax_year, tax_period_begin, tax_period_end, + return_version, return_timestamp, ingest_run_id + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (source_system, source_document_id) + DO UPDATE SET + ein = EXCLUDED.ein, + filer_name = COALESCE(EXCLUDED.filer_name, raw.filing.filer_name), + form_type = EXCLUDED.form_type, + tax_year = COALESCE(EXCLUDED.tax_year, raw.filing.tax_year), + tax_period_begin = COALESCE(EXCLUDED.tax_period_begin, raw.filing.tax_period_begin), + tax_period_end = COALESCE(EXCLUDED.tax_period_end, raw.filing.tax_period_end), + return_version = COALESCE(EXCLUDED.return_version, raw.filing.return_version), + return_timestamp = COALESCE(EXCLUDED.return_timestamp, raw.filing.return_timestamp), + source_url = COALESCE(EXCLUDED.source_url, raw.filing.source_url), + ingest_run_id = EXCLUDED.ingest_run_id + RETURNING id + """, + ( + source_system, source_document_id, metadata["ein"], + metadata.get("filer_name"), metadata["form_type"], + metadata.get("tax_year"), metadata.get("tax_period_begin"), + metadata.get("tax_period_end"), metadata.get("return_version"), + metadata.get("return_timestamp"), ingest_run_id, + ), + ) + + +def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, source_path, conn=None): + """Record that a filing was seen at a specific archive+path. + + If (source_archive, source_path) already exists, verifies it points to + the same raw_filing_id. Raises ValueError if there's a conflict. + """ + new_id = _execute_scalar( + conn, + """ + INSERT INTO raw.filing_source (raw_filing_id, ingest_run_id, source_archive, source_path) + VALUES (%s, %s, %s, %s) + ON CONFLICT (source_archive, source_path) DO NOTHING + RETURNING id + """, + (raw_filing_id, ingest_run_id, source_archive, source_path), + ) + + if new_id is not None: + return # inserted successfully + + # Row already existed — verify ownership + existing_filing_id = _execute_scalar( + conn, + "SELECT raw_filing_id FROM raw.filing_source " + "WHERE source_archive = %s AND source_path = %s", + (source_archive, source_path), + ) + + if existing_filing_id is not None and int(existing_filing_id) != raw_filing_id: + raise ValueError( + f"Filing source conflict: {source_archive}:{source_path} " + f"already mapped to raw_filing_id={existing_filing_id}, " + f"but trying to map to {raw_filing_id}" + ) diff --git a/scripts/common/ingest.py b/scripts/common/ingest.py new file mode 100644 index 0000000..9adb6da --- /dev/null +++ b/scripts/common/ingest.py @@ -0,0 +1,57 @@ +""" +Ingest run tracking using the raw schema (raw.ingest_run / raw.ingest_error). +""" + +from scripts.common.db import execute, execute_scalar + + +def start_ingest_run(parser, source_system, notes=None): + """Create an ingest_run row and return its id.""" + return execute_scalar( + "INSERT INTO raw.ingest_run (parser, source_system, notes) " + "VALUES (%s, %s, %s) RETURNING id", + (parser, source_system, notes or ""), + ) + + +def finish_ingest_run(run_id, files_scanned, files_matched, rows_inserted): + """Mark an ingest_run as done.""" + execute( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'done', " + "files_scanned = %s, files_matched = %s, rows_inserted = %s " + "WHERE id = %s", + (files_scanned, files_matched, rows_inserted, run_id), + ) + + +def fail_ingest_run(run_id): + """Mark an ingest_run as error.""" + execute( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' WHERE id = %s", + (run_id,), + ) + + +def expire_stale_runs(minutes=60): + """Mark any ingest_run still 'running' after N minutes as 'error'. + + Returns True if at least one run was expired, False otherwise. + """ + return execute_scalar( + "UPDATE raw.ingest_run SET finished_at = now(), status = 'error' " + "WHERE status = 'running' AND started_at < now() - interval '%s minutes' " + "RETURNING id", + (minutes,), + ) is not None + + +def log_ingest_error(run_id, source_archive, source_path, error, + source_document_id=None, stage=None): + """Log an error to ingest_error.""" + execute( + "INSERT INTO raw.ingest_error " + "(ingest_run_id, source_archive, source_path, source_document_id, stage, error) " + "VALUES (%s, %s, %s, %s, %s, %s)", + (run_id, source_archive or "", source_path or "", + source_document_id, stage, str(error)), + ) diff --git a/scripts/common/normalize.py b/scripts/common/normalize.py new file mode 100644 index 0000000..b06afed --- /dev/null +++ b/scripts/common/normalize.py @@ -0,0 +1,120 @@ +""" +Single authoritative implementations of data normalization. + +Every parser must use these — never hand-roll EIN cleanup, +amount parsing, form type mapping, or placeholder detection. +""" + +import re + + +def normalize_ein(raw): + """Normalize an EIN to 9-digit zero-padded string. + + Returns None if the input can't be normalized to exactly 9 digits. + + >>> normalize_ein('04-3567890') + '043567890' + >>> normalize_ein('43567890') + '043567890' + >>> normalize_ein(' 04-3567890 ') + '043567890' + >>> normalize_ein(None) + >>> normalize_ein('') + >>> normalize_ein('not-an-ein') + """ + if not raw: + return None + cleaned = re.sub(r'[^0-9]', '', str(raw).strip()) + if not cleaned: + return None + padded = cleaned.zfill(9) + if len(padded) != 9: + return None + return padded + + +def parse_numeric(raw): + """Parse a raw string to a clean numeric string for DB insertion. + + Returns None for non-numeric values like 'SEE ATTACHED'. + + >>> parse_numeric('1,234,567') + '1234567' + >>> parse_numeric('$1,234.56') + '1234.56' + >>> parse_numeric('(500)') + '-500' + >>> parse_numeric('SEE ATTACHED') + >>> parse_numeric(None) + """ + if not raw: + return None + s = str(raw).strip() + if not s: + return None + + s = s.replace('$', '').replace(',', '').strip() + + # Parenthesized negatives: (500) -> -500 + if s.startswith('(') and s.endswith(')'): + s = '-' + s[1:-1] + + try: + float(s) + return s + except ValueError: + return None + + +# IRS ReturnTypeCd -> our form_type reference table code +_FORM_TYPE_MAP = { + '990': '990', + '990PF': '990PF', + '990EZ': '990EZ', + '990O': '990O', + '990T': '990T', + '990A': '990A', + '990PA': '990PA', + '990EA': '990EA', +} + + +def map_form_type(return_type_cd): + """Map an IRS ReturnTypeCd to our form_type code. + + Raises ValueError if unknown — caller should log and skip the filing. + + >>> map_form_type('990PF') + '990PF' + """ + if not return_type_cd: + raise ValueError("Empty return type") + code = return_type_cd.strip() + if code in _FORM_TYPE_MAP: + return _FORM_TYPE_MAP[code] + raise ValueError(f"Unknown return type: {code!r}") + + +_PLACEHOLDER_RE = re.compile( + r'^(SEE\s+(ATTACHED|STATEMENT|SCHEDULE|PART\s|CONTINUATION)|' + r'VARIOUS|MULTIPLE|N/?A|NONE|--+|\.+)$', + re.IGNORECASE, +) + + +def is_placeholder(value): + """Check if a text value is a placeholder rather than real data. + + >>> is_placeholder('SEE ATTACHED') + True + >>> is_placeholder('VARIOUS') + True + >>> is_placeholder('BOYS AND GIRLS CLUB') + False + >>> is_placeholder(None) + False + """ + if not value: + return False + return bool(_PLACEHOLDER_RE.match(value.strip())) diff --git a/scripts/common/xml.py b/scripts/common/xml.py new file mode 100644 index 0000000..b7e80cd --- /dev/null +++ b/scripts/common/xml.py @@ -0,0 +1,129 @@ +""" +XML parsing helpers. + +Single authoritative implementations of XML field extraction, +source document ID derivation, and leaf path building. +""" + +import re + +NS = "http://www.irs.gov/efile" +NS_MAP = {"irs": NS} + +_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$') + + +def strip_ns(tag): + """Strip XML namespace: '{http://...}Foo' -> 'Foo'. + + Non-element lxml nodes such as comments expose a non-string ``tag``. + Those are skipped by the leaf-path walker rather than treated as fields. + """ + if not isinstance(tag, str): + return None + if tag.startswith("{"): + return tag.split("}", 1)[1] + return tag + + +def text(el, xpath): + """Extract text from an XML element by xpath. Returns None if not found.""" + if el is None: + return None + found = el.find(xpath, NS_MAP) + return found.text.strip() if found is not None and found.text else None + + +# Values an IRS indicator element may carry to mean "yes / true". +_TRUTHY = frozenset({"X", "x", "1", "true"}) + + +def text_bool(el, xpath): + """Extract an IRS indicator element as True/False, or None if the tag is absent. + + Unlike `text(...) in _TRUTHY`, this preserves the difference between + "filer explicitly left this unchecked" and "tag not present in the XML + at all." Critical for the raw layer, where we want to keep NULL for + missing-in-source and reserve False for an explicit non-truthy value. + """ + val = text(el, xpath) + if val is None: + return None + return val in _TRUTHY + + +def leaf_paths(el, prefix=""): + """Yield (path, value) for every leaf element under el. + + Path uses '/' separator with namespace stripped: + 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt' + """ + tag = strip_ns(el.tag) + if tag is None: + return + full = f"{prefix}{tag}" if prefix else tag + children = [child for child in el if isinstance(child.tag, str)] + if not children: + yield full, (el.text or "").strip() + else: + for child in children: + yield from leaf_paths(child, full + "/") + + +def derive_source_document_id(source_system, filename): + """Derive source_document_id from a filename. + + For irs_xml / irs_pdf: extracts the object_id (numeric stem). + Directory prefixes are stripped. + + >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml') + '202213089349101246' + >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml') + '202213089349101246' + """ + if source_system in ('irs_xml', 'irs_pdf'): + m = _OBJECT_ID_RE.search(filename) + if not m: + raise ValueError(f"Cannot extract object_id from: {filename!r}") + return m.group(1) + raise NotImplementedError( + f"derive_source_document_id not implemented for {source_system!r}" + ) + + +def extract_filing_metadata(tree): + """Extract filing-level metadata from an XML tree for raw_filing. + + Returns a dict with: ein, filer_name, form_type, tax_year, + tax_period_begin, tax_period_end, return_version, return_timestamp. + + EIN is normalized, form_type is mapped. Raises ValueError if the + filing cannot be identified. + """ + from scripts.common.normalize import normalize_ein, map_form_type + + root = tree.getroot() + + raw_ein = text(tree, ".//irs:Filer/irs:EIN") + ein = normalize_ein(raw_ein) + if not ein: + raise ValueError(f"Cannot normalize EIN: {raw_ein!r}") + + return_type_cd = text(tree, ".//irs:ReturnTypeCd") + form_type = map_form_type(return_type_cd) + + filer_name = ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1") + ) + + return { + "ein": ein, + "filer_name": filer_name, + "form_type": form_type, + "tax_year": text(tree, ".//irs:TaxYr"), + "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"), + "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"), + "return_version": root.get("returnVersion"), + "return_timestamp": text(tree, ".//irs:ReturnTs"), + } diff --git a/scripts/extract/__init__.py b/scripts/extract/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/scripts/extract/__init__.py diff --git a/scripts/extract/irs_990_pdf.py b/scripts/extract/irs_990_pdf.py new file mode 100644 index 0000000..1d1209c --- /dev/null +++ b/scripts/extract/irs_990_pdf.py @@ -0,0 +1,699 @@ +""" +Source-agnostic 990-PF PDF grant extractor. + +Takes a path to a local PDF and returns structured grant data. Side-effect +free with respect to any database; only external call is to Anthropic's API. + +A separate loader consumes ExtractionResult and writes to raw.* tables. + +Usage (CLI): + python -m scripts.extract.irs_990_pdf path/to/file.pdf + python -m scripts.extract.irs_990_pdf path/to/file.pdf --json + python -m scripts.extract.irs_990_pdf path/to/file.pdf --tax-year 2021 + python -m scripts.extract.irs_990_pdf path/to/file.pdf --source-label ny_ag + +Usage (programmatic): + from scripts.extract.irs_990_pdf import extract_from_pdf + result = extract_from_pdf("data/tmp/pdf_test/marley_2017.pdf", tax_year=2017) + for grant in result.grants: + print(grant.recipient_name, grant.amount) +""" + +from __future__ import annotations + +import argparse +import base64 +import json +import re +import subprocess +import sys +from dataclasses import dataclass, field, asdict +from decimal import Decimal, InvalidOperation +from pathlib import Path +from typing import Any + +import anthropic +import fitz # pymupdf +import pdfplumber + +from scripts.common.normalize import is_placeholder, parse_numeric + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +MODEL = "claude-haiku-4-5-20251001" + +# Text-path quality heuristic. If the number of grants extracted is less than +# num_grant_pages * MIN_GRANTS_PER_PAGE, we flag the result as low-yield and +# fall back to vision. This is an initial heuristic and may be tuned. +MIN_GRANTS_PER_PAGE = 1 + +# Minimum total text characters across all pages before we trust the text layer. +MIN_TEXT_LAYER_CHARS = 200 + +# Standard 990-PF form has 13 pages; grant attachments come after. +ATTACHMENT_START_PAGE_IDX = 13 # 0-based; corresponds to page 14 + +# Vision scan stops after this many consecutive non-grant pages (once any +# grant page has been found). +VISION_CONSECUTIVE_NO_LIMIT = 3 + +# DPIs used for vision rendering. +TRIAGE_DPI = 100 +EXTRACTION_DPI = 150 + +GRANT_PAGE_KEYWORDS = [ + r"GRANTS AND CONTRIBUTIONS PAID", + r"SUPPLEMENTARY INFORMATION", + r"PART XIV", + r"PART XV", + r"SCHEDULE OF.*GRANT", + r"GRANTS PAID", + r"CONTRIBUTIONS PAID", +] +_GRANT_PAGE_RE = re.compile("|".join(GRANT_PAGE_KEYWORDS)) + +# Loose marker for "this PDF is a 990-PF". NY AG PDFs may contain only the +# CHAR500 state cover form with no federal return attached — ~46% of that +# corpus, based on sampling. Those should short-circuit before we waste any +# Haiku calls. +_IS_990PF_RE = re.compile(r"FORM\s*990-?PF") + +TEXT_EXTRACTION_PROMPT = """Extract ALL individual grants from this 990-PF page text. + +Return a JSON array. Each grant should have: +- recipient_name: organization name (if grant is to an org) +- recipient_person_name: person's name (if grant is to an individual) +- address_line1: street address (if present) +- address_line2: second address line (if present) +- city: city (if present) +- state: state abbreviation (if present) +- zip: zip code (if present) +- country: country (if present, only when non-US) +- foreign_postal_code: foreign postal code (if present) +- amount: dollar amount as string (digits only, no $ or commas) +- purpose: purpose of grant (if present) +- foundation_status: recipient status like PC, NC, PF (if present) +- relationship: relationship of recipient to foundation (if present) + +IMPORTANT RULES: +- Do NOT include total/subtotal rows +- Do NOT include header rows or column labels +- If there are multiple year columns, extract ONLY the most recent year +- If recipient name is "SEE ATTACHED", "VARIOUS", "N/A", "NONE", or a similar placeholder, skip it +- Return [] if no individual grants are found + +Return ONLY the JSON array, no other text.""" + +VISION_EXTRACTION_PROMPT = """Extract ALL individual grants from this 990-PF page image. + +Return a JSON array. Each grant should have: +- recipient_name: organization name (if grant is to an org) +- recipient_person_name: person's name (if grant is to an individual) +- address_line1: street address (if visible) +- address_line2: second address line (if visible) +- city: city +- state: state abbreviation +- zip: zip code (if visible) +- country: country (only if non-US) +- foreign_postal_code: foreign postal code (if present) +- amount: dollar amount as string (digits only, no $ or commas) +- purpose: purpose of grant +- foundation_status: recipient status like PC, NC, PF (if present) +- relationship: relationship of recipient to foundation (if present) + +IMPORTANT RULES: +- Do NOT include total/subtotal rows +- Do NOT include header rows +- If there are multiple year columns, extract ONLY the most recent year +- If recipient name is "SEE ATTACHED", "VARIOUS", "N/A", "NONE", or a similar placeholder, skip it +- Return [] if no individual grants are found + +Return ONLY the JSON array, no other text.""" + +TRIAGE_PROMPT = ( + "Is this page a table of grant or contribution recipients listing " + "individual organization names with addresses and dollar amounts? " + "Answer ONLY yes or no." +) + + +# --------------------------------------------------------------------------- +# Anthropic client (lazy singleton so importing this module is cheap) +# --------------------------------------------------------------------------- + +_client: anthropic.Anthropic | None = None + + +def _get_client() -> anthropic.Anthropic: + global _client + if _client is None: + api_key = subprocess.run( + ["pass", "show", "anthropic.com/api.anthropic.com/apikey"], + capture_output=True, text=True, + ).stdout.strip() + _client = anthropic.Anthropic(api_key=api_key) + return _client + + +# --------------------------------------------------------------------------- +# Dataclasses +# --------------------------------------------------------------------------- + +@dataclass +class ExtractedGrant: + line_number: int + recipient_name: str | None = None + recipient_name2: str | None = None + recipient_person_name: str | None = None + address_line1: str | None = None + address_line2: str | None = None + city: str | None = None + state: str | None = None + zip: str | None = None + country: str | None = None + foreign_postal_code: str | None = None + amount_raw: str | None = None + amount: Decimal | None = None + purpose: str | None = None + foundation_status: str | None = None + relationship: str | None = None + + +@dataclass +class ExtractionResult: + success: bool + grants: list[ExtractedGrant] + # 'supplemented' — ≥1 grant extracted + # 'no_grants' — extractor ran end-to-end, found nothing usable + # 'not_a_990pf' — PDF is readable but isn't a 990-PF (e.g. CHAR500 cover) + # None — catastrophic failure (success=False) + grant_detail_status: str | None + # 'pdfplumber+haiku_text' — text path was used + # 'haiku_vision_attempted' — vision path was used (success not implied) + # 'skipped_not_990pf' — short-circuited; no API calls made + # 'failed' — catastrophic failure (success=False) + method: str + diagnostics: dict = field(default_factory=dict) + + @property + def total_amount(self) -> Decimal: + return sum((g.amount or Decimal(0)) for g in self.grants) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _parse_haiku_response(text: str) -> list[dict]: + """Parse JSON from Haiku response, handling markdown code blocks.""" + text = text.strip() + if text.startswith("```"): + # Drop opening fence + optional language tag + parts = text.split("\n", 1) + if len(parts) == 2: + text = parts[1] + text = text.rsplit("```", 1)[0] + text = text.strip() + try: + parsed = json.loads(text) + except json.JSONDecodeError: + return [] + if not isinstance(parsed, list): + return [] + return [x for x in parsed if isinstance(x, dict)] + + +def _is_grant_page_text(text: str) -> bool: + return bool(_GRANT_PAGE_RE.search(text.upper())) + + +def _clean_str(value: Any) -> str | None: + if value is None: + return None + s = str(value).strip() + return s or None + + +def _normalize_grant(raw: dict) -> ExtractedGrant | None: + """Map a loose Haiku JSON dict to an ExtractedGrant. + + Returns None if the row looks like a placeholder/header artifact. + line_number is set to 0 here; it's reassigned later after dedupe. + """ + recipient_name = _clean_str(raw.get("recipient_name")) + recipient_person_name = _clean_str(raw.get("recipient_person_name")) + + # Safety net: prompt already filters these, but double-check. + if recipient_name and is_placeholder(recipient_name): + recipient_name = None + if recipient_person_name and is_placeholder(recipient_person_name): + recipient_person_name = None + + amount_raw = _clean_str(raw.get("amount")) + amount_numeric: Decimal | None = None + if amount_raw is not None: + parsed = parse_numeric(amount_raw) + if parsed is not None: + try: + amount_numeric = Decimal(parsed) + except InvalidOperation: + amount_numeric = None + + return ExtractedGrant( + line_number=0, + recipient_name=recipient_name, + recipient_name2=_clean_str(raw.get("recipient_name2")), + recipient_person_name=recipient_person_name, + address_line1=_clean_str(raw.get("address_line1")), + address_line2=_clean_str(raw.get("address_line2")), + city=_clean_str(raw.get("city")), + state=_clean_str(raw.get("state")), + zip=_clean_str(raw.get("zip")), + country=_clean_str(raw.get("country")), + foreign_postal_code=_clean_str(raw.get("foreign_postal_code")), + amount_raw=amount_raw, + amount=amount_numeric, + purpose=_clean_str(raw.get("purpose")), + foundation_status=_clean_str(raw.get("foundation_status")), + relationship=_clean_str(raw.get("relationship")), + ) + + +def _year_hint(tax_year: int | None) -> str: + if tax_year is None: + return "" + return f"\n\nThis filing is for tax year {tax_year}." + + +def _postprocess(grants: list[ExtractedGrant]) -> list[ExtractedGrant]: + """Drop placeholders / amount-less rows, dedupe, and reassign line_number.""" + out: list[ExtractedGrant] = [] + seen: set[tuple] = set() + + for g in grants: + # Drop rows with no recipient at all. + if not g.recipient_name and not g.recipient_person_name: + continue + # Drop placeholder recipients that slipped through (belt-and-suspenders). + if g.recipient_name and is_placeholder(g.recipient_name): + continue + if g.recipient_person_name and is_placeholder(g.recipient_person_name): + continue + # Drop rows with no amount text at all (Haiku didn't see an amount column). + # Keep rows where amount_raw == '0' and amount == Decimal(0). + if g.amount is None and not g.amount_raw: + continue + + key = ( + (g.recipient_name or "").upper(), + (g.city or "").upper(), + (g.state or "").upper(), + g.amount_raw or "", + ) + if key in seen: + continue + seen.add(key) + out.append(g) + + for i, g in enumerate(out, start=1): + g.line_number = i + return out + + +# --------------------------------------------------------------------------- +# Text path +# --------------------------------------------------------------------------- + +def _extract_text_layer( + pdf_path: Path, + tax_year: int | None, + diagnostics: dict, +) -> tuple[list[ExtractedGrant], str, int]: + """Extract grants via pdfplumber text + Haiku text parsing. + + Returns (grants, status, num_grant_pages). + status ∈ {'ok', 'not_a_990pf', 'no_text_layer', 'no_grant_pages', + 'haiku_empty', 'low_yield', 'error'} + + 'not_a_990pf' means the PDF has a readable text layer but doesn't look + like a 990-PF at all (e.g. a standalone NY State CHAR500 cover form). + The caller should short-circuit on this — there's nothing for the vision + path to find either. + """ + try: + pdf = pdfplumber.open(pdf_path) + except Exception as exc: + diagnostics["text_path_error"] = str(exc) + return [], "error", 0 + + try: + page_texts: list[tuple[int, str]] = [] + total_chars = 0 + for idx, page in enumerate(pdf.pages, start=1): + text = page.extract_text() or "" + page_texts.append((idx, text)) + total_chars += len(text) + + diagnostics["text_layer_chars"] = total_chars + diagnostics["pages_total"] = len(page_texts) + + if total_chars < MIN_TEXT_LAYER_CHARS: + return [], "no_text_layer", 0 + + # Must look like a 990-PF at all. If not, don't bother with Haiku — + # and don't fall through to vision, since it'd just burn triage calls + # scanning a non-990-PF document end to end. + joined_upper = "\n".join(t for _, t in page_texts).upper() + if not _IS_990PF_RE.search(joined_upper): + return [], "not_a_990pf", 0 + + grant_pages = [ + (num, text) for num, text in page_texts + if len(text) > 100 and _is_grant_page_text(text) + ] + diagnostics["grant_pages_identified"] = len(grant_pages) + + if not grant_pages: + return [], "no_grant_pages", 0 + finally: + pdf.close() + + client = _get_client() + year_hint = _year_hint(tax_year) + + raw_grants: list[dict] = [] + try: + for page_num, text in grant_pages: + resp = client.messages.create( + model=MODEL, + max_tokens=4096, + messages=[{ + "role": "user", + "content": f"{TEXT_EXTRACTION_PROMPT}{year_hint}\n\nPage text:\n{text}", + }], + ) + raw_grants.extend(_parse_haiku_response(resp.content[0].text)) + except anthropic.APIError as exc: + # Discard partial results — fall back to vision cleanly. + diagnostics["text_path_error"] = f"{type(exc).__name__}: {exc}" + return [], "error", len(grant_pages) + + if not raw_grants: + return [], "haiku_empty", len(grant_pages) + + grants: list[ExtractedGrant] = [] + for raw in raw_grants: + g = _normalize_grant(raw) + if g is not None: + grants.append(g) + + if len(grants) < len(grant_pages) * MIN_GRANTS_PER_PAGE: + return grants, "low_yield", len(grant_pages) + + return grants, "ok", len(grant_pages) + + +# --------------------------------------------------------------------------- +# Vision path +# --------------------------------------------------------------------------- + +def _render_page_b64(page, dpi: int) -> str: + pix = page.get_pixmap(dpi=dpi) + return base64.standard_b64encode(pix.tobytes("png")).decode("utf-8") + + +def _extract_vision( + pdf_path: Path, + tax_year: int | None, + diagnostics: dict, +) -> tuple[list[ExtractedGrant], int, int]: + """Extract grants via Haiku vision over rendered page images. + + Returns (grants, pages_scanned, pages_extracted). Raises anthropic.APIError + on catastrophic API failure (caller handles). + """ + client = _get_client() + year_hint = _year_hint(tax_year) + + doc = fitz.open(pdf_path) + try: + total_pages = len(doc) + diagnostics.setdefault("pages_total", total_pages) + + if total_pages <= ATTACHMENT_START_PAGE_IDX: + diagnostics["vision_pages_scanned"] = 0 + diagnostics["vision_pages_extracted"] = 0 + return [], 0, 0 + + # Phase 1: triage pages to find grant tables. + grant_page_indices: list[int] = [] + consecutive_no = 0 + found_any = False + pages_scanned = 0 + + for i in range(ATTACHMENT_START_PAGE_IDX, total_pages): + pages_scanned += 1 + b64 = _render_page_b64(doc[i], dpi=TRIAGE_DPI) + resp = client.messages.create( + model=MODEL, + max_tokens=10, + messages=[{ + "role": "user", + "content": [ + {"type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": b64}}, + {"type": "text", "text": TRIAGE_PROMPT}, + ], + }], + ) + is_grant = "yes" in resp.content[0].text.lower() + if is_grant: + grant_page_indices.append(i) + consecutive_no = 0 + found_any = True + else: + consecutive_no += 1 + if found_any and consecutive_no >= VISION_CONSECUTIVE_NO_LIMIT: + break + + diagnostics["vision_pages_scanned"] = pages_scanned + + if not grant_page_indices: + diagnostics["vision_pages_extracted"] = 0 + return [], pages_scanned, 0 + + # Phase 2: extract from identified grant pages. + raw_grants: list[dict] = [] + pages_extracted = 0 + for i in grant_page_indices: + b64 = _render_page_b64(doc[i], dpi=EXTRACTION_DPI) + resp = client.messages.create( + model=MODEL, + max_tokens=4096, + messages=[{ + "role": "user", + "content": [ + {"type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": b64}}, + {"type": "text", "text": f"{VISION_EXTRACTION_PROMPT}{year_hint}"}, + ], + }], + ) + page_grants = _parse_haiku_response(resp.content[0].text) + if page_grants: + pages_extracted += 1 + raw_grants.extend(page_grants) + + diagnostics["vision_pages_extracted"] = pages_extracted + finally: + doc.close() + + grants: list[ExtractedGrant] = [] + for raw in raw_grants: + g = _normalize_grant(raw) + if g is not None: + grants.append(g) + + return grants, pages_scanned, pages_extracted + + +# --------------------------------------------------------------------------- +# Top-level extractor +# --------------------------------------------------------------------------- + +def extract_from_pdf( + pdf_path: str | Path, + tax_year: int | None = None, + source_label: str | None = None, +) -> ExtractionResult: + """Extract grants from a single 990-PF PDF. + + Stateless: reads only the file at pdf_path and calls Anthropic's API. + Writes nothing. See module docstring for full contract. + """ + pdf_path = Path(pdf_path) + + diagnostics: dict = { + "tax_year_hint": tax_year, + "source_label": source_label, + } + + if not pdf_path.exists(): + diagnostics["error"] = f"PDF not found: {pdf_path}" + return ExtractionResult( + success=False, + grants=[], + grant_detail_status=None, + method="failed", + diagnostics=diagnostics, + ) + + # Text path + try: + text_grants, text_status, num_grant_pages = _extract_text_layer( + pdf_path, tax_year, diagnostics + ) + except Exception as exc: + # pdfplumber can blow up on malformed PDFs — don't let that kill us, + # fall through to the vision path like any other text failure. + text_grants = [] + text_status = "error" + num_grant_pages = 0 + diagnostics.setdefault("text_path_error", f"{type(exc).__name__}: {exc}") + + diagnostics["text_path_status"] = text_status + + if text_status == "ok": + grants = _postprocess(text_grants) + return ExtractionResult( + success=True, + grants=grants, + grant_detail_status="supplemented" if grants else "no_grants", + method="pdfplumber+haiku_text", + diagnostics=diagnostics, + ) + + # Short-circuit: PDF has a readable text layer but isn't a 990-PF + # (e.g. a standalone NY State CHAR500 cover form). Don't run vision — + # there's nothing in the document for it to find. + if text_status == "not_a_990pf": + return ExtractionResult( + success=True, + grants=[], + grant_detail_status="not_a_990pf", + method="skipped_not_990pf", + diagnostics=diagnostics, + ) + + # Vision fallback — replaces text output entirely. + try: + vision_grants, _scanned, _extracted = _extract_vision( + pdf_path, tax_year, diagnostics + ) + except anthropic.APIError as exc: + diagnostics["error"] = f"{type(exc).__name__}: {exc}" + return ExtractionResult( + success=False, + grants=[], + grant_detail_status=None, + method="failed", + diagnostics=diagnostics, + ) + except Exception as exc: + diagnostics["error"] = f"{type(exc).__name__}: {exc}" + return ExtractionResult( + success=False, + grants=[], + grant_detail_status=None, + method="failed", + diagnostics=diagnostics, + ) + + grants = _postprocess(vision_grants) + return ExtractionResult( + success=True, + grants=grants, + grant_detail_status="supplemented" if grants else "no_grants", + method="haiku_vision_attempted", + diagnostics=diagnostics, + ) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def _result_to_jsonable(result: ExtractionResult) -> dict: + def grant_to_dict(g: ExtractedGrant) -> dict: + d = asdict(g) + if d["amount"] is not None: + d["amount"] = str(d["amount"]) + return d + + return { + "success": result.success, + "method": result.method, + "grant_detail_status": result.grant_detail_status, + "diagnostics": result.diagnostics, + "grants": [grant_to_dict(g) for g in result.grants], + "total_amount": str(result.total_amount), + } + + +def _print_table(result: ExtractionResult) -> None: + print(f"success: {result.success}") + print(f"method: {result.method}") + print(f"grant_detail_status: {result.grant_detail_status}") + print(f"grants: {len(result.grants)}") + print(f"total_amount: ${result.total_amount:,}") + print("diagnostics:") + for k, v in result.diagnostics.items(): + print(f" {k}: {v}") + if not result.grants: + return + print() + print(f"{'#':>4} {'recipient':<45} {'city':<20} {'st':<3} {'amount':>12}") + print("-" * 90) + for g in result.grants[:50]: + name = (g.recipient_name or g.recipient_person_name or "")[:45] + city = (g.city or "")[:20] + state = (g.state or "")[:3] + amt = f"${g.amount:,.0f}" if g.amount is not None else (g.amount_raw or "") + print(f"{g.line_number:>4} {name:<45} {city:<20} {state:<3} {amt:>12}") + if len(result.grants) > 50: + print(f"... and {len(result.grants) - 50} more") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + prog="scripts.extract.irs_990_pdf", + description="Extract grant data from a 990-PF PDF.", + ) + parser.add_argument("pdf_path", help="Path to a local PDF file.") + parser.add_argument("--tax-year", type=int, default=None, + help="Tax year hint passed to the extraction prompts.") + parser.add_argument("--source-label", default=None, + help="Diagnostic label for the PDF source (e.g. 'ny_ag').") + parser.add_argument("--json", action="store_true", + help="Emit the full result as JSON instead of a table.") + args = parser.parse_args(argv) + + result = extract_from_pdf( + args.pdf_path, + tax_year=args.tax_year, + source_label=args.source_label, + ) + + if args.json: + print(json.dumps(_result_to_jsonable(result), indent=2)) + else: + _print_table(result) + + return 0 if result.success else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/parse/__init__.py b/scripts/parse/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/scripts/parse/__init__.py diff --git a/scripts/parse/irs_990.py b/scripts/parse/irs_990.py new file mode 100644 index 0000000..3a5cc2d --- /dev/null +++ b/scripts/parse/irs_990.py @@ -0,0 +1,691 @@ +""" +Parse IRS Form 990 XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990, + raw.grant_990, raw.schedule_o. + +Usage: + python -m scripts.parse.irs_990 data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990 data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import normalize_ein, parse_numeric, is_placeholder +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# Module-level truthy set used by every classification check. +_TRUTHY = {"X", "x", "1", "true"} + +# IRS ReturnTypeCd values that map to Form 990 (includes amended return variant). +FORM_990_RETURN_TYPES = {"990", "990A"} + +# Schedule I RecipientTable rows (one xpath — modern schema only). +GRANT_XPATHS = [ + f".//{{{NS}}}IRS990ScheduleI/{{{NS}}}RecipientTable", +] + +SCHEDULE_I_XPATH = f".//{{{NS}}}IRS990ScheduleI" +SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" + + +# ============================================================ +# Grant extraction (Schedule I RecipientTable) +# ============================================================ + +def extract_grant(g, line_number): + """Extract a single Schedule I grant row from a RecipientTable element. + + Child element names vary slightly across IRS schema versions, so each + field tries the modern tag first and falls back to the older variant. + + Returns None for stub rows that lack both a recipient name and any + grant amount — these appear in real filings as malformed RecipientTable + entries (e.g. just a PurposeOfGrantTxt with no recipient or amount). + """ + if len(g) == 0: + return None + + cash_raw = text(g, "irs:CashGrantAmt") + non_cash_raw = text(g, "irs:NonCashAssistanceAmt") + recipient_name = ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") + ) + recipient_name2 = ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") + ) + + if (recipient_name is None and recipient_name2 is None + and cash_raw is None and non_cash_raw is None): + return None + + return { + "line_number": line_number, + "recipient_name": recipient_name, + "recipient_name2": recipient_name2, + "recipient_ein": normalize_ein(text(g, "irs:RecipientEIN")), + "address_line1": ( + text(g, "irs:USAddress/irs:AddressLine1Txt") + or text(g, "irs:USAddress/irs:AddressLine1") + or text(g, "irs:ForeignAddress/irs:AddressLine1Txt") + or text(g, "irs:ForeignAddress/irs:AddressLine1") + ), + "address_line2": ( + text(g, "irs:USAddress/irs:AddressLine2Txt") + or text(g, "irs:USAddress/irs:AddressLine2") + or text(g, "irs:ForeignAddress/irs:AddressLine2Txt") + or text(g, "irs:ForeignAddress/irs:AddressLine2") + ), + "city": ( + text(g, "irs:USAddress/irs:CityNm") + or text(g, "irs:USAddress/irs:City") + or text(g, "irs:ForeignAddress/irs:CityNm") + or text(g, "irs:ForeignAddress/irs:City") + ), + "state": ( + text(g, "irs:USAddress/irs:StateAbbreviationCd") + or text(g, "irs:USAddress/irs:State") + or text(g, "irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "zip": ( + text(g, "irs:USAddress/irs:ZIPCd") + or text(g, "irs:USAddress/irs:ZIPCode") + ), + "country": text(g, "irs:ForeignAddress/irs:CountryCd"), + "foreign_postal_code": text(g, "irs:ForeignAddress/irs:ForeignPostalCd"), + "cash_grant_amt_raw": cash_raw, + "cash_grant_amt": parse_numeric(cash_raw), + "non_cash_amt_raw": non_cash_raw, + "non_cash_amt": parse_numeric(non_cash_raw), + "non_cash_desc": text(g, "irs:NonCashAssistanceDesc"), + "valuation_method": text(g, "irs:ValuationMethodUsedDesc"), + "purpose": text(g, "irs:PurposeOfGrantTxt"), + "irc_section": text(g, "irs:IRCSectionDesc"), + } + + +def find_all_grants(tree): + """Find all Schedule I RecipientTable elements.""" + grants = [] + for xpath in GRANT_XPATHS: + grants.extend(tree.findall(xpath)) + return grants + + +# ============================================================ +# Schedule O narrative extraction +# ============================================================ + +def extract_schedule_o(tree): + """Extract every SupplementalInformationDetail entry from Schedule O. + + Returns a list of dicts (without raw_filing_id — added in process_filing). + line_number is the 1-based ordinal position among emitted rows. Entries + with no narrative text in any of the explanation fields are skipped to + avoid emitting low-signal placeholder rows. + """ + rows = [] + sched_o = tree.find(SCHEDULE_O_XPATH) + if sched_o is None: + return rows + + details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") + for d in details: + explanation = ( + text(d, "irs:ExplanationTxt") + or text(d, "irs:MediumExplanationTxt") + or text(d, "irs:ShortExplanationTxt") + ) + if explanation is None: + continue + rows.append({ + "line_number": len(rows) + 1, + "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), + "explanation": explanation, + }) + return rows + + +# ============================================================ +# Form 990 summary extraction +# ============================================================ + +def extract_form_990(tree): + """Extract filing-level summary fields for raw.form_990.""" + f990 = f".//{{{NS}}}IRS990" + + # Filer address from ReturnHeader (with old-schema fallbacks) + filer_addr = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_address_line2": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + "website": text(tree, f"{f990}/irs:WebsiteAddressTxt"), + } + + # Classification + org_type = None + if text(tree, f"{f990}/irs:TypeOfOrganizationCorpInd") in _TRUTHY: + org_type = "corp" + elif text(tree, f"{f990}/irs:TypeOfOrganizationTrustInd") in _TRUTHY: + org_type = "trust" + elif text(tree, f"{f990}/irs:TypeOfOrganizationAssocInd") in _TRUTHY: + org_type = "assoc" + elif text(tree, f"{f990}/irs:TypeOfOrganizationOtherInd") in _TRUTHY: + org_type = "other" + + method_cash = text(tree, f"{f990}/irs:MethodOfAccountingCashInd") + method_accrual = text(tree, f"{f990}/irs:MethodOfAccountingAccrualInd") + method_other = text(tree, f"{f990}/irs:MethodOfAccountingOtherInd") + if method_cash in _TRUTHY: + accounting_method = "cash" + elif method_accrual in _TRUTHY: + accounting_method = "accrual" + elif method_other in _TRUTHY: + accounting_method = "other" + else: + accounting_method = None + + # The 501(c) subsection number lives in an attribute, not in the + # element body. (The body is typically "X" — the checkbox indicator.) + # A 501(c)(3) filing sets Organization501c3Ind instead and leaves this + # attribute unset; we deliberately don't infer "3" in that case, to + # keep raw provenance honest. + section_501c_type = None + el_501c = tree.find(f"{f990}/irs:Organization501cInd", NS_MAP) + if el_501c is not None: + section_501c_type = el_501c.get("organization501cTypeTxt") + + classification = { + "is_501c3": text_bool(tree, f"{f990}/irs:Organization501c3Ind"), + "section_501c_type": section_501c_type, + "org_type": org_type, + "group_return": text_bool(tree, f"{f990}/irs:GroupReturnForAffiliatesInd"), + "group_exemption_num": text(tree, f"{f990}/irs:GroupExemptionNum"), + "formation_year": text(tree, f"{f990}/irs:FormationYr"), + "legal_domicile_state": text(tree, f"{f990}/irs:LegalDomicileStateCd"), + "mission": ( + text(tree, f"{f990}/irs:ActivityOrMissionDesc") + or text(tree, f"{f990}/irs:MissionDesc") + ), + "accounting_method": accounting_method, + } + + # Filing status flags + status = { + "is_amended": text_bool(tree, f"{f990}/irs:AmendedReturnInd"), + "is_initial": text_bool(tree, f"{f990}/irs:InitialReturnInd"), + "is_final": text_bool(tree, f"{f990}/irs:FinalReturnInd"), + "is_terminated": text_bool(tree, f"{f990}/irs:TerminateOperationsInd"), + } + + # Part I: current year summary + part_i_cy = {} + part_i_cy_fields = { + "gross_receipts": "GrossReceiptsAmt", + "cy_contributions_grants": "CYContributionsGrantsAmt", + "cy_program_service_revenue": "CYProgramServiceRevenueAmt", + "cy_investment_income": "CYInvestmentIncomeAmt", + "cy_other_revenue": "CYOtherRevenueAmt", + "cy_total_revenue": "CYTotalRevenueAmt", + "cy_grants_paid": "CYGrantsAndSimilarPaidAmt", + "cy_benefits_to_members": "CYBenefitsPaidToMembersAmt", + "cy_salaries_benefits": "CYSalariesCompEmpBnftPaidAmt", + "cy_fundraising_expense": "CYTotalFundraisingExpenseAmt", + "cy_other_expenses": "CYOtherExpensesAmt", + "cy_total_expenses": "CYTotalExpensesAmt", + "cy_revenue_less_expenses": "CYRevenuesLessExpensesAmt", + } + for col, elem in part_i_cy_fields.items(): + part_i_cy[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) + + # Part I: prior year summary + part_i_py = { + "py_total_revenue": parse_numeric(text(tree, f"{f990}/irs:PYTotalRevenueAmt")), + "py_total_expenses": parse_numeric(text(tree, f"{f990}/irs:PYTotalExpensesAmt")), + } + + # Balance sheet (Part I summary / Part X) + balance_sheet = {} + bs_fields = { + "total_assets_boy": "TotalAssetsBOYAmt", + "total_assets_eoy": "TotalAssetsEOYAmt", + "total_liabilities_boy": "TotalLiabilitiesBOYAmt", + "total_liabilities_eoy": "TotalLiabilitiesEOYAmt", + "net_assets_boy": "NetAssetsOrFundBalancesBOYAmt", + "net_assets_eoy": "NetAssetsOrFundBalancesEOYAmt", + } + for col, elem in bs_fields.items(): + balance_sheet[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) + + # Governance / workforce + workforce = { + "total_employees": parse_numeric(text(tree, f"{f990}/irs:TotalEmployeeCnt")), + "total_volunteers": parse_numeric(text(tree, f"{f990}/irs:TotalVolunteersCnt")), + "voting_members": parse_numeric( + text(tree, f"{f990}/irs:VotingMembersGoverningBodyCnt") + or text(tree, f"{f990}/irs:GoverningBodyVotingMembersCnt") + ), + "independent_voting_members": parse_numeric( + text(tree, f"{f990}/irs:VotingMembersIndependentCnt") + or text(tree, f"{f990}/irs:IndependentVotingMemberCnt") + ), + } + + # Part IX: functional expense breakdown + func_exp = { + "program_services_expense": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ProgramServicesAmt") + ), + "management_general_expense": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ManagementAndGeneralAmt") + ), + "fundraising_expense_ix": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:FundraisingAmt") + ), + } + + # Part VIII: revenue detail + revenue_detail = { + "government_grants": parse_numeric(text(tree, f"{f990}/irs:GovernmentGrantsAmt")), + "total_contributions": parse_numeric(text(tree, f"{f990}/irs:TotalContributionsAmt")), + "total_program_service_rev": parse_numeric( + text(tree, f"{f990}/irs:TotalProgramServiceRevenueAmt") + ), + "investment_income": parse_numeric( + text(tree, f"{f990}/irs:InvestmentIncomeGrp/irs:TotalRevenueColumnAmt") + ), + } + + # UBI + ubi = { + "gross_ubi": parse_numeric(text(tree, f"{f990}/irs:TotalGrossUBIAmt")), + "net_ubi": parse_numeric(text(tree, f"{f990}/irs:NetUnrelatedBusTxblIncmAmt")), + } + + # Schedule I metadata (1:1 with the filing) + sched_i_el = tree.find(SCHEDULE_I_XPATH) + if sched_i_el is not None: + sched_i = { + "sched_i_grant_records_maintained": text_bool(sched_i_el, "irs:GrantRecordsMaintainedInd"), + "sched_i_501c3_org_count": parse_numeric(text(sched_i_el, "irs:Total501c3OrgCnt")), + "sched_i_other_org_count": parse_numeric(text(sched_i_el, "irs:TotalOtherOrgCnt")), + "sched_i_total_grants_amt": None, + } + else: + sched_i = { + "sched_i_grant_records_maintained": None, + "sched_i_501c3_org_count": None, + "sched_i_other_org_count": None, + "sched_i_total_grants_amt": None, + } + + # Officer / signer + officer = { + "principal_officer": text(tree, f"{f990}/irs:PrincipalOfficerNm"), + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return { + **filer_addr, **classification, **status, + **part_i_cy, **part_i_py, **balance_sheet, + **workforce, **func_exp, **revenue_detail, **ubi, + **sched_i, **officer, + } + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(sched_i_element, grant_elements, grant_rows, cy_grants_paid): + """Determine grant detail completeness for a Form 990 filing. + + Uses filing-level context (Schedule I presence + Part I CYGrantsAndSimilarPaidAmt) + to distinguish "no Schedule I because no grants" from "no Schedule I but + grants reported on Part I". + """ + if sched_i_element is None: + # No Schedule I — but check whether Part I reports grants paid. + if cy_grants_paid is not None and float(cy_grants_paid) > 0: + return "unresolved" + return "no_grants" + + if not grant_elements: + return "unresolved" + + if not grant_rows: + return "unresolved" + + placeholder_count = sum( + 1 for r in grant_rows + if is_placeholder(r.get("recipient_name")) + or (is_placeholder(r.get("cash_grant_amt_raw")) + and is_placeholder(r.get("non_cash_amt_raw"))) + ) + + if placeholder_count == len(grant_rows): + return "placeholder_only" + if placeholder_count > 0: + return "see_attached" + return "complete" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +GRANT_COLUMNS = [ + "raw_filing_id", "line_number", + "recipient_name", "recipient_name2", "recipient_ein", + "address_line1", "address_line2", "city", "state", "zip", + "country", "foreign_postal_code", + "cash_grant_amt_raw", "cash_grant_amt", + "non_cash_amt_raw", "non_cash_amt", + "non_cash_desc", "valuation_method", "purpose", "irc_section", +] + +SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single Form 990 filing. All child writes are transactional.""" + + # Extract filing metadata + metadata = extract_filing_metadata(tree) + + # Locate Schedule I element (used both for grant extraction and status logic) + sched_i_element = tree.find(SCHEDULE_I_XPATH) + + # Extract grants from Schedule I RecipientTable rows + grant_elements = find_all_grants(tree) + extracted_grants = [] + for i, g in enumerate(grant_elements, start=1): + row = extract_grant(g, i) + if row is not None: + extracted_grants.append(row) + + # Extract Schedule O narrative entries + schedule_o_entries = extract_schedule_o(tree) + + # Extract form summary + form_data = extract_form_990(tree) + form_data["grant_detail_status"] = compute_grant_detail_status( + sched_i_element, grant_elements, extracted_grants, form_data.get("cy_grants_paid"), + ) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + grant_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in extracted_grants + ] + schedule_o_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in schedule_o_entries + ] + + _replace_children( + conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows, + ) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields + return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) + schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.grant_990 WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990 WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990 ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert grants + if grant_rows: + from psycopg2.extras import execute_batch + grant_values = [ + [row.get(col) for col in GRANT_COLUMNS] + for row in grant_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.grant_990 ({', '.join(GRANT_COLUMNS)}) " + f"VALUES ({grant_placeholders})", + grant_values, + ) + + # Insert Schedule O narrative + if schedule_o_rows: + from psycopg2.extras import execute_batch + schedule_o_values = [ + [row.get(col) for col in SCHEDULE_O_COLUMNS] + for row in schedule_o_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " + f"VALUES ({schedule_o_placeholders})", + schedule_o_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a Form 990. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type not in FORM_990_RETURN_TYPES: + return None + + source_document_id = None + try: + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + stage = "process_filing" if source_document_id else "derive_source_document_id" + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage=stage) + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990 <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() diff --git a/scripts/parse/irs_990ez.py b/scripts/parse/irs_990ez.py new file mode 100644 index 0000000..bea4fdd --- /dev/null +++ b/scripts/parse/irs_990ez.py @@ -0,0 +1,449 @@ +""" +Parse IRS Form 990-EZ XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990ez, + raw.schedule_o. + +Form 990-EZ has no structured grant recipient table — line 10 of Part I +aggregates grants into a single amount and instructs the filer to "list +in Schedule O." v1 scope is preservation (filing facts + Schedule O), +not grant normalization. + +Usage: + python -m scripts.parse.irs_990ez data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990ez data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import parse_numeric +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990ez" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# IRS ReturnTypeCd values that map to Form 990-EZ (includes amended variant). +FORM_990EZ_RETURN_TYPES = {"990EZ", "990EA"} + +SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" + + +# ============================================================ +# Schedule O narrative extraction +# ============================================================ + +def extract_schedule_o(tree): + """Extract every SupplementalInformationDetail entry from Schedule O. + + Returns a list of dicts (without raw_filing_id — added in process_filing). + line_number is the 1-based ordinal position among emitted rows. Entries + with no narrative text in any of the explanation fields are skipped to + avoid emitting low-signal placeholder rows. + """ + rows = [] + sched_o = tree.find(SCHEDULE_O_XPATH) + if sched_o is None: + return rows + + details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") + for d in details: + explanation = ( + text(d, "irs:ExplanationTxt") + or text(d, "irs:MediumExplanationTxt") + or text(d, "irs:ShortExplanationTxt") + ) + if explanation is None: + continue + rows.append({ + "line_number": len(rows) + 1, + "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), + "explanation": explanation, + }) + return rows + + +# ============================================================ +# Form 990-EZ summary extraction +# ============================================================ + +def extract_form_990ez(tree): + """Extract filing-level summary fields for raw.form_990ez.""" + ez = f".//{{{NS}}}IRS990EZ" + + # Filer identity from ReturnHeader (with old-schema fallbacks). + filer = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + "website": text(tree, f"{ez}/irs:WebsiteAddressTxt"), + } + + # Classification. The 990-EZ schema puts the 501(c) subsection in an + # attribute on Organization501cInd (e.g. <Organization501cInd + # organization501cTypeTxt="7"/>), not in element text. A 501(c)(3) filing + # instead sets <Organization501c3Ind>X</Organization501c3Ind> with no + # separate subsection element — in that case we leave section_501c_type + # NULL rather than inferring "3" from the boolean flag. + section_501c_type = None + el_501c = tree.find(f"{ez}/irs:Organization501cInd", NS_MAP) + if el_501c is not None: + section_501c_type = el_501c.get("organization501cTypeTxt") + + classification = { + "is_501c3": text_bool(tree, f"{ez}/irs:Organization501c3Ind"), + "section_501c_type": section_501c_type, + "group_exemption_num": text(tree, f"{ez}/irs:GroupExemptionNum"), + } + + # Filing status flags + status = { + "is_amended": text_bool(tree, f"{ez}/irs:AmendedReturnInd"), + "is_initial": text_bool(tree, f"{ez}/irs:InitialReturnInd"), + "is_final": text_bool(tree, f"{ez}/irs:FinalReturnInd"), + } + + # Part I: revenue / expenses + part_i = {} + part_i_fields = { + "gross_receipts": "GrossReceiptsAmt", + "contributions_gifts_grants": "ContributionsGiftsGrantsEtcAmt", + "program_service_revenue": "ProgramServiceRevenueAmt", + "investment_income": "InvestmentIncomeAmt", + "total_revenue": "TotalRevenueAmt", + "grants_paid": "GrantsAndSimilarAmountsPaidAmt", + "salaries_compensation": "SalariesOtherCompEmplBnftAmt", + "total_expenses": "TotalExpensesAmt", + "revenue_less_expenses": "ExcessOrDeficitForYearAmt", + } + for col, elem in part_i_fields.items(): + part_i[col] = parse_numeric(text(tree, f"{ez}/irs:{elem}")) + + # Part II: balance sheet (BOY/EOY child elements under group wrappers). + balance_sheet = { + "total_assets_boy": parse_numeric( + text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:BOYAmt") + ), + "total_assets_eoy": parse_numeric( + text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:EOYAmt") + ), + "total_liabilities_boy": parse_numeric( + text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:BOYAmt") + ), + "total_liabilities_eoy": parse_numeric( + text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:EOYAmt") + ), + "net_assets_boy": parse_numeric( + text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:BOYAmt") + ), + "net_assets_eoy": parse_numeric( + text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:EOYAmt") + ), + } + + # Schedule O presence + sched_o_presence = { + "has_schedule_o": tree.find(SCHEDULE_O_XPATH) is not None, + } + + # Officer / signer + officer = { + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return { + **filer, **classification, **status, + **part_i, **balance_sheet, + **sched_o_presence, **officer, + } + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(form_data, schedule_o_rows): + """Determine grant detail completeness for a Form 990-EZ filing. + + 990-EZ has no recipient table to inspect. Line 10 (grants_paid) is an + aggregate and grant detail is instructed to be listed in Schedule O, but + the raw XML gives us no way to tell that a given Schedule O entry is + *the* line-10 narrative rather than unrelated supplemental text. For a + provenance-strict raw layer we therefore only report what we can verify + from the structured fields: + + - no_grants: line 10 is null or 0 (no grants reported). + - unresolved: line 10 is positive. Whether that detail lives in Schedule + O, an attachment, or nowhere is left to a downstream classifier. + + The schedule_o_rows argument is intentionally unused; it's kept in the + signature so callers (and a future classifier) have a single place to + evolve the logic. + """ + del schedule_o_rows # intentionally unused — see docstring + grants_paid = form_data.get("grants_paid") + if grants_paid is None or float(grants_paid) == 0: + return "no_grants" + return "unresolved" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single Form 990-EZ filing. All child writes are transactional.""" + + metadata = extract_filing_metadata(tree) + schedule_o_entries = extract_schedule_o(tree) + form_data = extract_form_990ez(tree) + form_data["grant_detail_status"] = compute_grant_detail_status( + form_data, schedule_o_entries, + ) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + schedule_o_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in schedule_o_entries + ] + + _replace_children( + conn, raw_filing_id, filing_form_data, schedule_o_rows, xml_rows, + ) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + xml fields + return 3 + len(schedule_o_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, schedule_o_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990ez WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990ez ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert Schedule O narrative + if schedule_o_rows: + from psycopg2.extras import execute_batch + schedule_o_values = [ + [row.get(col) for col in SCHEDULE_O_COLUMNS] + for row in schedule_o_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " + f"VALUES ({schedule_o_placeholders})", + schedule_o_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a Form 990-EZ. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type not in FORM_990EZ_RETURN_TYPES: + return None + + source_document_id = None + try: + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + stage = "process_filing" if source_document_id else "derive_source_document_id" + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage=stage) + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990ez <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() diff --git a/scripts/parse/irs_990pf.py b/scripts/parse/irs_990pf.py new file mode 100644 index 0000000..3d245b8 --- /dev/null +++ b/scripts/parse/irs_990pf.py @@ -0,0 +1,544 @@ +""" +Parse IRS 990-PF XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990pf, + raw.grant_990pf. + +Usage: + python -m scripts.parse.irs_990pf data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990pf data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import parse_numeric, is_placeholder +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990pf" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# Grant element XPaths — three schema variants across IRS versions +GRANT_XPATHS = [ + f".//{{{NS}}}SupplementaryInformationGrp/{{{NS}}}GrantOrContributionPdDurYrGrp", + f".//{{{NS}}}SupplementaryInformation/{{{NS}}}GrantOrContriPaidDuringYear", + f".//{{{NS}}}SupplementaryInfomation/{{{NS}}}GrantOrContriPaidDuringYear", # IRS typo +] + + +# ============================================================ +# Grant extraction +# ============================================================ + +def extract_grant(g, line_number): + """Extract a single grant row from a grant XML element. + + Child element names vary across IRS schema versions, so each field + tries the modern tag first, then falls back to the older variant. + """ + if len(g) == 0: + return None + + amount_raw = ( + text(g, "irs:Amt") + or text(g, "irs:Amount") + ) + + return { + "line_number": line_number, + "recipient_name": ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") + ), + "recipient_name2": ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") + ), + "recipient_person_name": ( + text(g, "irs:RecipientPersonNm") + or text(g, "irs:RecipientPersonName") + ), + "address_line1": ( + text(g, "irs:RecipientUSAddress/irs:AddressLine1Txt") + or text(g, "irs:RecipientUSAddress/irs:AddressLine1") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine1Txt") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine1") + ), + "address_line2": ( + text(g, "irs:RecipientUSAddress/irs:AddressLine2Txt") + or text(g, "irs:RecipientUSAddress/irs:AddressLine2") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine2Txt") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine2") + ), + "city": ( + text(g, "irs:RecipientUSAddress/irs:CityNm") + or text(g, "irs:RecipientUSAddress/irs:City") + or text(g, "irs:RecipientForeignAddress/irs:CityNm") + or text(g, "irs:RecipientForeignAddress/irs:City") + ), + "state": ( + text(g, "irs:RecipientUSAddress/irs:StateAbbreviationCd") + or text(g, "irs:RecipientUSAddress/irs:State") + or text(g, "irs:RecipientForeignAddress/irs:ProvinceOrStateNm") + ), + "zip": ( + text(g, "irs:RecipientUSAddress/irs:ZIPCd") + or text(g, "irs:RecipientUSAddress/irs:ZIPCode") + ), + "country": text(g, "irs:RecipientForeignAddress/irs:CountryCd"), + "foreign_postal_code": text(g, "irs:RecipientForeignAddress/irs:ForeignPostalCd"), + "amount_raw": amount_raw, + "amount": parse_numeric(amount_raw), + "purpose": ( + text(g, "irs:GrantOrContributionPurposeTxt") + or text(g, "irs:PurposeOfGrantOrContriTxt") + ), + "foundation_status": ( + text(g, "irs:RecipientFoundationStatusTxt") + or text(g, "irs:RecipientFoundationStatusCd") + ), + "relationship": ( + text(g, "irs:RecipientRelationshipTxt") + or text(g, "irs:RecipientRelationship") + ), + } + + +def find_all_grants(tree): + """Find all grant elements across schema variants.""" + grants = [] + for xpath in GRANT_XPATHS: + grants.extend(tree.findall(xpath)) + return grants + + +# ============================================================ +# Form 990-PF summary extraction +# ============================================================ + +def extract_form_990pf(tree): + """Extract filing-level summary fields for raw_form_990pf.""" + pf = f".//{{{NS}}}IRS990PF" + a = f"{pf}/{{{NS}}}AnalysisOfRevenueAndExpenses" + bs = f"{pf}/{{{NS}}}Form990PFBalanceSheetsGrp" + si = f"{pf}/{{{NS}}}SupplementaryInformationGrp" + sa = f"{pf}/{{{NS}}}StatementsRegardingActyGrp" + + # Filer address from ReturnHeader. Each field tries the modern (*Txt / + # *Cd / CityNm / StateAbbreviationCd / ZIPCd) tag first and falls back + # to the old-schema variant used by older filings. + filer_addr = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_address_line2": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + } + + # Classification. text_bool preserves None (tag absent) vs False + # (tag present but not ticked); the accounting_method scan treats both + # the same because we're looking for the first truthy branch. + method_cash = text_bool(tree, f"{pf}/irs:MethodOfAccountingCashInd") + method_accrual = text_bool(tree, f"{pf}/irs:MethodOfAccountingAccrualInd") + + classification = { + "is_501c3_pf": text_bool(tree, f"{pf}/irs:Organization501c3ExemptPFInd"), + "is_4947a1_trust": text_bool(tree, f"{pf}/irs:Organization4947a1TrtdPFInd"), + "is_private_operating": text_bool(tree, f"{sa}/irs:PrivateOperatingFoundationInd"), + "accounting_method": ( + "cash" if method_cash + else ("accrual" if method_accrual else None) + ), + } + + # Filing status. is_initial is true when either of two mutually-exclusive + # indicator tags is set; we preserve None when both tags are absent. + initial_std = text_bool(tree, f"{pf}/irs:InitialReturnInd") + initial_former = text_bool(tree, f"{pf}/irs:InitialReturnFormerPubChrtyInd") + if initial_std is None and initial_former is None: + is_initial = None + else: + is_initial = bool(initial_std) or bool(initial_former) + + status = { + "is_amended": text_bool(tree, f"{pf}/irs:AmendedReturnInd"), + "is_initial": is_initial, + "is_final": text_bool(tree, f"{pf}/irs:FinalReturnInd"), + } + + # Part I: revenue and expenses + part_i = {} + part_i_fields = { + "contributions_received": f"{a}/irs:ContriRcvdRevAndExpnssAmt", + "interest_revenue": f"{a}/irs:InterestOnSavRevAndExpnssAmt", + "dividends_revenue": f"{a}/irs:DividendsRevAndExpnssAmt", + "net_gain_sale_assets": f"{a}/irs:NetGainSaleAstRevAndExpnssAmt", + "total_revenue": f"{a}/irs:TotalRevAndExpnssAmt", + "total_net_investment_income": f"{a}/irs:TotalNetInvstIncmAmt", + "compensation_officers": f"{a}/irs:CompOfcrDirTrstRevAndExpnssAmt", + "total_operating_expenses": f"{a}/irs:TotOprExpensesRevAndExpnssAmt", + "contributions_paid": f"{a}/irs:ContriPaidRevAndExpnssAmt", + "total_expenses": f"{a}/irs:TotalExpensesRevAndExpnssAmt", + "total_charitable_disbursements": f"{a}/irs:TotalExpensesDsbrsChrtblAmt", + "excess_revenue_over_expenses": f"{a}/irs:ExcessRevenueOverExpensesAmt", + "net_investment_income": f"{a}/irs:NetInvestmentIncomeAmt", + "adjusted_net_income": f"{a}/irs:AdjustedNetIncomeAmt", + } + for col, xpath in part_i_fields.items(): + part_i[col] = parse_numeric(text(tree, xpath)) + + # Part II: balance sheets + part_ii = {} + part_ii_fields = { + "total_assets_boy": f"{bs}/irs:TotalAssetsBOYAmt", + "total_assets_eoy": f"{bs}/irs:TotalAssetsEOYAmt", + "total_assets_eoy_fmv": f"{bs}/irs:TotalAssetsEOYFMVAmt", + "total_liabilities_boy": f"{bs}/irs:TotalLiabilitiesBOYAmt", + "total_liabilities_eoy": f"{bs}/irs:TotalLiabilitiesEOYAmt", + "net_assets_boy": f"{bs}/irs:TotNetAstOrFundBalancesBOYAmt", + "net_assets_eoy": f"{bs}/irs:TotNetAstOrFundBalancesEOYAmt", + "fmv_assets_eoy": f"{pf}/irs:FMVAssetsEOYAmt", + } + for col, xpath in part_ii_fields.items(): + part_ii[col] = parse_numeric(text(tree, xpath)) + + # Parts X-XII + dist = { + "minimum_investment_return": parse_numeric( + text(tree, f"{pf}/irs:MinimumInvestmentReturnGrp/irs:MinimumInvestmentReturnAmt") + ), + "distributable_amount": parse_numeric( + text(tree, f"{pf}/irs:DistributableAmountGrp/irs:DistributableAsAdjustedAmt") + ), + "qualifying_distributions": parse_numeric( + text(tree, f"{pf}/irs:QualifyingDistriPartXIIGrp/irs:QualifyingDistributionsAmt") + ), + "excise_tax_amount": parse_numeric( + text(tree, f"{pf}/irs:ExciseTaxBasedOnInvstIncmGrp/irs:InvestmentIncomeExciseTaxAmt") + ), + } + + # Part XV totals + xv = { + "total_grants_paid": parse_numeric( + text(tree, f"{si}/irs:TotalGrantOrContriPdDurYrAmt") + ), + "total_grants_approved_future": parse_numeric( + text(tree, f"{si}/irs:TotalGrantOrContriApprvFutAmt") + ), + } + + # Misc + misc = { + "website": text(tree, f"{sa}/irs:WebsiteAddressTxt"), + "state_of_registration": text(tree, f"{sa}/irs:OrgReportOrRegisterStateCd"), + } + + # Officer / signer + officer = { + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return {**filer_addr, **classification, **status, **part_i, **part_ii, **dist, **xv, **misc, **officer} + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(grant_elements, grant_rows): + """Determine grant detail completeness status.""" + if not grant_elements: + return "no_grants" + if not grant_rows: + return "unresolved" + + placeholder_count = sum( + 1 for r in grant_rows + if is_placeholder(r.get("recipient_name")) + or is_placeholder(r.get("amount_raw")) + ) + + if placeholder_count == len(grant_rows): + return "placeholder_only" + if placeholder_count > 0: + return "see_attached" + return "complete" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +GRANT_COLUMNS = [ + "raw_filing_id", "line_number", + "recipient_name", "recipient_name2", "recipient_person_name", + "address_line1", "address_line2", "city", "state", "zip", + "country", "foreign_postal_code", + "amount_raw", "amount", "purpose", "foundation_status", "relationship", +] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single 990-PF filing. All child writes are transactional.""" + + # Extract filing metadata + metadata = extract_filing_metadata(tree) + + # Extract grants + grant_elements = find_all_grants(tree) + extracted_grants = [] + for i, g in enumerate(grant_elements, start=1): + row = extract_grant(g, i) + if row is not None: + extracted_grants.append(row) + + # Extract form summary + form_data = extract_form_990pf(tree) + form_data["grant_detail_status"] = compute_grant_detail_status(grant_elements, extracted_grants) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + grant_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in extracted_grants + ] + + _replace_children(conn, raw_filing_id, filing_form_data, grant_rows, xml_rows) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants + xml fields + return 3 + len(grant_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, grant_rows, xml_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) + + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.grant_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990pf ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert grants + if grant_rows: + from psycopg2.extras import execute_batch + grant_values = [ + [row.get(col) for col in GRANT_COLUMNS] + for row in grant_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.grant_990pf ({', '.join(GRANT_COLUMNS)}) " + f"VALUES ({grant_placeholders})", + grant_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a 990-PF. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type != "990PF": + return None + + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + try: + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage="process_filing") + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990pf <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() diff --git a/scripts/parse/irs_bmf.py b/scripts/parse/irs_bmf.py new file mode 100644 index 0000000..cf02575 --- /dev/null +++ b/scripts/parse/irs_bmf.py @@ -0,0 +1,231 @@ +""" +Load IRS Business Master File (BMF) CSVs into raw.bmf. + +Usage: + python -m scripts.parse.irs_bmf + python -m scripts.parse.irs_bmf data/irs/bmf/eo1.csv data/irs/bmf/eo2.csv +""" + +import csv +import os +import sys + +from psycopg2.extras import execute_batch + +from scripts.common.db import execute_transaction +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.normalize import normalize_ein + +DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "..", "data", "irs", "bmf") +PARSER_NAME = "load_raw_bmf" +SOURCE_SYSTEM = "irs_bmf" + +BMF_COLUMNS = [ + "ein", + "name", + "ico", + "street", + "city", + "state", + "zip", + "grp", + "subsection", + "affiliation", + "classification", + "ruling", + "deductibility", + "foundation", + "activity", + "organization", + "status", + "tax_period", + "asset_cd", + "income_cd", + "filing_req_cd", + "pf_filing_req_cd", + "acct_pd", + "asset_amt", + "income_amt", + "revenue_amt", + "ntee_cd", + "sort_name", + "source_file", + "ingest_run_id", +] + + +def parse_bigint(value): + if value is None: + return None + s = str(value).strip() + if not s: + return None + return int(s) + + +def row_to_record(row, source_file, ingest_run_id): + ein = normalize_ein(row.get("EIN")) + if not ein: + raise ValueError(f"Cannot normalize EIN: {row.get('EIN')!r}") + + return { + "ein": ein, + "name": row.get("NAME") or None, + "ico": row.get("ICO") or None, + "street": row.get("STREET") or None, + "city": row.get("CITY") or None, + "state": row.get("STATE") or None, + "zip": row.get("ZIP") or None, + "grp": row.get("GROUP") or None, + "subsection": row.get("SUBSECTION") or None, + "affiliation": row.get("AFFILIATION") or None, + "classification": row.get("CLASSIFICATION") or None, + "ruling": row.get("RULING") or None, + "deductibility": row.get("DEDUCTIBILITY") or None, + "foundation": row.get("FOUNDATION") or None, + "activity": row.get("ACTIVITY") or None, + "organization": row.get("ORGANIZATION") or None, + "status": row.get("STATUS") or None, + "tax_period": row.get("TAX_PERIOD") or None, + "asset_cd": row.get("ASSET_CD") or None, + "income_cd": row.get("INCOME_CD") or None, + "filing_req_cd": row.get("FILING_REQ_CD") or None, + "pf_filing_req_cd": row.get("PF_FILING_REQ_CD") or None, + "acct_pd": row.get("ACCT_PD") or None, + "asset_amt": parse_bigint(row.get("ASSET_AMT")), + "income_amt": parse_bigint(row.get("INCOME_AMT")), + "revenue_amt": parse_bigint(row.get("REVENUE_AMT")), + "ntee_cd": row.get("NTEE_CD") or None, + "sort_name": row.get("SORT_NAME") or None, + "source_file": source_file, + "ingest_run_id": ingest_run_id, + } + + +def upsert_records(records): + if not records: + return 0 + + placeholders = ", ".join(["%s"] * len(BMF_COLUMNS)) + sql = ( + f"INSERT INTO raw.bmf ({', '.join(BMF_COLUMNS)}) " + f"VALUES ({placeholders}) " + "ON CONFLICT (ein) DO UPDATE SET " + "name = EXCLUDED.name, " + "ico = EXCLUDED.ico, " + "street = EXCLUDED.street, " + "city = EXCLUDED.city, " + "state = EXCLUDED.state, " + "zip = EXCLUDED.zip, " + "grp = EXCLUDED.grp, " + "subsection = EXCLUDED.subsection, " + "affiliation = EXCLUDED.affiliation, " + "classification = EXCLUDED.classification, " + "ruling = EXCLUDED.ruling, " + "deductibility = EXCLUDED.deductibility, " + "foundation = EXCLUDED.foundation, " + "activity = EXCLUDED.activity, " + "organization = EXCLUDED.organization, " + "status = EXCLUDED.status, " + "tax_period = EXCLUDED.tax_period, " + "asset_cd = EXCLUDED.asset_cd, " + "income_cd = EXCLUDED.income_cd, " + "filing_req_cd = EXCLUDED.filing_req_cd, " + "pf_filing_req_cd = EXCLUDED.pf_filing_req_cd, " + "acct_pd = EXCLUDED.acct_pd, " + "asset_amt = EXCLUDED.asset_amt, " + "income_amt = EXCLUDED.income_amt, " + "revenue_amt = EXCLUDED.revenue_amt, " + "ntee_cd = EXCLUDED.ntee_cd, " + "sort_name = EXCLUDED.sort_name, " + "source_file = EXCLUDED.source_file, " + "ingest_run_id = EXCLUDED.ingest_run_id" + ) + + values = [[record.get(col) for col in BMF_COLUMNS] for record in records] + + def _do(conn): + with conn.cursor() as cur: + execute_batch(cur, sql, values, page_size=1000) + return len(values) + + return execute_transaction(_do) + + +def discover_files(args): + if args: + return args + return sorted( + os.path.join(DATA_DIR, name) + for name in os.listdir(DATA_DIR) + if name.endswith(".csv") + ) + + +def main(): + files = discover_files(sys.argv[1:]) + if not files: + print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(path) for path in files) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + files_scanned = 0 + files_matched = 0 + rows_inserted = 0 + + try: + for csv_path in files: + basename = os.path.basename(csv_path) + files_scanned += 1 + print(f"Loading {basename}...") + + batch = [] + file_rows = 0 + file_errors = 0 + + with open(csv_path, newline="", encoding="utf-8-sig") as f: + reader = csv.DictReader(f) + for line_number, row in enumerate(reader, start=2): + try: + batch.append(row_to_record(row, basename, ingest_run_id)) + except Exception as e: + file_errors += 1 + log_ingest_error( + ingest_run_id, + basename, + f"line:{line_number}", + e, + source_document_id=row.get("EIN"), + stage="normalize_row", + ) + continue + + if len(batch) >= 5000: + rows_inserted += upsert_records(batch) + file_rows += len(batch) + batch = [] + + if batch: + rows_inserted += upsert_records(batch) + file_rows += len(batch) + + files_matched += 1 + print(f" {basename}: {file_rows:,} rows loaded, {file_errors} errors") + + finish_ingest_run(ingest_run_id, files_scanned, files_matched, rows_inserted) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print( + f"Done. {files_scanned} files scanned, " + f"{files_matched} files loaded, {rows_inserted:,} rows upserted." + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/seed.py b/scripts/seed.py new file mode 100644 index 0000000..72c12bd --- /dev/null +++ b/scripts/seed.py @@ -0,0 +1,81 @@ +""" +Seed raw-layer reference tables with initial values. + +Idempotent — safe to run repeatedly. Uses ON CONFLICT DO UPDATE +so display_name/notes stay current if you change them here. + +Usage: python scripts/seed.py +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from scripts.common.db import execute + + +def seed_source_system(): + execute(""" + INSERT INTO raw.source_system (code, display_name, url, notes) VALUES + ('irs_xml', 'IRS Bulk XML E-Files', 'https://www.irs.gov/charities-non-profits/form-990-series-downloads', NULL), + ('irs_pdf', 'IRS PDF Filings', NULL, NULL), + ('irs_bmf', 'IRS EO BMF Extract', 'https://www.irs.gov/charities-non-profits/exempt-organizations-business-master-file-extract-eo-bmf', NULL), + ('ny_ag', 'New York Attorney General', 'https://www.charitiesnys.com/RegistrySearch/search_charities.jsp', NULL), + ('ca_ag', 'California Attorney General', 'https://rct.doj.ca.gov/Verification/Web/Search.aspx', NULL), + ('fl_ag', 'Florida Charities', NULL, NULL), + ('tx_ag', 'Texas Secretary of State', NULL, NULL), + ('nj_ag', 'New Jersey Charities', NULL, NULL), + ('il_ag', 'Illinois Attorney General', NULL, NULL), + ('pa_ag', 'Pennsylvania Charities', NULL, NULL), + ('tn_ag', 'Tennessee Secretary of State', NULL, NULL) + ON CONFLICT (code) DO UPDATE SET + display_name = EXCLUDED.display_name, + url = EXCLUDED.url; + """) + + +def seed_form_type(): + execute(""" + INSERT INTO raw.form_type (code, display_name, notes) VALUES + ('990', 'Form 990', NULL), + ('990PF', 'Form 990-PF', NULL), + ('990EZ', 'Form 990-EZ', NULL), + ('990O', 'Form 990-O', NULL), + ('990T', 'Form 990-T', NULL), + ('990A', 'Form 990 (Amended)', NULL), + ('990PA', 'Form 990-PF (Amended)', NULL), + ('990EA', 'Form 990-EZ (Amended)', NULL) + ON CONFLICT (code) DO UPDATE SET + display_name = EXCLUDED.display_name; + """) + + +def seed_grant_detail_status(): + execute(""" + INSERT INTO raw.grant_detail_status (code, display_name, notes) VALUES + ('complete', 'Complete', 'All grants are in raw grant rows'), + ('see_attached', 'See Attached', 'Filing says SEE ATTACHED for grant detail'), + ('schedule_o', 'Schedule O', 'Grant detail is in Schedule O narrative'), + ('placeholder_only', 'Placeholder Only', 'Only placeholder rows (VARIOUS, SEE STATEMENT, etc.)'), + ('no_grants', 'No Grants', 'Filing has no grant schedule or zero grants reported'), + ('unresolved', 'Unresolved', 'Completeness not yet determined'), + ('supplemented', 'Supplemented', 'Detail recovered from supplemental source (PDF, state AG)') + ON CONFLICT (code) DO UPDATE SET + display_name = EXCLUDED.display_name, + notes = EXCLUDED.notes; + """) + + +def main(): + print("Seeding reference tables...") + seed_source_system() + print(" source_system: done") + seed_form_type() + print(" form_type: done") + seed_grant_detail_status() + print(" grant_detail_status: done") + print("All reference tables seeded.") + + +if __name__ == "__main__": + main() |
