""" PostgreSQL Database Connector for Mitai Jinkendo (v9b) Provides connection pooling and helper functions for database operations. Compatible drop-in replacement for the previous SQLite get_db() pattern. """ import os from contextlib import contextmanager from typing import Optional, Dict, Any, List import psycopg2 from psycopg2.extras import RealDictCursor import psycopg2.pool # Global connection pool _pool: Optional[psycopg2.pool.SimpleConnectionPool] = None def init_pool(): """Initialize PostgreSQL connection pool.""" global _pool if _pool is None: _pool = psycopg2.pool.SimpleConnectionPool( minconn=1, maxconn=10, host=os.getenv("DB_HOST", "postgres"), port=int(os.getenv("DB_PORT", "5432")), database=os.getenv("DB_NAME", "mitai"), user=os.getenv("DB_USER", "mitai"), password=os.getenv("DB_PASSWORD", "") ) print(f"✓ PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})") @contextmanager def get_db(): """ Context manager for database connections. Usage: with get_db() as conn: cur = conn.cursor() cur.execute("SELECT * FROM profiles") rows = cur.fetchall() Auto-commits on success, auto-rolls back on exception. """ if _pool is None: init_pool() conn = _pool.getconn() try: yield conn conn.commit() except Exception: conn.rollback() raise finally: _pool.putconn(conn) def get_cursor(conn): """ Get cursor with RealDictCursor for dict-like row access. Returns rows as dictionaries: {'column_name': value, ...} Compatible with previous sqlite3.Row behavior. """ return conn.cursor(cursor_factory=RealDictCursor) def r2d(row) -> Optional[Dict[str, Any]]: """ Convert row to dict (compatibility helper). Args: row: RealDictRow from psycopg2 Returns: Dictionary or None if row is None """ return dict(row) if row else None def execute_one(conn, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]: """ Execute query and return one row as dict. Args: conn: Database connection from get_db() query: SQL query with %s placeholders params: Tuple of parameters Returns: Dictionary with column:value pairs, or None if no row found Example: profile = execute_one(conn, "SELECT * FROM profiles WHERE id=%s", (pid,)) if profile: print(profile['name']) """ with get_cursor(conn) as cur: cur.execute(query, params) row = cur.fetchone() return r2d(row) def execute_all(conn, query: str, params: tuple = ()) -> List[Dict[str, Any]]: """ Execute query and return all rows as list of dicts. Args: conn: Database connection from get_db() query: SQL query with %s placeholders params: Tuple of parameters Returns: List of dictionaries (one per row) Example: weights = execute_all(conn, "SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date DESC", (pid,) ) for w in weights: print(w['date'], w['weight']) """ with get_cursor(conn) as cur: cur.execute(query, params) rows = cur.fetchall() return [r2d(r) for r in rows] def execute_write(conn, query: str, params: tuple = ()) -> None: """ Execute INSERT/UPDATE/DELETE query. Args: conn: Database connection from get_db() query: SQL query with %s placeholders params: Tuple of parameters Example: execute_write(conn, "UPDATE profiles SET name=%s WHERE id=%s", ("New Name", pid) ) """ with get_cursor(conn) as cur: cur.execute(query, params) def init_db(): """ Initialize database with required data. Ensures critical data exists (e.g., pipeline master prompt). Safe to call multiple times - checks before inserting. Called automatically on app startup. """ try: with get_db() as conn: cur = get_cursor(conn) # Check if table exists first cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'ai_prompts' ) as table_exists """) if not cur.fetchone()['table_exists']: print("⚠️ ai_prompts table doesn't exist yet - skipping pipeline prompt creation") return # Ensure "pipeline" master prompt exists cur.execute("SELECT COUNT(*) as count FROM ai_prompts WHERE slug='pipeline'") if cur.fetchone()['count'] == 0: cur.execute(""" INSERT INTO ai_prompts (slug, name, description, template, active, sort_order) VALUES ( 'pipeline', 'Mehrstufige Gesamtanalyse', 'Master-Schalter für die gesamte Pipeline. Deaktiviere diese Analyse, um die Pipeline komplett zu verstecken.', 'PIPELINE_MASTER', true, -10 ) """) conn.commit() print("✓ Pipeline master prompt created") except Exception as e: print(f"⚠️ Could not create pipeline prompt: {e}") # Don't fail startup - prompt can be created manually