shinkan-jinkendo/backend/db.py
Lars e4451e1362
Some checks failed
Test Suite / playwright-tests (push) Waiting to run
Deploy Development / deploy (push) Successful in 43s
Test Suite / pytest-backend (push) Failing after 1s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Has been cancelled
Enhance Exercise Management and AI Integration
- Updated the exercise form to include a tabbed navigation structure, improving user experience with sections for Stammdaten, Anleitung, Einordnung, Varianten, and Medien & Mehr.
- Introduced the concept of **Freigabelevel** (visibility level) in the UI, replacing previous terminology for clarity and consistency across components.
- Implemented new AI endpoints for exercise suggestions and regeneration, allowing for dynamic content generation without direct database writes.
- Removed the legacy `is_primary` flag from exercise skills in the UI, ensuring that intensity levels (`niedrig`, `mittel`, `hoch`) are the primary focus for skill management.
- Enhanced the variant management process with improved saving mechanisms and UI updates to reflect changes more intuitively.
2026-05-22 07:52:31 +02:00

203 lines
5.8 KiB
Python

"""
PostgreSQL Database Connector for Mitai Jinkendo (v9b)
Provides connection pooling and helper functions for database operations.
Compatible drop-in replacement for the previous SQLite get_db() pattern.
"""
import os
from contextlib import contextmanager
from typing import Optional, Dict, Any, List
import psycopg2
from psycopg2.extras import RealDictCursor
import psycopg2.pool
# Global connection pool
_pool: Optional[psycopg2.pool.SimpleConnectionPool] = None
def init_pool():
"""Initialize PostgreSQL connection pool."""
global _pool
if _pool is None:
_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host=os.getenv("DB_HOST", "postgres"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "mitai"),
user=os.getenv("DB_USER", "mitai"),
password=os.getenv("DB_PASSWORD", "")
)
print(
f"[OK] PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})"
)
@contextmanager
def get_db():
"""
Context manager for database connections.
Usage:
with get_db() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM profiles")
rows = cur.fetchall()
Auto-commits on success, auto-rolls back on exception.
"""
if _pool is None:
init_pool()
conn = _pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
_pool.putconn(conn)
def get_cursor(conn):
"""
Get cursor with RealDictCursor for dict-like row access.
Returns rows as dictionaries: {'column_name': value, ...}
Compatible with previous sqlite3.Row behavior.
"""
return conn.cursor(cursor_factory=RealDictCursor)
def r2d(row) -> Optional[Dict[str, Any]]:
"""
Convert row to dict (compatibility helper).
Args:
row: RealDictRow from psycopg2
Returns:
Dictionary or None if row is None
"""
return dict(row) if row else None
def execute_one(conn, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
"""
Execute query and return one row as dict.
Args:
conn: Database connection from get_db()
query: SQL query with %s placeholders
params: Tuple of parameters
Returns:
Dictionary with column:value pairs, or None if no row found
Example:
profile = execute_one(conn, "SELECT * FROM profiles WHERE id=%s", (pid,))
if profile:
print(profile['name'])
"""
with get_cursor(conn) as cur:
cur.execute(query, params)
row = cur.fetchone()
return r2d(row)
def execute_all(conn, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""
Execute query and return all rows as list of dicts.
Args:
conn: Database connection from get_db()
query: SQL query with %s placeholders
params: Tuple of parameters
Returns:
List of dictionaries (one per row)
Example:
weights = execute_all(conn,
"SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date DESC",
(pid,)
)
for w in weights:
print(w['date'], w['weight'])
"""
with get_cursor(conn) as cur:
cur.execute(query, params)
rows = cur.fetchall()
return [r2d(r) for r in rows]
def execute_write(conn, query: str, params: tuple = ()) -> None:
"""
Execute INSERT/UPDATE/DELETE query.
Args:
conn: Database connection from get_db()
query: SQL query with %s placeholders
params: Tuple of parameters
Example:
execute_write(conn,
"UPDATE profiles SET name=%s WHERE id=%s",
("New Name", pid)
)
"""
with get_cursor(conn) as cur:
cur.execute(query, params)
def init_db():
"""
Initialize database with required data.
Ensures critical data exists (e.g., pipeline master prompt).
Safe to call multiple times - checks before inserting.
Called automatically on app startup.
"""
try:
with get_db() as conn:
cur = get_cursor(conn)
# Check if table exists first
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'ai_prompts'
) as table_exists
""")
if not cur.fetchone()['table_exists']:
print("[WARN] ai_prompts table doesn't exist yet - skipping pipeline prompt creation")
return
# Ensure "pipeline" master prompt exists
cur.execute("SELECT COUNT(*) as count FROM ai_prompts WHERE slug='pipeline'")
if cur.fetchone()['count'] == 0:
cur.execute("""
INSERT INTO ai_prompts (
slug, display_name, description, template,
category, output_format, active, sort_order
)
VALUES (
'pipeline',
'Mehrstufige Gesamtanalyse',
'Master-Schalter fuer die gesamte Pipeline. Deaktiviere diese Zeile um die Pipeline zu verstecken.',
'PIPELINE_MASTER',
'admin',
'text',
true,
-10
)
""")
conn.commit()
print("[OK] Pipeline master prompt created")
except Exception as e:
print(f"[WARN] Could not create pipeline prompt: {e}")
# Don't fail startup - prompt can be created manually