diff options
Diffstat (limited to '')
| -rw-r--r-- | scripts/common/xml.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/scripts/common/xml.py b/scripts/common/xml.py new file mode 100644 index 0000000..b7e80cd --- /dev/null +++ b/scripts/common/xml.py @@ -0,0 +1,129 @@ +""" +XML parsing helpers. + +Single authoritative implementations of XML field extraction, +source document ID derivation, and leaf path building. +""" + +import re + +NS = "http://www.irs.gov/efile" +NS_MAP = {"irs": NS} + +_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$') + + +def strip_ns(tag): + """Strip XML namespace: '{http://...}Foo' -> 'Foo'. + + Non-element lxml nodes such as comments expose a non-string ``tag``. + Those are skipped by the leaf-path walker rather than treated as fields. + """ + if not isinstance(tag, str): + return None + if tag.startswith("{"): + return tag.split("}", 1)[1] + return tag + + +def text(el, xpath): + """Extract text from an XML element by xpath. Returns None if not found.""" + if el is None: + return None + found = el.find(xpath, NS_MAP) + return found.text.strip() if found is not None and found.text else None + + +# Values an IRS indicator element may carry to mean "yes / true". +_TRUTHY = frozenset({"X", "x", "1", "true"}) + + +def text_bool(el, xpath): + """Extract an IRS indicator element as True/False, or None if the tag is absent. + + Unlike `text(...) in _TRUTHY`, this preserves the difference between + "filer explicitly left this unchecked" and "tag not present in the XML + at all." Critical for the raw layer, where we want to keep NULL for + missing-in-source and reserve False for an explicit non-truthy value. + """ + val = text(el, xpath) + if val is None: + return None + return val in _TRUTHY + + +def leaf_paths(el, prefix=""): + """Yield (path, value) for every leaf element under el. + + Path uses '/' separator with namespace stripped: + 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt' + """ + tag = strip_ns(el.tag) + if tag is None: + return + full = f"{prefix}{tag}" if prefix else tag + children = [child for child in el if isinstance(child.tag, str)] + if not children: + yield full, (el.text or "").strip() + else: + for child in children: + yield from leaf_paths(child, full + "/") + + +def derive_source_document_id(source_system, filename): + """Derive source_document_id from a filename. + + For irs_xml / irs_pdf: extracts the object_id (numeric stem). + Directory prefixes are stripped. + + >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml') + '202213089349101246' + >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml') + '202213089349101246' + """ + if source_system in ('irs_xml', 'irs_pdf'): + m = _OBJECT_ID_RE.search(filename) + if not m: + raise ValueError(f"Cannot extract object_id from: {filename!r}") + return m.group(1) + raise NotImplementedError( + f"derive_source_document_id not implemented for {source_system!r}" + ) + + +def extract_filing_metadata(tree): + """Extract filing-level metadata from an XML tree for raw_filing. + + Returns a dict with: ein, filer_name, form_type, tax_year, + tax_period_begin, tax_period_end, return_version, return_timestamp. + + EIN is normalized, form_type is mapped. Raises ValueError if the + filing cannot be identified. + """ + from scripts.common.normalize import normalize_ein, map_form_type + + root = tree.getroot() + + raw_ein = text(tree, ".//irs:Filer/irs:EIN") + ein = normalize_ein(raw_ein) + if not ein: + raise ValueError(f"Cannot normalize EIN: {raw_ein!r}") + + return_type_cd = text(tree, ".//irs:ReturnTypeCd") + form_type = map_form_type(return_type_cd) + + filer_name = ( + text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt") + or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1") + ) + + return { + "ein": ein, + "filer_name": filer_name, + "form_type": form_type, + "tax_year": text(tree, ".//irs:TaxYr"), + "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"), + "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"), + "return_version": root.get("returnVersion"), + "return_timestamp": text(tree, ".//irs:ReturnTs"), + } |
