#!/usr/bin/env python3 """ Shinkan Jinkendo — Datenbank-Migrationen **Idempotent** über `schema_migrations`: jede numerische Datei `migrations/*.sql` höchstens einmal als „erfolgreich“ eingetragen; bei erneutem Start werden nur noch fehlende Dateien abgearbeitet. **Reihenfolge:** Alle `NNN_*.sql` nach führender Zahl (001 vor 009 vor 010 …), bei gleicher Zahl alphabetisch nach Dateinamen — nicht bloß String-Sortierung (vermeidet z. B. `10_` vor `9_`). **Pro Datei eine Transaktion:** Entweder `psql -1 -f` (wie bei psql/pg_dump üblich) oder Fallback über `sqlparse.split` und einzelne `cursor.execute` in einer DB-Transaktion. Nach erfolgreicher Ausführung: `INSERT INTO schema_migrations (migration)` (mit `ON CONFLICT DO NOTHING`). """ import os import re import shutil import subprocess import sys import time from typing import List, Tuple import psycopg2 import sqlparse def _db_params(): return { "host": os.getenv("DB_HOST", "localhost"), "port": os.getenv("DB_PORT", "5432"), "dbname": os.getenv("DB_NAME", "shinkan_dev"), "user": os.getenv("DB_USER", "shinkan_dev"), "password": os.getenv("DB_PASSWORD", "dev_password"), } def get_db_connection(): """PostgreSQL-Verbindung mit Retry (Container-Start).""" p = _db_params() max_retries = 30 for i in range(max_retries): try: conn = psycopg2.connect( host=p["host"], port=p["port"], database=p["dbname"], user=p["user"], password=p["password"], ) conn.autocommit = False print(f"[OK] Connected to database: {p['dbname']}") return conn except psycopg2.OperationalError: if i < max_retries - 1: print(f"Waiting for database... ({i+1}/{max_retries})") time.sleep(2) else: print(f"[FAIL] Failed to connect to database after {max_retries} attempts") raise def init_migrations_table(conn): with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS schema_migrations ( id SERIAL PRIMARY KEY, migration VARCHAR(255) UNIQUE NOT NULL, executed_at TIMESTAMP DEFAULT NOW() ) """ ) conn.commit() print("[OK] schema_migrations initialisiert") _LEADING_DIGITS = re.compile(r"^(\d+)") def _migration_sort_key_from_stem(stem: str) -> Tuple[int, str]: """Sortierung: numerisches Präfix am Anfang des Stems, dann voller Stem (stabil).""" m = _LEADING_DIGITS.match(stem) n = int(m.group(1)) if m else 0 return (n, stem) def get_migration_files_ordered(migrations_dir: str) -> List[Tuple[str, str]]: rows = [] for filename in os.listdir(migrations_dir): if not filename.endswith(".sql"): continue if not filename[0].isdigit(): continue stem = filename[:-4] rows.append((stem, os.path.join(migrations_dir, filename))) rows.sort(key=lambda item: _migration_sort_key_from_stem(item[0])) return rows def get_executed(conn) -> set: with conn.cursor() as cur: cur.execute("SELECT migration FROM schema_migrations") return set(r[0] for r in cur.fetchall()) def get_pending(conn, migrations_dir: str): executed = get_executed(conn) pending = [] for name, path in get_migration_files_ordered(migrations_dir): if name not in executed: pending.append((name, path)) return pending def _split_statements(sql_text: str) -> List[str]: """Fallback ohne psql — einzelne Statements (sqlparse.split).""" stripped = sql_text.strip() if not stripped: return [] parts = sqlparse.split(stripped) return [p.strip() for p in parts if p and p.strip()] def _run_file_with_psql(filepath: str) -> Tuple[bool, str]: """ Ganze Datei wie von psql dokumentiert (-1 eine Transaktion, ON_ERROR_STOP). Unter Windows oft nicht verfügbar → zurück False ohne Fehlertext wenn kein Binary. """ psql = shutil.which("psql") if not psql: return False, "" p = _db_params() env = os.environ.copy() env["PGPASSWORD"] = str(p["password"]) cmd = [ psql, "-h", p["host"], "-p", str(p["port"]), "-U", p["user"], "-d", p["dbname"], "-v", "ON_ERROR_STOP=1", "-1", "-f", filepath, ] proc = subprocess.run( cmd, env=env, capture_output=True, text=True, timeout=7200, ) if proc.returncode != 0: tail = ( (proc.stderr or "").strip() + "\n" + (proc.stdout or "").strip() ).strip() return False, tail[:8000] or f"exit {proc.returncode}" out = (proc.stdout or "").strip() return True, out def _record_migration(conn, migration_name: str) -> None: with conn.cursor() as cur: cur.execute( """ INSERT INTO schema_migrations (migration) VALUES (%s) ON CONFLICT (migration) DO NOTHING """, (migration_name,), ) def run_migration(conn, migration_name: str, filepath: str) -> bool: print(f"Running migration: {migration_name}") detail_suffix = "" try: if shutil.which("psql"): ok, diag = _run_file_with_psql(filepath) if not ok: print(f" [FAIL] psql fehlgeschlagen:\n{diag or '(kein Output)'}") conn.rollback() return False detail_suffix = "(psql -1)" else: try: with open(filepath, "r", encoding="utf-8") as fh: body = fh.read() except OSError as e: print(f" [FAIL] kann Datei nicht lesen: {e}") conn.rollback() return False statements = _split_statements(body) with conn.cursor() as cur: if not statements: print( f" [WARN] keine ausführbaren Statements (leer?) — " f"Eintrag trotzdem: {migration_name}" ) else: for stmt in statements: cur.execute(stmt) detail_suffix = f"(psycopg2 + sqlparse, {len(statements)} Statements)" _record_migration(conn, migration_name) conn.commit() print(f" [OK] {migration_name} erfolgreich {detail_suffix}") return True except Exception as e: conn.rollback() print(f" [FAIL] {migration_name}: {e}") return False def main(): print("=" * 60) print("Shinkan Jinkendo — Database Migrations") print("(Warteschlange: nur fehlende *.sql — idempotent)") print("=" * 60) migrations_dir = "/app/migrations" if not os.path.isdir(migrations_dir): migrations_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations") try: conn = get_db_connection() init_migrations_table(conn) pending = get_pending(conn, migrations_dir) if not pending: print("[OK] Keine ausstehenden Migrationen — Schema aktuell.") conn.close() return 0 print(f"\n{len(pending)} ausstehende Migration(en):") for n, _ in pending: print(f" - {n}") print() failed = None for migration_name, filepath in pending: if not run_migration(conn, migration_name, filepath): failed = migration_name break conn.close() print("\n" + "=" * 60) if failed: print(f"[FAIL] Abbruch nach: {failed}") print(" (Bereits erfolgreiche Dateien dieser Session sind committed.)") print("=" * 60) return 1 print(f"[OK] {len(pending)} Migration(s) angewendet — Schema aktuell.") print("=" * 60) return 0 except Exception as e: print(f"\n[FAIL] Fehler: {e}") return 1 if __name__ == "__main__": sys.exit(main())