diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/parse/irs_990ez.py | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to '')
| -rw-r--r-- | scripts/parse/irs_990ez.py | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/scripts/parse/irs_990ez.py b/scripts/parse/irs_990ez.py new file mode 100644 index 0000000..bea4fdd --- /dev/null +++ b/scripts/parse/irs_990ez.py @@ -0,0 +1,449 @@ +""" +Parse IRS Form 990-EZ XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990ez, + raw.schedule_o. + +Form 990-EZ has no structured grant recipient table — line 10 of Part I +aggregates grants into a single amount and instructs the filer to "list +in Schedule O." v1 scope is preservation (filing facts + Schedule O), +not grant normalization. + +Usage: + python -m scripts.parse.irs_990ez data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990ez data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import parse_numeric +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990ez" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# IRS ReturnTypeCd values that map to Form 990-EZ (includes amended variant). +FORM_990EZ_RETURN_TYPES = {"990EZ", "990EA"} + +SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" + + +# ============================================================ +# Schedule O narrative extraction +# ============================================================ + +def extract_schedule_o(tree): + """Extract every SupplementalInformationDetail entry from Schedule O. + + Returns a list of dicts (without raw_filing_id — added in process_filing). + line_number is the 1-based ordinal position among emitted rows. Entries + with no narrative text in any of the explanation fields are skipped to + avoid emitting low-signal placeholder rows. + """ + rows = [] + sched_o = tree.find(SCHEDULE_O_XPATH) + if sched_o is None: + return rows + + details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") + for d in details: + explanation = ( + text(d, "irs:ExplanationTxt") + or text(d, "irs:MediumExplanationTxt") + or text(d, "irs:ShortExplanationTxt") + ) + if explanation is None: + continue + rows.append({ + "line_number": len(rows) + 1, + "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), + "explanation": explanation, + }) + return rows + + +# ============================================================ +# Form 990-EZ summary extraction +# ============================================================ + +def extract_form_990ez(tree): + """Extract filing-level summary fields for raw.form_990ez.""" + ez = f".//{{{NS}}}IRS990EZ" + + # Filer identity from ReturnHeader (with old-schema fallbacks). + filer = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + "website": text(tree, f"{ez}/irs:WebsiteAddressTxt"), + } + + # Classification. The 990-EZ schema puts the 501(c) subsection in an + # attribute on Organization501cInd (e.g. <Organization501cInd + # organization501cTypeTxt="7"/>), not in element text. A 501(c)(3) filing + # instead sets <Organization501c3Ind>X</Organization501c3Ind> with no + # separate subsection element — in that case we leave section_501c_type + # NULL rather than inferring "3" from the boolean flag. + section_501c_type = None + el_501c = tree.find(f"{ez}/irs:Organization501cInd", NS_MAP) + if el_501c is not None: + section_501c_type = el_501c.get("organization501cTypeTxt") + + classification = { + "is_501c3": text_bool(tree, f"{ez}/irs:Organization501c3Ind"), + "section_501c_type": section_501c_type, + "group_exemption_num": text(tree, f"{ez}/irs:GroupExemptionNum"), + } + + # Filing status flags + status = { + "is_amended": text_bool(tree, f"{ez}/irs:AmendedReturnInd"), + "is_initial": text_bool(tree, f"{ez}/irs:InitialReturnInd"), + "is_final": text_bool(tree, f"{ez}/irs:FinalReturnInd"), + } + + # Part I: revenue / expenses + part_i = {} + part_i_fields = { + "gross_receipts": "GrossReceiptsAmt", + "contributions_gifts_grants": "ContributionsGiftsGrantsEtcAmt", + "program_service_revenue": "ProgramServiceRevenueAmt", + "investment_income": "InvestmentIncomeAmt", + "total_revenue": "TotalRevenueAmt", + "grants_paid": "GrantsAndSimilarAmountsPaidAmt", + "salaries_compensation": "SalariesOtherCompEmplBnftAmt", + "total_expenses": "TotalExpensesAmt", + "revenue_less_expenses": "ExcessOrDeficitForYearAmt", + } + for col, elem in part_i_fields.items(): + part_i[col] = parse_numeric(text(tree, f"{ez}/irs:{elem}")) + + # Part II: balance sheet (BOY/EOY child elements under group wrappers). + balance_sheet = { + "total_assets_boy": parse_numeric( + text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:BOYAmt") + ), + "total_assets_eoy": parse_numeric( + text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:EOYAmt") + ), + "total_liabilities_boy": parse_numeric( + text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:BOYAmt") + ), + "total_liabilities_eoy": parse_numeric( + text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:EOYAmt") + ), + "net_assets_boy": parse_numeric( + text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:BOYAmt") + ), + "net_assets_eoy": parse_numeric( + text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:EOYAmt") + ), + } + + # Schedule O presence + sched_o_presence = { + "has_schedule_o": tree.find(SCHEDULE_O_XPATH) is not None, + } + + # Officer / signer + officer = { + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return { + **filer, **classification, **status, + **part_i, **balance_sheet, + **sched_o_presence, **officer, + } + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(form_data, schedule_o_rows): + """Determine grant detail completeness for a Form 990-EZ filing. + + 990-EZ has no recipient table to inspect. Line 10 (grants_paid) is an + aggregate and grant detail is instructed to be listed in Schedule O, but + the raw XML gives us no way to tell that a given Schedule O entry is + *the* line-10 narrative rather than unrelated supplemental text. For a + provenance-strict raw layer we therefore only report what we can verify + from the structured fields: + + - no_grants: line 10 is null or 0 (no grants reported). + - unresolved: line 10 is positive. Whether that detail lives in Schedule + O, an attachment, or nowhere is left to a downstream classifier. + + The schedule_o_rows argument is intentionally unused; it's kept in the + signature so callers (and a future classifier) have a single place to + evolve the logic. + """ + del schedule_o_rows # intentionally unused — see docstring + grants_paid = form_data.get("grants_paid") + if grants_paid is None or float(grants_paid) == 0: + return "no_grants" + return "unresolved" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single Form 990-EZ filing. All child writes are transactional.""" + + metadata = extract_filing_metadata(tree) + schedule_o_entries = extract_schedule_o(tree) + form_data = extract_form_990ez(tree) + form_data["grant_detail_status"] = compute_grant_detail_status( + form_data, schedule_o_entries, + ) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + schedule_o_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in schedule_o_entries + ] + + _replace_children( + conn, raw_filing_id, filing_form_data, schedule_o_rows, xml_rows, + ) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + xml fields + return 3 + len(schedule_o_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, schedule_o_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990ez WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990ez ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert Schedule O narrative + if schedule_o_rows: + from psycopg2.extras import execute_batch + schedule_o_values = [ + [row.get(col) for col in SCHEDULE_O_COLUMNS] + for row in schedule_o_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " + f"VALUES ({schedule_o_placeholders})", + schedule_o_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a Form 990-EZ. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type not in FORM_990EZ_RETURN_TYPES: + return None + + source_document_id = None + try: + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + stage = "process_filing" if source_document_id else "derive_source_document_id" + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage=stage) + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990ez <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() |
