mitai-jinkendo/backend/db_init.py
Lars 913b485500
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
fix: only process numbered migrations (XXX_*.sql pattern)
Modified run_migrations() to only process files matching pattern: \d{3}_*.sql
This prevents utility scripts (check_features.sql) and manually applied
migrations (v9c_*.sql) from being executed.

Only properly numbered migrations like 003_add_email_verification.sql
will be processed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 10:08:56 +01:00

246 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
Database initialization script for PostgreSQL.
Replaces psql commands in startup.sh with pure Python.
"""
import os
import sys
import time
import psycopg2
from psycopg2 import OperationalError
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "mitai_dev")
DB_USER = os.getenv("DB_USER", "mitai_dev")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")
def get_connection():
"""Get PostgreSQL connection."""
return psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
def wait_for_postgres(max_retries=30):
"""Wait for PostgreSQL to be ready."""
print("\nChecking PostgreSQL connection...")
for i in range(1, max_retries + 1):
try:
conn = get_connection()
conn.close()
print("✓ PostgreSQL ready")
return True
except OperationalError:
print(f" Waiting for PostgreSQL... (attempt {i}/{max_retries})")
time.sleep(2)
print(f"✗ PostgreSQL not ready after {max_retries} attempts")
return False
def check_table_exists(table_name="profiles"):
"""Check if a table exists."""
try:
conn = get_connection()
cur = conn.cursor()
cur.execute("""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_schema='public' AND table_name=%s
""", (table_name,))
count = cur.fetchone()[0]
cur.close()
conn.close()
return count > 0
except Exception as e:
print(f"Error checking table: {e}")
return False
def load_schema(schema_file="/app/schema.sql"):
"""Load schema from SQL file."""
try:
with open(schema_file, 'r') as f:
schema_sql = f.read()
conn = get_connection()
cur = conn.cursor()
cur.execute(schema_sql)
conn.commit()
cur.close()
conn.close()
print("✓ Schema loaded from schema.sql")
return True
except Exception as e:
print(f"✗ Error loading schema: {e}")
return False
def get_profile_count():
"""Get number of profiles in database."""
try:
conn = get_connection()
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM profiles")
count = cur.fetchone()[0]
cur.close()
conn.close()
return count
except Exception as e:
print(f"Error getting profile count: {e}")
return -1
def ensure_migration_table():
"""Create migration tracking table if it doesn't exist."""
try:
conn = get_connection()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
id SERIAL PRIMARY KEY,
filename VARCHAR(255) UNIQUE NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
cur.close()
conn.close()
return True
except Exception as e:
print(f"Error creating migration table: {e}")
return False
def get_applied_migrations():
"""Get list of already applied migrations."""
try:
conn = get_connection()
cur = conn.cursor()
cur.execute("SELECT filename FROM schema_migrations ORDER BY filename")
migrations = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()
return migrations
except Exception as e:
print(f"Error getting applied migrations: {e}")
return []
def apply_migration(filepath, filename):
"""Apply a single migration file."""
try:
with open(filepath, 'r') as f:
migration_sql = f.read()
conn = get_connection()
cur = conn.cursor()
# Execute migration
cur.execute(migration_sql)
# Record migration
cur.execute(
"INSERT INTO schema_migrations (filename) VALUES (%s)",
(filename,)
)
conn.commit()
cur.close()
conn.close()
print(f" ✓ Applied: {filename}")
return True
except Exception as e:
print(f" ✗ Failed to apply {filename}: {e}")
return False
def run_migrations(migrations_dir="/app/migrations"):
"""Run all pending migrations."""
import glob
import re
if not os.path.exists(migrations_dir):
print("✓ No migrations directory found")
return True
# Ensure migration tracking table exists
if not ensure_migration_table():
return False
# Get already applied migrations
applied = get_applied_migrations()
# Get all migration files (only numbered migrations like 001_*.sql)
all_files = sorted(glob.glob(os.path.join(migrations_dir, "*.sql")))
migration_pattern = re.compile(r'^\d{3}_.*\.sql$')
migration_files = [f for f in all_files if migration_pattern.match(os.path.basename(f))]
if not migration_files:
print("✓ No migration files found")
return True
# Apply pending migrations
pending = []
for filepath in migration_files:
filename = os.path.basename(filepath)
if filename not in applied:
pending.append((filepath, filename))
if not pending:
print(f"✓ All {len(applied)} migrations already applied")
return True
print(f" Found {len(pending)} pending migration(s)...")
for filepath, filename in pending:
if not apply_migration(filepath, filename):
return False
return True
if __name__ == "__main__":
print("═══════════════════════════════════════════════════════════")
print("MITAI JINKENDO - Database Initialization (v9c)")
print("═══════════════════════════════════════════════════════════")
# Wait for PostgreSQL
if not wait_for_postgres():
sys.exit(1)
# Check schema
print("\nChecking database schema...")
if not check_table_exists("profiles"):
print(" Schema not found, initializing...")
if not load_schema():
sys.exit(1)
else:
print("✓ Schema already exists")
# Run migrations
print("\nRunning database migrations...")
if not run_migrations():
print("✗ Migration failed")
sys.exit(1)
# Check for migration
print("\nChecking for SQLite data migration...")
sqlite_db = "/app/data/bodytrack.db"
profile_count = get_profile_count()
if os.path.exists(sqlite_db) and profile_count == 0:
print(" SQLite database found and PostgreSQL is empty")
print(" Starting automatic migration...")
# Import and run migration
try:
from migrate_to_postgres import main as migrate
migrate()
except Exception as e:
print(f"✗ Migration failed: {e}")
sys.exit(1)
elif os.path.exists(sqlite_db) and profile_count > 0:
print(f"⚠ SQLite DB exists but PostgreSQL already has {profile_count} profiles")
print(" Skipping migration (already migrated)")
elif not os.path.exists(sqlite_db):
print("✓ No SQLite database found (fresh install or already migrated)")
else:
print("✓ No migration needed")
print("\n✓ Database initialization complete")