aboutsummaryrefslogtreecommitdiff
path: root/scripts/parse/irs_bmf.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/parse/irs_bmf.py')
-rw-r--r--scripts/parse/irs_bmf.py231
1 files changed, 231 insertions, 0 deletions
diff --git a/scripts/parse/irs_bmf.py b/scripts/parse/irs_bmf.py
new file mode 100644
index 0000000..cf02575
--- /dev/null
+++ b/scripts/parse/irs_bmf.py
@@ -0,0 +1,231 @@
+"""
+Load IRS Business Master File (BMF) CSVs into raw.bmf.
+
+Usage:
+ python -m scripts.parse.irs_bmf
+ python -m scripts.parse.irs_bmf data/irs/bmf/eo1.csv data/irs/bmf/eo2.csv
+"""
+
+import csv
+import os
+import sys
+
+from psycopg2.extras import execute_batch
+
+from scripts.common.db import execute_transaction
+from scripts.common.ingest import (
+ start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error,
+)
+from scripts.common.normalize import normalize_ein
+
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "..", "data", "irs", "bmf")
+PARSER_NAME = "load_raw_bmf"
+SOURCE_SYSTEM = "irs_bmf"
+
+BMF_COLUMNS = [
+ "ein",
+ "name",
+ "ico",
+ "street",
+ "city",
+ "state",
+ "zip",
+ "grp",
+ "subsection",
+ "affiliation",
+ "classification",
+ "ruling",
+ "deductibility",
+ "foundation",
+ "activity",
+ "organization",
+ "status",
+ "tax_period",
+ "asset_cd",
+ "income_cd",
+ "filing_req_cd",
+ "pf_filing_req_cd",
+ "acct_pd",
+ "asset_amt",
+ "income_amt",
+ "revenue_amt",
+ "ntee_cd",
+ "sort_name",
+ "source_file",
+ "ingest_run_id",
+]
+
+
+def parse_bigint(value):
+ if value is None:
+ return None
+ s = str(value).strip()
+ if not s:
+ return None
+ return int(s)
+
+
+def row_to_record(row, source_file, ingest_run_id):
+ ein = normalize_ein(row.get("EIN"))
+ if not ein:
+ raise ValueError(f"Cannot normalize EIN: {row.get('EIN')!r}")
+
+ return {
+ "ein": ein,
+ "name": row.get("NAME") or None,
+ "ico": row.get("ICO") or None,
+ "street": row.get("STREET") or None,
+ "city": row.get("CITY") or None,
+ "state": row.get("STATE") or None,
+ "zip": row.get("ZIP") or None,
+ "grp": row.get("GROUP") or None,
+ "subsection": row.get("SUBSECTION") or None,
+ "affiliation": row.get("AFFILIATION") or None,
+ "classification": row.get("CLASSIFICATION") or None,
+ "ruling": row.get("RULING") or None,
+ "deductibility": row.get("DEDUCTIBILITY") or None,
+ "foundation": row.get("FOUNDATION") or None,
+ "activity": row.get("ACTIVITY") or None,
+ "organization": row.get("ORGANIZATION") or None,
+ "status": row.get("STATUS") or None,
+ "tax_period": row.get("TAX_PERIOD") or None,
+ "asset_cd": row.get("ASSET_CD") or None,
+ "income_cd": row.get("INCOME_CD") or None,
+ "filing_req_cd": row.get("FILING_REQ_CD") or None,
+ "pf_filing_req_cd": row.get("PF_FILING_REQ_CD") or None,
+ "acct_pd": row.get("ACCT_PD") or None,
+ "asset_amt": parse_bigint(row.get("ASSET_AMT")),
+ "income_amt": parse_bigint(row.get("INCOME_AMT")),
+ "revenue_amt": parse_bigint(row.get("REVENUE_AMT")),
+ "ntee_cd": row.get("NTEE_CD") or None,
+ "sort_name": row.get("SORT_NAME") or None,
+ "source_file": source_file,
+ "ingest_run_id": ingest_run_id,
+ }
+
+
+def upsert_records(records):
+ if not records:
+ return 0
+
+ placeholders = ", ".join(["%s"] * len(BMF_COLUMNS))
+ sql = (
+ f"INSERT INTO raw.bmf ({', '.join(BMF_COLUMNS)}) "
+ f"VALUES ({placeholders}) "
+ "ON CONFLICT (ein) DO UPDATE SET "
+ "name = EXCLUDED.name, "
+ "ico = EXCLUDED.ico, "
+ "street = EXCLUDED.street, "
+ "city = EXCLUDED.city, "
+ "state = EXCLUDED.state, "
+ "zip = EXCLUDED.zip, "
+ "grp = EXCLUDED.grp, "
+ "subsection = EXCLUDED.subsection, "
+ "affiliation = EXCLUDED.affiliation, "
+ "classification = EXCLUDED.classification, "
+ "ruling = EXCLUDED.ruling, "
+ "deductibility = EXCLUDED.deductibility, "
+ "foundation = EXCLUDED.foundation, "
+ "activity = EXCLUDED.activity, "
+ "organization = EXCLUDED.organization, "
+ "status = EXCLUDED.status, "
+ "tax_period = EXCLUDED.tax_period, "
+ "asset_cd = EXCLUDED.asset_cd, "
+ "income_cd = EXCLUDED.income_cd, "
+ "filing_req_cd = EXCLUDED.filing_req_cd, "
+ "pf_filing_req_cd = EXCLUDED.pf_filing_req_cd, "
+ "acct_pd = EXCLUDED.acct_pd, "
+ "asset_amt = EXCLUDED.asset_amt, "
+ "income_amt = EXCLUDED.income_amt, "
+ "revenue_amt = EXCLUDED.revenue_amt, "
+ "ntee_cd = EXCLUDED.ntee_cd, "
+ "sort_name = EXCLUDED.sort_name, "
+ "source_file = EXCLUDED.source_file, "
+ "ingest_run_id = EXCLUDED.ingest_run_id"
+ )
+
+ values = [[record.get(col) for col in BMF_COLUMNS] for record in records]
+
+ def _do(conn):
+ with conn.cursor() as cur:
+ execute_batch(cur, sql, values, page_size=1000)
+ return len(values)
+
+ return execute_transaction(_do)
+
+
+def discover_files(args):
+ if args:
+ return args
+ return sorted(
+ os.path.join(DATA_DIR, name)
+ for name in os.listdir(DATA_DIR)
+ if name.endswith(".csv")
+ )
+
+
+def main():
+ files = discover_files(sys.argv[1:])
+ if not files:
+ print(f"No CSV files found in {DATA_DIR}", file=sys.stderr)
+ sys.exit(1)
+
+ notes = " ".join(os.path.basename(path) for path in files)
+ ingest_run_id = start_ingest_run(PARSER_NAME, SOURCE_SYSTEM, notes)
+
+ files_scanned = 0
+ files_matched = 0
+ rows_inserted = 0
+
+ try:
+ for csv_path in files:
+ basename = os.path.basename(csv_path)
+ files_scanned += 1
+ print(f"Loading {basename}...")
+
+ batch = []
+ file_rows = 0
+ file_errors = 0
+
+ with open(csv_path, newline="", encoding="utf-8-sig") as f:
+ reader = csv.DictReader(f)
+ for line_number, row in enumerate(reader, start=2):
+ try:
+ batch.append(row_to_record(row, basename, ingest_run_id))
+ except Exception as e:
+ file_errors += 1
+ log_ingest_error(
+ ingest_run_id,
+ basename,
+ f"line:{line_number}",
+ e,
+ source_document_id=row.get("EIN"),
+ stage="normalize_row",
+ )
+ continue
+
+ if len(batch) >= 5000:
+ rows_inserted += upsert_records(batch)
+ file_rows += len(batch)
+ batch = []
+
+ if batch:
+ rows_inserted += upsert_records(batch)
+ file_rows += len(batch)
+
+ files_matched += 1
+ print(f" {basename}: {file_rows:,} rows loaded, {file_errors} errors")
+
+ finish_ingest_run(ingest_run_id, files_scanned, files_matched, rows_inserted)
+ except Exception:
+ fail_ingest_run(ingest_run_id)
+ raise
+
+ print(
+ f"Done. {files_scanned} files scanned, "
+ f"{files_matched} files loaded, {rows_inserted:,} rows upserted."
+ )
+
+
+if __name__ == "__main__":
+ main()