aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/xml.py
blob: b7e80cd2ed1f0416a2688fc1f247800c2402e1c8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
XML parsing helpers.

Single authoritative implementations of XML field extraction,
source document ID derivation, and leaf path building.
"""

import re

NS = "http://www.irs.gov/efile"
NS_MAP = {"irs": NS}

_OBJECT_ID_RE = re.compile(r'(\d{15,20})_public\.xml$')


def strip_ns(tag):
    """Strip XML namespace: '{http://...}Foo' -> 'Foo'.

    Non-element lxml nodes such as comments expose a non-string ``tag``.
    Those are skipped by the leaf-path walker rather than treated as fields.
    """
    if not isinstance(tag, str):
        return None
    if tag.startswith("{"):
        return tag.split("}", 1)[1]
    return tag


def text(el, xpath):
    """Extract text from an XML element by xpath. Returns None if not found."""
    if el is None:
        return None
    found = el.find(xpath, NS_MAP)
    return found.text.strip() if found is not None and found.text else None


# Values an IRS indicator element may carry to mean "yes / true".
_TRUTHY = frozenset({"X", "x", "1", "true"})


def text_bool(el, xpath):
    """Extract an IRS indicator element as True/False, or None if the tag is absent.

    Unlike `text(...) in _TRUTHY`, this preserves the difference between
    "filer explicitly left this unchecked" and "tag not present in the XML
    at all." Critical for the raw layer, where we want to keep NULL for
    missing-in-source and reserve False for an explicit non-truthy value.
    """
    val = text(el, xpath)
    if val is None:
        return None
    return val in _TRUTHY


def leaf_paths(el, prefix=""):
    """Yield (path, value) for every leaf element under el.

    Path uses '/' separator with namespace stripped:
    'IRS990PF/AnalysisOfRevenueAndExpenses/TotalRevAndExpnssAmt'
    """
    tag = strip_ns(el.tag)
    if tag is None:
        return
    full = f"{prefix}{tag}" if prefix else tag
    children = [child for child in el if isinstance(child.tag, str)]
    if not children:
        yield full, (el.text or "").strip()
    else:
        for child in children:
            yield from leaf_paths(child, full + "/")


def derive_source_document_id(source_system, filename):
    """Derive source_document_id from a filename.

    For irs_xml / irs_pdf: extracts the object_id (numeric stem).
    Directory prefixes are stripped.

    >>> derive_source_document_id('irs_xml', 'Cycles_202242_202252/202213089349101246_public.xml')
    '202213089349101246'
    >>> derive_source_document_id('irs_xml', '202213089349101246_public.xml')
    '202213089349101246'
    """
    if source_system in ('irs_xml', 'irs_pdf'):
        m = _OBJECT_ID_RE.search(filename)
        if not m:
            raise ValueError(f"Cannot extract object_id from: {filename!r}")
        return m.group(1)
    raise NotImplementedError(
        f"derive_source_document_id not implemented for {source_system!r}"
    )


def extract_filing_metadata(tree):
    """Extract filing-level metadata from an XML tree for raw_filing.

    Returns a dict with: ein, filer_name, form_type, tax_year,
    tax_period_begin, tax_period_end, return_version, return_timestamp.

    EIN is normalized, form_type is mapped. Raises ValueError if the
    filing cannot be identified.
    """
    from scripts.common.normalize import normalize_ein, map_form_type

    root = tree.getroot()

    raw_ein = text(tree, ".//irs:Filer/irs:EIN")
    ein = normalize_ein(raw_ein)
    if not ein:
        raise ValueError(f"Cannot normalize EIN: {raw_ein!r}")

    return_type_cd = text(tree, ".//irs:ReturnTypeCd")
    form_type = map_form_type(return_type_cd)

    filer_name = (
        text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1Txt")
        or text(tree, ".//irs:Filer/irs:BusinessName/irs:BusinessNameLine1")
    )

    return {
        "ein": ein,
        "filer_name": filer_name,
        "form_type": form_type,
        "tax_year": text(tree, ".//irs:TaxYr"),
        "tax_period_begin": text(tree, ".//irs:TaxPeriodBeginDt"),
        "tax_period_end": text(tree, ".//irs:TaxPeriodEndDt"),
        "return_version": root.get("returnVersion"),
        "return_timestamp": text(tree, ".//irs:ReturnTs"),
    }