mitai-jinkendo/backend/db.py
Lars 548d733048
All checks were successful
Deploy Development / deploy (push) Successful in 56s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
refactor: move init_db() to db.py
Phase 1.1 - Database-Logik konsolidieren

ÄNDERUNGEN:
- init_db() von main.py nach db.py verschoben
- main.py importiert init_db von db
- startup_event() ruft db.init_db() auf
- Keine funktionalen Änderungen

DATEIEN:
- backend/db.py: +60 Zeilen (init_db Funktion)
- backend/main.py: -48 Zeilen (init_db entfernt, import hinzugefügt)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 09:49:46 +01:00

196 lines
5.6 KiB
Python

"""
PostgreSQL Database Connector for Mitai Jinkendo (v9b)
Provides connection pooling and helper functions for database operations.
Compatible drop-in replacement for the previous SQLite get_db() pattern.
"""
import os
from contextlib import contextmanager
from typing import Optional, Dict, Any, List
import psycopg2
from psycopg2.extras import RealDictCursor
import psycopg2.pool
# Global connection pool
_pool: Optional[psycopg2.pool.SimpleConnectionPool] = None
def init_pool():
"""Initialize PostgreSQL connection pool."""
global _pool
if _pool is None:
_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host=os.getenv("DB_HOST", "postgres"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "mitai"),
user=os.getenv("DB_USER", "mitai"),
password=os.getenv("DB_PASSWORD", "")
)
print(f"✓ PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})")
@contextmanager
def get_db():
"""
Context manager for database connections.
Usage:
with get_db() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM profiles")
rows = cur.fetchall()
Auto-commits on success, auto-rolls back on exception.
"""
if _pool is None:
init_pool()
conn = _pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
def get_cursor(conn):
"""
Get cursor with RealDictCursor for dict-like row access.
Returns rows as dictionaries: {'column_name': value, ...}
Compatible with previous sqlite3.Row behavior.
"""
return conn.cursor(cursor_factory=RealDictCursor)
def r2d(row) -> Optional[Dict[str, Any]]:
"""
Convert row to dict (compatibility helper).
Args:
row: RealDictRow from psycopg2
Returns:
Dictionary or None if row is None
"""
return dict(row) if row else None
def execute_one(conn, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
"""
Execute query and return one row as dict.
Args:
conn: Database connection from get_db()
query: SQL query with %s placeholders
params: Tuple of parameters
Returns:
Dictionary with column:value pairs, or None if no row found
Example:
profile = execute_one(conn, "SELECT * FROM profiles WHERE id=%s", (pid,))
if profile:
print(profile['name'])
"""
with get_cursor(conn) as cur:
cur.execute(query, params)
row = cur.fetchone()
return r2d(row)
def execute_all(conn, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""
Execute query and return all rows as list of dicts.
Args:
conn: Database connection from get_db()
query: SQL query with %s placeholders
params: Tuple of parameters
Returns:
List of dictionaries (one per row)
Example:
weights = execute_all(conn,
"SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date DESC",
(pid,)
)
for w in weights:
print(w['date'], w['weight'])
"""
with get_cursor(conn) as cur:
cur.execute(query, params)
rows = cur.fetchall()
return [r2d(r) for r in rows]
def execute_write(conn, query: str, params: tuple = ()) -> None:
"""
Execute INSERT/UPDATE/DELETE query.
Args:
conn: Database connection from get_db()
query: SQL query with %s placeholders
params: Tuple of parameters
Example:
execute_write(conn,
"UPDATE profiles SET name=%s WHERE id=%s",
("New Name", pid)
)
"""
with get_cursor(conn) as cur:
cur.execute(query, params)
def init_db():
"""
Initialize database with required data.
Ensures critical data exists (e.g., pipeline master prompt).
Safe to call multiple times - checks before inserting.
Called automatically on app startup.
"""
try:
with get_db() as conn:
cur = get_cursor(conn)
# Check if table exists first
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'ai_prompts'
) as table_exists
""")
if not cur.fetchone()['table_exists']:
print("⚠️ ai_prompts table doesn't exist yet - skipping pipeline prompt creation")
return
# Ensure "pipeline" master prompt exists
cur.execute("SELECT COUNT(*) as count FROM ai_prompts WHERE slug='pipeline'")
if cur.fetchone()['count'] == 0:
cur.execute("""
INSERT INTO ai_prompts (slug, name, description, template, active, sort_order)
VALUES (
'pipeline',
'Mehrstufige Gesamtanalyse',
'Master-Schalter für die gesamte Pipeline. Deaktiviere diese Analyse, um die Pipeline komplett zu verstecken.',
'PIPELINE_MASTER',
true,
-10
)
""")
conn.commit()
print("✓ Pipeline master prompt created")
except Exception as e:
print(f"⚠️ Could not create pipeline prompt: {e}")
# Don't fail startup - prompt can be created manually