diff options
| author | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-04-10 11:13:57 +0800 |
| commit | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (patch) | |
| tree | 52f9d176c2ce1a80adb2ea2ac31cd12d3a29c0db /scripts/parse | |
| parent | 493746b14c1251a45b061d2e3edd9160c929d2b9 (diff) | |
| download | tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.gz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.bz2 tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.lz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.xz tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.tar.zst tidyindex-6605e2cc428e3bdaa174ccc432941eab8c5d61cb.zip | |
ensure parsers do not parse and store raw XML fields
Diffstat (limited to '')
| -rw-r--r-- | scripts/parse/__init__.py | 0 | ||||
| -rw-r--r-- | scripts/parse/irs_990.py | 691 | ||||
| -rw-r--r-- | scripts/parse/irs_990ez.py | 449 | ||||
| -rw-r--r-- | scripts/parse/irs_990pf.py | 544 | ||||
| -rw-r--r-- | scripts/parse/irs_bmf.py | 231 |
5 files changed, 1915 insertions, 0 deletions
diff --git a/scripts/parse/__init__.py b/scripts/parse/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/scripts/parse/__init__.py diff --git a/scripts/parse/irs_990.py b/scripts/parse/irs_990.py new file mode 100644 index 0000000..3a5cc2d --- /dev/null +++ b/scripts/parse/irs_990.py @@ -0,0 +1,691 @@ +""" +Parse IRS Form 990 XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990, + raw.grant_990, raw.schedule_o. + +Usage: + python -m scripts.parse.irs_990 data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990 data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import normalize_ein, parse_numeric, is_placeholder +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# Module-level truthy set used by every classification check. +_TRUTHY = {"X", "x", "1", "true"} + +# IRS ReturnTypeCd values that map to Form 990 (includes amended return variant). +FORM_990_RETURN_TYPES = {"990", "990A"} + +# Schedule I RecipientTable rows (one xpath — modern schema only). +GRANT_XPATHS = [ + f".//{{{NS}}}IRS990ScheduleI/{{{NS}}}RecipientTable", +] + +SCHEDULE_I_XPATH = f".//{{{NS}}}IRS990ScheduleI" +SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" + + +# ============================================================ +# Grant extraction (Schedule I RecipientTable) +# ============================================================ + +def extract_grant(g, line_number): + """Extract a single Schedule I grant row from a RecipientTable element. + + Child element names vary slightly across IRS schema versions, so each + field tries the modern tag first and falls back to the older variant. + + Returns None for stub rows that lack both a recipient name and any + grant amount — these appear in real filings as malformed RecipientTable + entries (e.g. just a PurposeOfGrantTxt with no recipient or amount). + """ + if len(g) == 0: + return None + + cash_raw = text(g, "irs:CashGrantAmt") + non_cash_raw = text(g, "irs:NonCashAssistanceAmt") + recipient_name = ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") + ) + recipient_name2 = ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") + ) + + if (recipient_name is None and recipient_name2 is None + and cash_raw is None and non_cash_raw is None): + return None + + return { + "line_number": line_number, + "recipient_name": recipient_name, + "recipient_name2": recipient_name2, + "recipient_ein": normalize_ein(text(g, "irs:RecipientEIN")), + "address_line1": ( + text(g, "irs:USAddress/irs:AddressLine1Txt") + or text(g, "irs:USAddress/irs:AddressLine1") + or text(g, "irs:ForeignAddress/irs:AddressLine1Txt") + or text(g, "irs:ForeignAddress/irs:AddressLine1") + ), + "address_line2": ( + text(g, "irs:USAddress/irs:AddressLine2Txt") + or text(g, "irs:USAddress/irs:AddressLine2") + or text(g, "irs:ForeignAddress/irs:AddressLine2Txt") + or text(g, "irs:ForeignAddress/irs:AddressLine2") + ), + "city": ( + text(g, "irs:USAddress/irs:CityNm") + or text(g, "irs:USAddress/irs:City") + or text(g, "irs:ForeignAddress/irs:CityNm") + or text(g, "irs:ForeignAddress/irs:City") + ), + "state": ( + text(g, "irs:USAddress/irs:StateAbbreviationCd") + or text(g, "irs:USAddress/irs:State") + or text(g, "irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "zip": ( + text(g, "irs:USAddress/irs:ZIPCd") + or text(g, "irs:USAddress/irs:ZIPCode") + ), + "country": text(g, "irs:ForeignAddress/irs:CountryCd"), + "foreign_postal_code": text(g, "irs:ForeignAddress/irs:ForeignPostalCd"), + "cash_grant_amt_raw": cash_raw, + "cash_grant_amt": parse_numeric(cash_raw), + "non_cash_amt_raw": non_cash_raw, + "non_cash_amt": parse_numeric(non_cash_raw), + "non_cash_desc": text(g, "irs:NonCashAssistanceDesc"), + "valuation_method": text(g, "irs:ValuationMethodUsedDesc"), + "purpose": text(g, "irs:PurposeOfGrantTxt"), + "irc_section": text(g, "irs:IRCSectionDesc"), + } + + +def find_all_grants(tree): + """Find all Schedule I RecipientTable elements.""" + grants = [] + for xpath in GRANT_XPATHS: + grants.extend(tree.findall(xpath)) + return grants + + +# ============================================================ +# Schedule O narrative extraction +# ============================================================ + +def extract_schedule_o(tree): + """Extract every SupplementalInformationDetail entry from Schedule O. + + Returns a list of dicts (without raw_filing_id — added in process_filing). + line_number is the 1-based ordinal position among emitted rows. Entries + with no narrative text in any of the explanation fields are skipped to + avoid emitting low-signal placeholder rows. + """ + rows = [] + sched_o = tree.find(SCHEDULE_O_XPATH) + if sched_o is None: + return rows + + details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") + for d in details: + explanation = ( + text(d, "irs:ExplanationTxt") + or text(d, "irs:MediumExplanationTxt") + or text(d, "irs:ShortExplanationTxt") + ) + if explanation is None: + continue + rows.append({ + "line_number": len(rows) + 1, + "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), + "explanation": explanation, + }) + return rows + + +# ============================================================ +# Form 990 summary extraction +# ============================================================ + +def extract_form_990(tree): + """Extract filing-level summary fields for raw.form_990.""" + f990 = f".//{{{NS}}}IRS990" + + # Filer address from ReturnHeader (with old-schema fallbacks) + filer_addr = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_address_line2": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + "website": text(tree, f"{f990}/irs:WebsiteAddressTxt"), + } + + # Classification + org_type = None + if text(tree, f"{f990}/irs:TypeOfOrganizationCorpInd") in _TRUTHY: + org_type = "corp" + elif text(tree, f"{f990}/irs:TypeOfOrganizationTrustInd") in _TRUTHY: + org_type = "trust" + elif text(tree, f"{f990}/irs:TypeOfOrganizationAssocInd") in _TRUTHY: + org_type = "assoc" + elif text(tree, f"{f990}/irs:TypeOfOrganizationOtherInd") in _TRUTHY: + org_type = "other" + + method_cash = text(tree, f"{f990}/irs:MethodOfAccountingCashInd") + method_accrual = text(tree, f"{f990}/irs:MethodOfAccountingAccrualInd") + method_other = text(tree, f"{f990}/irs:MethodOfAccountingOtherInd") + if method_cash in _TRUTHY: + accounting_method = "cash" + elif method_accrual in _TRUTHY: + accounting_method = "accrual" + elif method_other in _TRUTHY: + accounting_method = "other" + else: + accounting_method = None + + # The 501(c) subsection number lives in an attribute, not in the + # element body. (The body is typically "X" — the checkbox indicator.) + # A 501(c)(3) filing sets Organization501c3Ind instead and leaves this + # attribute unset; we deliberately don't infer "3" in that case, to + # keep raw provenance honest. + section_501c_type = None + el_501c = tree.find(f"{f990}/irs:Organization501cInd", NS_MAP) + if el_501c is not None: + section_501c_type = el_501c.get("organization501cTypeTxt") + + classification = { + "is_501c3": text_bool(tree, f"{f990}/irs:Organization501c3Ind"), + "section_501c_type": section_501c_type, + "org_type": org_type, + "group_return": text_bool(tree, f"{f990}/irs:GroupReturnForAffiliatesInd"), + "group_exemption_num": text(tree, f"{f990}/irs:GroupExemptionNum"), + "formation_year": text(tree, f"{f990}/irs:FormationYr"), + "legal_domicile_state": text(tree, f"{f990}/irs:LegalDomicileStateCd"), + "mission": ( + text(tree, f"{f990}/irs:ActivityOrMissionDesc") + or text(tree, f"{f990}/irs:MissionDesc") + ), + "accounting_method": accounting_method, + } + + # Filing status flags + status = { + "is_amended": text_bool(tree, f"{f990}/irs:AmendedReturnInd"), + "is_initial": text_bool(tree, f"{f990}/irs:InitialReturnInd"), + "is_final": text_bool(tree, f"{f990}/irs:FinalReturnInd"), + "is_terminated": text_bool(tree, f"{f990}/irs:TerminateOperationsInd"), + } + + # Part I: current year summary + part_i_cy = {} + part_i_cy_fields = { + "gross_receipts": "GrossReceiptsAmt", + "cy_contributions_grants": "CYContributionsGrantsAmt", + "cy_program_service_revenue": "CYProgramServiceRevenueAmt", + "cy_investment_income": "CYInvestmentIncomeAmt", + "cy_other_revenue": "CYOtherRevenueAmt", + "cy_total_revenue": "CYTotalRevenueAmt", + "cy_grants_paid": "CYGrantsAndSimilarPaidAmt", + "cy_benefits_to_members": "CYBenefitsPaidToMembersAmt", + "cy_salaries_benefits": "CYSalariesCompEmpBnftPaidAmt", + "cy_fundraising_expense": "CYTotalFundraisingExpenseAmt", + "cy_other_expenses": "CYOtherExpensesAmt", + "cy_total_expenses": "CYTotalExpensesAmt", + "cy_revenue_less_expenses": "CYRevenuesLessExpensesAmt", + } + for col, elem in part_i_cy_fields.items(): + part_i_cy[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) + + # Part I: prior year summary + part_i_py = { + "py_total_revenue": parse_numeric(text(tree, f"{f990}/irs:PYTotalRevenueAmt")), + "py_total_expenses": parse_numeric(text(tree, f"{f990}/irs:PYTotalExpensesAmt")), + } + + # Balance sheet (Part I summary / Part X) + balance_sheet = {} + bs_fields = { + "total_assets_boy": "TotalAssetsBOYAmt", + "total_assets_eoy": "TotalAssetsEOYAmt", + "total_liabilities_boy": "TotalLiabilitiesBOYAmt", + "total_liabilities_eoy": "TotalLiabilitiesEOYAmt", + "net_assets_boy": "NetAssetsOrFundBalancesBOYAmt", + "net_assets_eoy": "NetAssetsOrFundBalancesEOYAmt", + } + for col, elem in bs_fields.items(): + balance_sheet[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) + + # Governance / workforce + workforce = { + "total_employees": parse_numeric(text(tree, f"{f990}/irs:TotalEmployeeCnt")), + "total_volunteers": parse_numeric(text(tree, f"{f990}/irs:TotalVolunteersCnt")), + "voting_members": parse_numeric( + text(tree, f"{f990}/irs:VotingMembersGoverningBodyCnt") + or text(tree, f"{f990}/irs:GoverningBodyVotingMembersCnt") + ), + "independent_voting_members": parse_numeric( + text(tree, f"{f990}/irs:VotingMembersIndependentCnt") + or text(tree, f"{f990}/irs:IndependentVotingMemberCnt") + ), + } + + # Part IX: functional expense breakdown + func_exp = { + "program_services_expense": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ProgramServicesAmt") + ), + "management_general_expense": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ManagementAndGeneralAmt") + ), + "fundraising_expense_ix": parse_numeric( + text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:FundraisingAmt") + ), + } + + # Part VIII: revenue detail + revenue_detail = { + "government_grants": parse_numeric(text(tree, f"{f990}/irs:GovernmentGrantsAmt")), + "total_contributions": parse_numeric(text(tree, f"{f990}/irs:TotalContributionsAmt")), + "total_program_service_rev": parse_numeric( + text(tree, f"{f990}/irs:TotalProgramServiceRevenueAmt") + ), + "investment_income": parse_numeric( + text(tree, f"{f990}/irs:InvestmentIncomeGrp/irs:TotalRevenueColumnAmt") + ), + } + + # UBI + ubi = { + "gross_ubi": parse_numeric(text(tree, f"{f990}/irs:TotalGrossUBIAmt")), + "net_ubi": parse_numeric(text(tree, f"{f990}/irs:NetUnrelatedBusTxblIncmAmt")), + } + + # Schedule I metadata (1:1 with the filing) + sched_i_el = tree.find(SCHEDULE_I_XPATH) + if sched_i_el is not None: + sched_i = { + "sched_i_grant_records_maintained": text_bool(sched_i_el, "irs:GrantRecordsMaintainedInd"), + "sched_i_501c3_org_count": parse_numeric(text(sched_i_el, "irs:Total501c3OrgCnt")), + "sched_i_other_org_count": parse_numeric(text(sched_i_el, "irs:TotalOtherOrgCnt")), + "sched_i_total_grants_amt": None, + } + else: + sched_i = { + "sched_i_grant_records_maintained": None, + "sched_i_501c3_org_count": None, + "sched_i_other_org_count": None, + "sched_i_total_grants_amt": None, + } + + # Officer / signer + officer = { + "principal_officer": text(tree, f"{f990}/irs:PrincipalOfficerNm"), + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return { + **filer_addr, **classification, **status, + **part_i_cy, **part_i_py, **balance_sheet, + **workforce, **func_exp, **revenue_detail, **ubi, + **sched_i, **officer, + } + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(sched_i_element, grant_elements, grant_rows, cy_grants_paid): + """Determine grant detail completeness for a Form 990 filing. + + Uses filing-level context (Schedule I presence + Part I CYGrantsAndSimilarPaidAmt) + to distinguish "no Schedule I because no grants" from "no Schedule I but + grants reported on Part I". + """ + if sched_i_element is None: + # No Schedule I — but check whether Part I reports grants paid. + if cy_grants_paid is not None and float(cy_grants_paid) > 0: + return "unresolved" + return "no_grants" + + if not grant_elements: + return "unresolved" + + if not grant_rows: + return "unresolved" + + placeholder_count = sum( + 1 for r in grant_rows + if is_placeholder(r.get("recipient_name")) + or (is_placeholder(r.get("cash_grant_amt_raw")) + and is_placeholder(r.get("non_cash_amt_raw"))) + ) + + if placeholder_count == len(grant_rows): + return "placeholder_only" + if placeholder_count > 0: + return "see_attached" + return "complete" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +GRANT_COLUMNS = [ + "raw_filing_id", "line_number", + "recipient_name", "recipient_name2", "recipient_ein", + "address_line1", "address_line2", "city", "state", "zip", + "country", "foreign_postal_code", + "cash_grant_amt_raw", "cash_grant_amt", + "non_cash_amt_raw", "non_cash_amt", + "non_cash_desc", "valuation_method", "purpose", "irc_section", +] + +SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single Form 990 filing. All child writes are transactional.""" + + # Extract filing metadata + metadata = extract_filing_metadata(tree) + + # Locate Schedule I element (used both for grant extraction and status logic) + sched_i_element = tree.find(SCHEDULE_I_XPATH) + + # Extract grants from Schedule I RecipientTable rows + grant_elements = find_all_grants(tree) + extracted_grants = [] + for i, g in enumerate(grant_elements, start=1): + row = extract_grant(g, i) + if row is not None: + extracted_grants.append(row) + + # Extract Schedule O narrative entries + schedule_o_entries = extract_schedule_o(tree) + + # Extract form summary + form_data = extract_form_990(tree) + form_data["grant_detail_status"] = compute_grant_detail_status( + sched_i_element, grant_elements, extracted_grants, form_data.get("cy_grants_paid"), + ) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + grant_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in extracted_grants + ] + schedule_o_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in schedule_o_entries + ] + + _replace_children( + conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows, + ) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields + return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) + schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.grant_990 WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990 WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990 ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert grants + if grant_rows: + from psycopg2.extras import execute_batch + grant_values = [ + [row.get(col) for col in GRANT_COLUMNS] + for row in grant_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.grant_990 ({', '.join(GRANT_COLUMNS)}) " + f"VALUES ({grant_placeholders})", + grant_values, + ) + + # Insert Schedule O narrative + if schedule_o_rows: + from psycopg2.extras import execute_batch + schedule_o_values = [ + [row.get(col) for col in SCHEDULE_O_COLUMNS] + for row in schedule_o_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " + f"VALUES ({schedule_o_placeholders})", + schedule_o_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a Form 990. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type not in FORM_990_RETURN_TYPES: + return None + + source_document_id = None + try: + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + stage = "process_filing" if source_document_id else "derive_source_document_id" + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage=stage) + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990 <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() diff --git a/scripts/parse/irs_990ez.py b/scripts/parse/irs_990ez.py new file mode 100644 index 0000000..bea4fdd --- /dev/null +++ b/scripts/parse/irs_990ez.py @@ -0,0 +1,449 @@ +""" +Parse IRS Form 990-EZ XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990ez, + raw.schedule_o. + +Form 990-EZ has no structured grant recipient table — line 10 of Part I +aggregates grants into a single amount and instructs the filer to "list +in Schedule O." v1 scope is preservation (filing facts + Schedule O), +not grant normalization. + +Usage: + python -m scripts.parse.irs_990ez data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990ez data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import parse_numeric +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990ez" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# IRS ReturnTypeCd values that map to Form 990-EZ (includes amended variant). +FORM_990EZ_RETURN_TYPES = {"990EZ", "990EA"} + +SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" + + +# ============================================================ +# Schedule O narrative extraction +# ============================================================ + +def extract_schedule_o(tree): + """Extract every SupplementalInformationDetail entry from Schedule O. + + Returns a list of dicts (without raw_filing_id — added in process_filing). + line_number is the 1-based ordinal position among emitted rows. Entries + with no narrative text in any of the explanation fields are skipped to + avoid emitting low-signal placeholder rows. + """ + rows = [] + sched_o = tree.find(SCHEDULE_O_XPATH) + if sched_o is None: + return rows + + details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") + for d in details: + explanation = ( + text(d, "irs:ExplanationTxt") + or text(d, "irs:MediumExplanationTxt") + or text(d, "irs:ShortExplanationTxt") + ) + if explanation is None: + continue + rows.append({ + "line_number": len(rows) + 1, + "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), + "explanation": explanation, + }) + return rows + + +# ============================================================ +# Form 990-EZ summary extraction +# ============================================================ + +def extract_form_990ez(tree): + """Extract filing-level summary fields for raw.form_990ez.""" + ez = f".//{{{NS}}}IRS990EZ" + + # Filer identity from ReturnHeader (with old-schema fallbacks). + filer = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + "website": text(tree, f"{ez}/irs:WebsiteAddressTxt"), + } + + # Classification. The 990-EZ schema puts the 501(c) subsection in an + # attribute on Organization501cInd (e.g. <Organization501cInd + # organization501cTypeTxt="7"/>), not in element text. A 501(c)(3) filing + # instead sets <Organization501c3Ind>X</Organization501c3Ind> with no + # separate subsection element — in that case we leave section_501c_type + # NULL rather than inferring "3" from the boolean flag. + section_501c_type = None + el_501c = tree.find(f"{ez}/irs:Organization501cInd", NS_MAP) + if el_501c is not None: + section_501c_type = el_501c.get("organization501cTypeTxt") + + classification = { + "is_501c3": text_bool(tree, f"{ez}/irs:Organization501c3Ind"), + "section_501c_type": section_501c_type, + "group_exemption_num": text(tree, f"{ez}/irs:GroupExemptionNum"), + } + + # Filing status flags + status = { + "is_amended": text_bool(tree, f"{ez}/irs:AmendedReturnInd"), + "is_initial": text_bool(tree, f"{ez}/irs:InitialReturnInd"), + "is_final": text_bool(tree, f"{ez}/irs:FinalReturnInd"), + } + + # Part I: revenue / expenses + part_i = {} + part_i_fields = { + "gross_receipts": "GrossReceiptsAmt", + "contributions_gifts_grants": "ContributionsGiftsGrantsEtcAmt", + "program_service_revenue": "ProgramServiceRevenueAmt", + "investment_income": "InvestmentIncomeAmt", + "total_revenue": "TotalRevenueAmt", + "grants_paid": "GrantsAndSimilarAmountsPaidAmt", + "salaries_compensation": "SalariesOtherCompEmplBnftAmt", + "total_expenses": "TotalExpensesAmt", + "revenue_less_expenses": "ExcessOrDeficitForYearAmt", + } + for col, elem in part_i_fields.items(): + part_i[col] = parse_numeric(text(tree, f"{ez}/irs:{elem}")) + + # Part II: balance sheet (BOY/EOY child elements under group wrappers). + balance_sheet = { + "total_assets_boy": parse_numeric( + text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:BOYAmt") + ), + "total_assets_eoy": parse_numeric( + text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:EOYAmt") + ), + "total_liabilities_boy": parse_numeric( + text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:BOYAmt") + ), + "total_liabilities_eoy": parse_numeric( + text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:EOYAmt") + ), + "net_assets_boy": parse_numeric( + text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:BOYAmt") + ), + "net_assets_eoy": parse_numeric( + text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:EOYAmt") + ), + } + + # Schedule O presence + sched_o_presence = { + "has_schedule_o": tree.find(SCHEDULE_O_XPATH) is not None, + } + + # Officer / signer + officer = { + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return { + **filer, **classification, **status, + **part_i, **balance_sheet, + **sched_o_presence, **officer, + } + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(form_data, schedule_o_rows): + """Determine grant detail completeness for a Form 990-EZ filing. + + 990-EZ has no recipient table to inspect. Line 10 (grants_paid) is an + aggregate and grant detail is instructed to be listed in Schedule O, but + the raw XML gives us no way to tell that a given Schedule O entry is + *the* line-10 narrative rather than unrelated supplemental text. For a + provenance-strict raw layer we therefore only report what we can verify + from the structured fields: + + - no_grants: line 10 is null or 0 (no grants reported). + - unresolved: line 10 is positive. Whether that detail lives in Schedule + O, an attachment, or nowhere is left to a downstream classifier. + + The schedule_o_rows argument is intentionally unused; it's kept in the + signature so callers (and a future classifier) have a single place to + evolve the logic. + """ + del schedule_o_rows # intentionally unused — see docstring + grants_paid = form_data.get("grants_paid") + if grants_paid is None or float(grants_paid) == 0: + return "no_grants" + return "unresolved" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single Form 990-EZ filing. All child writes are transactional.""" + + metadata = extract_filing_metadata(tree) + schedule_o_entries = extract_schedule_o(tree) + form_data = extract_form_990ez(tree) + form_data["grant_detail_status"] = compute_grant_detail_status( + form_data, schedule_o_entries, + ) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + schedule_o_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in schedule_o_entries + ] + + _replace_children( + conn, raw_filing_id, filing_form_data, schedule_o_rows, xml_rows, + ) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + xml fields + return 3 + len(schedule_o_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, schedule_o_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990ez WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990ez ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert Schedule O narrative + if schedule_o_rows: + from psycopg2.extras import execute_batch + schedule_o_values = [ + [row.get(col) for col in SCHEDULE_O_COLUMNS] + for row in schedule_o_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " + f"VALUES ({schedule_o_placeholders})", + schedule_o_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a Form 990-EZ. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type not in FORM_990EZ_RETURN_TYPES: + return None + + source_document_id = None + try: + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + stage = "process_filing" if source_document_id else "derive_source_document_id" + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage=stage) + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990ez <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() diff --git a/scripts/parse/irs_990pf.py b/scripts/parse/irs_990pf.py new file mode 100644 index 0000000..3d245b8 --- /dev/null +++ b/scripts/parse/irs_990pf.py @@ -0,0 +1,544 @@ +""" +Parse IRS 990-PF XML files into the new raw schema. + +Populates: raw.filing, raw.filing_source, raw.form_990pf, + raw.grant_990pf. + +Usage: + python -m scripts.parse.irs_990pf data/irs/xml-zips/*.zip + python -m scripts.parse.irs_990pf data/irs/xml-missing/202100139349100100_public.xml +""" + +import io +import os +import sys +import zipfile + +from lxml import etree + +from scripts.common.db import execute_transaction +from scripts.common.normalize import parse_numeric, is_placeholder +from scripts.common.xml import ( + NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, +) +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.filing import ( + upsert_raw_filing, record_raw_filing_source, +) + +PARSER_NAME = "parse_irs_990pf" +SOURCE_SYSTEM = "irs_xml" + +# Standalone XML files use this as source_archive +STANDALONE_ARCHIVE = "__standalone__" + +# Grant element XPaths — three schema variants across IRS versions +GRANT_XPATHS = [ + f".//{{{NS}}}SupplementaryInformationGrp/{{{NS}}}GrantOrContributionPdDurYrGrp", + f".//{{{NS}}}SupplementaryInformation/{{{NS}}}GrantOrContriPaidDuringYear", + f".//{{{NS}}}SupplementaryInfomation/{{{NS}}}GrantOrContriPaidDuringYear", # IRS typo +] + + +# ============================================================ +# Grant extraction +# ============================================================ + +def extract_grant(g, line_number): + """Extract a single grant row from a grant XML element. + + Child element names vary across IRS schema versions, so each field + tries the modern tag first, then falls back to the older variant. + """ + if len(g) == 0: + return None + + amount_raw = ( + text(g, "irs:Amt") + or text(g, "irs:Amount") + ) + + return { + "line_number": line_number, + "recipient_name": ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") + ), + "recipient_name2": ( + text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") + or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") + ), + "recipient_person_name": ( + text(g, "irs:RecipientPersonNm") + or text(g, "irs:RecipientPersonName") + ), + "address_line1": ( + text(g, "irs:RecipientUSAddress/irs:AddressLine1Txt") + or text(g, "irs:RecipientUSAddress/irs:AddressLine1") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine1Txt") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine1") + ), + "address_line2": ( + text(g, "irs:RecipientUSAddress/irs:AddressLine2Txt") + or text(g, "irs:RecipientUSAddress/irs:AddressLine2") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine2Txt") + or text(g, "irs:RecipientForeignAddress/irs:AddressLine2") + ), + "city": ( + text(g, "irs:RecipientUSAddress/irs:CityNm") + or text(g, "irs:RecipientUSAddress/irs:City") + or text(g, "irs:RecipientForeignAddress/irs:CityNm") + or text(g, "irs:RecipientForeignAddress/irs:City") + ), + "state": ( + text(g, "irs:RecipientUSAddress/irs:StateAbbreviationCd") + or text(g, "irs:RecipientUSAddress/irs:State") + or text(g, "irs:RecipientForeignAddress/irs:ProvinceOrStateNm") + ), + "zip": ( + text(g, "irs:RecipientUSAddress/irs:ZIPCd") + or text(g, "irs:RecipientUSAddress/irs:ZIPCode") + ), + "country": text(g, "irs:RecipientForeignAddress/irs:CountryCd"), + "foreign_postal_code": text(g, "irs:RecipientForeignAddress/irs:ForeignPostalCd"), + "amount_raw": amount_raw, + "amount": parse_numeric(amount_raw), + "purpose": ( + text(g, "irs:GrantOrContributionPurposeTxt") + or text(g, "irs:PurposeOfGrantOrContriTxt") + ), + "foundation_status": ( + text(g, "irs:RecipientFoundationStatusTxt") + or text(g, "irs:RecipientFoundationStatusCd") + ), + "relationship": ( + text(g, "irs:RecipientRelationshipTxt") + or text(g, "irs:RecipientRelationship") + ), + } + + +def find_all_grants(tree): + """Find all grant elements across schema variants.""" + grants = [] + for xpath in GRANT_XPATHS: + grants.extend(tree.findall(xpath)) + return grants + + +# ============================================================ +# Form 990-PF summary extraction +# ============================================================ + +def extract_form_990pf(tree): + """Extract filing-level summary fields for raw_form_990pf.""" + pf = f".//{{{NS}}}IRS990PF" + a = f"{pf}/{{{NS}}}AnalysisOfRevenueAndExpenses" + bs = f"{pf}/{{{NS}}}Form990PFBalanceSheetsGrp" + si = f"{pf}/{{{NS}}}SupplementaryInformationGrp" + sa = f"{pf}/{{{NS}}}StatementsRegardingActyGrp" + + # Filer address from ReturnHeader. Each field tries the modern (*Txt / + # *Cd / CityNm / StateAbbreviationCd / ZIPCd) tag first and falls back + # to the old-schema variant used by older filings. + filer_addr = { + "filer_name2": ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") + ), + "filer_address_line1": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") + ), + "filer_address_line2": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") + ), + "filer_city": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:USAddress/irs:City") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") + ), + "filer_state": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:State") + or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") + ), + "filer_zip": ( + text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") + or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") + ), + "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), + "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), + "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), + } + + # Classification. text_bool preserves None (tag absent) vs False + # (tag present but not ticked); the accounting_method scan treats both + # the same because we're looking for the first truthy branch. + method_cash = text_bool(tree, f"{pf}/irs:MethodOfAccountingCashInd") + method_accrual = text_bool(tree, f"{pf}/irs:MethodOfAccountingAccrualInd") + + classification = { + "is_501c3_pf": text_bool(tree, f"{pf}/irs:Organization501c3ExemptPFInd"), + "is_4947a1_trust": text_bool(tree, f"{pf}/irs:Organization4947a1TrtdPFInd"), + "is_private_operating": text_bool(tree, f"{sa}/irs:PrivateOperatingFoundationInd"), + "accounting_method": ( + "cash" if method_cash + else ("accrual" if method_accrual else None) + ), + } + + # Filing status. is_initial is true when either of two mutually-exclusive + # indicator tags is set; we preserve None when both tags are absent. + initial_std = text_bool(tree, f"{pf}/irs:InitialReturnInd") + initial_former = text_bool(tree, f"{pf}/irs:InitialReturnFormerPubChrtyInd") + if initial_std is None and initial_former is None: + is_initial = None + else: + is_initial = bool(initial_std) or bool(initial_former) + + status = { + "is_amended": text_bool(tree, f"{pf}/irs:AmendedReturnInd"), + "is_initial": is_initial, + "is_final": text_bool(tree, f"{pf}/irs:FinalReturnInd"), + } + + # Part I: revenue and expenses + part_i = {} + part_i_fields = { + "contributions_received": f"{a}/irs:ContriRcvdRevAndExpnssAmt", + "interest_revenue": f"{a}/irs:InterestOnSavRevAndExpnssAmt", + "dividends_revenue": f"{a}/irs:DividendsRevAndExpnssAmt", + "net_gain_sale_assets": f"{a}/irs:NetGainSaleAstRevAndExpnssAmt", + "total_revenue": f"{a}/irs:TotalRevAndExpnssAmt", + "total_net_investment_income": f"{a}/irs:TotalNetInvstIncmAmt", + "compensation_officers": f"{a}/irs:CompOfcrDirTrstRevAndExpnssAmt", + "total_operating_expenses": f"{a}/irs:TotOprExpensesRevAndExpnssAmt", + "contributions_paid": f"{a}/irs:ContriPaidRevAndExpnssAmt", + "total_expenses": f"{a}/irs:TotalExpensesRevAndExpnssAmt", + "total_charitable_disbursements": f"{a}/irs:TotalExpensesDsbrsChrtblAmt", + "excess_revenue_over_expenses": f"{a}/irs:ExcessRevenueOverExpensesAmt", + "net_investment_income": f"{a}/irs:NetInvestmentIncomeAmt", + "adjusted_net_income": f"{a}/irs:AdjustedNetIncomeAmt", + } + for col, xpath in part_i_fields.items(): + part_i[col] = parse_numeric(text(tree, xpath)) + + # Part II: balance sheets + part_ii = {} + part_ii_fields = { + "total_assets_boy": f"{bs}/irs:TotalAssetsBOYAmt", + "total_assets_eoy": f"{bs}/irs:TotalAssetsEOYAmt", + "total_assets_eoy_fmv": f"{bs}/irs:TotalAssetsEOYFMVAmt", + "total_liabilities_boy": f"{bs}/irs:TotalLiabilitiesBOYAmt", + "total_liabilities_eoy": f"{bs}/irs:TotalLiabilitiesEOYAmt", + "net_assets_boy": f"{bs}/irs:TotNetAstOrFundBalancesBOYAmt", + "net_assets_eoy": f"{bs}/irs:TotNetAstOrFundBalancesEOYAmt", + "fmv_assets_eoy": f"{pf}/irs:FMVAssetsEOYAmt", + } + for col, xpath in part_ii_fields.items(): + part_ii[col] = parse_numeric(text(tree, xpath)) + + # Parts X-XII + dist = { + "minimum_investment_return": parse_numeric( + text(tree, f"{pf}/irs:MinimumInvestmentReturnGrp/irs:MinimumInvestmentReturnAmt") + ), + "distributable_amount": parse_numeric( + text(tree, f"{pf}/irs:DistributableAmountGrp/irs:DistributableAsAdjustedAmt") + ), + "qualifying_distributions": parse_numeric( + text(tree, f"{pf}/irs:QualifyingDistriPartXIIGrp/irs:QualifyingDistributionsAmt") + ), + "excise_tax_amount": parse_numeric( + text(tree, f"{pf}/irs:ExciseTaxBasedOnInvstIncmGrp/irs:InvestmentIncomeExciseTaxAmt") + ), + } + + # Part XV totals + xv = { + "total_grants_paid": parse_numeric( + text(tree, f"{si}/irs:TotalGrantOrContriPdDurYrAmt") + ), + "total_grants_approved_future": parse_numeric( + text(tree, f"{si}/irs:TotalGrantOrContriApprvFutAmt") + ), + } + + # Misc + misc = { + "website": text(tree, f"{sa}/irs:WebsiteAddressTxt"), + "state_of_registration": text(tree, f"{sa}/irs:OrgReportOrRegisterStateCd"), + } + + # Officer / signer + officer = { + "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), + "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), + "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), + "preparer_firm": ( + text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") + ), + } + + return {**filer_addr, **classification, **status, **part_i, **part_ii, **dist, **xv, **misc, **officer} + + +# ============================================================ +# Grant detail status +# ============================================================ + +def compute_grant_detail_status(grant_elements, grant_rows): + """Determine grant detail completeness status.""" + if not grant_elements: + return "no_grants" + if not grant_rows: + return "unresolved" + + placeholder_count = sum( + 1 for r in grant_rows + if is_placeholder(r.get("recipient_name")) + or is_placeholder(r.get("amount_raw")) + ) + + if placeholder_count == len(grant_rows): + return "placeholder_only" + if placeholder_count > 0: + return "see_attached" + return "complete" + + +# ============================================================ +# Per-filing processing +# ============================================================ + +GRANT_COLUMNS = [ + "raw_filing_id", "line_number", + "recipient_name", "recipient_name2", "recipient_person_name", + "address_line1", "address_line2", "city", "state", "zip", + "country", "foreign_postal_code", + "amount_raw", "amount", "purpose", "foundation_status", "relationship", +] + + + +def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): + """Process a single 990-PF filing. All child writes are transactional.""" + + # Extract filing metadata + metadata = extract_filing_metadata(tree) + + # Extract grants + grant_elements = find_all_grants(tree) + extracted_grants = [] + for i, g in enumerate(grant_elements, start=1): + row = extract_grant(g, i) + if row is not None: + extracted_grants.append(row) + + # Extract form summary + form_data = extract_form_990pf(tree) + form_data["grant_detail_status"] = compute_grant_detail_status(grant_elements, extracted_grants) + + root = tree.getroot() + return_header = root.find(f"{{{NS}}}ReturnHeader") + return_data = root.find(f"{{{NS}}}ReturnData") + + def _do(conn): + raw_filing_id = upsert_raw_filing( + SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn + ) + record_raw_filing_source( + raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn + ) + + filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} + grant_rows = [ + {**row, "raw_filing_id": raw_filing_id} + for row in extracted_grants + ] + + _replace_children(conn, raw_filing_id, filing_form_data, grant_rows, xml_rows) + + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants + xml fields + return 3 + len(grant_rows) + len(xml_rows) + + return execute_transaction(_do) + + +def _replace_children(conn, raw_filing_id, form_data, grant_rows, xml_rows): + """Delete and re-insert all child rows for a filing using the caller's transaction.""" + form_columns = list(form_data.keys()) + form_placeholders = ", ".join(["%s"] * len(form_columns)) + form_values = [form_data[col] for col in form_columns] + + grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) + + + with conn.cursor() as cur: + # Delete old child rows + cur.execute("DELETE FROM raw.grant_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) + cur.execute("DELETE FROM raw.form_990pf WHERE raw_filing_id = %s", (raw_filing_id,)) + + # Insert form summary + cur.execute( + f"INSERT INTO raw.form_990pf ({', '.join(form_columns)}) " + f"VALUES ({form_placeholders})", + form_values, + ) + + # Insert grants + if grant_rows: + from psycopg2.extras import execute_batch + grant_values = [ + [row.get(col) for col in GRANT_COLUMNS] + for row in grant_rows + ] + execute_batch( + cur, + f"INSERT INTO raw.grant_990pf ({', '.join(GRANT_COLUMNS)}) " + f"VALUES ({grant_placeholders})", + grant_values, + ) + + # Insert XML fields + + +# ============================================================ +# ZIP / file processing +# ============================================================ + +def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): + """Parse XML bytes and process if it's a 990-PF. Returns rows inserted or None if skipped.""" + try: + tree = etree.parse(io.BytesIO(xml_bytes)) + except etree.XMLSyntaxError as e: + log_ingest_error(ingest_run_id, source_archive, source_path, + f"XML parse error: {e}", stage="parse_xml") + return None + + ret_type = text(tree, ".//irs:ReturnTypeCd") + if ret_type != "990PF": + return None + + source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) + try: + return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) + except Exception as e: + log_ingest_error(ingest_run_id, source_archive, source_path, e, + source_document_id=source_document_id, stage="process_filing") + raise + + +def process_zip(zip_path, ingest_run_id): + """Process all XMLs in a ZIP file.""" + basename = os.path.basename(zip_path) + try: + zf = zipfile.ZipFile(zip_path) + except zipfile.BadZipFile as e: + log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") + print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) + return 0, 0, 0 + + with zf: + names = [n for n in zf.namelist() if n.endswith(".xml")] + + print(f"Processing {basename}: {len(names)} XML files") + files_scanned = 0 + files_matched = 0 + total_rows = 0 + + for i, name in enumerate(names): + # Count every ZIP member as scanned, even ones we fail to read — + # otherwise read failures silently shrink the scanned total and + # make run-level metrics misleading. + files_scanned += 1 + try: + xml_bytes = zf.read(name) + except Exception as e: + log_ingest_error(ingest_run_id, basename, name, e, stage="read") + continue + + try: + rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) + except Exception as e: + print(f" ERROR in {name}: {e}", file=sys.stderr) + continue + + if rows is not None: + files_matched += 1 + total_rows += rows + + if (i + 1) % 1000 == 0: + print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") + + print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + return files_scanned, files_matched, total_rows + + +def main(): + args = sys.argv[1:] + if not args: + print(f"Usage: python -m scripts.parse.irs_990pf <zip_or_xml_files...>", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(a) for a in args) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + grand_scanned = 0 + grand_matched = 0 + grand_rows = 0 + + try: + for path in args: + if path.endswith(".zip"): + scanned, matched, rows = process_zip(path, ingest_run_id) + grand_scanned += scanned + grand_matched += matched + grand_rows += rows + + elif path.endswith(".xml"): + xml_name = os.path.basename(path) + # Count before I/O, so read failures still show up in scanned. + grand_scanned += 1 + try: + with open(path, "rb") as f: + xml_bytes = f.read() + except Exception as e: + log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") + print(f"ERROR reading {path}: {e}", file=sys.stderr) + continue + + try: + rows = process_xml_bytes( + xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id + ) + except Exception as e: + print(f"ERROR in {path}: {e}", file=sys.stderr) + continue + + if rows is not None: + grand_matched += 1 + grand_rows += rows + else: + print(f"Skipping unknown file type: {path}", file=sys.stderr) + + finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") + + +if __name__ == "__main__": + main() diff --git a/scripts/parse/irs_bmf.py b/scripts/parse/irs_bmf.py new file mode 100644 index 0000000..cf02575 --- /dev/null +++ b/scripts/parse/irs_bmf.py @@ -0,0 +1,231 @@ +""" +Load IRS Business Master File (BMF) CSVs into raw.bmf. + +Usage: + python -m scripts.parse.irs_bmf + python -m scripts.parse.irs_bmf data/irs/bmf/eo1.csv data/irs/bmf/eo2.csv +""" + +import csv +import os +import sys + +from psycopg2.extras import execute_batch + +from scripts.common.db import execute_transaction +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.normalize import normalize_ein + +DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "..", "data", "irs", "bmf") +PARSER_NAME = "load_raw_bmf" +SOURCE_SYSTEM = "irs_bmf" + +BMF_COLUMNS = [ + "ein", + "name", + "ico", + "street", + "city", + "state", + "zip", + "grp", + "subsection", + "affiliation", + "classification", + "ruling", + "deductibility", + "foundation", + "activity", + "organization", + "status", + "tax_period", + "asset_cd", + "income_cd", + "filing_req_cd", + "pf_filing_req_cd", + "acct_pd", + "asset_amt", + "income_amt", + "revenue_amt", + "ntee_cd", + "sort_name", + "source_file", + "ingest_run_id", +] + + +def parse_bigint(value): + if value is None: + return None + s = str(value).strip() + if not s: + return None + return int(s) + + +def row_to_record(row, source_file, ingest_run_id): + ein = normalize_ein(row.get("EIN")) + if not ein: + raise ValueError(f"Cannot normalize EIN: {row.get('EIN')!r}") + + return { + "ein": ein, + "name": row.get("NAME") or None, + "ico": row.get("ICO") or None, + "street": row.get("STREET") or None, + "city": row.get("CITY") or None, + "state": row.get("STATE") or None, + "zip": row.get("ZIP") or None, + "grp": row.get("GROUP") or None, + "subsection": row.get("SUBSECTION") or None, + "affiliation": row.get("AFFILIATION") or None, + "classification": row.get("CLASSIFICATION") or None, + "ruling": row.get("RULING") or None, + "deductibility": row.get("DEDUCTIBILITY") or None, + "foundation": row.get("FOUNDATION") or None, + "activity": row.get("ACTIVITY") or None, + "organization": row.get("ORGANIZATION") or None, + "status": row.get("STATUS") or None, + "tax_period": row.get("TAX_PERIOD") or None, + "asset_cd": row.get("ASSET_CD") or None, + "income_cd": row.get("INCOME_CD") or None, + "filing_req_cd": row.get("FILING_REQ_CD") or None, + "pf_filing_req_cd": row.get("PF_FILING_REQ_CD") or None, + "acct_pd": row.get("ACCT_PD") or None, + "asset_amt": parse_bigint(row.get("ASSET_AMT")), + "income_amt": parse_bigint(row.get("INCOME_AMT")), + "revenue_amt": parse_bigint(row.get("REVENUE_AMT")), + "ntee_cd": row.get("NTEE_CD") or None, + "sort_name": row.get("SORT_NAME") or None, + "source_file": source_file, + "ingest_run_id": ingest_run_id, + } + + +def upsert_records(records): + if not records: + return 0 + + placeholders = ", ".join(["%s"] * len(BMF_COLUMNS)) + sql = ( + f"INSERT INTO raw.bmf ({', '.join(BMF_COLUMNS)}) " + f"VALUES ({placeholders}) " + "ON CONFLICT (ein) DO UPDATE SET " + "name = EXCLUDED.name, " + "ico = EXCLUDED.ico, " + "street = EXCLUDED.street, " + "city = EXCLUDED.city, " + "state = EXCLUDED.state, " + "zip = EXCLUDED.zip, " + "grp = EXCLUDED.grp, " + "subsection = EXCLUDED.subsection, " + "affiliation = EXCLUDED.affiliation, " + "classification = EXCLUDED.classification, " + "ruling = EXCLUDED.ruling, " + "deductibility = EXCLUDED.deductibility, " + "foundation = EXCLUDED.foundation, " + "activity = EXCLUDED.activity, " + "organization = EXCLUDED.organization, " + "status = EXCLUDED.status, " + "tax_period = EXCLUDED.tax_period, " + "asset_cd = EXCLUDED.asset_cd, " + "income_cd = EXCLUDED.income_cd, " + "filing_req_cd = EXCLUDED.filing_req_cd, " + "pf_filing_req_cd = EXCLUDED.pf_filing_req_cd, " + "acct_pd = EXCLUDED.acct_pd, " + "asset_amt = EXCLUDED.asset_amt, " + "income_amt = EXCLUDED.income_amt, " + "revenue_amt = EXCLUDED.revenue_amt, " + "ntee_cd = EXCLUDED.ntee_cd, " + "sort_name = EXCLUDED.sort_name, " + "source_file = EXCLUDED.source_file, " + "ingest_run_id = EXCLUDED.ingest_run_id" + ) + + values = [[record.get(col) for col in BMF_COLUMNS] for record in records] + + def _do(conn): + with conn.cursor() as cur: + execute_batch(cur, sql, values, page_size=1000) + return len(values) + + return execute_transaction(_do) + + +def discover_files(args): + if args: + return args + return sorted( + os.path.join(DATA_DIR, name) + for name in os.listdir(DATA_DIR) + if name.endswith(".csv") + ) + + +def main(): + files = discover_files(sys.argv[1:]) + if not files: + print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(path) for path in files) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + files_scanned = 0 + files_matched = 0 + rows_inserted = 0 + + try: + for csv_path in files: + basename = os.path.basename(csv_path) + files_scanned += 1 + print(f"Loading {basename}...") + + batch = [] + file_rows = 0 + file_errors = 0 + + with open(csv_path, newline="", encoding="utf-8-sig") as f: + reader = csv.DictReader(f) + for line_number, row in enumerate(reader, start=2): + try: + batch.append(row_to_record(row, basename, ingest_run_id)) + except Exception as e: + file_errors += 1 + log_ingest_error( + ingest_run_id, + basename, + f"line:{line_number}", + e, + source_document_id=row.get("EIN"), + stage="normalize_row", + ) + continue + + if len(batch) >= 5000: + rows_inserted += upsert_records(batch) + file_rows += len(batch) + batch = [] + + if batch: + rows_inserted += upsert_records(batch) + file_rows += len(batch) + + files_matched += 1 + print(f" {basename}: {file_rows:,} rows loaded, {file_errors} errors") + + finish_ingest_run(ingest_run_id, files_scanned, files_matched, rows_inserted) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print( + f"Done. {files_scanned} files scanned, " + f"{files_matched} files loaded, {rows_inserted:,} rows upserted." + ) + + +if __name__ == "__main__": + main() |
