"""
Parse IRS Form 990-EZ XML files into the new raw schema.
Populates: raw.filing, raw.filing_source, raw.form_990ez,
raw.schedule_o.
Form 990-EZ has no structured grant recipient table — line 10 of Part I
aggregates grants into a single amount and instructs the filer to "list
in Schedule O." v1 scope is preservation (filing facts + Schedule O),
not grant normalization.
Usage:
python -m scripts.parse.irs_990ez data/irs/xml-zips/*.zip
python -m scripts.parse.irs_990ez data/irs/xml-missing/202100139349100100_public.xml
"""
import io
import os
import sys
import zipfile
from lxml import etree
from scripts.common.db import execute_transaction
from scripts.common.normalize import parse_numeric
from scripts.common.xml import (
NS, NS_MAP, text, text_bool, derive_source_document_id, extract_filing_metadata,
)
from scripts.common.ingest import (
start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
)
from scripts.common.filing import (
upsert_raw_filing, record_raw_filing_source, get_seen_source_paths,
)
PARSER_NAME = "parse_irs_990ez"
SOURCE_SYSTEM = "irs_xml"
# Standalone XML files use this as source_archive
STANDALONE_ARCHIVE = "__standalone__"
# IRS ReturnTypeCd values that map to Form 990-EZ (includes amended variant).
FORM_990EZ_RETURN_TYPES = {"990EZ", "990EA"}
SCHEDULE_O_XPATH = f".//{{{NS}}}IRS990ScheduleO"
# ============================================================
# Schedule O narrative extraction
# ============================================================
def extract_schedule_o(tree):
"""Extract every SupplementalInformationDetail entry from Schedule O.
Returns a list of dicts (without raw_filing_id — added in process_filing).
line_number is the 1-based ordinal position among emitted rows. Entries
with no narrative text in any of the explanation fields are skipped to
avoid emitting low-signal placeholder rows.
"""
rows = []
sched_o = tree.find(SCHEDULE_O_XPATH)
if sched_o is None:
return rows
details = sched_o.findall(f"{{{NS}}}SupplementalInformationDetail")
for d in details:
explanation = (
text(d, "irs:ExplanationTxt")
or text(d, "irs:MediumExplanationTxt")
or text(d, "irs:ShortExplanationTxt")
)
if explanation is None:
continue
rows.append({
"line_number": len(rows) + 1,
"form_line_ref": text(d, "irs:FormAndLineReferenceDesc"),
"explanation": explanation,
})
return rows
# ============================================================
# Form 990-EZ summary extraction
# ============================================================
def extract_form_990ez(tree):
"""Extract filing-level summary fields for raw.form_990ez."""
ez = f".//{{{NS}}}IRS990EZ"
# Filer identity from ReturnHeader (with old-schema fallbacks).
filer = {
"filer_name2": (
text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2Txt")
or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine2")
),
"filer_address_line1": (
text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1Txt")
or text(tree, ".//irs:Filer/irs:USAddress/irs:AddressLine1")
or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1Txt")
or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:AddressLine1")
),
"filer_city": (
text(tree, ".//irs:Filer/irs:USAddress/irs:CityNm")
or text(tree, ".//irs:Filer/irs:USAddress/irs:City")
or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CityNm")
or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:City")
),
"filer_state": (
text(tree, ".//irs:Filer/irs:USAddress/irs:StateAbbreviationCd")
or text(tree, ".//irs:Filer/irs:USAddress/irs:State")
or text(tree, ".//irs:Filer/irs:ForeignAddress/irs:ProvinceOrStateNm")
),
"filer_zip": (
text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCd")
or text(tree, ".//irs:Filer/irs:USAddress/irs:ZIPCode")
),
"filer_country": text(tree, ".//irs:Filer/irs:ForeignAddress/irs:CountryCd"),
"phone": text(tree, ".//irs:Filer/irs:PhoneNum"),
"website": text(tree, f"{ez}/irs:WebsiteAddressTxt"),
}
# Classification. The 990-EZ schema puts the 501(c) subsection in an
# attribute on Organization501cInd (e.g. ), not in element text. A 501(c)(3) filing
# instead sets X with no
# separate subsection element — in that case we leave section_501c_type
# NULL rather than inferring "3" from the boolean flag.
section_501c_type = None
el_501c = tree.find(f"{ez}/irs:Organization501cInd", NS_MAP)
if el_501c is not None:
section_501c_type = el_501c.get("organization501cTypeTxt")
classification = {
"is_501c3": text_bool(tree, f"{ez}/irs:Organization501c3Ind"),
"section_501c_type": section_501c_type,
"group_exemption_num": text(tree, f"{ez}/irs:GroupExemptionNum"),
}
# Filing status flags
status = {
"is_amended": text_bool(tree, f"{ez}/irs:AmendedReturnInd"),
"is_initial": text_bool(tree, f"{ez}/irs:InitialReturnInd"),
"is_final": text_bool(tree, f"{ez}/irs:FinalReturnInd"),
}
# Part I: revenue / expenses
part_i = {}
part_i_fields = {
"gross_receipts": "GrossReceiptsAmt",
"contributions_gifts_grants": "ContributionsGiftsGrantsEtcAmt",
"program_service_revenue": "ProgramServiceRevenueAmt",
"investment_income": "InvestmentIncomeAmt",
"total_revenue": "TotalRevenueAmt",
"grants_paid": "GrantsAndSimilarAmountsPaidAmt",
"salaries_compensation": "SalariesOtherCompEmplBnftAmt",
"total_expenses": "TotalExpensesAmt",
"revenue_less_expenses": "ExcessOrDeficitForYearAmt",
}
for col, elem in part_i_fields.items():
part_i[col] = parse_numeric(text(tree, f"{ez}/irs:{elem}"))
# Part II: balance sheet (BOY/EOY child elements under group wrappers).
balance_sheet = {
"total_assets_boy": parse_numeric(
text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:BOYAmt")
),
"total_assets_eoy": parse_numeric(
text(tree, f"{ez}/irs:Form990TotalAssetsGrp/irs:EOYAmt")
),
"total_liabilities_boy": parse_numeric(
text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:BOYAmt")
),
"total_liabilities_eoy": parse_numeric(
text(tree, f"{ez}/irs:SumOfTotalLiabilitiesGrp/irs:EOYAmt")
),
"net_assets_boy": parse_numeric(
text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:BOYAmt")
),
"net_assets_eoy": parse_numeric(
text(tree, f"{ez}/irs:NetAssetsOrFundBalancesGrp/irs:EOYAmt")
),
}
# Schedule O presence
sched_o_presence = {
"has_schedule_o": tree.find(SCHEDULE_O_XPATH) is not None,
}
# Officer / signer
officer = {
"officer_name": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonNm"),
"officer_title": text(tree, ".//irs:BusinessOfficerGrp/irs:PersonTitleTxt"),
"signature_date": text(tree, ".//irs:BusinessOfficerGrp/irs:SignatureDt"),
"preparer_firm": (
text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1Txt")
or text(tree, ".//irs:PreparerFirmGrp/irs:PreparerFirmName/irs:BusinessNameLine1")
),
}
return {
**filer, **classification, **status,
**part_i, **balance_sheet,
**sched_o_presence, **officer,
}
# ============================================================
# Grant detail status
# ============================================================
def compute_grant_detail_status(form_data, schedule_o_rows):
"""Determine grant detail completeness for a Form 990-EZ filing.
990-EZ has no recipient table to inspect. Line 10 (grants_paid) is an
aggregate and grant detail is instructed to be listed in Schedule O, but
the raw XML gives us no way to tell that a given Schedule O entry is
*the* line-10 narrative rather than unrelated supplemental text. For a
provenance-strict raw layer we therefore only report what we can verify
from the structured fields:
- no_grants: line 10 is null or 0 (no grants reported).
- unresolved: line 10 is positive. Whether that detail lives in Schedule
O, an attachment, or nowhere is left to a downstream classifier.
The schedule_o_rows argument is intentionally unused; it's kept in the
signature so callers (and a future classifier) have a single place to
evolve the logic.
"""
del schedule_o_rows # intentionally unused — see docstring
grants_paid = form_data.get("grants_paid")
if grants_paid is None or float(grants_paid) == 0:
return "no_grants"
return "unresolved"
# ============================================================
# Per-filing processing
# ============================================================
SCHEDULE_O_COLUMNS = ["raw_filing_id", "line_number", "form_line_ref", "explanation"]
def process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id):
"""Process a single Form 990-EZ filing. All child writes are transactional."""
metadata = extract_filing_metadata(tree)
schedule_o_entries = extract_schedule_o(tree)
form_data = extract_form_990ez(tree)
form_data["grant_detail_status"] = compute_grant_detail_status(
form_data, schedule_o_entries,
)
root = tree.getroot()
return_header = root.find(f"{{{NS}}}ReturnHeader")
return_data = root.find(f"{{{NS}}}ReturnData")
def _do(conn):
raw_filing_id = upsert_raw_filing(
SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn
)
record_raw_filing_source(
raw_filing_id, ingest_run_id, source_archive, source_path, conn=conn
)
filing_form_data = {**form_data, "raw_filing_id": raw_filing_id}
schedule_o_rows = [
{**row, "raw_filing_id": raw_filing_id}
for row in schedule_o_entries
]
_replace_children(
conn, raw_filing_id, filing_form_data, schedule_o_rows,
)
# 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o
return 3 + len(schedule_o_rows)
return execute_transaction(_do)
def _replace_children(conn, raw_filing_id, form_data, schedule_o_rows):
"""Delete and re-insert all child rows for a filing using the caller's transaction."""
form_columns = list(form_data.keys())
form_placeholders = ", ".join(["%s"] * len(form_columns))
form_values = [form_data[col] for col in form_columns]
schedule_o_placeholders = ", ".join(["%s"] * len(SCHEDULE_O_COLUMNS))
with conn.cursor() as cur:
# Delete old child rows
cur.execute("DELETE FROM raw.schedule_o WHERE raw_filing_id = %s", (raw_filing_id,))
cur.execute("DELETE FROM raw.form_990ez WHERE raw_filing_id = %s", (raw_filing_id,))
# Insert form summary
cur.execute(
f"INSERT INTO raw.form_990ez ({', '.join(form_columns)}) "
f"VALUES ({form_placeholders})",
form_values,
)
# Insert Schedule O narrative
if schedule_o_rows:
from psycopg2.extras import execute_batch
schedule_o_values = [
[row.get(col) for col in SCHEDULE_O_COLUMNS]
for row in schedule_o_rows
]
execute_batch(
cur,
f"INSERT INTO raw.schedule_o ({', '.join(SCHEDULE_O_COLUMNS)}) "
f"VALUES ({schedule_o_placeholders})",
schedule_o_values,
)
# Insert XML fields
# ============================================================
# ZIP / file processing
# ============================================================
def process_xml_bytes(xml_bytes, source_archive, source_path, ingest_run_id):
"""Parse XML bytes and process if it's a Form 990-EZ. Returns rows inserted or None if skipped."""
try:
tree = etree.parse(io.BytesIO(xml_bytes))
except etree.XMLSyntaxError as e:
log_ingest_error(ingest_run_id, source_archive, source_path,
f"XML parse error: {e}", stage="parse_xml")
return None
ret_type = text(tree, ".//irs:ReturnTypeCd")
if ret_type not in FORM_990EZ_RETURN_TYPES:
return None
source_document_id = None
try:
source_document_id = derive_source_document_id(SOURCE_SYSTEM, source_path)
return process_filing(tree, source_document_id, source_archive, source_path, ingest_run_id)
except Exception as e:
stage = "process_filing" if source_document_id else "derive_source_document_id"
log_ingest_error(ingest_run_id, source_archive, source_path, e,
source_document_id=source_document_id, stage=stage)
raise
def process_zip(zip_path, ingest_run_id):
"""Process all XMLs in a ZIP file."""
basename = os.path.basename(zip_path)
try:
zf = zipfile.ZipFile(zip_path)
except zipfile.BadZipFile as e:
log_ingest_error(ingest_run_id, basename, basename, e, stage="open_zip")
print(f"Skipping bad ZIP {basename}: {e}", file=sys.stderr)
return 0, 0, 0
with zf:
names = [n for n in zf.namelist() if n.endswith(".xml")]
seen_paths = get_seen_source_paths(basename)
skipped_existing = 0
print(
f"Processing {basename}: {len(names)} XML files "
f"({len(seen_paths)} already seen)"
)
files_scanned = 0
files_matched = 0
total_rows = 0
for i, name in enumerate(names):
if name in seen_paths:
skipped_existing += 1
if (i + 1) % 1000 == 0:
print(
f" ...{i + 1}/{len(names)} files, "
f"{skipped_existing} skipped existing, "
f"{files_matched} matched, {total_rows} rows"
)
continue
# Count every ZIP member as scanned, even ones we fail to read —
# otherwise read failures silently shrink the scanned total and
# make run-level metrics misleading.
files_scanned += 1
try:
xml_bytes = zf.read(name)
except Exception as e:
log_ingest_error(ingest_run_id, basename, name, e, stage="read")
continue
try:
rows = process_xml_bytes(xml_bytes, basename, name, ingest_run_id)
except Exception as e:
print(f" ERROR in {name}: {e}", file=sys.stderr)
continue
if rows is not None:
files_matched += 1
total_rows += rows
if (i + 1) % 1000 == 0:
print(
f" ...{i + 1}/{len(names)} files, "
f"{skipped_existing} skipped existing, "
f"{files_matched} matched, {total_rows} rows"
)
print(
f" Done: {files_scanned} scanned, {skipped_existing} skipped existing, "
f"{files_matched} matched, {total_rows} rows"
)
return files_scanned, files_matched, total_rows
def main():
args = sys.argv[1:]
if not args:
print(f"Usage: python -m scripts.parse.irs_990ez ", file=sys.stderr)
sys.exit(1)
notes = " ".join(os.path.basename(a) for a in args)
ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
grand_scanned = 0
grand_matched = 0
grand_rows = 0
try:
for path in args:
if path.endswith(".zip"):
scanned, matched, rows = process_zip(path, ingest_run_id)
grand_scanned += scanned
grand_matched += matched
grand_rows += rows
elif path.endswith(".xml"):
xml_name = os.path.basename(path)
# Count before I/O, so read failures still show up in scanned.
grand_scanned += 1
try:
with open(path, "rb") as f:
xml_bytes = f.read()
except Exception as e:
log_ingest_error(ingest_run_id, STANDALONE_ARCHIVE, xml_name, e, stage="read")
print(f"ERROR reading {path}: {e}", file=sys.stderr)
continue
try:
rows = process_xml_bytes(
xml_bytes, STANDALONE_ARCHIVE, xml_name, ingest_run_id
)
except Exception as e:
print(f"ERROR in {path}: {e}", file=sys.stderr)
continue
if rows is not None:
grand_matched += 1
grand_rows += rows
else:
print(f"Skipping unknown file type: {path}", file=sys.stderr)
finish_ingest_run(ingest_run_id, grand_scanned, grand_matched, grand_rows)
except Exception:
fail_ingest_run(ingest_run_id)
raise
print(f"\nDone. {grand_scanned} files scanned, {grand_matched} matched, {grand_rows} rows.")
if __name__ == "__main__":
main()