""" Load IRS Business Master File (BMF) CSVs into raw.bmf. Usage: python -m scripts.parse.irs_bmf python -m scripts.parse.irs_bmf data/irs/bmf/eo1.csv data/irs/bmf/eo2.csv """ import csv import os import sys from psycopg2.extras import execute_batch from scripts.common.db import execute_transaction from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.normalize import normalize_ein DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "..", "data", "irs", "bmf") PARSER_NAME = "load_raw_bmf" SOURCE_SYSTEM = "irs_bmf" BMF_COLUMNS = [ "ein", "name", "ico", "street", "city", "state", "zip", "grp", "subsection", "affiliation", "classification", "ruling", "deductibility", "foundation", "activity", "organization", "status", "tax_period", "asset_cd", "income_cd", "filing_req_cd", "pf_filing_req_cd", "acct_pd", "asset_amt", "income_amt", "revenue_amt", "ntee_cd", "sort_name", "source_file", "ingest_run_id", ] def parse_bigint(value): if value is None: return None s = str(value).strip() if not s: return None return int(s) def row_to_record(row, source_file, ingest_run_id): ein = normalize_ein(row.get("EIN")) if not ein: raise ValueError(f"Cannot normalize EIN: {row.get('EIN')!r}") return { "ein": ein, "name": row.get("NAME") or None, "ico": row.get("ICO") or None, "street": row.get("STREET") or None, "city": row.get("CITY") or None, "state": row.get("STATE") or None, "zip": row.get("ZIP") or None, "grp": row.get("GROUP") or None, "subsection": row.get("SUBSECTION") or None, "affiliation": row.get("AFFILIATION") or None, "classification": row.get("CLASSIFICATION") or None, "ruling": row.get("RULING") or None, "deductibility": row.get("DEDUCTIBILITY") or None, "foundation": row.get("FOUNDATION") or None, "activity": row.get("ACTIVITY") or None, "organization": row.get("ORGANIZATION") or None, "status": row.get("STATUS") or None, "tax_period": row.get("TAX_PERIOD") or None, "asset_cd": row.get("ASSET_CD") or None, "income_cd": row.get("INCOME_CD") or None, "filing_req_cd": row.get("FILING_REQ_CD") or None, "pf_filing_req_cd": row.get("PF_FILING_REQ_CD") or None, "acct_pd": row.get("ACCT_PD") or None, "asset_amt": parse_bigint(row.get("ASSET_AMT")), "income_amt": parse_bigint(row.get("INCOME_AMT")), "revenue_amt": parse_bigint(row.get("REVENUE_AMT")), "ntee_cd": row.get("NTEE_CD") or None, "sort_name": row.get("SORT_NAME") or None, "source_file": source_file, "ingest_run_id": ingest_run_id, } def upsert_records(records): if not records: return 0 placeholders = ", ".join(["%s"] * len(BMF_COLUMNS)) sql = ( f"INSERT INTO raw.bmf ({', '.join(BMF_COLUMNS)}) " f"VALUES ({placeholders}) " "ON CONFLICT (ein) DO UPDATE SET " "name = EXCLUDED.name, " "ico = EXCLUDED.ico, " "street = EXCLUDED.street, " "city = EXCLUDED.city, " "state = EXCLUDED.state, " "zip = EXCLUDED.zip, " "grp = EXCLUDED.grp, " "subsection = EXCLUDED.subsection, " "affiliation = EXCLUDED.affiliation, " "classification = EXCLUDED.classification, " "ruling = EXCLUDED.ruling, " "deductibility = EXCLUDED.deductibility, " "foundation = EXCLUDED.foundation, " "activity = EXCLUDED.activity, " "organization = EXCLUDED.organization, " "status = EXCLUDED.status, " "tax_period = EXCLUDED.tax_period, " "asset_cd = EXCLUDED.asset_cd, " "income_cd = EXCLUDED.income_cd, " "filing_req_cd = EXCLUDED.filing_req_cd, " "pf_filing_req_cd = EXCLUDED.pf_filing_req_cd, " "acct_pd = EXCLUDED.acct_pd, " "asset_amt = EXCLUDED.asset_amt, " "income_amt = EXCLUDED.income_amt, " "revenue_amt = EXCLUDED.revenue_amt, " "ntee_cd = EXCLUDED.ntee_cd, " "sort_name = EXCLUDED.sort_name, " "source_file = EXCLUDED.source_file, " "ingest_run_id = EXCLUDED.ingest_run_id" ) values = [[record.get(col) for col in BMF_COLUMNS] for record in records] def _do(conn): with conn.cursor() as cur: execute_batch(cur, sql, values, page_size=1000) return len(values) return execute_transaction(_do) def discover_files(args): if args: return args return sorted( os.path.join(DATA_DIR, name) for name in os.listdir(DATA_DIR) if name.endswith(".csv") ) def main(): files = discover_files(sys.argv[1:]) if not files: print(f"No CSV files found in {DATA_DIR}", file=sys.stderr) sys.exit(1) notes = " ".join(os.path.basename(path) for path in files) ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes) files_scanned = 0 files_matched = 0 rows_inserted = 0 try: for csv_path in files: basename = os.path.basename(csv_path) files_scanned += 1 print(f"Loading {basename}...") batch = [] file_rows = 0 file_errors = 0 with open(csv_path, newline="", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for line_number, row in enumerate(reader, start=2): try: batch.append(row_to_record(row, basename, ingest_run_id)) except Exception as e: file_errors += 1 log_ingest_error( ingest_run_id, basename, f"line:{line_number}", e, source_document_id=row.get("EIN"), stage="normalize_row", ) continue if len(batch) >= 5000: rows_inserted += upsert_records(batch) file_rows += len(batch) batch = [] if batch: rows_inserted += upsert_records(batch) file_rows += len(batch) files_matched += 1 print(f" {basename}: {file_rows:,} rows loaded, {file_errors} errors") finish_ingest_run(ingest_run_id, files_scanned, files_matched, rows_inserted) except Exception: fail_ingest_run(ingest_run_id) raise print( f"Done. {files_scanned} files scanned, " f"{files_matched} files loaded, {rows_inserted:,} rows upserted." ) if __name__ == "__main__": main()