""" Parse IRS Form 990 XML files into the new raw schema. Populates: raw.filing, raw.filing_source, raw.form_990, raw.grant_990, raw.schedule_o. Usage: python -m scripts.parse.irs_990 data/irs/xml-zips/*.zip python -m scripts.parse.irs_990 data/irs/xml-missing/202100139349100100_public.xml """ import io import os import sys import zipfile from lxml import etree from scripts.common.db import execute_transaction from scripts.common.normalize import normalize_ein, parse_numeric, is_placeholder from scripts.common.xml import ( NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata, ) from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.filing import ( upsert_raw_filing, record_raw_filing_source, ) PARSER_NAME = "parse_irs_990" SOURCE_SYSTEM = "irs_xml" # Standalone XML files use this as source_archive STANDALONE_ARCHIVE = "__standalone__" # Module-level truthy set used by every classification check. _TRUTHY = {"X", "x", "1", "true"} # IRS ReturnTypeCd values that map to Form 990 (includes amended return variant). FORM_990_RETURN_TYPES = {"990", "990A"} # Schedule I RecipientTable rows (one xpath — modern schema only). GRANT_XPATHS = [ f".//{{{NS}}}IRS990ScheduleI/{{{NS}}}RecipientTable", ] SCHEDULE_I_XPATH = f".//{{{NS}}}IRS990ScheduleI" SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO" # ============================================================ # Grant extraction (Schedule I RecipientTable) # ============================================================ def extract_grant(g, line_number): """Extract a single Schedule I grant row from a RecipientTable element. Child element names vary slightly across IRS schema versions, so each field tries the modern tag first and falls back to the older variant. Returns None for stub rows that lack both a recipient name and any grant amount — these appear in real filings as malformed RecipientTable entries (e.g. just a PurposeOfGrantTxt with no recipient or amount). """ if len(g) == 0: return None cash_raw = text(g, "irs:CashGrantAmt") non_cash_raw = text(g, "irs:NonCashAssistanceAmt") recipient_name = ( text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1Txt") or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine1") ) recipient_name2 = ( text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2Txt") or text(g, "irs:RecipientBusinessName/irs:BusinessNameLine2") ) if (recipient_name is None and recipient_name2 is None and cash_raw is None and non_cash_raw is None): return None return { "line_number": line_number, "recipient_name": recipient_name, "recipient_name2": recipient_name2, "recipient_ein": normalize_ein(text(g, "irs:RecipientEIN")), "address_line1": ( text(g, "irs:USAddress/irs:AddressLine1Txt") or text(g, "irs:USAddress/irs:AddressLine1") or text(g, "irs:ForeignAddress/irs:AddressLine1Txt") or text(g, "irs:ForeignAddress/irs:AddressLine1") ), "address_line2": ( text(g, "irs:USAddress/irs:AddressLine2Txt") or text(g, "irs:USAddress/irs:AddressLine2") or text(g, "irs:ForeignAddress/irs:AddressLine2Txt") or text(g, "irs:ForeignAddress/irs:AddressLine2") ), "city": ( text(g, "irs:USAddress/irs:CityNm") or text(g, "irs:USAddress/irs:City") or text(g, "irs:ForeignAddress/irs:CityNm") or text(g, "irs:ForeignAddress/irs:City") ), "state": ( text(g, "irs:USAddress/irs:StateAbbreviationCd") or text(g, "irs:USAddress/irs:State") or text(g, "irs:ForeignAddress/irs:ProvinceOrStateNm") ), "zip": ( text(g, "irs:USAddress/irs:ZIPCd") or text(g, "irs:USAddress/irs:ZIPCode") ), "country": text(g, "irs:ForeignAddress/irs:CountryCd"), "foreign_postal_code": text(g, "irs:ForeignAddress/irs:ForeignPostalCd"), "cash_grant_amt_raw": cash_raw, "cash_grant_amt": parse_numeric(cash_raw), "non_cash_amt_raw": non_cash_raw, "non_cash_amt": parse_numeric(non_cash_raw), "non_cash_desc": text(g, "irs:NonCashAssistanceDesc"), "valuation_method": text(g, "irs:ValuationMethodUsedDesc"), "purpose": text(g, "irs:PurposeOfGrantTxt"), "irc_section": text(g, "irs:IRCSectionDesc"), } def find_all_grants(tree): """Find all Schedule I RecipientTable elements.""" grants = [] for xpath in GRANT_XPATHS: grants.extend(tree.findall(xpath)) return grants # ============================================================ # Schedule O narrative extraction # ============================================================ def extract_schedule_o(tree): """Extract every SupplementalInformationDetail entry from Schedule O. Returns a list of dicts (without raw_filing_id — added in process_filing). line_number is the 1-based ordinal position among emitted rows. Entries with no narrative text in any of the explanation fields are skipped to avoid emitting low-signal placeholder rows. """ rows = [] sched_o = tree.find(SCHEDULE_O_XPATH) if sched_o is None: return rows details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail") for d in details: explanation = ( text(d, "irs:ExplanationTxt") or text(d, "irs:MediumExplanationTxt") or text(d, "irs:ShortExplanationTxt") ) if explanation is None: continue rows.append({ "line_number": len(rows) + 1, "form_line_ref": text(d, "irs:FormAndLineReferenceDesc"), "explanation": explanation, }) return rows # ============================================================ # Form 990 summary extraction # ============================================================ def extract_form_990(tree): """Extract filing-level summary fields for raw.form_990.""" f990 = f".//{{{NS}}}IRS990" # Filer address from ReturnHeader (with old-schema fallbacks) filer_addr = { "filer_name2": ( text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt") or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2") ), "filer_address_line1": ( text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt") or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1") ), "filer_address_line2": ( text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2Txt") or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine2") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2Txt") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine2") ), "filer_city": ( text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm") or text(tree, ".//irs:Filer/irs:USAddress/irs:City") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City") ), "filer_state": ( text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd") or text(tree, ".//irs:Filer/irs:USAddress/irs:State") or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm") ), "filer_zip": ( text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd") or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode") ), "filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"), "filer_foreign_postal_code": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ForeignPostalCd"), "phone": text(tree, ".//irs:Filer/irs:PhoneNum"), "website": text(tree, f"{f990}/irs:WebsiteAddressTxt"), } # Classification org_type = None if text(tree, f"{f990}/irs:TypeOfOrganizationCorpInd") in _TRUTHY: org_type = "corp" elif text(tree, f"{f990}/irs:TypeOfOrganizationTrustInd") in _TRUTHY: org_type = "trust" elif text(tree, f"{f990}/irs:TypeOfOrganizationAssocInd") in _TRUTHY: org_type = "assoc" elif text(tree, f"{f990}/irs:TypeOfOrganizationOtherInd") in _TRUTHY: org_type = "other" method_cash = text(tree, f"{f990}/irs:MethodOfAccountingCashInd") method_accrual = text(tree, f"{f990}/irs:MethodOfAccountingAccrualInd") method_other = text(tree, f"{f990}/irs:MethodOfAccountingOtherInd") if method_cash in _TRUTHY: accounting_method = "cash" elif method_accrual in _TRUTHY: accounting_method = "accrual" elif method_other in _TRUTHY: accounting_method = "other" else: accounting_method = None # The 501(c) subsection number lives in an attribute, not in the # element body. (The body is typically "X" — the checkbox indicator.) # A 501(c)(3) filing sets Organization501c3Ind instead and leaves this # attribute unset; we deliberately don't infer "3" in that case, to # keep raw provenance honest. section_501c_type = None el_501c = tree.find(f"{f990}/irs:Organization501cInd", NS_MAP) if el_501c is not None: section_501c_type = el_501c.get("organization501cTypeTxt") classification = { "is_501c3": text_bool(tree, f"{f990}/irs:Organization501c3Ind"), "section_501c_type": section_501c_type, "org_type": org_type, "group_return": text_bool(tree, f"{f990}/irs:GroupReturnForAffiliatesInd"), "group_exemption_num": text(tree, f"{f990}/irs:GroupExemptionNum"), "formation_year": text(tree, f"{f990}/irs:FormationYr"), "legal_domicile_state": text(tree, f"{f990}/irs:LegalDomicileStateCd"), "mission": ( text(tree, f"{f990}/irs:ActivityOrMissionDesc") or text(tree, f"{f990}/irs:MissionDesc") ), "accounting_method": accounting_method, } # Filing status flags status = { "is_amended": text_bool(tree, f"{f990}/irs:AmendedReturnInd"), "is_initial": text_bool(tree, f"{f990}/irs:InitialReturnInd"), "is_final": text_bool(tree, f"{f990}/irs:FinalReturnInd"), "is_terminated": text_bool(tree, f"{f990}/irs:TerminateOperationsInd"), } # Part I: current year summary part_i_cy = {} part_i_cy_fields = { "gross_receipts": "GrossReceiptsAmt", "cy_contributions_grants": "CYContributionsGrantsAmt", "cy_program_service_revenue": "CYProgramServiceRevenueAmt", "cy_investment_income": "CYInvestmentIncomeAmt", "cy_other_revenue": "CYOtherRevenueAmt", "cy_total_revenue": "CYTotalRevenueAmt", "cy_grants_paid": "CYGrantsAndSimilarPaidAmt", "cy_benefits_to_members": "CYBenefitsPaidToMembersAmt", "cy_salaries_benefits": "CYSalariesCompEmpBnftPaidAmt", "cy_fundraising_expense": "CYTotalFundraisingExpenseAmt", "cy_other_expenses": "CYOtherExpensesAmt", "cy_total_expenses": "CYTotalExpensesAmt", "cy_revenue_less_expenses": "CYRevenuesLessExpensesAmt", } for col, elem in part_i_cy_fields.items(): part_i_cy[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) # Part I: prior year summary part_i_py = { "py_total_revenue": parse_numeric(text(tree, f"{f990}/irs:PYTotalRevenueAmt")), "py_total_expenses": parse_numeric(text(tree, f"{f990}/irs:PYTotalExpensesAmt")), } # Balance sheet (Part I summary / Part X) balance_sheet = {} bs_fields = { "total_assets_boy": "TotalAssetsBOYAmt", "total_assets_eoy": "TotalAssetsEOYAmt", "total_liabilities_boy": "TotalLiabilitiesBOYAmt", "total_liabilities_eoy": "TotalLiabilitiesEOYAmt", "net_assets_boy": "NetAssetsOrFundBalancesBOYAmt", "net_assets_eoy": "NetAssetsOrFundBalancesEOYAmt", } for col, elem in bs_fields.items(): balance_sheet[col] = parse_numeric(text(tree, f"{f990}/irs:{elem}")) # Governance / workforce workforce = { "total_employees": parse_numeric(text(tree, f"{f990}/irs:TotalEmployeeCnt")), "total_volunteers": parse_numeric(text(tree, f"{f990}/irs:TotalVolunteersCnt")), "voting_members": parse_numeric( text(tree, f"{f990}/irs:VotingMembersGoverningBodyCnt") or text(tree, f"{f990}/irs:GoverningBodyVotingMembersCnt") ), "independent_voting_members": parse_numeric( text(tree, f"{f990}/irs:VotingMembersIndependentCnt") or text(tree, f"{f990}/irs:IndependentVotingMemberCnt") ), } # Part IX: functional expense breakdown func_exp = { "program_services_expense": parse_numeric( text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ProgramServicesAmt") ), "management_general_expense": parse_numeric( text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:ManagementAndGeneralAmt") ), "fundraising_expense_ix": parse_numeric( text(tree, f"{f990}/irs:TotalFunctionalExpensesGrp/irs:FundraisingAmt") ), } # Part VIII: revenue detail revenue_detail = { "government_grants": parse_numeric(text(tree, f"{f990}/irs:GovernmentGrantsAmt")), "total_contributions": parse_numeric(text(tree, f"{f990}/irs:TotalContributionsAmt")), "total_program_service_rev": parse_numeric( text(tree, f"{f990}/irs:TotalProgramServiceRevenueAmt") ), "investment_income": parse_numeric( text(tree, f"{f990}/irs:InvestmentIncomeGrp/irs:TotalRevenueColumnAmt") ), } # UBI ubi = { "gross_ubi": parse_numeric(text(tree, f"{f990}/irs:TotalGrossUBIAmt")), "net_ubi": parse_numeric(text(tree, f"{f990}/irs:NetUnrelatedBusTxblIncmAmt")), } # Schedule I metadata (1:1 with the filing) sched_i_el = tree.find(SCHEDULE_I_XPATH) if sched_i_el is not None: sched_i = { "sched_i_grant_records_maintained": text_bool(sched_i_el, "irs:GrantRecordsMaintainedInd"), "sched_i_501c3_org_count": parse_numeric(text(sched_i_el, "irs:Total501c3OrgCnt")), "sched_i_other_org_count": parse_numeric(text(sched_i_el, "irs:TotalOtherOrgCnt")), "sched_i_total_grants_amt": None, } else: sched_i = { "sched_i_grant_records_maintained": None, "sched_i_501c3_org_count": None, "sched_i_other_org_count": None, "sched_i_total_grants_amt": None, } # Officer / signer officer = { "principal_officer": text(tree, f"{f990}/irs:PrincipalOfficerNm"), "officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"), "officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"), "signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"), "preparer_firm": ( text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt") or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1") ), } return { **filer_addr, **classification, **status, **part_i_cy, **part_i_py, **balance_sheet, **workforce, **func_exp, **revenue_detail, **ubi, **sched_i, **officer, } # ============================================================ # Grant detail status # ============================================================ def compute_grant_detail_status(sched_i_element, grant_elements, grant_rows, cy_grants_paid): """Determine grant detail completeness for a Form 990 filing. Uses filing-level context (Schedule I presence + Part I CYGrantsAndSimilarPaidAmt) to distinguish "no Schedule I because no grants" from "no Schedule I but grants reported on Part I". """ if sched_i_element is None: # No Schedule I — but check whether Part I reports grants paid. if cy_grants_paid is not None and float(cy_grants_paid) > 0: return "unresolved" return "no_grants" if not grant_elements: return "unresolved" if not grant_rows: return "unresolved" placeholder_count = sum( 1 for r in grant_rows if is_placeholder(r.get("recipient_name")) or (is_placeholder(r.get("cash_grant_amt_raw")) and is_placeholder(r.get("non_cash_amt_raw"))) ) if placeholder_count == len(grant_rows): return "placeholder_only" if placeholder_count > 0: return "see_attached" return "complete" # ============================================================ # Per-filing processing # ============================================================ GRANT_COLUMNS = [ "raw_filing_id", "line_number", "recipient_name", "recipient_name2", "recipient_ein", "address_line1", "address_line2", "city", "state", "zip", "country", "foreign_postal_code", "cash_grant_amt_raw", "cash_grant_amt", "non_cash_amt_raw", "non_cash_amt", "non_cash_desc", "valuation_method", "purpose", "irc_section", ] SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"] def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id): """Process a single Form 990 filing. All child writes are transactional.""" # Extract filing metadata metadata = extract_filing_metadata(tree) # Locate Schedule I element (used both for grant extraction and status logic) sched_i_element = tree.find(SCHEDULE_I_XPATH) # Extract grants from Schedule I RecipientTable rows grant_elements = find_all_grants(tree) extracted_grants = [] for i, g in enumerate(grant_elements, start=1): row = extract_grant(g, i) if row is not None: extracted_grants.append(row) # Extract Schedule O narrative entries schedule_o_entries = extract_schedule_o(tree) # Extract form summary form_data = extract_form_990(tree) form_data["grant_detail_status"] = compute_grant_detail_status( sched_i_element, grant_elements, extracted_grants, form_data.get("cy_grants_paid"), ) root = tree.getroot() return_header = root.find(f"{{{NS}}}ReturnHeader") return_data = root.find(f"{{{NS}}}ReturnData") def _do(conn): raw_filing_id = upsert_raw_filing( SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn ) record_raw_filing_source( raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn ) filing_form_data = {**form_data, "raw_filing_id": raw_filing_id} grant_rows = [ {**row, "raw_filing_id": raw_filing_id} for row in extracted_grants ] schedule_o_rows = [ {**row, "raw_filing_id": raw_filing_id} for row in schedule_o_entries ] _replace_children( conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows, ) # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows) return execute_transaction(_do) def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows): """Delete and re-insert all child rows for a filing using the caller's transaction.""" form_columns = list(form_data.keys()) form_placeholders = ", ".join(["%s"] * len(form_columns)) form_values = [form_data[col] for col in form_columns] grant_placeholders = ", ".join(["%s"] * len(GRANT_COLUMNS)) schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS)) with conn.cursor() as cur: # Delete old child rows cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,)) cur.execute("DELETE FROM raw.grant_990 WHERE raw_filing_id = %s", (raw_filing_id,)) cur.execute("DELETE FROM raw.form_990 WHERE raw_filing_id = %s", (raw_filing_id,)) # Insert form summary cur.execute( f"INSERT INTO raw.form_990 ({', '.join(form_columns)}) " f"VALUES ({form_placeholders})", form_values, ) # Insert grants if grant_rows: from psycopg2.extras import execute_batch grant_values = [ [row.get(col) for col in GRANT_COLUMNS] for row in grant_rows ] execute_batch( cur, f"INSERT INTO raw.grant_990 ({', '.join(GRANT_COLUMNS)}) " f"VALUES ({grant_placeholders})", grant_values, ) # Insert Schedule O narrative if schedule_o_rows: from psycopg2.extras import execute_batch schedule_o_values = [ [row.get(col) for col in SCHEDULE_O_COLUMNS] for row in schedule_o_rows ] execute_batch( cur, f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) " f"VALUES ({schedule_o_placeholders})", schedule_o_values, ) # Insert XML fields # ============================================================ # ZIP / file processing # ============================================================ def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id): """Parse XML bytes and process if it's a Form 990. Returns rows inserted or None if skipped.""" try: tree = etree.parse(io.BytesIO(xml_bytes)) except etree.XMLSyntaxError as e: log_ingest_error(ingest_run_id, source_archive, source_path, f"XML parse error: {e}", stage="parse_xml") return None ret_type = text(tree, ".//irs:ReturnTypeCd") if ret_type not in FORM_990_RETURN_TYPES: return None source_document_id = None try: source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path) return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id) except Exception as e: stage = "process_filing" if source_document_id else "derive_source_document_id" log_ingest_error(ingest_run_id, source_archive, source_path, e, source_document_id=source_document_id, stage=stage) raise def process_zip(zip_path, ingest_run_id): """Process all XMLs in a ZIP file.""" basename = os.path.basename(zip_path) try: zf = zipfile.ZipFile(zip_path) except zipfile.BadZipFile as e: log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip") print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr) return 0, 0, 0 with zf: names = [n for n in zf.namelist() if n.endswith(".xml")] print(f"Processing {basename}: {len(names)} XML files") files_scanned = 0 files_matched = 0 total_rows = 0 for i, name in enumerate(names): # Count every ZIP member as scanned, even ones we fail to read — # otherwise read failures silently shrink the scanned total and # make run-level metrics misleading. files_scanned += 1 try: xml_bytes = zf.read(name) except Exception as e: log_ingest_error(ingest_run_id, basename, name, e, stage="read") continue try: rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id) except Exception as e: print(f" ERROR in {name}: {e}", file=sys.stderr) continue if rows is not None: files_matched += 1 total_rows += rows if (i + 1) % 1000 == 0: print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") return files_scanned, files_matched, total_rows def main(): args = sys.argv[1:] if not args: print(f"Usage: python -m scripts.parse.irs_990 ", file=sys.stderr) sys.exit(1) notes = " ".join(os.path.basename(a) for a in args) ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) grand_scanned = 0 grand_matched = 0 grand_rows = 0 try: for path in args: if path.endswith(".zip"): scanned, matched, rows = process_zip(path, ingest_run_id) grand_scanned += scanned grand_matched += matched grand_rows += rows elif path.endswith(".xml"): xml_name = os.path.basename(path) # Count before I/O, so read failures still show up in scanned. grand_scanned += 1 try: with open(path, "rb") as f: xml_bytes = f.read() except Exception as e: log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read") print(f"ERROR reading {path}: {e}", file=sys.stderr) continue try: rows = process_xml_bytes( xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id ) except Exception as e: print(f"ERROR in {path}: {e}", file=sys.stderr) continue if rows is not None: grand_matched += 1 grand_rows += rows else: print(f"Skipping unknown file type: {path}", file=sys.stderr) finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows) except Exception: fail_ingest_run(ingest_run_id) raise print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.") if __name__ == "__main__": main()