#!/usr/bin/env python3 """ Database initialization script for PostgreSQL. Replaces psql commands in startup.sh with pure Python. """ import os import sys import time import psycopg2 from psycopg2 import OperationalError DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_NAME = os.getenv("DB_NAME", "mitai_dev") DB_USER = os.getenv("DB_USER", "mitai_dev") DB_PASSWORD = os.getenv("DB_PASSWORD", "") def get_connection(): """Get PostgreSQL connection.""" return psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) def wait_for_postgres(max_retries=30): """Wait for PostgreSQL to be ready.""" print("\nChecking PostgreSQL connection...") for i in range(1, max_retries + 1): try: conn = get_connection() conn.close() print("✓ PostgreSQL ready") return True except OperationalError: print(f" Waiting for PostgreSQL... (attempt {i}/{max_retries})") time.sleep(2) print(f"✗ PostgreSQL not ready after {max_retries} attempts") return False def check_table_exists(table_name="profiles"): """Check if a table exists.""" try: conn = get_connection() cur = conn.cursor() cur.execute(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema='public' AND table_name=%s """, (table_name,)) count = cur.fetchone()[0] cur.close() conn.close() return count > 0 except Exception as e: print(f"Error checking table: {e}") return False def load_schema(schema_file="/app/schema.sql"): """Load schema from SQL file.""" try: with open(schema_file, 'r') as f: schema_sql = f.read() conn = get_connection() cur = conn.cursor() cur.execute(schema_sql) conn.commit() cur.close() conn.close() print("✓ Schema loaded from schema.sql") return True except Exception as e: print(f"✗ Error loading schema: {e}") return False def get_profile_count(): """Get number of profiles in database.""" try: conn = get_connection() cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM profiles") count = cur.fetchone()[0] cur.close() conn.close() return count except Exception as e: print(f"Error getting profile count: {e}") return -1 def ensure_migration_table(): """Create migration tracking table if it doesn't exist.""" try: conn = get_connection() cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( id SERIAL PRIMARY KEY, filename VARCHAR(255) UNIQUE NOT NULL, applied_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() cur.close() conn.close() return True except Exception as e: print(f"Error creating migration table: {e}") return False def get_applied_migrations(): """Get list of already applied migrations.""" try: conn = get_connection() cur = conn.cursor() cur.execute("SELECT filename FROM schema_migrations ORDER BY filename") migrations = [row[0] for row in cur.fetchall()] cur.close() conn.close() return migrations except Exception as e: print(f"Error getting applied migrations: {e}") return [] def apply_migration(filepath, filename): """Apply a single migration file.""" try: with open(filepath, 'r') as f: migration_sql = f.read() conn = get_connection() cur = conn.cursor() # Execute migration cur.execute(migration_sql) # Record migration cur.execute( "INSERT INTO schema_migrations (filename) VALUES (%s)", (filename,) ) conn.commit() cur.close() conn.close() print(f" ✓ Applied: {filename}") return True except Exception as e: print(f" ✗ Failed to apply {filename}: {e}") return False def run_migrations(migrations_dir="/app/migrations"): """Run all pending migrations.""" import glob import re if not os.path.exists(migrations_dir): print("✓ No migrations directory found") return True # Ensure migration tracking table exists if not ensure_migration_table(): return False # Get already applied migrations applied = get_applied_migrations() # Get all migration files (only numbered migrations like 001_*.sql) all_files = sorted(glob.glob(os.path.join(migrations_dir, "*.sql"))) migration_pattern = re.compile(r'^\d{3}_.*\.sql$') migration_files = [f for f in all_files if migration_pattern.match(os.path.basename(f))] if not migration_files: print("✓ No migration files found") return True # Apply pending migrations pending = [] for filepath in migration_files: filename = os.path.basename(filepath) if filename not in applied: pending.append((filepath, filename)) if not pending: print(f"✓ All {len(applied)} migrations already applied") return True print(f" Found {len(pending)} pending migration(s)...") for filepath, filename in pending: if not apply_migration(filepath, filename): return False return True if __name__ == "__main__": print("═══════════════════════════════════════════════════════════") print("MITAI JINKENDO - Database Initialization (v9c)") print("═══════════════════════════════════════════════════════════") # Wait for PostgreSQL if not wait_for_postgres(): sys.exit(1) # Check schema print("\nChecking database schema...") if not check_table_exists("profiles"): print(" Schema not found, initializing...") if not load_schema(): sys.exit(1) else: print("✓ Schema already exists") # Run migrations print("\nRunning database migrations...") if not run_migrations(): print("✗ Migration failed") sys.exit(1) # Check for migration print("\nChecking for SQLite data migration...") sqlite_db = "/app/data/bodytrack.db" profile_count = get_profile_count() if os.path.exists(sqlite_db) and profile_count == 0: print(" SQLite database found and PostgreSQL is empty") print(" Starting automatic migration...") # Import and run migration try: from migrate_to_postgres import main as migrate migrate() except Exception as e: print(f"✗ Migration failed: {e}") sys.exit(1) elif os.path.exists(sqlite_db) and profile_count > 0: print(f"⚠ SQLite DB exists but PostgreSQL already has {profile_count} profiles") print(" Skipping migration (already migrated)") elif not os.path.exists(sqlite_db): print("✓ No SQLite database found (fresh install or already migrated)") else: print("✓ No migration needed") print("\n✓ Database initialization complete")