shinkan-jinkendo/backend/run_migrations.py
Lars 1f2c8ea0f1
Some checks failed
Deploy Development / deploy (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Failing after 1m55s
feat: enhance database migration handling and health check endpoint
- Updated migration logic in main.py to allow skipping migrations during local development with SKIP_DB_MIGRATE environment variable.
- Improved error handling for migration failures, ensuring the application does not start if migrations are incomplete.
- Added a new health check endpoint (/api/health/ready) to verify database connection and essential tables, aiding in production debugging.
- Enhanced run_migrations.py to support ordered execution of migration files and improved transaction handling.
- Updated requirements.txt to include sqlparse for SQL statement parsing when psql is unavailable.
2026-04-29 12:29:39 +02:00

281 lines
8.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Shinkan Jinkendo — Datenbank-Migrationen
**Idempotent** über `schema_migrations`: jede numerische Datei `migrations/*.sql` höchstens einmal
als „erfolgreich“ eingetragen; bei erneutem Start werden nur noch fehlende Dateien abgearbeitet.
**Reihenfolge:** Alle `NNN_*.sql` nach führender Zahl (001 vor 009 vor 010 …), bei gleicher Zahl
alphabetisch nach Dateinamen — nicht bloß String-Sortierung (vermeidet z.B. `10_` vor `9_`).
**Pro Datei eine Transaktion:** Entweder `psql -1 -f` (wie bei psql/pg_dump üblich) oder Fallback
über `sqlparse.split` und einzelne `cursor.execute` in einer DB-Transaktion.
Nach erfolgreicher Ausführung: `INSERT INTO schema_migrations (migration)` (mit `ON CONFLICT DO NOTHING`).
"""
import os
import re
import shutil
import subprocess
import sys
import time
from typing import List, Tuple
import psycopg2
import sqlparse
def _db_params():
return {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", "5432"),
"dbname": os.getenv("DB_NAME", "shinkan_dev"),
"user": os.getenv("DB_USER", "shinkan_dev"),
"password": os.getenv("DB_PASSWORD", "dev_password"),
}
def get_db_connection():
"""PostgreSQL-Verbindung mit Retry (Container-Start)."""
p = _db_params()
max_retries = 30
for i in range(max_retries):
try:
conn = psycopg2.connect(
host=p["host"],
port=p["port"],
database=p["dbname"],
user=p["user"],
password=p["password"],
)
conn.autocommit = False
print(f"✓ Connected to database: {p['dbname']}")
return conn
except psycopg2.OperationalError:
if i < max_retries - 1:
print(f"Waiting for database... ({i+1}/{max_retries})")
time.sleep(2)
else:
print(f"✗ Failed to connect to database after {max_retries} attempts")
raise
def init_migrations_table(conn):
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
id SERIAL PRIMARY KEY,
migration VARCHAR(255) UNIQUE NOT NULL,
executed_at TIMESTAMP DEFAULT NOW()
)
"""
)
conn.commit()
print("✓ schema_migrations initialisiert")
_LEADING_DIGITS = re.compile(r"^(\d+)")
def _migration_sort_key_from_stem(stem: str) -> Tuple[int, str]:
"""Sortierung: numerisches Präfix am Anfang des Stems, dann voller Stem (stabil)."""
m = _LEADING_DIGITS.match(stem)
n = int(m.group(1)) if m else 0
return (n, stem)
def get_migration_files_ordered(migrations_dir: str) -> List[Tuple[str, str]]:
rows = []
for filename in os.listdir(migrations_dir):
if not filename.endswith(".sql"):
continue
if not filename[0].isdigit():
continue
stem = filename[:-4]
rows.append((stem, os.path.join(migrations_dir, filename)))
rows.sort(key=lambda item: _migration_sort_key_from_stem(item[0]))
return rows
def get_executed(conn) -> set:
with conn.cursor() as cur:
cur.execute("SELECT migration FROM schema_migrations")
return set(r[0] for r in cur.fetchall())
def get_pending(conn, migrations_dir: str):
executed = get_executed(conn)
pending = []
for name, path in get_migration_files_ordered(migrations_dir):
if name not in executed:
pending.append((name, path))
return pending
def _split_statements(sql_text: str) -> List[str]:
"""Fallback ohne psql — einzelne Statements (sqlparse.split)."""
stripped = sql_text.strip()
if not stripped:
return []
parts = sqlparse.split(stripped)
return [p.strip() for p in parts if p and p.strip()]
def _run_file_with_psql(filepath: str) -> Tuple[bool, str]:
"""
Ganze Datei wie von psql dokumentiert (-1 eine Transaktion, ON_ERROR_STOP).
Unter Windows oft nicht verfügbar → zurück False ohne Fehlertext wenn kein Binary.
"""
psql = shutil.which("psql")
if not psql:
return False, ""
p = _db_params()
env = os.environ.copy()
env["PGPASSWORD"] = str(p["password"])
cmd = [
psql,
"-h",
p["host"],
"-p",
str(p["port"]),
"-U",
p["user"],
"-d",
p["dbname"],
"-v",
"ON_ERROR_STOP=1",
"-1",
"-f",
filepath,
]
proc = subprocess.run(
cmd,
env=env,
capture_output=True,
text=True,
timeout=7200,
)
if proc.returncode != 0:
tail = (
(proc.stderr or "").strip()
+ "\n"
+ (proc.stdout or "").strip()
).strip()
return False, tail[:8000] or f"exit {proc.returncode}"
out = (proc.stdout or "").strip()
return True, out
def _record_migration(conn, migration_name: str) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO schema_migrations (migration)
VALUES (%s)
ON CONFLICT (migration) DO NOTHING
""",
(migration_name,),
)
def run_migration(conn, migration_name: str, filepath: str) -> bool:
print(f"Running migration: {migration_name}")
detail_suffix = ""
try:
if shutil.which("psql"):
ok, diag = _run_file_with_psql(filepath)
if not ok:
print(f" ✗ psql fehlgeschlagen:\n{diag or '(kein Output)'}")
conn.rollback()
return False
detail_suffix = "(psql -1)"
else:
try:
with open(filepath, "r", encoding="utf-8") as fh:
body = fh.read()
except OSError as e:
print(f" ✗ kann Datei nicht lesen: {e}")
conn.rollback()
return False
statements = _split_statements(body)
with conn.cursor() as cur:
if not statements:
print(
f" ⚠ keine ausführbaren Statements (leer?) — "
f"Eintrag trotzdem: {migration_name}"
)
else:
for stmt in statements:
cur.execute(stmt)
detail_suffix = f"(psycopg2 + sqlparse, {len(statements)} Statements)"
_record_migration(conn, migration_name)
conn.commit()
print(f"{migration_name} erfolgreich {detail_suffix}")
return True
except Exception as e:
conn.rollback()
print(f"{migration_name}: {e}")
return False
def main():
print("=" * 60)
print("Shinkan Jinkendo — Database Migrations")
print("(Warteschlange: nur fehlende *.sql — idempotent)")
print("=" * 60)
migrations_dir = "/app/migrations"
if not os.path.isdir(migrations_dir):
migrations_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "migrations")
try:
conn = get_db_connection()
init_migrations_table(conn)
pending = get_pending(conn, migrations_dir)
if not pending:
print("✓ Keine ausstehenden Migrationen — Schema aktuell.")
conn.close()
return 0
print(f"\n{len(pending)} ausstehende Migration(en):")
for n, _ in pending:
print(f" - {n}")
print()
failed = None
for migration_name, filepath in pending:
if not run_migration(conn, migration_name, filepath):
failed = migration_name
break
conn.close()
print("\n" + "=" * 60)
if failed:
print(f"✗ Abbruch nach: {failed}")
print(" (Bereits erfolgreiche Dateien dieser Session sind committed.)")
print("=" * 60)
return 1
print(f"{len(pending)} Migration(s) angewendet — Schema aktuell.")
print("=" * 60)
return 0
except Exception as e:
print(f"\n✗ Fehler: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())