diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/parse/irs_990pf.py | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to 'scripts/parse/irs_990pf.py')
| -rw-r--r-- | scripts/parse/irs_990pf.py | 544 |
1 files changed, 544 insertions, 0 deletions
diff --git a/scripts/parse/irs_990pf.py b/scripts/parse/irs_990pf.py new file mode 100644 index 0000000..3d245b8 --- /dev/null +++ b/scripts/parse/irs_990pf.py @@ -0,0 +1,544 @@ +""" +Parse IRS 990-PF XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990pf, + raw.grant_990pf. + +Usage: + python -m scripts.parse.irs_990pf data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990pf data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import parse_numeric, is_placeholder +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990pf" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# Grant element XPaths — three schema variants across IRS versions +GRANT_XPATHS = [ + f".//{{{NS}}}SupplementaryInformationGrp/{{{NS}}}GrantOrContributionPdDurYrGrp", + f".//{{{NS}}}SupplementaryInformation/{{{NS}}}GrantOrContriPaidDuringYear", + f".//{{{NS}}}SupplementaryInfomation/{{{NS}}}GrantOrContriPaidDuringYear", # IRS typo +] + + +# ============================================================ +# Grant extraction +# ============================================================ + +def extract_grant(g, line_number): + """Extract a single grant row from a grant XML element. + + Child element names vary across IRS schema versions, so each field + tries the modern tag first, then falls back to the older variant. + """ + if len(g) == 0: + return None + + amount_raw = ( + text(g, "irs:Amt") + or text(g, "irs:Amount") + ) + + return { + "line_number": line_number, + "recipient_name": ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") + ), + "recipient_name2": ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") + ), + "recipient_person_name": ( + text(g, "irs:RecipientPersonNm") + or text(g, "irs:RecipientPersonName") + ), + "address_line1": ( + text(g, "irs:RecipientUSAddress/irs:AddressLine1Txt") + or text(g, "irs:RecipientUSAddress/irs:AddressLine1") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine1Txt") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine1") + ), + "address_line2": ( + text(g, "irs:RecipientUSAddress/irs:AddressLine2Txt") + or text(g, "irs:RecipientUSAddress/irs:AddressLine2") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine2Txt") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine2") + ), + "city": ( + text(g, "irs:RecipientUSAddress/irs:CityNm") + or text(g, "irs:RecipientUSAddress/irs:City") + or text(g, "irs:RecipientForeignAddress/irs:CityNm") + or text(g, "irs:RecipientForeignAddress/irs:City") + ), + "state": ( + text(g, "irs:RecipientUSAddress/irs:StateAbbreviationCd") + or text(g, "irs:RecipientUSAddress/irs:State") + or text(g, "irs:RecipientForeignAddress/irs:ProvinceOrStateNm") + ), + "zip": ( + text(g, "irs:RecipientUSAddress/irs:ZIPCd") + or text(g, "irs:RecipientUSAddress/irs:ZIPCode") + ), + "country": text(g, "irs:RecipientForeignAddress/irs:CountryCd"), + "foreign_postal_code": text(g, "irs:RecipientForeignAddress/irs:ForeignPostalCd"), + "amount_raw": amount_raw, + "amount": parse_numeric(amount_raw), + "purpose": ( + text(g, "irs:GrantOrContributionPurposeTxt") + or text(g, "irs:PurposeOfGrantOrContriTxt") + ), + "foundation_status": ( + text(g, "irs:RecipientFoundationStatusTxt") + or text(g, "irs:RecipientFoundationStatusCd") + ), + "relationship": ( + text(g, "irs:RecipientRelationshipTxt") + or text(g, "irs:RecipientRelationship") + ), + } + + +def find_all_grants(tree): + """Find all grant elements across schema variants.""" + grants = [] + for xpath in GRANT_XPATHS: + grants.extend(tree.findall(xpath)) + return grants + + +# ============================================================ +# Form 990-PF summary extraction +# ============================================================ + +def extract_form_990pf(tree): + """Extract filing-level summary fields for raw_form_990pf.""" + pf = f".//{{{NS}}}IRS990PF" + a = f"{pf}/{{{NS}}}AnalysisOfRevenueAndExpenses" + bs = f"{pf}/{{{NS}}}Form990PFBalanceSheetsGrp" + si = f"{pf}/{{{NS}}}SupplementaryInformationGrp" + sa = f"{pf}/{{{NS}}}StatementsRegardingActyGrp" + + # Filer address from ReturnHeader. Each field tries the modern (*Txt / + # *Cd / CityNm / StateAbbreviationCd / ZIPCd) tag first and falls back + # to the old-schema variant used by older filings. + filer_addr = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_address_line2": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + } + + # Classification. text_bool preserves None (tag absent) vs False + # (tag present but not ticked); the accounting_method scan treats both + # the same because we're looking for the first truthy branch. + method_cash = text_bool(tree, f"{pf}/irs:MethodOfAccountingCashInd") + method_accrual = text_bool(tree, f"{pf}/irs:MethodOfAccountingAccrualInd") + + classification = { + "is_501c3_pf": text_bool(tree, f"{pf}/irs:Organization501c3ExemptPFInd"), + "is_4947a1_trust": text_bool(tree, f"{pf}/irs:Organization4947a1TrtdPFInd"), + "is_private_operating": text_bool(tree, f"{sa}/irs:PrivateOperatingFoundationInd"), + "accounting_method": ( + "cash" if method_cash + else ("accrual" if method_accrual else None) + ), + } + + # Filing status. is_initial is true when either of two mutually-exclusive + # indicator tags is set; we preserve None when both tags are absent. + initial_std = text_bool(tree, f"{pf}/irs:InitialReturnInd") + initial_former = text_bool(tree, f"{pf}/irs:InitialReturnFormerPubChrtyInd") + if initial_std is None and initial_former is None: + is_initial = None + else: + is_initial = bool(initial_std) or bool(initial_former) + + status = { + "is_amended": text_bool(tree, f"{pf}/irs:AmendedReturnInd"), + "is_initial": is_initial, + "is_final": text_bool(tree, f"{pf}/irs:FinalReturnInd"), + } + + # Part I: revenue and expenses + part_i = {} + part_i_fields = { + "contributions_received": f"{a}/irs:ContriRcvdRevAndExpnssAmt", + "interest_revenue": f"{a}/irs:InterestOnSavRevAndExpnssAmt", + "dividends_revenue": f"{a}/irs:DividendsRevAndExpnssAmt", + "net_gain_sale_assets": f"{a}/irs:NetGainSaleAstRevAndExpnssAmt", + "total_revenue": f"{a}/irs:TotalRevAndExpnssAmt", + "total_net_investment_income": f"{a}/irs:TotalNetInvstIncmAmt", + "compensation_officers": f"{a}/irs:CompOfcrDirTrstRevAndExpnssAmt", + "total_operating_expenses": f"{a}/irs:TotOprExpensesRevAndExpnssAmt", + "contributions_paid": f"{a}/irs:ContriPaidRevAndExpnssAmt", + "total_expenses": f"{a}/irs:TotalExpensesRevAndExpnssAmt", + "total_charitable_disbursements": f"{a}/irs:TotalExpensesDsbrsChrtblAmt", + "excess_revenue_over_expenses": f"{a}/irs:ExcessRevenueOverExpensesAmt", + "net_investment_income": f"{a}/irs:NetInvestmentIncomeAmt", + "adjusted_net_income": f"{a}/irs:AdjustedNetIncomeAmt", + } + for col, xpath in part_i_fields.items(): + part_i[col] = parse_numeric(text(tree, xpath)) + + # Part II: balance sheets + part_ii = {} + part_ii_fields = { + "total_assets_boy": f"{bs}/irs:TotalAssetsBOYAmt", + "total_assets_eoy": f"{bs}/irs:TotalAssetsEOYAmt", + "total_assets_eoy_fmv": f"{bs}/irs:TotalAssetsEOYFMVAmt", + "total_liabilities_boy": f"{bs}/irs:TotalLiabilitiesBOYAmt", + "total_liabilities_eoy": f"{bs}/irs:TotalLiabilitiesEOYAmt", + "net_assets_boy": f"{bs}/irs:TotNetAstOrFundBalancesBOYAmt", + "net_assets_eoy": f"{bs}/irs:TotNetAstOrFundBalancesEOYAmt", + "fmv_assets_eoy": f"{pf}/irs:FMVAssetsEOYAmt", + } + for col, xpath in part_ii_fields.items(): + part_ii[col] = parse_numeric(text(tree, xpath)) + + # Parts X-XII + dist = { + "minimum_investment_return": parse_numeric( + text(tree, f"{pf}/irs:MinimumInvestmentReturnGrp/irs:MinimumInvestmentReturnAmt") + ), + "distributable_amount": parse_numeric( + text(tree, f"{pf}/irs:DistributableAmountGrp/irs:DistributableAsAdjustedAmt") + ), + "qualifying_distributions": parse_numeric( + text(tree, f"{pf}/irs:QualifyingDistriPartXIIGrp/irs:QualifyingDistributionsAmt") + ), + "excise_tax_amount": parse_numeric( + text(tree, f"{pf}/irs:ExciseTaxBasedOnInvstIncmGrp/irs:InvestmentIncomeExciseTaxAmt") + ), + } + + # Part XV totals + xv = { + "total_grants_paid": parse_numeric( + text(tree, f"{si}/irs:TotalGrantOrContriPdDurYrAmt") + ), + "total_grants_approved_future": parse_numeric( + text(tree, f"{si}/irs:TotalGrantOrContriApprvFutAmt") + ), + } + + # Misc + misc = { + "website": text(tree, f"{sa}/irs:WebsiteAddressTxt"), + "state_of_registration": text(tree, f"{sa}/irs:OrgReportOrRegisterStateCd"), + } + + # Officer / signer + officer = { + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return {**filer_addr, **classification, **status, **part_i, **part_ii, **dist, **xv, **misc, **officer} + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(grant_elements, grant_rows): + """Determine grant detail completeness status.""" + if not grant_elements: + return "no_grants" + if not grant_rows: + return "unresolved" + + placeholder_count = sum( + 1 for r in grant_rows + if is_placeholder(r.get("recipient_name")) + or is_placeholder(r.get("amount_raw")) + ) + + if placeholder_count == len(grant_rows): + return "placeholder_only" + if placeholder_count > 0: + return "see_attached" + return "complete" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +GRANT_COLUMNS = [ + "raw_filing_id", "line_number", + "recipient_name", "recipient_name2", "recipient_person_name", + "address_line1", "address_line2", "city", "state", "zip", + "country", "foreign_postal_code", + "amount_raw", "amount", "purpose", "foundation_status", "relationship", +] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single 990-PF filing. All child writes are transactional.""" + + # Extract filing metadata + metadata = extract_filing_metadata(tree) + + # Extract grants + grant_elements = find_all_grants(tree) + extracted_grants = [] + for i, g in enumerate(grant_elements, start=1): + row = extract_grant(g, i) + if row is not None: + extracted_grants.append(row) + + # Extract form summary + form_data = extract_form_990pf(tree) + form_data["grant_detail_status"] = compute_grant_detail_status(grant_elements, extracted_grants) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + grant_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in extracted_grants + ] + + _replace_children(conn, raw_filing_id, filing_form_data, grant_rows, xml_rows) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants + xml fields + return 3 + len(grant_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, grant_rows, xml_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) + + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.grant_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990pf ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert grants + if grant_rows: + from psycopg2.extras import execute_batch + grant_values = [ + [row.get(col) for col in GRANT_COLUMNS] + for row in grant_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.grant_990pf ({', '.join(GRANT_COLUMNS)}) " + f"VALUES ({grant_placeholders})", + grant_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a 990-PF. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type != "990PF": + return None + + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + try: + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage="process_filing") + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990pf <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() |
