diff options
| author | benj <benj@rse8.com> | 2026-05-01 09:36:21 +0800 |
|---|---|---|
| committer | benj <benj@rse8.com> | 2026-05-01 09:36:21 +0800 |
| commit | 850f4f826b536d913235e174dc07aef74e51bf60 (patch) | |
| tree | a2806da6c0ed5c48d21178e0c6c280d5a40ccd38 /scripts | |
| parent | 6605e2cc428e3bdaa174ccc432941eab8c5d61cb (diff) | |
| download | tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.tar tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.tar.gz tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.tar.bz2 tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.tar.lz tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.tar.xz tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.tar.zst tidyindex-850f4f826b536d913235e174dc07aef74e51bf60.zip | |
Diffstat (limited to 'scripts')
| -rw-r--r-- | scripts/common/filing.py | 11 | ||||
| -rw-r--r-- | scripts/parse/irs_990.py | 40 | ||||
| -rw-r--r-- | scripts/parse/irs_990ez.py | 38 | ||||
| -rw-r--r-- | scripts/parse/irs_990pf.py | 44 |
4 files changed, 102 insertions, 31 deletions
diff --git a/scripts/common/filing.py b/scripts/common/filing.py index 44bcabc..3c7aaf3 100644 --- a/scripts/common/filing.py +++ b/scripts/common/filing.py @@ -5,7 +5,7 @@ Provides upsert/record operations for raw.filing and raw.filing_source. Form-specific child row operations live in the parsers themselves. """ -from scripts.common.db import execute_scalar +from scripts.common.db import execute_all, execute_scalar def _execute_scalar(conn, sql, params=None): @@ -91,3 +91,12 @@ def record_raw_filing_source(raw_filing_id, ingest_run_id, source_archive, sourc f"already mapped to raw_filing_id={existing_filing_id}, " f"but trying to map to {raw_filing_id}" ) + + +def get_seen_source_paths(source_archive): + """Return the set of source_path values already committed for an archive.""" + rows = execute_all( + "SELECT source_path FROM raw.filing_source WHERE source_archive = %s", + (source_archive,), + ) + return {row[0] for row in rows} diff --git a/scripts/parse/irs_990.py b/scripts/parse/irs_990.py index 3a5cc2d..b3751b6 100644 --- a/scripts/parse/irs_990.py +++ b/scripts/parse/irs_990.py @@ -25,7 +25,7 @@ from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.filing import ( - upsert_raw_filing, record_raw_filing_source, + upsert_raw_filing, record_raw_filing_source, get_seen_source_paths, ) PARSER_NAME = "parse_irs_990" @@ -496,16 +496,16 @@ def process_filing(tree, source_document_id, source_archive, source_path, ingest ] _replace_children( - conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, xml_rows, + conn, raw_filing_id, filing_form_data, grant_rows, schedule_o_rows, ) - # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + xml fields - return 3 + len(grant_rows) + len(schedule_o_rows) + len(xml_rows) + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990 + grants + schedule_o + return 3 + len(grant_rows) + len(schedule_o_rows) return execute_transaction(_do) -def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows, xml_rows): +def _replace_children(conn, raw_filing_id, form_data, grant_rows, schedule_o_rows): """Delete and re-insert all child rows for a filing using the caller's transaction.""" form_columns = list(form_data.keys()) form_placeholders = ", ".join(["%s"] * len(form_columns)) @@ -598,13 +598,28 @@ def process_zip(zip_path, ingest_run_id): with zf: names = [n for n in zf.namelist() if n.endswith(".xml")] + seen_paths = get_seen_source_paths(basename) + skipped_existing = 0 - print(f"Processing {basename}: {len(names)} XML files") + print( + f"Processing {basename}: {len(names)} XML files " + f"({len(seen_paths)} already seen)" + ) files_scanned = 0 files_matched = 0 total_rows = 0 for i, name in enumerate(names): + if name in seen_paths: + skipped_existing += 1 + if (i + 1) % 1000 == 0: + print( + f" ...{i + 1}/{len(names)} files, " + f"{skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) + continue + # Count every ZIP member as scanned, even ones we fail to read — # otherwise read failures silently shrink the scanned total and # make run-level metrics misleading. @@ -626,9 +641,16 @@ def process_zip(zip_path, ingest_run_id): total_rows += rows if (i + 1) % 1000 == 0: - print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") - - print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + print( + f" ...{i + 1}/{len(names)} files, " + f"{skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) + + print( + f" Done: {files_scanned} scanned, {skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) return files_scanned, files_matched, total_rows diff --git a/scripts/parse/irs_990ez.py b/scripts/parse/irs_990ez.py index bea4fdd..3606b88 100644 --- a/scripts/parse/irs_990ez.py +++ b/scripts/parse/irs_990ez.py @@ -30,7 +30,7 @@ from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.filing import ( - upsert_raw_filing, record_raw_filing_source, + upsert_raw_filing, record_raw_filing_source, get_seen_source_paths, ) PARSER_NAME = "parse_irs_990ez" @@ -270,11 +270,11 @@ def process_filing(tree, source_document_id, source_archive, source_path, ingest ] _replace_children( - conn, raw_filing_id, filing_form_data, schedule_o_rows, xml_rows, + conn, raw_filing_id, filing_form_data, schedule_o_rows, ) - # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + xml fields - return 3 + len(schedule_o_rows) + len(xml_rows) + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990ez + schedule_o + return 3 + len(schedule_o_rows) return execute_transaction(_do) @@ -356,13 +356,28 @@ def process_zip(zip_path, ingest_run_id): with zf: names = [n for n in zf.namelist() if n.endswith(".xml")] + seen_paths = get_seen_source_paths(basename) + skipped_existing = 0 - print(f"Processing {basename}: {len(names)} XML files") + print( + f"Processing {basename}: {len(names)} XML files " + f"({len(seen_paths)} already seen)" + ) files_scanned = 0 files_matched = 0 total_rows = 0 for i, name in enumerate(names): + if name in seen_paths: + skipped_existing += 1 + if (i + 1) % 1000 == 0: + print( + f" ...{i + 1}/{len(names)} files, " + f"{skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) + continue + # Count every ZIP member as scanned, even ones we fail to read — # otherwise read failures silently shrink the scanned total and # make run-level metrics misleading. @@ -384,9 +399,16 @@ def process_zip(zip_path, ingest_run_id): total_rows += rows if (i + 1) % 1000 == 0: - print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") - - print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + print( + f" ...{i + 1}/{len(names)} files, " + f"{skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) + + print( + f" Done: {files_scanned} scanned, {skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) return files_scanned, files_matched, total_rows diff --git a/scripts/parse/irs_990pf.py b/scripts/parse/irs_990pf.py index 3d245b8..218c7fb 100644 --- a/scripts/parse/irs_990pf.py +++ b/scripts/parse/irs_990pf.py @@ -25,7 +25,7 @@ from scripts.common.ingest import ( start_ingest_run, finish_ingest_run, fail_ingest_run, log_ingest_error, ) from scripts.common.filing import ( - upsert_raw_filing, record_raw_filing_source, + upsert_raw_filing, record_raw_filing_source, get_seen_source_paths, ) PARSER_NAME = "parse_irs_990pf" @@ -349,10 +349,6 @@ def process_filing(tree, source_document_id, source_archive, source_path, ingest form_data = extract_form_990pf(tree) form_data["grant_detail_status"] = compute_grant_detail_status(grant_elements, extracted_grants) - root = tree.getroot() - return_header = root.find(f"{{{NS}}}ReturnHeader") - return_data = root.find(f"{{{NS}}}ReturnData") - def _do(conn): raw_filing_id = upsert_raw_filing( SOURCE_SYSTEM, source_document_id, metadata, ingest_run_id, conn=conn @@ -367,15 +363,15 @@ def process_filing(tree, source_document_id, source_archive, source_path, ingest for row in extracted_grants ] - _replace_children(conn, raw_filing_id, filing_form_data, grant_rows, xml_rows) + _replace_children(conn, raw_filing_id, filing_form_data, grant_rows) - # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants + xml fields - return 3 + len(grant_rows) + len(xml_rows) + # 1 raw_filing + 1 raw_filing_source + 1 raw_form_990pf + grants + return 3 + len(grant_rows) return execute_transaction(_do) -def _replace_children(conn, raw_filing_id, form_data, grant_rows, xml_rows): +def _replace_children(conn, raw_filing_id, form_data, grant_rows): """Delete and re-insert all child rows for a filing using the caller's transaction.""" form_columns = list(form_data.keys()) form_placeholders = ", ".join(["%s"] * len(form_columns)) @@ -451,13 +447,28 @@ def process_zip(zip_path, ingest_run_id): with zf: names = [n for n in zf.namelist() if n.endswith(".xml")] + seen_paths = get_seen_source_paths(basename) + skipped_existing = 0 - print(f"Processing {basename}: {len(names)} XML files") + print( + f"Processing {basename}: {len(names)} XML files " + f"({len(seen_paths)} already seen)" + ) files_scanned = 0 files_matched = 0 total_rows = 0 for i, name in enumerate(names): + if name in seen_paths: + skipped_existing += 1 + if (i + 1) % 1000 == 0: + print( + f" ...{i + 1}/{len(names)} files, " + f"{skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) + continue + # Count every ZIP member as scanned, even ones we fail to read — # otherwise read failures silently shrink the scanned total and # make run-level metrics misleading. @@ -479,9 +490,16 @@ def process_zip(zip_path, ingest_run_id): total_rows += rows if (i + 1) % 1000 == 0: - print(f" ...{i + 1}/{len(names)} files, {files_matched} matched, {total_rows} rows") - - print(f" Done: {files_scanned} scanned, {files_matched} matched, {total_rows} rows") + print( + f" ...{i + 1}/{len(names)} files, " + f"{skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) + + print( + f" Done: {files_scanned} scanned, {skipped_existing} skipped existing, " + f"{files_matched} matched, {total_rows} rows" + ) return files_scanned, files_matched, total_rows |
