aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/xml.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--scripts/common/xml.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/scripts/common/xml.py b/scripts/common/xml.py
new file mode 100644
index 0000000..b7e80cd
--- /dev/null
+++ b/scripts/common/xml.py
@@ -0,0 +1,129 @@
+"""
+XML parsing helpers.
+
+Single authoritative implementations of XML field extraction,
+source document ID derivation, and leaf path building.
+"""
+
+import re
+
+NS = "http://www.irs.gov/efile"
+NS_MAP = {"irs": NS}
+
+_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$')
+
+
+def strip_ns(tag):
+ """Strip XML namespace: '{http://...}Foo' -> 'Foo'.
+
+ Non-element lxml nodes such as comments expose a non-string ``tag``.
+ Those are skipped by the leaf-path walker rather than treated as fields.
+ """
+ if not isinstance(tag, str):
+ return None
+ if tag.startswith("{"):
+ return tag.split("}", 1)[1]
+ return tag
+
+
+def text(el, xpath):
+ """Extract text from an XML element by xpath. Returns None if not found."""
+ if el is None:
+ return None
+ found = el.find(xpath, NS_MAP)
+ return found.text.strip() if found is not None and found.text else None
+
+
+# Values an IRS indicator element may carry to mean "yes / true".
+_TRUTHY = frozenset({"X", "x", "1", "true"})
+
+
+def text_bool(el, xpath):
+ """Extract an IRS indicator element as True/False, or None if the tag is absent.
+
+ Unlike `text(...) in _TRUTHY`, this preserves the difference between
+ "filer explicitly left this unchecked" and "tag not present in the XML
+ at all." Critical for the raw layer, where we want to keep NULL for
+ missing-in-source and reserve False for an explicit non-truthy value.
+ """
+ val = text(el, xpath)
+ if val is None:
+ return None
+ return val in _TRUTHY
+
+
+def leaf_paths(el, prefix=""):
+ """Yield (path, value) for every leaf element under el.
+
+ Path uses '/' separator with namespace stripped:
+ 'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt'
+ """
+ tag = strip_ns(el.tag)
+ if tag is None:
+ return
+ full = f"{prefix}{tag}" if prefix else tag
+ children = [child for child in el if isinstance(child.tag, str)]
+ if not children:
+ yield full, (el.text or "").strip()
+ else:
+ for child in children:
+ yield from leaf_paths(child, full + "/")
+
+
+def derive_source_document_id(source_system, filename):
+ """Derive source_document_id from a filename.
+
+ For irs_xml / irs_pdf: extracts the object_id (numeric stem).
+ Directory prefixes are stripped.
+
+ >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml')
+ '202213089349101246'
+ >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml')
+ '202213089349101246'
+ """
+ if source_system in ('irs_xml', 'irs_pdf'):
+ m = _OBJECT_ID_RE.search(filename)
+ if not m:
+ raise ValueError(f"Cannot extract object_id from: {filename!r}")
+ return m.group(1)
+ raise NotImplementedError(
+ f"derive_source_document_id not implemented for {source_system!r}"
+ )
+
+
+def extract_filing_metadata(tree):
+ """Extract filing-level metadata from an XML tree for raw_filing.
+
+ Returns a dict with: ein, filer_name, form_type, tax_year,
+ tax_period_begin, tax_period_end, return_version, return_timestamp.
+
+ EIN is normalized, form_type is mapped. Raises ValueError if the
+ filing cannot be identified.
+ """
+ from scripts.common.normalize import normalize_ein, map_form_type
+
+ root = tree.getroot()
+
+ raw_ein = text(tree, ".//irs:Filer/irs:EIN")
+ ein = normalize_ein(raw_ein)
+ if not ein:
+ raise ValueError(f"Cannot normalize EIN: {raw_ein!r}")
+
+ return_type_cd = text(tree, ".//irs:ReturnTypeCd")
+ form_type = map_form_type(return_type_cd)
+
+ filer_name = (
+ text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt")
+ or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1")
+ )
+
+ return {
+ "ein": ein,
+ "filer_name": filer_name,
+ "form_type": form_type,
+ "tax_year": text(tree, ".//irs:TaxYr"),
+ "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"),
+ "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"),
+ "return_version": root.get("returnVersion"),
+ "return_timestamp": text(tree, ".//irs:ReturnTs"),
+ }