From 6605e2cc428e3bdaa174ccc432941eab8c5d61cb Mon Sep 17 00:00:00 2001 From: benj Date: Fri, 10 Apr 2026 11:13:57 +0800 Subject: ensure parsers do not parse and store raw XML fields --- scripts/common/db.py | 198 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 scripts/common/db.py (limited to 'scripts/common/db.py') diff --git a/scripts/common/db.py b/scripts/common/db.py new file mode 100644 index 0000000..bbd220c --- /dev/null +++ b/scripts/common/db.py @@ -0,0 +1,198 @@ +""" +Database access layer using psycopg2 with connection pooling. + +All DB access in the v2 pipeline goes through this module. +""" + +import csv +import io +import sys + +import psycopg2 +import psycopg2.pool +import psycopg2.extras + +DB_HOST = "localhost" +DB_PORT = 35434 +DB_USER = "tidyindex" +DB_PASSWORD = "tidyindex" +DB_NAME = "tidyindex" + +_pool = None + + +def get_pool(): + """Get or create the connection pool (lazy singleton).""" + global _pool + if _pool is None: + _pool = psycopg2.pool.SimpleConnectionPool( + minconn=1, + maxconn=4, + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASSWORD, + dbname=DB_NAME, + ) + return _pool + + +def get_conn(): + """Get a connection from the pool.""" + return get_pool().getconn() + + +def put_conn(conn): + """Return a connection to the pool.""" + get_pool().putconn(conn) + + +def execute(sql, params=None): + """Execute SQL (no result). Auto-commits.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_scalar(sql, params=None): + """Execute SQL and return a single scalar value, or None.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + conn.commit() + return row[0] if row else None + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_all(sql, params=None): + """Execute SQL and return all rows as a list of tuples.""" + conn = get_conn() + try: + with conn.cursor() as cur: + cur.execute(sql, params) + rows = cur.fetchall() + conn.commit() + return rows + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def execute_transaction(fn): + """Run fn(conn) inside a transaction. Commits on success, rolls back on error. + + fn receives a connection with autocommit off. fn should use conn.cursor() + to execute statements. Do NOT commit inside fn — this wrapper handles it. + """ + conn = get_conn() + try: + result = fn(conn) + conn.commit() + return result + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +def copy_rows(table, columns, rows): + """Bulk insert rows via COPY FROM. Returns count of inserted rows.""" + if not rows: + return 0 + + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=columns, extrasaction="ignore") + for row in rows: + writer.writerow(row) + buf.seek(0) + + conn = get_conn() + try: + with conn.cursor() as cur: + cur.copy_expert( + f"COPY {table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv)", + buf, + ) + count = cur.rowcount + conn.commit() + return count + except Exception: + conn.rollback() + raise + finally: + put_conn(conn) + + +# ============================================================ +# Legacy compatibility (for old parsers using parse_common.py) +# These shell-based functions are NOT used by v2 code. +# ============================================================ + +import subprocess + +_DB_CONTAINER = "tidyindex-postgres" + + +def psql(sql): + """Execute SQL via docker exec psql. Legacy — use execute() instead.""" + result = subprocess.run( + ["docker", "exec", "-i", _DB_CONTAINER, "psql", "-U", DB_USER, "-d", DB_NAME], + input=sql, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f"PSQL ERROR: {result.stderr}", file=sys.stderr) + sys.exit(1) + return result.stdout + + +def psql_scalar(sql): + """Legacy — use execute_scalar() instead.""" + result = subprocess.run( + [ + "docker", "exec", "-i", _DB_CONTAINER, + "psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", + ], + input=sql, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f"PSQL ERROR: {result.stderr}", file=sys.stderr) + sys.exit(1) + for line in result.stdout.strip().split("\n"): + line = line.strip() + if line and not line.startswith("INSERT") and not line.startswith("UPDATE") and not line.startswith("DELETE"): + return line + return None + + +def psql_query_values(sql): + """Legacy — use execute_all() instead.""" + result = psql(sql) + lines = result.strip().split("\n") + if len(lines) >= 3: + return [line.strip() for line in lines[2:-1]] + return [] + + +def insert_rows(table, columns, rows): + """Legacy — use copy_rows() instead.""" + return copy_rows(table, columns, rows) -- cgit v1.2.3