All checks were successful
Deploy Development / deploy (push) Successful in 36s
Test Suite / pytest-backend (push) Successful in 29s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 30s
Test Suite / pytest-backend (pull_request) Successful in 23s
Test Suite / lint-backend (pull_request) Successful in 0s
Test Suite / build-frontend (pull_request) Successful in 6s
Test Suite / playwright-tests (pull_request) Successful in 23s
- Updated migration scripts to ensure idempotency and safe execution of SQL statements, preventing errors on repeated runs. - Improved documentation in migration files to clarify the execution order and idempotent practices for database migrations. - Added a comment in main.py to explain the migration process and its conditional execution based on the SKIP_DB_MIGRATE environment variable.
290 lines
8.7 KiB
Python
290 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Shinkan Jinkendo — Datenbank-Migrationen
|
||
|
||
**Deployment:** Beim Start importiert `main.py` dieses Modul und ruft `main()` auf, bevor FastAPI
|
||
die App lädt (`SKIP_DB_MIGRATE=1` nur für Tests oder ohne DB). Jeder Backend-Container-Start
|
||
wendet ausstehende Migrationen an — kein separater Deploy-Schritt nötig.
|
||
|
||
**Idempotent (Runner):** Über `schema_migrations` wird jede Datei `NNN_*.sql` höchstens einmal
|
||
als erfolgreich markiert; wiederholte Starts führen nur noch fehlende Dateien aus.
|
||
|
||
**Idempotent (SQL, empfohlen):** `IF NOT EXISTS`, `ADD COLUMN IF NOT EXISTS`, `CREATE INDEX IF NOT EXISTS`,
|
||
damit ein erneuter Lauf derselben Datei harmlos bleibt. Ohne diese Guards schützt nur der
|
||
`schema_migrations`-Eintrag; ältere Migrationen ohne IF NOT EXISTS können bei manuell gelöschtem
|
||
Eintrag und bestehenden Tabellen fehlschlagen.
|
||
|
||
**Reihenfolge:** Alle `NNN_*.sql` nach führender Zahl (001 vor 009 vor 010 …), bei gleicher Zahl
|
||
alphabetisch nach Dateinamen — nicht bloß String-Sortierung (vermeidet z. B. `10_` vor `9_`).
|
||
|
||
**Pro Datei eine Transaktion:** Entweder `psql -1 -f` (wie bei psql/pg_dump üblich) oder Fallback
|
||
über `sqlparse.split` und einzelne `cursor.execute` in einer DB-Transaktion.
|
||
|
||
Nach erfolgreicher Ausführung: `INSERT INTO schema_migrations (migration)` (mit `ON CONFLICT DO NOTHING`).
|
||
"""
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from typing import List, Tuple
|
||
|
||
import psycopg2
|
||
import sqlparse
|
||
|
||
|
||
def _db_params():
|
||
return {
|
||
"host": os.getenv("DB_HOST", "localhost"),
|
||
"port": os.getenv("DB_PORT", "5432"),
|
||
"dbname": os.getenv("DB_NAME", "shinkan_dev"),
|
||
"user": os.getenv("DB_USER", "shinkan_dev"),
|
||
"password": os.getenv("DB_PASSWORD", "dev_password"),
|
||
}
|
||
|
||
|
||
def get_db_connection():
|
||
"""PostgreSQL-Verbindung mit Retry (Container-Start)."""
|
||
p = _db_params()
|
||
max_retries = 30
|
||
for i in range(max_retries):
|
||
try:
|
||
conn = psycopg2.connect(
|
||
host=p["host"],
|
||
port=p["port"],
|
||
database=p["dbname"],
|
||
user=p["user"],
|
||
password=p["password"],
|
||
)
|
||
conn.autocommit = False
|
||
print(f"[OK] Connected to database: {p['dbname']}")
|
||
return conn
|
||
except psycopg2.OperationalError:
|
||
if i < max_retries - 1:
|
||
print(f"Waiting for database... ({i+1}/{max_retries})")
|
||
time.sleep(2)
|
||
else:
|
||
print(f"[FAIL] Failed to connect to database after {max_retries} attempts")
|
||
raise
|
||
|
||
|
||
def init_migrations_table(conn):
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||
id SERIAL PRIMARY KEY,
|
||
migration VARCHAR(255) UNIQUE NOT NULL,
|
||
executed_at TIMESTAMP DEFAULT NOW()
|
||
)
|
||
"""
|
||
)
|
||
conn.commit()
|
||
print("[OK] schema_migrations initialisiert")
|
||
|
||
|
||
_LEADING_DIGITS = re.compile(r"^(\d+)")
|
||
|
||
|
||
def _migration_sort_key_from_stem(stem: str) -> Tuple[int, str]:
|
||
"""Sortierung: numerisches Präfix am Anfang des Stems, dann voller Stem (stabil)."""
|
||
m = _LEADING_DIGITS.match(stem)
|
||
n = int(m.group(1)) if m else 0
|
||
return (n, stem)
|
||
|
||
|
||
def get_migration_files_ordered(migrations_dir: str) -> List[Tuple[str, str]]:
|
||
rows = []
|
||
for filename in os.listdir(migrations_dir):
|
||
if not filename.endswith(".sql"):
|
||
continue
|
||
if not filename[0].isdigit():
|
||
continue
|
||
stem = filename[:-4]
|
||
rows.append((stem, os.path.join(migrations_dir, filename)))
|
||
rows.sort(key=lambda item: _migration_sort_key_from_stem(item[0]))
|
||
return rows
|
||
|
||
|
||
def get_executed(conn) -> set:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT migration FROM schema_migrations")
|
||
return set(r[0] for r in cur.fetchall())
|
||
|
||
|
||
def get_pending(conn, migrations_dir: str):
|
||
executed = get_executed(conn)
|
||
pending = []
|
||
for name, path in get_migration_files_ordered(migrations_dir):
|
||
if name not in executed:
|
||
pending.append((name, path))
|
||
return pending
|
||
|
||
|
||
def _split_statements(sql_text: str) -> List[str]:
|
||
"""Fallback ohne psql — einzelne Statements (sqlparse.split)."""
|
||
stripped = sql_text.strip()
|
||
if not stripped:
|
||
return []
|
||
parts = sqlparse.split(stripped)
|
||
return [p.strip() for p in parts if p and p.strip()]
|
||
|
||
|
||
def _run_file_with_psql(filepath: str) -> Tuple[bool, str]:
|
||
"""
|
||
Ganze Datei wie von psql dokumentiert (-1 eine Transaktion, ON_ERROR_STOP).
|
||
Unter Windows oft nicht verfügbar → zurück False ohne Fehlertext wenn kein Binary.
|
||
"""
|
||
psql = shutil.which("psql")
|
||
if not psql:
|
||
return False, ""
|
||
|
||
p = _db_params()
|
||
env = os.environ.copy()
|
||
env["PGPASSWORD"] = str(p["password"])
|
||
|
||
cmd = [
|
||
psql,
|
||
"-h",
|
||
p["host"],
|
||
"-p",
|
||
str(p["port"]),
|
||
"-U",
|
||
p["user"],
|
||
"-d",
|
||
p["dbname"],
|
||
"-v",
|
||
"ON_ERROR_STOP=1",
|
||
"-1",
|
||
"-f",
|
||
filepath,
|
||
]
|
||
|
||
proc = subprocess.run(
|
||
cmd,
|
||
env=env,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=7200,
|
||
)
|
||
if proc.returncode != 0:
|
||
tail = (
|
||
(proc.stderr or "").strip()
|
||
+ "\n"
|
||
+ (proc.stdout or "").strip()
|
||
).strip()
|
||
return False, tail[:8000] or f"exit {proc.returncode}"
|
||
out = (proc.stdout or "").strip()
|
||
return True, out
|
||
|
||
|
||
def _record_migration(conn, migration_name: str) -> None:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO schema_migrations (migration)
|
||
VALUES (%s)
|
||
ON CONFLICT (migration) DO NOTHING
|
||
""",
|
||
(migration_name,),
|
||
)
|
||
|
||
|
||
def run_migration(conn, migration_name: str, filepath: str) -> bool:
|
||
print(f"Running migration: {migration_name}")
|
||
|
||
detail_suffix = ""
|
||
try:
|
||
if shutil.which("psql"):
|
||
ok, diag = _run_file_with_psql(filepath)
|
||
if not ok:
|
||
print(f" [FAIL] psql fehlgeschlagen:\n{diag or '(kein Output)'}")
|
||
conn.rollback()
|
||
return False
|
||
detail_suffix = "(psql -1)"
|
||
else:
|
||
try:
|
||
with open(filepath, "r", encoding="utf-8") as fh:
|
||
body = fh.read()
|
||
except OSError as e:
|
||
print(f" [FAIL] kann Datei nicht lesen: {e}")
|
||
conn.rollback()
|
||
return False
|
||
|
||
statements = _split_statements(body)
|
||
with conn.cursor() as cur:
|
||
if not statements:
|
||
print(
|
||
f" [WARN] keine ausführbaren Statements (leer?) — "
|
||
f"Eintrag trotzdem: {migration_name}"
|
||
)
|
||
else:
|
||
for stmt in statements:
|
||
cur.execute(stmt)
|
||
detail_suffix = f"(psycopg2 + sqlparse, {len(statements)} Statements)"
|
||
|
||
_record_migration(conn, migration_name)
|
||
conn.commit()
|
||
print(f" [OK] {migration_name} erfolgreich {detail_suffix}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f" [FAIL] {migration_name}: {e}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Shinkan Jinkendo — Database Migrations")
|
||
print("(Warteschlange: nur fehlende *.sql — idempotent)")
|
||
print("=" * 60)
|
||
|
||
migrations_dir = "/app/migrations"
|
||
if not os.path.isdir(migrations_dir):
|
||
migrations_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations")
|
||
|
||
try:
|
||
conn = get_db_connection()
|
||
init_migrations_table(conn)
|
||
|
||
pending = get_pending(conn, migrations_dir)
|
||
|
||
if not pending:
|
||
print("[OK] Keine ausstehenden Migrationen — Schema aktuell.")
|
||
conn.close()
|
||
return 0
|
||
|
||
print(f"\n{len(pending)} ausstehende Migration(en):")
|
||
for n, _ in pending:
|
||
print(f" - {n}")
|
||
print()
|
||
|
||
failed = None
|
||
for migration_name, filepath in pending:
|
||
if not run_migration(conn, migration_name, filepath):
|
||
failed = migration_name
|
||
break
|
||
|
||
conn.close()
|
||
|
||
print("\n" + "=" * 60)
|
||
if failed:
|
||
print(f"[FAIL] Abbruch nach: {failed}")
|
||
print(" (Bereits erfolgreiche Dateien dieser Session sind committed.)")
|
||
print("=" * 60)
|
||
return 1
|
||
|
||
print(f"[OK] {len(pending)} Migration(s) angewendet — Schema aktuell.")
|
||
print("=" * 60)
|
||
return 0
|
||
|
||
except Exception as e:
|
||
print(f"\n[FAIL] Fehler: {e}")
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|