From 6605e2cc428e3bdaa174ccc432941eab8c5d61cb Mon Sep 17 00:00:00 2001 From: benj Date: Fri, 10 Apr 2026 11:13:57 +0800 Subject: ensure parsers do not parse and store raw XML fields --- scripts/parse/irs_bmf.py | 231 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 231 insertions(+) create mode 100644 scripts/parse/irs_bmf.py (limited to 'scripts/parse/irs_bmf.py') diff --git a/scripts/parse/irs_bmf.py b/scripts/parse/irs_bmf.py new file mode 100644 index 0000000..cf02575 --- /dev/null +++ b/scripts/parse/irs_bmf.py @@ -0,0 +1,231 @@ +""" +Load IRS Business Master File (BMF) CSVs into raw.bmf. + +Usage: + python -m scripts.parse.irs_bmf + python -m scripts.parse.irs_bmf data/irs/bmf/eo1.csv data/irs/bmf/eo2.csv +""" + +import csv +import os +import sys + +from psycopg2.extras import execute_batch + +from scripts.common.db import execute_transaction +from scripts.common.ingest import ( + start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, +) +from scripts.common.normalize import normalize_ein + +DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "..", "data", "irs", "bmf") +PARSER_NAME = "load_raw_bmf" +SOURCE_SYSTEM = "irs_bmf" + +BMF_COLUMNS = [ + "ein", + "name", + "ico", + "street", + "city", + "state", + "zip", + "grp", + "subsection", + "affiliation", + "classification", + "ruling", + "deductibility", + "foundation", + "activity", + "organization", + "status", + "tax_period", + "asset_cd", + "income_cd", + "filing_req_cd", + "pf_filing_req_cd", + "acct_pd", + "asset_amt", + "income_amt", + "revenue_amt", + "ntee_cd", + "sort_name", + "source_file", + "ingest_run_id", +] + + +def parse_bigint(value): + if value is None: + return None + s = str(value).strip() + if not s: + return None + return int(s) + + +def row_to_record(row, source_file, ingest_run_id): + ein = normalize_ein(row.get("EIN")) + if not ein: + raise ValueError(f"Cannot normalize EIN: {row.get('EIN')!r}") + + return { + "ein": ein, + "name": row.get("NAME") or None, + "ico": row.get("ICO") or None, + "street": row.get("STREET") or None, + "city": row.get("CITY") or None, + "state": row.get("STATE") or None, + "zip": row.get("ZIP") or None, + "grp": row.get("GROUP") or None, + "subsection": row.get("SUBSECTION") or None, + "affiliation": row.get("AFFILIATION") or None, + "classification": row.get("CLASSIFICATION") or None, + "ruling": row.get("RULING") or None, + "deductibility": row.get("DEDUCTIBILITY") or None, + "foundation": row.get("FOUNDATION") or None, + "activity": row.get("ACTIVITY") or None, + "organization": row.get("ORGANIZATION") or None, + "status": row.get("STATUS") or None, + "tax_period": row.get("TAX_PERIOD") or None, + "asset_cd": row.get("ASSET_CD") or None, + "income_cd": row.get("INCOME_CD") or None, + "filing_req_cd": row.get("FILING_REQ_CD") or None, + "pf_filing_req_cd": row.get("PF_FILING_REQ_CD") or None, + "acct_pd": row.get("ACCT_PD") or None, + "asset_amt": parse_bigint(row.get("ASSET_AMT")), + "income_amt": parse_bigint(row.get("INCOME_AMT")), + "revenue_amt": parse_bigint(row.get("REVENUE_AMT")), + "ntee_cd": row.get("NTEE_CD") or None, + "sort_name": row.get("SORT_NAME") or None, + "source_file": source_file, + "ingest_run_id": ingest_run_id, + } + + +def upsert_records(records): + if not records: + return 0 + + placeholders = ", ".join(["%s"] * len(BMF_COLUMNS)) + sql = ( + f"INSERT INTO raw.bmf ({', '.join(BMF_COLUMNS)}) " + f"VALUES ({placeholders}) " + "ON CONFLICT (ein) DO UPDATE SET " + "name = EXCLUDED.name, " + "ico = EXCLUDED.ico, " + "street = EXCLUDED.street, " + "city = EXCLUDED.city, " + "state = EXCLUDED.state, " + "zip = EXCLUDED.zip, " + "grp = EXCLUDED.grp, " + "subsection = EXCLUDED.subsection, " + "affiliation = EXCLUDED.affiliation, " + "classification = EXCLUDED.classification, " + "ruling = EXCLUDED.ruling, " + "deductibility = EXCLUDED.deductibility, " + "foundation = EXCLUDED.foundation, " + "activity = EXCLUDED.activity, " + "organization = EXCLUDED.organization, " + "status = EXCLUDED.status, " + "tax_period = EXCLUDED.tax_period, " + "asset_cd = EXCLUDED.asset_cd, " + "income_cd = EXCLUDED.income_cd, " + "filing_req_cd = EXCLUDED.filing_req_cd, " + "pf_filing_req_cd = EXCLUDED.pf_filing_req_cd, " + "acct_pd = EXCLUDED.acct_pd, " + "asset_amt = EXCLUDED.asset_amt, " + "income_amt = EXCLUDED.income_amt, " + "revenue_amt = EXCLUDED.revenue_amt, " + "ntee_cd = EXCLUDED.ntee_cd, " + "sort_name = EXCLUDED.sort_name, " + "source_file = EXCLUDED.source_file, " + "ingest_run_id = EXCLUDED.ingest_run_id" + ) + + values = [[record.get(col) for col in BMF_COLUMNS] for record in records] + + def _do(conn): + with conn.cursor() as cur: + execute_batch(cur, sql, values, page_size=1000) + return len(values) + + return execute_transaction(_do) + + +def discover_files(args): + if args: + return args + return sorted( + os.path.join(DATA_DIR, name) + for name in os.listdir(DATA_DIR) + if name.endswith(".csv") + ) + + +def main(): + files = discover_files(sys.argv[1:]) + if not files: + print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) + sys.exit(1) + + notes = " ".join(os.path.basename(path) for path in files) + ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) + + files_scanned = 0 + files_matched = 0 + rows_inserted = 0 + + try: + for csv_path in files: + basename = os.path.basename(csv_path) + files_scanned += 1 + print(f"Loading {basename}...") + + batch = [] + file_rows = 0 + file_errors = 0 + + with open(csv_path, newline="", encoding="utf-8-sig") as f: + reader = csv.DictReader(f) + for line_number, row in enumerate(reader, start=2): + try: + batch.append(row_to_record(row, basename, ingest_run_id)) + except Exception as e: + file_errors += 1 + log_ingest_error( + ingest_run_id, + basename, + f"line:{line_number}", + e, + source_document_id=row.get("EIN"), + stage="normalize_row", + ) + continue + + if len(batch) >= 5000: + rows_inserted += upsert_records(batch) + file_rows += len(batch) + batch = [] + + if batch: + rows_inserted += upsert_records(batch) + file_rows += len(batch) + + files_matched += 1 + print(f" {basename}: {file_rows:,} rows loaded, {file_errors} errors") + + finish_ingest_run(ingest_run_id, files_scanned, files_matched, rows_inserted) + except Exception: + fail_ingest_run(ingest_run_id) + raise + + print( + f"Done. {files_scanned} files scanned, " + f"{files_matched} files loaded, {rows_inserted:,} rows upserted." + ) + + +if __name__ == "__main__": + main() -- cgit v1.2.3