""" XML parsing helpers. Single authoritative implementations of XML field extraction, source document ID derivation, and leaf path building. """ import re NS = "http://www.irs.gov/efile" NS_MAP = {"irs": NS} _OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$') def strip_ns(tag): """Strip XML namespace: '{http://...}Foo' -> 'Foo'. Non-element lxml nodes such as comments expose a non-string ``tag``. Those are skipped by the leaf-path walker rather than treated as fields. """ if not isinstance(tag, str): return None if tag.startswith("{"): return tag.split("}", 1)[1] return tag def text(el, xpath): """Extract text from an XML element by xpath. Returns None if not found.""" if el is None: return None found = el.find(xpath, NS_MAP) return found.text.strip() if found is not None and found.text else None # Values an IRS indicator element may carry to mean "yes / true". _TRUTHY = frozenset({"X", "x", "1", "true"}) def text_bool(el, xpath): """Extract an IRS indicator element as True/False, or None if the tag is absent. Unlike `text(...) in _TRUTHY`, this preserves the difference between "filer explicitly left this unchecked" and "tag not present in the XML at all." Critical for the raw layer, where we want to keep NULL for missing-in-source and reserve False for an explicit non-truthy value. """ val = text(el, xpath) if val is None: return None return val in _TRUTHY def leaf_paths(el, prefix=""): """Yield (path, value) for every leaf element under el. Path uses '/' separator with namespace stripped: 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt' """ tag = strip_ns(el.tag) if tag is None: return full = f"{prefix}{tag}" if prefix else tag children = [child for child in el if isinstance(child.tag, str)] if not children: yield full, (el.text or "").strip() else: for child in children: yield from leaf_paths(child, full + "/") def derive_source_document_id(source_system, filename): """Derive source_document_id from a filename. For irs_xml / irs_pdf: extracts the object_id (numeric stem). Directory prefixes are stripped. >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml') '202213089349101246' >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml') '202213089349101246' """ if source_system in ('irs_xml', 'irs_pdf'): m = _OBJECT_ID_RE.search(filename) if not m: raise ValueError(f"Cannot extract object_id from: {filename!r}") return m.group(1) raise NotImplementedError( f"derive_source_document_id not implemented for {source_system!r}" ) def extract_filing_metadata(tree): """Extract filing-level metadata from an XML tree for raw_filing. Returns a dict with: ein, filer_name, form_type, tax_year, tax_period_begin, tax_period_end, return_version, return_timestamp. EIN is normalized, form_type is mapped. Raises ValueError if the filing cannot be identified. """ from scripts.common.normalize import normalize_ein, map_form_type root = tree.getroot() raw_ein = text(tree, ".//irs:Filer/irs:EIN") ein = normalize_ein(raw_ein) if not ein: raise ValueError(f"Cannot normalize EIN: {raw_ein!r}") return_type_cd = text(tree, ".//irs:ReturnTypeCd") form_type = map_form_type(return_type_cd) filer_name = ( text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt") or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1") ) return { "ein": ein, "filer_name": filer_name, "form_type": form_type, "tax_year": text(tree, ".//irs:TaxYr"), "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"), "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"), "return_version": root.get("returnVersion"), "return_timestamp": text(tree, ".//irs:ReturnTs"), }