aboutsummaryrefslogtreecommitdiff
path: root/scripts/parse/irs_990.py
diff options
context:
space:
mode:
authorbenj <benj@rse8.com>2026-04-10 11:13:57 +0800
committerbenj <benj@rse8.com>2026-04-10 11:13:57 +0800
commit6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch)
tree52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/parse/irs_990.py
parent493746b14c1251a45b061d2e3edd9160c929d2b9 (diff)
downloadtidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst
tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip
ensure parsers do not parse and store raw XML fields
Diffstat (limited to '')
-rw-r--r--scripts/parse/irs_990.py691
1 files changed, 691 insertions, 0 deletions
diff --git a/scripts/parse/irs_990.py b/scripts/parse/irs_990.py
new file mode 100644
index 0000000..3a5cc2d
--- /dev/null
+++ b/scripts/parse/irs_990.py
@@ -0,0 +1,691 @@
+"""
+Parse IRS Form 990 XML files into the new raw schema.
+
+Populates: raw.filing, raw.filing_source, raw.form_990,
+ raw.grant_990, raw.schedule_o.
+
+Usage:
+ python -m scripts.parse.irs_990 data/irs/xml-zips/*.zip
+ python -m scripts.parse.irs_990 data/irs/xml-missing/202100139349100100_public.xml
+"""
+
+import io
+import os
+import sys
+import zipfile
+
+from lxml import etree
+
+from scripts.common.db import execute_transaction
+from scripts.common.normalize import normalize_ein, parse_numeric, is_placeholder
+from scripts.common.xml import (
+ NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata,
+)
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.filing import (
+ upsert_raw_filing, record_raw_filing_source,
+)
+
+PARSER_NAME = "parse_irs_990"
+SOURCE_SYSTEM = "irs_xml"
+
+# Standalone XML files use this as source_archive
+STANDALONE_ARCHIVE = "__standalone__"
+
+# Module-level truthy set used by every classification check.
+_TRUTHY = {"X", "x", "1", "true"}
+
+# IRS ReturnTypeCd values that map to Form 990 (includes amended return variant).
+FORM_990_RETURN_TYPES = {"990", "990A"}
+
+# Schedule I RecipientTable rows (one xpath — modern schema only).
+GRANT_XPATHS = [
+ f".//{{{NS}}}IRS990ScheduleI/{{{NS}}}RecipientTable",
+]
+
+SCHEDULE_I_XPATH = f".//{{{NS}}}IRS990ScheduleI"
+SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO"
+
+
+# ============================================================
+# Grant extraction (Schedule I RecipientTable)
+# ============================================================
+
+def extract_grant(g, line_number):
+ """Extract a single Schedule I grant row from a RecipientTable element.
+
+ Child element names vary slightly across IRS schema versions, so each
+ field tries the modern tag first and falls back to the older variant.
+
+ Returns None for stub rows that lack both a recipient name and any
+ grant amount — these appear in real filings as malformed RecipientTable
+ entries (e.g. just a PurposeOfGrantTxt with no recipient or amount).
+ """
+ if len(g) == 0:
+ return None
+
+ cash_raw = text(g, "irs:CashGrantAmt")
+ non_cash_raw = text(g, "irs:NonCashAssistanceAmt")
+ recipient_name = (
+ text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt")
+ or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1")
+ )
+ recipient_name2 = (
+ text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt")
+ or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2")
+ )
+
+ if (recipient_name is None and recipient_name2 is None
+ and cash_raw is None and non_cash_raw is None):
+ return None
+
+ return {
+ "line_number": line_number,
+ "recipient_name": recipient_name,
+ "recipient_name2": recipient_name2,
+ "recipient_ein": normalize_ein(text(g, "irs:RecipientEIN")),
+ "address_line1": (
+ text(g, "irs:USAddress/irs:AddressLine1Txt")
+ or text(g, "irs:USAddress/irs:AddressLine1")
+ or text(g, "irs:ForeignAddress/irs:AddressLine1Txt")
+ or text(g, "irs:ForeignAddress/irs:AddressLine1")
+ ),
+ "address_line2": (
+ text(g, "irs:USAddress/irs:AddressLine2Txt")
+ or text(g, "irs:USAddress/irs:AddressLine2")
+ or text(g, "irs:ForeignAddress/irs:AddressLine2Txt")
+ or text(g, "irs:ForeignAddress/irs:AddressLine2")
+ ),
+ "city": (
+ text(g, "irs:USAddress/irs:CityNm")
+ or text(g, "irs:USAddress/irs:City")
+ or text(g, "irs:ForeignAddress/irs:CityNm")
+ or text(g, "irs:ForeignAddress/irs:City")
+ ),
+ "state": (
+ text(g, "irs:USAddress/irs:StateAbbreviationCd")
+ or text(g, "irs:USAddress/irs:State")
+ or text(g, "irs:ForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "zip": (
+ text(g, "irs:USAddress/irs:ZIPCd")
+ or text(g, "irs:USAddress/irs:ZIPCode")
+ ),
+ "country": text(g, "irs:ForeignAddress/irs:CountryCd"),
+ "foreign_postal_code": text(g, "irs:ForeignAddress/irs:ForeignPostalCd"),
+ "cash_grant_amt_raw": cash_raw,
+ "cash_grant_amt": parse_numeric(cash_raw),
+ "non_cash_amt_raw": non_cash_raw,
+ "non_cash_amt": parse_numeric(non_cash_raw),
+ "non_cash_desc": text(g, "irs:NonCashAssistanceDesc"),
+ "valuation_method": text(g, "irs:ValuationMethodUsedDesc"),
+ "purpose": text(g, "irs:PurposeOfGrantTxt"),
+ "irc_section": text(g, "irs:IRCSectionDesc"),
+ }
+
+
+def find_all_grants(tree):
+ """Find all Schedule I RecipientTable elements."""
+ grants = []
+ for xpath in GRANT_XPATHS:
+ grants.extend(tree.findall(xpath))
+ return grants
+
+
+# ============================================================
+# Schedule O narrative extraction
+# ============================================================
+
+def extract_schedule_o(tree):
+ """Extract every SupplementalInformationDetail entry from Schedule O.
+
+ Returns a list of dicts (without raw_filing_id — added in process_filing).
+ line_number is the 1-based ordinal position among emitted rows. Entries
+ with no narrative text in any of the explanation fields are skipped to
+ avoid emitting low-signal placeholder rows.
+ """
+ rows = []
+ sched_o = tree.find(SCHEDULE_O_XPATH)
+ if sched_o is None:
+ return rows
+
+ details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail")
+ for d in details:
+ explanation = (
+ text(d, "irs:ExplanationTxt")
+ or text(d, "irs:MediumExplanationTxt")
+ or text(d, "irs:ShortExplanationTxt")
+ )
+ if explanation is None:
+ continue
+ rows.append({
+ "line_number": len(rows) + 1,
+ "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"),
+ "explanation": explanation,
+ })
+ return rows
+
+
+# ============================================================
+# Form 990 summary extraction
+# ============================================================
+
+def extract_form_990(tree):
+ """Extract filing-level summary fields for raw.form_990."""
+ f990 = f".//{{{NS}}}IRS990"
+
+ # Filer address from ReturnHeader (with old-schema fallbacks)
+ filer_addr = {
+ "filer_name2": (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2")
+ ),
+ "filer_address_line1": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1")
+ ),
+ "filer_address_line2": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2")
+ ),
+ "filer_city": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:City")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City")
+ ),
+ "filer_state": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:State")
+ or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm")
+ ),
+ "filer_zip": (
+ text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd")
+ or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode")
+ ),
+ "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"),
+ "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"),
+ "phone": text(tree, ".//irs:Filer/irs:PhoneNum"),
+ "website": text(tree, f"{f990}/irs:WebsiteAddressTxt"),
+ }
+
+ # Classification
+ org_type = None
+ if text(tree, f"{f990}/irs:TypeOfOrganizationCorpInd") in _TRUTHY:
+ org_type = "corp"
+ elif text(tree, f"{f990}/irs:TypeOfOrganizationTrustInd") in _TRUTHY:
+ org_type = "trust"
+ elif text(tree, f"{f990}/irs:TypeOfOrganizationAssocInd") in _TRUTHY:
+ org_type = "assoc"
+ elif text(tree, f"{f990}/irs:TypeOfOrganizationOtherInd") in _TRUTHY:
+ org_type = "other"
+
+ method_cash = text(tree, f"{f990}/irs:MethodOfAccountingCashInd")
+ method_accrual = text(tree, f"{f990}/irs:MethodOfAccountingAccrualInd")
+ method_other = text(tree, f"{f990}/irs:MethodOfAccountingOtherInd")
+ if method_cash in _TRUTHY:
+ accounting_method = "cash"
+ elif method_accrual in _TRUTHY:
+ accounting_method = "accrual"
+ elif method_other in _TRUTHY:
+ accounting_method = "other"
+ else:
+ accounting_method = None
+
+ # The 501(c) subsection number lives in an attribute, not in the
+ # element body. (The body is typically "X" — the checkbox indicator.)
+ # A 501(c)(3) filing sets Organization501c3Ind instead and leaves this
+ # attribute unset; we deliberately don't infer "3" in that case, to
+ # keep raw provenance honest.
+ section_501c_type = None
+ el_501c = tree.find(f"{f990}/irs:Organization501cInd", NS_MAP)
+ if el_501c is not None:
+ section_501c_type = el_501c.get("organization501cTypeTxt")
+
+ classification = {
+ "is_501c3": text_bool(tree, f"{f990}/irs:Organization501c3Ind"),
+ "section_501c_type": section_501c_type,
+ "org_type": org_type,
+ "group_return": text_bool(tree, f"{f990}/irs:GroupReturnForAffiliatesInd"),
+ "group_exemption_num": text(tree, f"{f990}/irs:GroupExemptionNum"),
+ "formation_year": text(tree, f"{f990}/irs:FormationYr"),
+ "legal_domicile_state": text(tree, f"{f990}/irs:LegalDomicileStateCd"),
+ "mission": (
+ text(tree, f"{f990}/irs:ActivityOrMissionDesc")
+ or text(tree, f"{f990}/irs:MissionDesc")
+ ),
+ "accounting_method": accounting_method,
+ }
+
+ # Filing status flags
+ status = {
+ "is_amended": text_bool(tree, f"{f990}/irs:AmendedReturnInd"),
+ "is_initial": text_bool(tree, f"{f990}/irs:InitialReturnInd"),
+ "is_final": text_bool(tree, f"{f990}/irs:FinalReturnInd"),
+ "is_terminated": text_bool(tree, f"{f990}/irs:TerminateOperationsInd"),
+ }
+
+ # Part I: current year summary
+ part_i_cy = {}
+ part_i_cy_fields = {
+ "gross_receipts": "GrossReceiptsAmt",
+ "cy_contributions_grants": "CYContributionsGrantsAmt",
+ "cy_program_service_revenue": "CYProgramServiceRevenueAmt",
+ "cy_investment_income": "CYInvestmentIncomeAmt",
+ "cy_other_revenue": "CYOtherRevenueAmt",
+ "cy_total_revenue": "CYTotalRevenueAmt",
+ "cy_grants_paid": "CYGrantsAndSimilarPaidAmt",
+ "cy_benefits_to_members": "CYBenefitsPaidToMembersAmt",
+ "cy_salaries_benefits": "CYSalariesCompEmpBnftPaidAmt",
+ "cy_fundraising_expense": "CYTotalFundraisingExpenseAmt",
+ "cy_other_expenses": "CYOtherExpensesAmt",
+ "cy_total_expenses": "CYTotalExpensesAmt",
+ "cy_revenue_less_expenses": "CYRevenuesLessExpensesAmt",
+ }
+ for col, elem in part_i_cy_fields.items():
+ part_i_cy[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}"))
+
+ # Part I: prior year summary
+ part_i_py = {
+ "py_total_revenue": parse_numeric(text(tree, f"{f990}/irs:PYTotalRevenueAmt")),
+ "py_total_expenses": parse_numeric(text(tree, f"{f990}/irs:PYTotalExpensesAmt")),
+ }
+
+ # Balance sheet (Part I summary / Part X)
+ balance_sheet = {}
+ bs_fields = {
+ "total_assets_boy": "TotalAssetsBOYAmt",
+ "total_assets_eoy": "TotalAssetsEOYAmt",
+ "total_liabilities_boy": "TotalLiabilitiesBOYAmt",
+ "total_liabilities_eoy": "TotalLiabilitiesEOYAmt",
+ "net_assets_boy": "NetAssetsOrFundBalancesBOYAmt",
+ "net_assets_eoy": "NetAssetsOrFundBalancesEOYAmt",
+ }
+ for col, elem in bs_fields.items():
+ balance_sheet[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}"))
+
+ # Governance / workforce
+ workforce = {
+ "total_employees": parse_numeric(text(tree, f"{f990}/irs:TotalEmployeeCnt")),
+ "total_volunteers": parse_numeric(text(tree, f"{f990}/irs:TotalVolunteersCnt")),
+ "voting_members": parse_numeric(
+ text(tree, f"{f990}/irs:VotingMembersGoverningBodyCnt")
+ or text(tree, f"{f990}/irs:GoverningBodyVotingMembersCnt")
+ ),
+ "independent_voting_members": parse_numeric(
+ text(tree, f"{f990}/irs:VotingMembersIndependentCnt")
+ or text(tree, f"{f990}/irs:IndependentVotingMemberCnt")
+ ),
+ }
+
+ # Part IX: functional expense breakdown
+ func_exp = {
+ "program_services_expense": parse_numeric(
+ text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ProgramServicesAmt")
+ ),
+ "management_general_expense": parse_numeric(
+ text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ManagementAndGeneralAmt")
+ ),
+ "fundraising_expense_ix": parse_numeric(
+ text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:FundraisingAmt")
+ ),
+ }
+
+ # Part VIII: revenue detail
+ revenue_detail = {
+ "government_grants": parse_numeric(text(tree, f"{f990}/irs:GovernmentGrantsAmt")),
+ "total_contributions": parse_numeric(text(tree, f"{f990}/irs:TotalContributionsAmt")),
+ "total_program_service_rev": parse_numeric(
+ text(tree, f"{f990}/irs:TotalProgramServiceRevenueAmt")
+ ),
+ "investment_income": parse_numeric(
+ text(tree, f"{f990}/irs:InvestmentIncomeGrp/irs:TotalRevenueColumnAmt")
+ ),
+ }
+
+ # UBI
+ ubi = {
+ "gross_ubi": parse_numeric(text(tree, f"{f990}/irs:TotalGrossUBIAmt")),
+ "net_ubi": parse_numeric(text(tree, f"{f990}/irs:NetUnrelatedBusTxblIncmAmt")),
+ }
+
+ # Schedule I metadata (1:1 with the filing)
+ sched_i_el = tree.find(SCHEDULE_I_XPATH)
+ if sched_i_el is not None:
+ sched_i = {
+ "sched_i_grant_records_maintained": text_bool(sched_i_el, "irs:GrantRecordsMaintainedInd"),
+ "sched_i_501c3_org_count": parse_numeric(text(sched_i_el, "irs:Total501c3OrgCnt")),
+ "sched_i_other_org_count": parse_numeric(text(sched_i_el, "irs:TotalOtherOrgCnt")),
+ "sched_i_total_grants_amt": None,
+ }
+ else:
+ sched_i = {
+ "sched_i_grant_records_maintained": None,
+ "sched_i_501c3_org_count": None,
+ "sched_i_other_org_count": None,
+ "sched_i_total_grants_amt": None,
+ }
+
+ # Officer / signer
+ officer = {
+ "principal_officer": text(tree, f"{f990}/irs:PrincipalOfficerNm"),
+ "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"),
+ "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"),
+ "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"),
+ "preparer_firm": (
+ text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1")
+ ),
+ }
+
+ return {
+ **filer_addr, **classification, **status,
+ **part_i_cy, **part_i_py, **balance_sheet,
+ **workforce, **func_exp, **revenue_detail, **ubi,
+ **sched_i, **officer,
+ }
+
+
+# ============================================================
+# Grant detail status
+# ============================================================
+
+def compute_grant_detail_status(sched_i_element, grant_elements, grant_rows, cy_grants_paid):
+ """Determine grant detail completeness for a Form 990 filing.
+
+ Uses filing-level context (Schedule I presence + Part I CYGrantsAndSimilarPaidAmt)
+ to distinguish "no Schedule I because no grants" from "no Schedule I but
+ grants reported on Part I".
+ """
+ if sched_i_element is None:
+ # No Schedule I — but check whether Part I reports grants paid.
+ if cy_grants_paid is not None and float(cy_grants_paid) > 0:
+ return "unresolved"
+ return "no_grants"
+
+ if not grant_elements:
+ return "unresolved"
+
+ if not grant_rows:
+ return "unresolved"
+
+ placeholder_count = sum(
+ 1 for r in grant_rows
+ if is_placeholder(r.get("recipient_name"))
+ or (is_placeholder(r.get("cash_grant_amt_raw"))
+ and is_placeholder(r.get("non_cash_amt_raw")))
+ )
+
+ if placeholder_count == len(grant_rows):
+ return "placeholder_only"
+ if placeholder_count > 0:
+ return "see_attached"
+ return "complete"
+
+
+# ============================================================
+# Per-filing processing
+# ============================================================
+
+GRANT_COLUMNS = [
+ "raw_filing_id", "line_number",
+ "recipient_name", "recipient_name2", "recipient_ein",
+ "address_line1", "address_line2", "city", "state", "zip",
+ "country", "foreign_postal_code",
+ "cash_grant_amt_raw", "cash_grant_amt",
+ "non_cash_amt_raw", "non_cash_amt",
+ "non_cash_desc", "valuation_method", "purpose", "irc_section",
+]
+
+SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"]
+
+
+
+def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id):
+ """Process a single Form 990 filing. All child writes are transactional."""
+
+ # Extract filing metadata
+ metadata = extract_filing_metadata(tree)
+
+ # Locate Schedule I element (used both for grant extraction and status logic)
+ sched_i_element = tree.find(SCHEDULE_I_XPATH)
+
+ # Extract grants from Schedule I RecipientTable rows
+ grant_elements = find_all_grants(tree)
+ extracted_grants = []
+ for i, g in enumerate(grant_elements, start=1):
+ row = extract_grant(g, i)
+ if row is not None:
+ extracted_grants.append(row)
+
+ # Extract Schedule O narrative entries
+ schedule_o_entries = extract_schedule_o(tree)
+
+ # Extract form summary
+ form_data = extract_form_990(tree)
+ form_data["grant_detail_status"] = compute_grant_detail_status(
+ sched_i_element, grant_elements, extracted_grants, form_data.get("cy_grants_paid"),
+ )
+
+ root = tree.getroot()
+ return_header = root.find(f"{{{NS}}}ReturnHeader")
+ return_data = root.find(f"{{{NS}}}ReturnData")
+
+ def _do(conn):
+ raw_filing_id = upsert_raw_filing(
+ SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn
+ )
+ record_raw_filing_source(
+ raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn
+ )
+
+ filing_form_data = {**form_data, "raw_filing_id": raw_filing_id}
+ grant_rows = [
+ {**row, "raw_filing_id": raw_filing_id}
+ for row in extracted_grants
+ ]
+ schedule_o_rows = [
+ {**row, "raw_filing_id": raw_filing_id}
+ for row in schedule_o_entries
+ ]
+
+ _replace_children(
+ conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows,
+ )
+
+ # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields
+ return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows)
+
+ return execute_transaction(_do)
+
+
+def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows):
+ """Delete and re-insert all child rows for a filing using the caller's transaction."""
+ form_columns = list(form_data.keys())
+ form_placeholders = ", ".join(["%s"] * len(form_columns))
+ form_values = [form_data[col] for col in form_columns]
+
+ grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS))
+ schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS))
+
+ with conn.cursor() as cur:
+ # Delete old child rows
+ cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,))
+ cur.execute("DELETE FROM raw.grant_990 WHERE raw_filing_id = %s", (raw_filing_id,))
+ cur.execute("DELETE FROM raw.form_990 WHERE raw_filing_id = %s", (raw_filing_id,))
+
+ # Insert form summary
+ cur.execute(
+ f"INSERT INTO raw.form_990 ({', '.join(form_columns)}) "
+ f"VALUES ({form_placeholders})",
+ form_values,
+ )
+
+ # Insert grants
+ if grant_rows:
+ from psycopg2.extras import execute_batch
+ grant_values = [
+ [row.get(col) for col in GRANT_COLUMNS]
+ for row in grant_rows
+ ]
+ execute_batch(
+ cur,
+ f"INSERT INTO raw.grant_990 ({', '.join(GRANT_COLUMNS)}) "
+ f"VALUES ({grant_placeholders})",
+ grant_values,
+ )
+
+ # Insert Schedule O narrative
+ if schedule_o_rows:
+ from psycopg2.extras import execute_batch
+ schedule_o_values = [
+ [row.get(col) for col in SCHEDULE_O_COLUMNS]
+ for row in schedule_o_rows
+ ]
+ execute_batch(
+ cur,
+ f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) "
+ f"VALUES ({schedule_o_placeholders})",
+ schedule_o_values,
+ )
+
+ # Insert XML fields
+
+
+# ============================================================
+# ZIP / file processing
+# ============================================================
+
+def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id):
+ """Parse XML bytes and process if it's a Form 990. Returns rows inserted or None if skipped."""
+ try:
+ tree = etree.parse(io.BytesIO(xml_bytes))
+ except etree.XMLSyntaxError as e:
+ log_ingest_error(ingest_run_id, source_archive, source_path,
+ f"XML parse error: {e}", stage="parse_xml")
+ return None
+
+ ret_type = text(tree, ".//irs:ReturnTypeCd")
+ if ret_type not in FORM_990_RETURN_TYPES:
+ return None
+
+ source_document_id = None
+ try:
+ source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path)
+ return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id)
+ except Exception as e:
+ stage = "process_filing" if source_document_id else "derive_source_document_id"
+ log_ingest_error(ingest_run_id, source_archive, source_path, e,
+ source_document_id=source_document_id, stage=stage)
+ raise
+
+
+def process_zip(zip_path, ingest_run_id):
+ """Process all XMLs in a ZIP file."""
+ basename = os.path.basename(zip_path)
+ try:
+ zf = zipfile.ZipFile(zip_path)
+ except zipfile.BadZipFile as e:
+ log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip")
+ print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr)
+ return 0, 0, 0
+
+ with zf:
+ names = [n for n in zf.namelist() if n.endswith(".xml")]
+
+ print(f"Processing {basename}: {len(names)} XML files")
+ files_scanned = 0
+ files_matched = 0
+ total_rows = 0
+
+ for i, name in enumerate(names):
+ # Count every ZIP member as scanned, even ones we fail to read —
+ # otherwise read failures silently shrink the scanned total and
+ # make run-level metrics misleading.
+ files_scanned += 1
+ try:
+ xml_bytes = zf.read(name)
+ except Exception as e:
+ log_ingest_error(ingest_run_id, basename, name, e, stage="read")
+ continue
+
+ try:
+ rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id)
+ except Exception as e:
+ print(f" ERROR in {name}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ files_matched += 1
+ total_rows += rows
+
+ if (i + 1) % 1000 == 0:
+ print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows")
+
+ print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows")
+ return files_scanned, files_matched, total_rows
+
+
+def main():
+ args = sys.argv[1:]
+ if not args:
+ print(f"Usage: python -m scripts.parse.irs_990 <zip_or_xml_files...>", file=sys.stderr)
+ sys.exit(1)
+
+ notes = " ".join(os.path.basename(a) for a in args)
+ ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
+
+ grand_scanned = 0
+ grand_matched = 0
+ grand_rows = 0
+
+ try:
+ for path in args:
+ if path.endswith(".zip"):
+ scanned, matched, rows = process_zip(path, ingest_run_id)
+ grand_scanned += scanned
+ grand_matched += matched
+ grand_rows += rows
+
+ elif path.endswith(".xml"):
+ xml_name = os.path.basename(path)
+ # Count before I/O, so read failures still show up in scanned.
+ grand_scanned += 1
+ try:
+ with open(path, "rb") as f:
+ xml_bytes = f.read()
+ except Exception as e:
+ log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read")
+ print(f"ERROR reading {path}: {e}", file=sys.stderr)
+ continue
+
+ try:
+ rows = process_xml_bytes(
+ xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id
+ )
+ except Exception as e:
+ print(f"ERROR in {path}: {e}", file=sys.stderr)
+ continue
+
+ if rows is not None:
+ grand_matched += 1
+ grand_rows += rows
+ else:
+ print(f"Skipping unknown file type: {path}", file=sys.stderr)
+
+ finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows)
+ except Exception:
+ fail_ingest_run(ingest_run_id)
+ raise
+
+ print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.")
+
+
+if __name__ == "__main__":
+ main()