diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/parse/irs_990.py | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to 'scripts/parse/irs_990.py')
| -rw-r--r-- | scripts/parse/irs_990.py | 691 |
1 files changed, 691 insertions, 0 deletions
diff --git a/scripts/parse/irs_990.py b/scripts/parse/irs_990.py new file mode 100644 index 0000000..3a5cc2d --- /dev/null +++ b/scripts/parse/irs_990.py @@ -0,0 +1,691 @@ +""" +Parse IRS Form 990 XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990, + raw.grant_990, raw.schedule_o. + +Usage: + python -m scripts.parse.irs_990 data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990 data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import normalize_ein, parse_numeric, is_placeholder +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# Module-level truthy set used by every classification check. +_TRUTHY = {"X", "x", "1", "true"} + +# IRS ReturnTypeCd values that map to Form 990 (includes amended return variant). +FORM_990_RETURN_TYPES = {"990", "990A"} + +# Schedule I RecipientTable rows (one xpath — modern schema only). +GRANT_XPATHS = [ + f".//{{{NS}}}IRS990ScheduleI/{{{NS}}}RecipientTable", +] + +SCHEDULE_I_XPATH = f".//{{{NS}}}IRS990ScheduleI" +SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" + + +# ============================================================ +# Grant extraction (Schedule I RecipientTable) +# ============================================================ + +def extract_grant(g, line_number): + """Extract a single Schedule I grant row from a RecipientTable element. + + Child element names vary slightly across IRS schema versions, so each + field tries the modern tag first and falls back to the older variant. + + Returns None for stub rows that lack both a recipient name and any + grant amount — these appear in real filings as malformed RecipientTable + entries (e.g. just a PurposeOfGrantTxt with no recipient or amount). + """ + if len(g) == 0: + return None + + cash_raw = text(g, "irs:CashGrantAmt") + non_cash_raw = text(g, "irs:NonCashAssistanceAmt") + recipient_name = ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") + ) + recipient_name2 = ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") + ) + + if (recipient_name is None and recipient_name2 is None + and cash_raw is None and non_cash_raw is None): + return None + + return { + "line_number": line_number, + "recipient_name": recipient_name, + "recipient_name2": recipient_name2, + "recipient_ein": normalize_ein(text(g, "irs:RecipientEIN")), + "address_line1": ( + text(g, "irs:USAddress/irs:AddressLine1Txt") + or text(g, "irs:USAddress/irs:AddressLine1") + or text(g, "irs:ForeignAddress/irs:AddressLine1Txt") + or text(g, "irs:ForeignAddress/irs:AddressLine1") + ), + "address_line2": ( + text(g, "irs:USAddress/irs:AddressLine2Txt") + or text(g, "irs:USAddress/irs:AddressLine2") + or text(g, "irs:ForeignAddress/irs:AddressLine2Txt") + or text(g, "irs:ForeignAddress/irs:AddressLine2") + ), + "city": ( + text(g, "irs:USAddress/irs:CityNm") + or text(g, "irs:USAddress/irs:City") + or text(g, "irs:ForeignAddress/irs:CityNm") + or text(g, "irs:ForeignAddress/irs:City") + ), + "state": ( + text(g, "irs:USAddress/irs:StateAbbreviationCd") + or text(g, "irs:USAddress/irs:State") + or text(g, "irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "zip": ( + text(g, "irs:USAddress/irs:ZIPCd") + or text(g, "irs:USAddress/irs:ZIPCode") + ), + "country": text(g, "irs:ForeignAddress/irs:CountryCd"), + "foreign_postal_code": text(g, "irs:ForeignAddress/irs:ForeignPostalCd"), + "cash_grant_amt_raw": cash_raw, + "cash_grant_amt": parse_numeric(cash_raw), + "non_cash_amt_raw": non_cash_raw, + "non_cash_amt": parse_numeric(non_cash_raw), + "non_cash_desc": text(g, "irs:NonCashAssistanceDesc"), + "valuation_method": text(g, "irs:ValuationMethodUsedDesc"), + "purpose": text(g, "irs:PurposeOfGrantTxt"), + "irc_section": text(g, "irs:IRCSectionDesc"), + } + + +def find_all_grants(tree): + """Find all Schedule I RecipientTable elements.""" + grants = [] + for xpath in GRANT_XPATHS: + grants.extend(tree.findall(xpath)) + return grants + + +# ============================================================ +# Schedule O narrative extraction +# ============================================================ + +def extract_schedule_o(tree): + """Extract every SupplementalInformationDetail entry from Schedule O. + + Returns a list of dicts (without raw_filing_id — added in process_filing). + line_number is the 1-based ordinal position among emitted rows. Entries + with no narrative text in any of the explanation fields are skipped to + avoid emitting low-signal placeholder rows. + """ + rows = [] + sched_o = tree.find(SCHEDULE_O_XPATH) + if sched_o is None: + return rows + + details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") + for d in details: + explanation = ( + text(d, "irs:ExplanationTxt") + or text(d, "irs:MediumExplanationTxt") + or text(d, "irs:ShortExplanationTxt") + ) + if explanation is None: + continue + rows.append({ + "line_number": len(rows) + 1, + "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), + "explanation": explanation, + }) + return rows + + +# ============================================================ +# Form 990 summary extraction +# ============================================================ + +def extract_form_990(tree): + """Extract filing-level summary fields for raw.form_990.""" + f990 = f".//{{{NS}}}IRS990" + + # Filer address from ReturnHeader (with old-schema fallbacks) + filer_addr = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_address_line2": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + "website": text(tree, f"{f990}/irs:WebsiteAddressTxt"), + } + + # Classification + org_type = None + if text(tree, f"{f990}/irs:TypeOfOrganizationCorpInd") in _TRUTHY: + org_type = "corp" + elif text(tree, f"{f990}/irs:TypeOfOrganizationTrustInd") in _TRUTHY: + org_type = "trust" + elif text(tree, f"{f990}/irs:TypeOfOrganizationAssocInd") in _TRUTHY: + org_type = "assoc" + elif text(tree, f"{f990}/irs:TypeOfOrganizationOtherInd") in _TRUTHY: + org_type = "other" + + method_cash = text(tree, f"{f990}/irs:MethodOfAccountingCashInd") + method_accrual = text(tree, f"{f990}/irs:MethodOfAccountingAccrualInd") + method_other = text(tree, f"{f990}/irs:MethodOfAccountingOtherInd") + if method_cash in _TRUTHY: + accounting_method = "cash" + elif method_accrual in _TRUTHY: + accounting_method = "accrual" + elif method_other in _TRUTHY: + accounting_method = "other" + else: + accounting_method = None + + # The 501(c) subsection number lives in an attribute, not in the + # element body. (The body is typically "X" — the checkbox indicator.) + # A 501(c)(3) filing sets Organization501c3Ind instead and leaves this + # attribute unset; we deliberately don't infer "3" in that case, to + # keep raw provenance honest. + section_501c_type = None + el_501c = tree.find(f"{f990}/irs:Organization501cInd", NS_MAP) + if el_501c is not None: + section_501c_type = el_501c.get("organization501cTypeTxt") + + classification = { + "is_501c3": text_bool(tree, f"{f990}/irs:Organization501c3Ind"), + "section_501c_type": section_501c_type, + "org_type": org_type, + "group_return": text_bool(tree, f"{f990}/irs:GroupReturnForAffiliatesInd"), + "group_exemption_num": text(tree, f"{f990}/irs:GroupExemptionNum"), + "formation_year": text(tree, f"{f990}/irs:FormationYr"), + "legal_domicile_state": text(tree, f"{f990}/irs:LegalDomicileStateCd"), + "mission": ( + text(tree, f"{f990}/irs:ActivityOrMissionDesc") + or text(tree, f"{f990}/irs:MissionDesc") + ), + "accounting_method": accounting_method, + } + + # Filing status flags + status = { + "is_amended": text_bool(tree, f"{f990}/irs:AmendedReturnInd"), + "is_initial": text_bool(tree, f"{f990}/irs:InitialReturnInd"), + "is_final": text_bool(tree, f"{f990}/irs:FinalReturnInd"), + "is_terminated": text_bool(tree, f"{f990}/irs:TerminateOperationsInd"), + } + + # Part I: current year summary + part_i_cy = {} + part_i_cy_fields = { + "gross_receipts": "GrossReceiptsAmt", + "cy_contributions_grants": "CYContributionsGrantsAmt", + "cy_program_service_revenue": "CYProgramServiceRevenueAmt", + "cy_investment_income": "CYInvestmentIncomeAmt", + "cy_other_revenue": "CYOtherRevenueAmt", + "cy_total_revenue": "CYTotalRevenueAmt", + "cy_grants_paid": "CYGrantsAndSimilarPaidAmt", + "cy_benefits_to_members": "CYBenefitsPaidToMembersAmt", + "cy_salaries_benefits": "CYSalariesCompEmpBnftPaidAmt", + "cy_fundraising_expense": "CYTotalFundraisingExpenseAmt", + "cy_other_expenses": "CYOtherExpensesAmt", + "cy_total_expenses": "CYTotalExpensesAmt", + "cy_revenue_less_expenses": "CYRevenuesLessExpensesAmt", + } + for col, elem in part_i_cy_fields.items(): + part_i_cy[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) + + # Part I: prior year summary + part_i_py = { + "py_total_revenue": parse_numeric(text(tree, f"{f990}/irs:PYTotalRevenueAmt")), + "py_total_expenses": parse_numeric(text(tree, f"{f990}/irs:PYTotalExpensesAmt")), + } + + # Balance sheet (Part I summary / Part X) + balance_sheet = {} + bs_fields = { + "total_assets_boy": "TotalAssetsBOYAmt", + "total_assets_eoy": "TotalAssetsEOYAmt", + "total_liabilities_boy": "TotalLiabilitiesBOYAmt", + "total_liabilities_eoy": "TotalLiabilitiesEOYAmt", + "net_assets_boy": "NetAssetsOrFundBalancesBOYAmt", + "net_assets_eoy": "NetAssetsOrFundBalancesEOYAmt", + } + for col, elem in bs_fields.items(): + balance_sheet[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) + + # Governance / workforce + workforce = { + "total_employees": parse_numeric(text(tree, f"{f990}/irs:TotalEmployeeCnt")), + "total_volunteers": parse_numeric(text(tree, f"{f990}/irs:TotalVolunteersCnt")), + "voting_members": parse_numeric( + text(tree, f"{f990}/irs:VotingMembersGoverningBodyCnt") + or text(tree, f"{f990}/irs:GoverningBodyVotingMembersCnt") + ), + "independent_voting_members": parse_numeric( + text(tree, f"{f990}/irs:VotingMembersIndependentCnt") + or text(tree, f"{f990}/irs:IndependentVotingMemberCnt") + ), + } + + # Part IX: functional expense breakdown + func_exp = { + "program_services_expense": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ProgramServicesAmt") + ), + "management_general_expense": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ManagementAndGeneralAmt") + ), + "fundraising_expense_ix": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:FundraisingAmt") + ), + } + + # Part VIII: revenue detail + revenue_detail = { + "government_grants": parse_numeric(text(tree, f"{f990}/irs:GovernmentGrantsAmt")), + "total_contributions": parse_numeric(text(tree, f"{f990}/irs:TotalContributionsAmt")), + "total_program_service_rev": parse_numeric( + text(tree, f"{f990}/irs:TotalProgramServiceRevenueAmt") + ), + "investment_income": parse_numeric( + text(tree, f"{f990}/irs:InvestmentIncomeGrp/irs:TotalRevenueColumnAmt") + ), + } + + # UBI + ubi = { + "gross_ubi": parse_numeric(text(tree, f"{f990}/irs:TotalGrossUBIAmt")), + "net_ubi": parse_numeric(text(tree, f"{f990}/irs:NetUnrelatedBusTxblIncmAmt")), + } + + # Schedule I metadata (1:1 with the filing) + sched_i_el = tree.find(SCHEDULE_I_XPATH) + if sched_i_el is not None: + sched_i = { + "sched_i_grant_records_maintained": text_bool(sched_i_el, "irs:GrantRecordsMaintainedInd"), + "sched_i_501c3_org_count": parse_numeric(text(sched_i_el, "irs:Total501c3OrgCnt")), + "sched_i_other_org_count": parse_numeric(text(sched_i_el, "irs:TotalOtherOrgCnt")), + "sched_i_total_grants_amt": None, + } + else: + sched_i = { + "sched_i_grant_records_maintained": None, + "sched_i_501c3_org_count": None, + "sched_i_other_org_count": None, + "sched_i_total_grants_amt": None, + } + + # Officer / signer + officer = { + "principal_officer": text(tree, f"{f990}/irs:PrincipalOfficerNm"), + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return { + **filer_addr, **classification, **status, + **part_i_cy, **part_i_py, **balance_sheet, + **workforce, **func_exp, **revenue_detail, **ubi, + **sched_i, **officer, + } + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(sched_i_element, grant_elements, grant_rows, cy_grants_paid): + """Determine grant detail completeness for a Form 990 filing. + + Uses filing-level context (Schedule I presence + Part I CYGrantsAndSimilarPaidAmt) + to distinguish "no Schedule I because no grants" from "no Schedule I but + grants reported on Part I". + """ + if sched_i_element is None: + # No Schedule I — but check whether Part I reports grants paid. + if cy_grants_paid is not None and float(cy_grants_paid) > 0: + return "unresolved" + return "no_grants" + + if not grant_elements: + return "unresolved" + + if not grant_rows: + return "unresolved" + + placeholder_count = sum( + 1 for r in grant_rows + if is_placeholder(r.get("recipient_name")) + or (is_placeholder(r.get("cash_grant_amt_raw")) + and is_placeholder(r.get("non_cash_amt_raw"))) + ) + + if placeholder_count == len(grant_rows): + return "placeholder_only" + if placeholder_count > 0: + return "see_attached" + return "complete" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +GRANT_COLUMNS = [ + "raw_filing_id", "line_number", + "recipient_name", "recipient_name2", "recipient_ein", + "address_line1", "address_line2", "city", "state", "zip", + "country", "foreign_postal_code", + "cash_grant_amt_raw", "cash_grant_amt", + "non_cash_amt_raw", "non_cash_amt", + "non_cash_desc", "valuation_method", "purpose", "irc_section", +] + +SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single Form 990 filing. All child writes are transactional.""" + + # Extract filing metadata + metadata = extract_filing_metadata(tree) + + # Locate Schedule I element (used both for grant extraction and status logic) + sched_i_element = tree.find(SCHEDULE_I_XPATH) + + # Extract grants from Schedule I RecipientTable rows + grant_elements = find_all_grants(tree) + extracted_grants = [] + for i, g in enumerate(grant_elements, start=1): + row = extract_grant(g, i) + if row is not None: + extracted_grants.append(row) + + # Extract Schedule O narrative entries + schedule_o_entries = extract_schedule_o(tree) + + # Extract form summary + form_data = extract_form_990(tree) + form_data["grant_detail_status"] = compute_grant_detail_status( + sched_i_element, grant_elements, extracted_grants, form_data.get("cy_grants_paid"), + ) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + grant_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in extracted_grants + ] + schedule_o_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in schedule_o_entries + ] + + _replace_children( + conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows, + ) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields + return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) + schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.grant_990 WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990 WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990 ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert grants + if grant_rows: + from psycopg2.extras import execute_batch + grant_values = [ + [row.get(col) for col in GRANT_COLUMNS] + for row in grant_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.grant_990 ({', '.join(GRANT_COLUMNS)}) " + f"VALUES ({grant_placeholders})", + grant_values, + ) + + # Insert Schedule O narrative + if schedule_o_rows: + from psycopg2.extras import execute_batch + schedule_o_values = [ + [row.get(col) for col in SCHEDULE_O_COLUMNS] + for row in schedule_o_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " + f"VALUES ({schedule_o_placeholders})", + schedule_o_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a Form 990. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type not in FORM_990_RETURN_TYPES: + return None + + source_document_id = None + try: + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + stage = "process_filing" if source_document_id else "derive_source_document_id" + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage=stage) + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990 <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() |
