aboutsummaryrefslogtreecommitdiff
path: root/scripts/common/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/common/db.py')
-rw-r--r--scripts/common/db.py198
1 files changed, 198 insertions, 0 deletions
diff --git a/scripts/common/db.py b/scripts/common/db.py
new file mode 100644
index 0000000..bbd220c
--- /dev/null
+++ b/scripts/common/db.py
@@ -0,0 +1,198 @@
+"""
+Database access layer using psycopg2 with connection pooling.
+
+All DB access in the v2 pipeline goes through this module.
+"""
+
+import csv
+import io
+import sys
+
+import psycopg2
+import psycopg2.pool
+import psycopg2.extras
+
+DB_HOST = "localhost"
+DB_PORT = 35434
+DB_USER = "tidyindex"
+DB_PASSWORD = "tidyindex"
+DB_NAME = "tidyindex"
+
+_pool = None
+
+
+def get_pool():
+ """Get or create the connection pool (lazy singleton)."""
+ global _pool
+ if _pool is None:
+ _pool = psycopg2.pool.SimpleConnectionPool(
+ minconn=1,
+ maxconn=4,
+ host=DB_HOST,
+ port=DB_PORT,
+ user=DB_USER,
+ password=DB_PASSWORD,
+ dbname=DB_NAME,
+ )
+ return _pool
+
+
+def get_conn():
+ """Get a connection from the pool."""
+ return get_pool().getconn()
+
+
+def put_conn(conn):
+ """Return a connection to the pool."""
+ get_pool().putconn(conn)
+
+
+def execute(sql, params=None):
+ """Execute SQL (no result). Auto-commits."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ conn.commit()
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_scalar(sql, params=None):
+ """Execute SQL and return a single scalar value, or None."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ row = cur.fetchone()
+ conn.commit()
+ return row[0] if row else None
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_all(sql, params=None):
+ """Execute SQL and return all rows as a list of tuples."""
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.execute(sql, params)
+ rows = cur.fetchall()
+ conn.commit()
+ return rows
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def execute_transaction(fn):
+ """Run fn(conn) inside a transaction. Commits on success, rolls back on error.
+
+ fn receives a connection with autocommit off. fn should use conn.cursor()
+ to execute statements. Do NOT commit inside fn — this wrapper handles it.
+ """
+ conn = get_conn()
+ try:
+ result = fn(conn)
+ conn.commit()
+ return result
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+def copy_rows(table, columns, rows):
+ """Bulk insert rows via COPY FROM. Returns count of inserted rows."""
+ if not rows:
+ return 0
+
+ buf = io.StringIO()
+ writer = csv.DictWriter(buf, fieldnames=columns, extrasaction="ignore")
+ for row in rows:
+ writer.writerow(row)
+ buf.seek(0)
+
+ conn = get_conn()
+ try:
+ with conn.cursor() as cur:
+ cur.copy_expert(
+ f"COPY {table} ({','.join(columns)}) FROM STDIN WITH (FORMAT csv)",
+ buf,
+ )
+ count = cur.rowcount
+ conn.commit()
+ return count
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ put_conn(conn)
+
+
+# ============================================================
+# Legacy compatibility (for old parsers using parse_common.py)
+# These shell-based functions are NOT used by v2 code.
+# ============================================================
+
+import subprocess
+
+_DB_CONTAINER = "tidyindex-postgres"
+
+
+def psql(sql):
+ """Execute SQL via docker exec psql. Legacy — use execute() instead."""
+ result = subprocess.run(
+ ["docker", "exec", "-i", _DB_CONTAINER, "psql", "-U", DB_USER, "-d", DB_NAME],
+ input=sql,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ print(f"PSQL ERROR: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ return result.stdout
+
+
+def psql_scalar(sql):
+ """Legacy — use execute_scalar() instead."""
+ result = subprocess.run(
+ [
+ "docker", "exec", "-i", _DB_CONTAINER,
+ "psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A",
+ ],
+ input=sql,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ print(f"PSQL ERROR: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ for line in result.stdout.strip().split("\n"):
+ line = line.strip()
+ if line and not line.startswith("INSERT") and not line.startswith("UPDATE") and not line.startswith("DELETE"):
+ return line
+ return None
+
+
+def psql_query_values(sql):
+ """Legacy — use execute_all() instead."""
+ result = psql(sql)
+ lines = result.strip().split("\n")
+ if len(lines) >= 3:
+ return [line.strip() for line in lines[2:-1]]
+ return []
+
+
+def insert_rows(table, columns, rows):
+ """Legacy — use copy_rows() instead."""
+ return copy_rows(table, columns, rows)