shinkan-jinkendo/backend/run_migrations.py
Lars 4f64cb105b
All checks were successful
Deploy Development / deploy (push) Successful in 36s
Test Suite / pytest-backend (push) Successful in 29s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 30s
Test Suite / pytest-backend (pull_request) Successful in 23s
Test Suite / lint-backend (pull_request) Successful in 0s
Test Suite / build-frontend (pull_request) Successful in 6s
Test Suite / playwright-tests (pull_request) Successful in 23s
feat: enhance database migration handling and media asset tags
- Updated migration scripts to ensure idempotency and safe execution of SQL statements, preventing errors on repeated runs.
- Improved documentation in migration files to clarify the execution order and idempotent practices for database migrations.
- Added a comment in main.py to explain the migration process and its conditional execution based on the SKIP_DB_MIGRATE environment variable.
2026-05-07 16:28:25 +02:00

290 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Shinkan Jinkendo — Datenbank-Migrationen
**Deployment:** Beim Start importiert `main.py` dieses Modul und ruft `main()` auf, bevor FastAPI
die App lädt (`SKIP_DB_MIGRATE=1` nur für Tests oder ohne DB). Jeder Backend-Container-Start
wendet ausstehende Migrationen an — kein separater Deploy-Schritt nötig.
**Idempotent (Runner):** Über `schema_migrations` wird jede Datei `NNN_*.sql` höchstens einmal
als erfolgreich markiert; wiederholte Starts führen nur noch fehlende Dateien aus.
**Idempotent (SQL, empfohlen):** `IF NOT EXISTS`, `ADD COLUMN IF NOT EXISTS`, `CREATE INDEX IF NOT EXISTS`,
damit ein erneuter Lauf derselben Datei harmlos bleibt. Ohne diese Guards schützt nur der
`schema_migrations`-Eintrag; ältere Migrationen ohne IF NOT EXISTS können bei manuell gelöschtem
Eintrag und bestehenden Tabellen fehlschlagen.
**Reihenfolge:** Alle `NNN_*.sql` nach führender Zahl (001 vor 009 vor 010 …), bei gleicher Zahl
alphabetisch nach Dateinamen — nicht bloß String-Sortierung (vermeidet z.B. `10_` vor `9_`).
**Pro Datei eine Transaktion:** Entweder `psql -1 -f` (wie bei psql/pg_dump üblich) oder Fallback
über `sqlparse.split` und einzelne `cursor.execute` in einer DB-Transaktion.
Nach erfolgreicher Ausführung: `INSERT INTO schema_migrations (migration)` (mit `ON CONFLICT DO NOTHING`).
"""
import os
import re
import shutil
import subprocess
import sys
import time
from typing import List, Tuple
import psycopg2
import sqlparse
def _db_params():
return {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", "5432"),
"dbname": os.getenv("DB_NAME", "shinkan_dev"),
"user": os.getenv("DB_USER", "shinkan_dev"),
"password": os.getenv("DB_PASSWORD", "dev_password"),
}
def get_db_connection():
"""PostgreSQL-Verbindung mit Retry (Container-Start)."""
p = _db_params()
max_retries = 30
for i in range(max_retries):
try:
conn = psycopg2.connect(
host=p["host"],
port=p["port"],
database=p["dbname"],
user=p["user"],
password=p["password"],
)
conn.autocommit = False
print(f"[OK] Connected to database: {p['dbname']}")
return conn
except psycopg2.OperationalError:
if i < max_retries - 1:
print(f"Waiting for database... ({i+1}/{max_retries})")
time.sleep(2)
else:
print(f"[FAIL] Failed to connect to database after {max_retries} attempts")
raise
def init_migrations_table(conn):
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
id SERIAL PRIMARY KEY,
migration VARCHAR(255) UNIQUE NOT NULL,
executed_at TIMESTAMP DEFAULT NOW()
)
"""
)
conn.commit()
print("[OK] schema_migrations initialisiert")
_LEADING_DIGITS = re.compile(r"^(\d+)")
def _migration_sort_key_from_stem(stem: str) -> Tuple[int, str]:
"""Sortierung: numerisches Präfix am Anfang des Stems, dann voller Stem (stabil)."""
m = _LEADING_DIGITS.match(stem)
n = int(m.group(1)) if m else 0
return (n, stem)
def get_migration_files_ordered(migrations_dir: str) -> List[Tuple[str, str]]:
rows = []
for filename in os.listdir(migrations_dir):
if not filename.endswith(".sql"):
continue
if not filename[0].isdigit():
continue
stem = filename[:-4]
rows.append((stem, os.path.join(migrations_dir, filename)))
rows.sort(key=lambda item: _migration_sort_key_from_stem(item[0]))
return rows
def get_executed(conn) -> set:
with conn.cursor() as cur:
cur.execute("SELECT migration FROM schema_migrations")
return set(r[0] for r in cur.fetchall())
def get_pending(conn, migrations_dir: str):
executed = get_executed(conn)
pending = []
for name, path in get_migration_files_ordered(migrations_dir):
if name not in executed:
pending.append((name, path))
return pending
def _split_statements(sql_text: str) -> List[str]:
"""Fallback ohne psql — einzelne Statements (sqlparse.split)."""
stripped = sql_text.strip()
if not stripped:
return []
parts = sqlparse.split(stripped)
return [p.strip() for p in parts if p and p.strip()]
def _run_file_with_psql(filepath: str) -> Tuple[bool, str]:
"""
Ganze Datei wie von psql dokumentiert (-1 eine Transaktion, ON_ERROR_STOP).
Unter Windows oft nicht verfügbar → zurück False ohne Fehlertext wenn kein Binary.
"""
psql = shutil.which("psql")
if not psql:
return False, ""
p = _db_params()
env = os.environ.copy()
env["PGPASSWORD"] = str(p["password"])
cmd = [
psql,
"-h",
p["host"],
"-p",
str(p["port"]),
"-U",
p["user"],
"-d",
p["dbname"],
"-v",
"ON_ERROR_STOP=1",
"-1",
"-f",
filepath,
]
proc = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True,
timeout=7200,
)
if proc.returncode != 0:
tail = (
(proc.stderr or "").strip()
+ "\n"
+ (proc.stdout or "").strip()
).strip()
return False, tail[:8000] or f"exit {proc.returncode}"
out = (proc.stdout or "").strip()
return True, out
def _record_migration(conn, migration_name: str) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO schema_migrations (migration)
VALUES (%s)
ON CONFLICT (migration) DO NOTHING
""",
(migration_name,),
)
def run_migration(conn, migration_name: str, filepath: str) -> bool:
print(f"Running migration: {migration_name}")
detail_suffix = ""
try:
if shutil.which("psql"):
ok, diag = _run_file_with_psql(filepath)
if not ok:
print(f" [FAIL] psql fehlgeschlagen:\n{diag or '(kein Output)'}")
conn.rollback()
return False
detail_suffix = "(psql -1)"
else:
try:
with open(filepath, "r", encoding="utf-8") as fh:
body = fh.read()
except OSError as e:
print(f" [FAIL] kann Datei nicht lesen: {e}")
conn.rollback()
return False
statements = _split_statements(body)
with conn.cursor() as cur:
if not statements:
print(
f" [WARN] keine ausführbaren Statements (leer?) — "
f"Eintrag trotzdem: {migration_name}"
)
else:
for stmt in statements:
cur.execute(stmt)
detail_suffix = f"(psycopg2 + sqlparse, {len(statements)} Statements)"
_record_migration(conn, migration_name)
conn.commit()
print(f" [OK] {migration_name} erfolgreich {detail_suffix}")
return True
except Exception as e:
conn.rollback()
print(f" [FAIL] {migration_name}: {e}")
return False
def main():
print("=" * 60)
print("Shinkan Jinkendo — Database Migrations")
print("(Warteschlange: nur fehlende *.sql — idempotent)")
print("=" * 60)
migrations_dir = "/app/migrations"
if not os.path.isdir(migrations_dir):
migrations_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations")
try:
conn = get_db_connection()
init_migrations_table(conn)
pending = get_pending(conn, migrations_dir)
if not pending:
print("[OK] Keine ausstehenden Migrationen — Schema aktuell.")
conn.close()
return 0
print(f"\n{len(pending)} ausstehende Migration(en):")
for n, _ in pending:
print(f" - {n}")
print()
failed = None
for migration_name, filepath in pending:
if not run_migration(conn, migration_name, filepath):
failed = migration_name
break
conn.close()
print("\n" + "=" * 60)
if failed:
print(f"[FAIL] Abbruch nach: {failed}")
print(" (Bereits erfolgreiche Dateien dieser Session sind committed.)")
print("=" * 60)
return 1
print(f"[OK] {len(pending)} Migration(s) angewendet — Schema aktuell.")
print("=" * 60)
return 0
except Exception as e:
print(f"\n[FAIL] Fehler: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())