""" Parse IRS Form 990-EZ XML files into the new raw schema. Populates: raw.filing, raw.filing_source, raw.form_990ez, raw.schedule_o. Form 990-EZ has no structured grant recipient table — line 10 of Part I aggregates grants into a single amount and instructs the filer to "list in Schedule O." v1 scope is preservation (filing facts + Schedule O), not grant normalization. Usage: python -m scripts.parse.irs_990ez data/irs/xml-zips/*.zip python -m scripts.parse.irs_990ez data/irs/xml-missing/202100139349100100_public.xml """ import io import os import sys import zipfile from lxml import etree from scripts.common.db import execute_transaction from scripts.common.normalize import parse_numeric from scripts.common.xml import ( NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, ) from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.filing import ( upsert_raw_filing, record_raw_filing_source, ) PARSER_NAME = "parse_irs_990ez" SOURCE_SYSTEM = "irs_xml" # Standalone XML files use this as source_archive STANDALONE_ARCHIVE = "__standalone__" # IRS ReturnTypeCd values that map to Form 990-EZ (includes amended variant). FORM_990EZ_RETURN_TYPES = {"990EZ", "990EA"} SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" # ============================================================ # Schedule O narrative extraction # ============================================================ def extract_schedule_o(tree): """Extract every SupplementalInformationDetail entry from Schedule O. Returns a list of dicts (without raw_filing_id — added in process_filing). line_number is the 1-based ordinal position among emitted rows. Entries with no narrative text in any of the explanation fields are skipped to avoid emitting low-signal placeholder rows. """ rows = [] sched_o = tree.find(SCHEDULE_O_XPATH) if sched_o is None: return rows details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") for d in details: explanation = ( text(d, "irs:ExplanationTxt") or text(d, "irs:MediumExplanationTxt") or text(d, "irs:ShortExplanationTxt") ) if explanation is None: continue rows.append({ "line_number": len(rows) + 1, "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), "explanation": explanation, }) return rows # ============================================================ # Form 990-EZ summary extraction # ============================================================ def extract_form_990ez(tree): """Extract filing-level summary fields for raw.form_990ez.""" ez = f".//{{{NS}}}IRS990EZ" # Filer identity from ReturnHeader (with old-schema fallbacks). filer = { "filer_name2": ( text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") ), "filer_address_line1": ( text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") ), "filer_city": ( text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") or text(tree, ".//irs:Filer/irs:USAddress/irs:City") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") ), "filer_state": ( text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") or text(tree, ".//irs:Filer/irs:USAddress/irs:State") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") ), "filer_zip": ( text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") ), "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), "website": text(tree, f"{ez}/irs:WebsiteAddressTxt"), } # Classification. The 990-EZ schema puts the 501(c) subsection in an # attribute on Organization501cInd (e.g. ), not in element text. A 501(c)(3) filing # instead sets X with no # separate subsection element — in that case we leave section_501c_type # NULL rather than inferring "3" from the boolean flag. section_501c_type = None el_501c = tree.find(f"{ez}/irs:Organization501cInd", NS_MAP) if el_501c is not None: section_501c_type = el_501c.get("organization501cTypeTxt") classification = { "is_501c3": text_bool(tree, f"{ez}/irs:Organization501c3Ind"), "section_501c_type": section_501c_type, "group_exemption_num": text(tree, f"{ez}/irs:GroupExemptionNum"), } # Filing status flags status = { "is_amended": text_bool(tree, f"{ez}/irs:AmendedReturnInd"), "is_initial": text_bool(tree, f"{ez}/irs:InitialReturnInd"), "is_final": text_bool(tree, f"{ez}/irs:FinalReturnInd"), } # Part I: revenue / expenses part_i = {} part_i_fields = { "gross_receipts": "GrossReceiptsAmt", "contributions_gifts_grants": "ContributionsGiftsGrantsEtcAmt", "program_service_revenue": "ProgramServiceRevenueAmt", "investment_income": "InvestmentIncomeAmt", "total_revenue": "TotalRevenueAmt", "grants_paid": "GrantsAndSimilarAmountsPaidAmt", "salaries_compensation": "SalariesOtherCompEmplBnftAmt", "total_expenses": "TotalExpensesAmt", "revenue_less_expenses": "ExcessOrDeficitForYearAmt", } for col, elem in part_i_fields.items(): part_i[col] = parse_numeric(text(tree, f"{ez}/irs:{elem}")) # Part II: balance sheet (BOY/EOY child elements under group wrappers). balance_sheet = { "total_assets_boy": parse_numeric( text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:BOYAmt") ), "total_assets_eoy": parse_numeric( text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:EOYAmt") ), "total_liabilities_boy": parse_numeric( text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:BOYAmt") ), "total_liabilities_eoy": parse_numeric( text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:EOYAmt") ), "net_assets_boy": parse_numeric( text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:BOYAmt") ), "net_assets_eoy": parse_numeric( text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:EOYAmt") ), } # Schedule O presence sched_o_presence = { "has_schedule_o": tree.find(SCHEDULE_O_XPATH) is not None, } # Officer / signer officer = { "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), "preparer_firm": ( text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") ), } return { **filer, **classification, **status, **part_i, **balance_sheet, **sched_o_presence, **officer, } # ============================================================ # Grant detail status # ============================================================ def compute_grant_detail_status(form_data, schedule_o_rows): """Determine grant detail completeness for a Form 990-EZ filing. 990-EZ has no recipient table to inspect. Line 10 (grants_paid) is an aggregate and grant detail is instructed to be listed in Schedule O, but the raw XML gives us no way to tell that a given Schedule O entry is *the* line-10 narrative rather than unrelated supplemental text. For a provenance-strict raw layer we therefore only report what we can verify from the structured fields: - no_grants: line 10 is null or 0 (no grants reported). - unresolved: line 10 is positive. Whether that detail lives in Schedule O, an attachment, or nowhere is left to a downstream classifier. The schedule_o_rows argument is intentionally unused; it's kept in the signature so callers (and a future classifier) have a single place to evolve the logic. """ del schedule_o_rows # intentionally unused — see docstring grants_paid = form_data.get("grants_paid") if grants_paid is None or float(grants_paid) == 0: return "no_grants" return "unresolved" # ============================================================ # Per-filing processing # ============================================================ SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): """Process a single Form 990-EZ filing. All child writes are transactional.""" metadata = extract_filing_metadata(tree) schedule_o_entries = extract_schedule_o(tree) form_data = extract_form_990ez(tree) form_data["grant_detail_status"] = compute_grant_detail_status( form_data, schedule_o_entries, ) root = tree.getroot() return_header = root.find(f"{{{NS}}}ReturnHeader") return_data = root.find(f"{{{NS}}}ReturnData") def _do(conn): raw_filing_id = upsert_raw_filing( SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn ) record_raw_filing_source( raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn ) filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} schedule_o_rows = [ {**row, "raw_filing_id": raw_filing_id} for row in schedule_o_entries ] _replace_children( conn, raw_filing_id, filing_form_data, schedule_o_rows, xml_rows, ) # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + xml fields return 3 + len(schedule_o_rows) + len(xml_rows) return execute_transaction(_do) def _replace_children(conn, raw_filing_id, form_data, schedule_o_rows): """Delete and re-insert all child rows for a filing using the caller's transaction.""" form_columns = list(form_data.keys()) form_placeholders = ", ".join(["%s"] * len(form_columns)) form_values = [form_data[col] for col in form_columns] schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) with conn.cursor() as cur: # Delete old child rows cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) cur.execute("DELETE FROM raw.form_990ez WHERE raw_filing_id = %s", (raw_filing_id,)) # Insert form summary cur.execute( f"INSERT INTO raw.form_990ez ({', '.join(form_columns)}) " f"VALUES ({form_placeholders})", form_values, ) # Insert Schedule O narrative if schedule_o_rows: from psycopg2.extras import execute_batch schedule_o_values = [ [row.get(col) for col in SCHEDULE_O_COLUMNS] for row in schedule_o_rows ] execute_batch( cur, f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " f"VALUES ({schedule_o_placeholders})", schedule_o_values, ) # Insert XML fields # ============================================================ # ZIP / file processing # ============================================================ def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): """Parse XML bytes and process if it's a Form 990-EZ. Returns rows inserted or None if skipped.""" try: tree = etree.parse(io.BytesIO(xml_bytes)) except etree.XMLSyntaxError as e: log_ingest_error(ingest_run_id, source_archive, source_path, f"XML parse error: {e}", stage="parse_xml") return None ret_type = text(tree, ".//irs:ReturnTypeCd") if ret_type not in FORM_990EZ_RETURN_TYPES: return None source_document_id = None try: source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) except Exception as e: stage = "process_filing" if source_document_id else "derive_source_document_id" log_ingest_error(ingest_run_id, source_archive, source_path, e, source_document_id=source_document_id, stage=stage) raise def process_zip(zip_path, ingest_run_id): """Process all XMLs in a ZIP file.""" basename = os.path.basename(zip_path) try: zf = zipfile.ZipFile(zip_path) except zipfile.BadZipFile as e: log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) return 0, 0, 0 with zf: names = [n for n in zf.namelist() if n.endswith(".xml")] print(f"Processing {basename}: {len(names)} XML files") files_scanned = 0 files_matched = 0 total_rows = 0 for i, name in enumerate(names): # Count every ZIP member as scanned, even ones we fail to read — # otherwise read failures silently shrink the scanned total and # make run-level metrics misleading. files_scanned += 1 try: xml_bytes = zf.read(name) except Exception as e: log_ingest_error(ingest_run_id, basename, name, e, stage="read") continue try: rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) except Exception as e: print(f" ERROR in {name}: {e}", file=sys.stderr) continue if rows is not None: files_matched += 1 total_rows += rows if (i + 1) % 1000 == 0: print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") return files_scanned, files_matched, total_rows def main(): args = sys.argv[1:] if not args: print(f"Usage: python -m scripts.parse.irs_990ez ", file=sys.stderr) sys.exit(1) notes = " ".join(os.path.basename(a) for a in args) ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) grand_scanned = 0 grand_matched = 0 grand_rows = 0 try: for path in args: if path.endswith(".zip"): scanned, matched, rows = process_zip(path, ingest_run_id) grand_scanned += scanned grand_matched += matched grand_rows += rows elif path.endswith(".xml"): xml_name = os.path.basename(path) # Count before I/O, so read failures still show up in scanned. grand_scanned += 1 try: with open(path, "rb") as f: xml_bytes = f.read() except Exception as e: log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") print(f"ERROR reading {path}: {e}", file=sys.stderr) continue try: rows = process_xml_bytes( xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id ) except Exception as e: print(f"ERROR in {path}: {e}", file=sys.stderr) continue if rows is not None: grand_matched += 1 grand_rows += rows else: print(f"Skipping unknown file type: {path}", file=sys.stderr) finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) except Exception: fail_ingest_run(ingest_run_id) raise print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") if __name__ == "__main__": main()