""" Parse IRS 990-PF XML files into the new raw schema. Populates: raw.filing, raw.filing_source, raw.form_990pf, raw.grant_990pf. Usage: python -m scripts.parse.irs_990pf data/irs/xml-zips/*.zip python -m scripts.parse.irs_990pf data/irs/xml-missing/202100139349100100_public.xml """ import io import os import sys import zipfile from lxml import etree from scripts.common.db import execute_transaction from scripts.common.normalize import parse_numeric, is_placeholder from scripts.common.xml import ( NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, ) from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.filing import ( upsert_raw_filing, record_raw_filing_source, get_seen_source_paths, ) PARSER_NAME = "parse_irs_990pf" SOURCE_SYSTEM = "irs_xml" # Standalone XML files use this as source_archive STANDALONE_ARCHIVE = "__standalone__" # Grant element XPaths — three schema variants across IRS versions GRANT_XPATHS = [ f".//{{{NS}}}SupplementaryInformationGrp/{{{NS}}}GrantOrContributionPdDurYrGrp", f".//{{{NS}}}SupplementaryInformation/{{{NS}}}GrantOrContriPaidDuringYear", f".//{{{NS}}}SupplementaryInfomation/{{{NS}}}GrantOrContriPaidDuringYear", # IRS typo ] # ============================================================ # Grant extraction # ============================================================ def extract_grant(g, line_number): """Extract a single grant row from a grant XML element. Child element names vary across IRS schema versions, so each field tries the modern tag first, then falls back to the older variant. """ if len(g) == 0: return None amount_raw = ( text(g, "irs:Amt") or text(g, "irs:Amount") ) return { "line_number": line_number, "recipient_name": ( text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") ), "recipient_name2": ( text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") ), "recipient_person_name": ( text(g, "irs:RecipientPersonNm") or text(g, "irs:RecipientPersonName") ), "address_line1": ( text(g, "irs:RecipientUSAddress/irs:AddressLine1Txt") or text(g, "irs:RecipientUSAddress/irs:AddressLine1") or text(g, "irs:RecipientForeignAddress/irs:AddressLine1Txt") or text(g, "irs:RecipientForeignAddress/irs:AddressLine1") ), "address_line2": ( text(g, "irs:RecipientUSAddress/irs:AddressLine2Txt") or text(g, "irs:RecipientUSAddress/irs:AddressLine2") or text(g, "irs:RecipientForeignAddress/irs:AddressLine2Txt") or text(g, "irs:RecipientForeignAddress/irs:AddressLine2") ), "city": ( text(g, "irs:RecipientUSAddress/irs:CityNm") or text(g, "irs:RecipientUSAddress/irs:City") or text(g, "irs:RecipientForeignAddress/irs:CityNm") or text(g, "irs:RecipientForeignAddress/irs:City") ), "state": ( text(g, "irs:RecipientUSAddress/irs:StateAbbreviationCd") or text(g, "irs:RecipientUSAddress/irs:State") or text(g, "irs:RecipientForeignAddress/irs:ProvinceOrStateNm") ), "zip": ( text(g, "irs:RecipientUSAddress/irs:ZIPCd") or text(g, "irs:RecipientUSAddress/irs:ZIPCode") ), "country": text(g, "irs:RecipientForeignAddress/irs:CountryCd"), "foreign_postal_code": text(g, "irs:RecipientForeignAddress/irs:ForeignPostalCd"), "amount_raw": amount_raw, "amount": parse_numeric(amount_raw), "purpose": ( text(g, "irs:GrantOrContributionPurposeTxt") or text(g, "irs:PurposeOfGrantOrContriTxt") ), "foundation_status": ( text(g, "irs:RecipientFoundationStatusTxt") or text(g, "irs:RecipientFoundationStatusCd") ), "relationship": ( text(g, "irs:RecipientRelationshipTxt") or text(g, "irs:RecipientRelationship") ), } def find_all_grants(tree): """Find all grant elements across schema variants.""" grants = [] for xpath in GRANT_XPATHS: grants.extend(tree.findall(xpath)) return grants # ============================================================ # Form 990-PF summary extraction # ============================================================ def extract_form_990pf(tree): """Extract filing-level summary fields for raw_form_990pf.""" pf = f".//{{{NS}}}IRS990PF" a = f"{pf}/{{{NS}}}AnalysisOfRevenueAndExpenses" bs = f"{pf}/{{{NS}}}Form990PFBalanceSheetsGrp" si = f"{pf}/{{{NS}}}SupplementaryInformationGrp" sa = f"{pf}/{{{NS}}}StatementsRegardingActyGrp" # Filer address from ReturnHeader. Each field tries the modern (*Txt / # *Cd / CityNm / StateAbbreviationCd / ZIPCd) tag first and falls back # to the old-schema variant used by older filings. filer_addr = { "filer_name2": ( text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") ), "filer_address_line1": ( text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") ), "filer_address_line2": ( text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") ), "filer_city": ( text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") or text(tree, ".//irs:Filer/irs:USAddress/irs:City") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") ), "filer_state": ( text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") or text(tree, ".//irs:Filer/irs:USAddress/irs:State") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") ), "filer_zip": ( text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") ), "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), } # Classification. text_bool preserves None (tag absent) vs False # (tag present but not ticked); the accounting_method scan treats both # the same because we're looking for the first truthy branch. method_cash = text_bool(tree, f"{pf}/irs:MethodOfAccountingCashInd") method_accrual = text_bool(tree, f"{pf}/irs:MethodOfAccountingAccrualInd") classification = { "is_501c3_pf": text_bool(tree, f"{pf}/irs:Organization501c3ExemptPFInd"), "is_4947a1_trust": text_bool(tree, f"{pf}/irs:Organization4947a1TrtdPFInd"), "is_private_operating": text_bool(tree, f"{sa}/irs:PrivateOperatingFoundationInd"), "accounting_method": ( "cash" if method_cash else ("accrual" if method_accrual else None) ), } # Filing status. is_initial is true when either of two mutually-exclusive # indicator tags is set; we preserve None when both tags are absent. initial_std = text_bool(tree, f"{pf}/irs:InitialReturnInd") initial_former = text_bool(tree, f"{pf}/irs:InitialReturnFormerPubChrtyInd") if initial_std is None and initial_former is None: is_initial = None else: is_initial = bool(initial_std) or bool(initial_former) status = { "is_amended": text_bool(tree, f"{pf}/irs:AmendedReturnInd"), "is_initial": is_initial, "is_final": text_bool(tree, f"{pf}/irs:FinalReturnInd"), } # Part I: revenue and expenses part_i = {} part_i_fields = { "contributions_received": f"{a}/irs:ContriRcvdRevAndExpnssAmt", "interest_revenue": f"{a}/irs:InterestOnSavRevAndExpnssAmt", "dividends_revenue": f"{a}/irs:DividendsRevAndExpnssAmt", "net_gain_sale_assets": f"{a}/irs:NetGainSaleAstRevAndExpnssAmt", "total_revenue": f"{a}/irs:TotalRevAndExpnssAmt", "total_net_investment_income": f"{a}/irs:TotalNetInvstIncmAmt", "compensation_officers": f"{a}/irs:CompOfcrDirTrstRevAndExpnssAmt", "total_operating_expenses": f"{a}/irs:TotOprExpensesRevAndExpnssAmt", "contributions_paid": f"{a}/irs:ContriPaidRevAndExpnssAmt", "total_expenses": f"{a}/irs:TotalExpensesRevAndExpnssAmt", "total_charitable_disbursements": f"{a}/irs:TotalExpensesDsbrsChrtblAmt", "excess_revenue_over_expenses": f"{a}/irs:ExcessRevenueOverExpensesAmt", "net_investment_income": f"{a}/irs:NetInvestmentIncomeAmt", "adjusted_net_income": f"{a}/irs:AdjustedNetIncomeAmt", } for col, xpath in part_i_fields.items(): part_i[col] = parse_numeric(text(tree, xpath)) # Part II: balance sheets part_ii = {} part_ii_fields = { "total_assets_boy": f"{bs}/irs:TotalAssetsBOYAmt", "total_assets_eoy": f"{bs}/irs:TotalAssetsEOYAmt", "total_assets_eoy_fmv": f"{bs}/irs:TotalAssetsEOYFMVAmt", "total_liabilities_boy": f"{bs}/irs:TotalLiabilitiesBOYAmt", "total_liabilities_eoy": f"{bs}/irs:TotalLiabilitiesEOYAmt", "net_assets_boy": f"{bs}/irs:TotNetAstOrFundBalancesBOYAmt", "net_assets_eoy": f"{bs}/irs:TotNetAstOrFundBalancesEOYAmt", "fmv_assets_eoy": f"{pf}/irs:FMVAssetsEOYAmt", } for col, xpath in part_ii_fields.items(): part_ii[col] = parse_numeric(text(tree, xpath)) # Parts X-XII dist = { "minimum_investment_return": parse_numeric( text(tree, f"{pf}/irs:MinimumInvestmentReturnGrp/irs:MinimumInvestmentReturnAmt") ), "distributable_amount": parse_numeric( text(tree, f"{pf}/irs:DistributableAmountGrp/irs:DistributableAsAdjustedAmt") ), "qualifying_distributions": parse_numeric( text(tree, f"{pf}/irs:QualifyingDistriPartXIIGrp/irs:QualifyingDistributionsAmt") ), "excise_tax_amount": parse_numeric( text(tree, f"{pf}/irs:ExciseTaxBasedOnInvstIncmGrp/irs:InvestmentIncomeExciseTaxAmt") ), } # Part XV totals xv = { "total_grants_paid": parse_numeric( text(tree, f"{si}/irs:TotalGrantOrContriPdDurYrAmt") ), "total_grants_approved_future": parse_numeric( text(tree, f"{si}/irs:TotalGrantOrContriApprvFutAmt") ), } # Misc misc = { "website": text(tree, f"{sa}/irs:WebsiteAddressTxt"), "state_of_registration": text(tree, f"{sa}/irs:OrgReportOrRegisterStateCd"), } # Officer / signer officer = { "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), "preparer_firm": ( text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") ), } return {**filer_addr, **classification, **status, **part_i, **part_ii, **dist, **xv, **misc, **officer} # ============================================================ # Grant detail status # ============================================================ def compute_grant_detail_status(grant_elements, grant_rows): """Determine grant detail completeness status.""" if not grant_elements: return "no_grants" if not grant_rows: return "unresolved" placeholder_count = sum( 1 for r in grant_rows if is_placeholder(r.get("recipient_name")) or is_placeholder(r.get("amount_raw")) ) if placeholder_count == len(grant_rows): return "placeholder_only" if placeholder_count > 0: return "see_attached" return "complete" # ============================================================ # Per-filing processing # ============================================================ GRANT_COLUMNS = [ "raw_filing_id", "line_number", "recipient_name", "recipient_name2", "recipient_person_name", "address_line1", "address_line2", "city", "state", "zip", "country", "foreign_postal_code", "amount_raw", "amount", "purpose", "foundation_status", "relationship", ] def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): """Process a single 990-PF filing. All child writes are transactional.""" # Extract filing metadata metadata = extract_filing_metadata(tree) # Extract grants grant_elements = find_all_grants(tree) extracted_grants = [] for i, g in enumerate(grant_elements, start=1): row = extract_grant(g, i) if row is not None: extracted_grants.append(row) # Extract form summary form_data = extract_form_990pf(tree) form_data["grant_detail_status"] = compute_grant_detail_status(grant_elements, extracted_grants) def _do(conn): raw_filing_id = upsert_raw_filing( SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn ) record_raw_filing_source( raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn ) filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} grant_rows = [ {**row, "raw_filing_id": raw_filing_id} for row in extracted_grants ] _replace_children(conn, raw_filing_id, filing_form_data, grant_rows) # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants return 3 + len(grant_rows) return execute_transaction(_do) def _replace_children(conn, raw_filing_id, form_data, grant_rows): """Delete and re-insert all child rows for a filing using the caller's transaction.""" form_columns = list(form_data.keys()) form_placeholders = ", ".join(["%s"] * len(form_columns)) form_values = [form_data[col] for col in form_columns] grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) with conn.cursor() as cur: # Delete old child rows cur.execute("DELETE FROM raw.grant_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) cur.execute("DELETE FROM raw.form_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) # Insert form summary cur.execute( f"INSERT INTO raw.form_990pf ({', '.join(form_columns)}) " f"VALUES ({form_placeholders})", form_values, ) # Insert grants if grant_rows: from psycopg2.extras import execute_batch grant_values = [ [row.get(col) for col in GRANT_COLUMNS] for row in grant_rows ] execute_batch( cur, f"INSERT INTO raw.grant_990pf ({', '.join(GRANT_COLUMNS)}) " f"VALUES ({grant_placeholders})", grant_values, ) # Insert XML fields # ============================================================ # ZIP / file processing # ============================================================ def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): """Parse XML bytes and process if it's a 990-PF. Returns rows inserted or None if skipped.""" try: tree = etree.parse(io.BytesIO(xml_bytes)) except etree.XMLSyntaxError as e: log_ingest_error(ingest_run_id, source_archive, source_path, f"XML parse error: {e}", stage="parse_xml") return None ret_type = text(tree, ".//irs:ReturnTypeCd") if ret_type != "990PF": return None source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) try: return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) except Exception as e: log_ingest_error(ingest_run_id, source_archive, source_path, e, source_document_id=source_document_id, stage="process_filing") raise def process_zip(zip_path, ingest_run_id): """Process all XMLs in a ZIP file.""" basename = os.path.basename(zip_path) try: zf = zipfile.ZipFile(zip_path) except zipfile.BadZipFile as e: log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) return 0, 0, 0 with zf: names = [n for n in zf.namelist() if n.endswith(".xml")] seen_paths = get_seen_source_paths(basename) skipped_existing = 0 print( f"Processing {basename}: {len(names)} XML files " f"({len(seen_paths)} already seen)" ) files_scanned = 0 files_matched = 0 total_rows = 0 for i, name in enumerate(names): if name in seen_paths: skipped_existing += 1 if (i + 1) % 1000 == 0: print( f" ...{i + 1}/{len(names)} files, " f"{skipped_existing} skipped existing, " f"{files_matched} matched, {total_rows} rows" ) continue # Count every ZIP member as scanned, even ones we fail to read — # otherwise read failures silently shrink the scanned total and # make run-level metrics misleading. files_scanned += 1 try: xml_bytes = zf.read(name) except Exception as e: log_ingest_error(ingest_run_id, basename, name, e, stage="read") continue try: rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) except Exception as e: print(f" ERROR in {name}: {e}", file=sys.stderr) continue if rows is not None: files_matched += 1 total_rows += rows if (i + 1) % 1000 == 0: print( f" ...{i + 1}/{len(names)} files, " f"{skipped_existing} skipped existing, " f"{files_matched} matched, {total_rows} rows" ) print( f" Done: {files_scanned} scanned, {skipped_existing} skipped existing, " f"{files_matched} matched, {total_rows} rows" ) return files_scanned, files_matched, total_rows def main(): args = sys.argv[1:] if not args: print(f"Usage: python -m scripts.parse.irs_990pf ", file=sys.stderr) sys.exit(1) notes = " ".join(os.path.basename(a) for a in args) ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) grand_scanned = 0 grand_matched = 0 grand_rows = 0 try: for path in args: if path.endswith(".zip"): scanned, matched, rows = process_zip(path, ingest_run_id) grand_scanned += scanned grand_matched += matched grand_rows += rows elif path.endswith(".xml"): xml_name = os.path.basename(path) # Count before I/O, so read failures still show up in scanned. grand_scanned += 1 try: with open(path, "rb") as f: xml_bytes = f.read() except Exception as e: log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") print(f"ERROR reading {path}: {e}", file=sys.stderr) continue try: rows = process_xml_bytes( xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id ) except Exception as e: print(f"ERROR in {path}: {e}", file=sys.stderr) continue if rows is not None: grand_matched += 1 grand_rows += rows else: print(f"Skipping unknown file type: {path}", file=sys.stderr) finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) except Exception: fail_ingest_run(ingest_run_id) raise print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") if __name__ == "__main__": main()