#!/usr/bin/env python3 """ SQLite → PostgreSQL Migration Script für Mitai Jinkendo (v9a → v9b) Migrates all data from SQLite to PostgreSQL with type conversions and validation. Usage: # Inside Docker container: python migrate_to_postgres.py # Or locally with custom paths: DATA_DIR=./data DB_HOST=localhost python migrate_to_postgres.py Environment Variables: SQLite Source: DATA_DIR (default: ./data) PostgreSQL Target: DB_HOST (default: postgres) DB_PORT (default: 5432) DB_NAME (default: mitai) DB_USER (default: mitai) DB_PASSWORD (required) """ import os import sys import sqlite3 from pathlib import Path from typing import Dict, Any, List, Optional import psycopg2 from psycopg2.extras import execute_values, RealDictCursor # ================================================================ # CONFIGURATION # ================================================================ # SQLite Source DATA_DIR = Path(os.getenv("DATA_DIR", "./data")) SQLITE_DB = DATA_DIR / "bodytrack.db" # PostgreSQL Target PG_CONFIG = { 'host': os.getenv("DB_HOST", "postgres"), 'port': int(os.getenv("DB_PORT", "5432")), 'database': os.getenv("DB_NAME", "mitai"), 'user': os.getenv("DB_USER", "mitai"), 'password': os.getenv("DB_PASSWORD", "") } # Tables to migrate (in order - respects foreign keys) TABLES = [ 'profiles', 'sessions', 'ai_usage', 'ai_prompts', 'weight_log', 'circumference_log', 'caliper_log', 'nutrition_log', 'activity_log', 'photos', 'ai_insights', ] # Columns that need INTEGER (0/1) → BOOLEAN conversion BOOLEAN_COLUMNS = { 'profiles': ['ai_enabled', 'export_enabled'], 'ai_prompts': ['active'], } # ================================================================ # CONVERSION HELPERS # ================================================================ def convert_value(value: Any, column: str, table: str) -> Any: """ Convert SQLite value to PostgreSQL-compatible format. Args: value: Raw value from SQLite column: Column name table: Table name Returns: Converted value suitable for PostgreSQL """ # NULL values pass through if value is None: return None # Empty string → NULL for DATE columns (PostgreSQL doesn't accept '' for DATE type) if isinstance(value, str) and value.strip() == '' and column == 'date': return None # INTEGER → BOOLEAN conversion if table in BOOLEAN_COLUMNS and column in BOOLEAN_COLUMNS[table]: return bool(value) # All other values pass through # (PostgreSQL handles TEXT timestamps, UUIDs, and numerics automatically) return value def convert_row(row: Dict[str, Any], table: str) -> Dict[str, Any]: """ Convert entire row from SQLite to PostgreSQL format. Args: row: Dictionary with column:value pairs from SQLite table: Table name Returns: Converted dictionary """ return { column: convert_value(value, column, table) for column, value in row.items() } # ================================================================ # MIGRATION LOGIC # ================================================================ def get_sqlite_rows(table: str) -> List[Dict[str, Any]]: """ Fetch all rows from SQLite table. Args: table: Table name Returns: List of dictionaries (one per row) """ conn = sqlite3.connect(SQLITE_DB) conn.row_factory = sqlite3.Row cur = conn.cursor() try: rows = cur.execute(f"SELECT * FROM {table}").fetchall() return [dict(row) for row in rows] except sqlite3.OperationalError as e: # Table doesn't exist in SQLite (OK, might be new in v9b) print(f" ⚠ Table '{table}' not found in SQLite: {e}") return [] finally: conn.close() def migrate_table(pg_conn, table: str) -> Dict[str, int]: """ Migrate one table from SQLite to PostgreSQL. Args: pg_conn: PostgreSQL connection table: Table name Returns: Dictionary with stats: {'sqlite_count': N, 'postgres_count': M} """ print(f" Migrating '{table}'...", end=' ', flush=True) # Fetch from SQLite sqlite_rows = get_sqlite_rows(table) sqlite_count = len(sqlite_rows) if sqlite_count == 0: print("(empty)") return {'sqlite_count': 0, 'postgres_count': 0} # Convert rows converted_rows = [convert_row(row, table) for row in sqlite_rows] # Get column names columns = list(converted_rows[0].keys()) cols_str = ', '.join(columns) placeholders = ', '.join(['%s'] * len(columns)) # Insert into PostgreSQL pg_cur = pg_conn.cursor() # Build INSERT query query = f"INSERT INTO {table} ({cols_str}) VALUES %s" # Prepare values (list of tuples) values = [ tuple(row[col] for col in columns) for row in converted_rows ] # Batch insert with execute_values (faster than executemany) try: execute_values(pg_cur, query, values, page_size=100) except psycopg2.Error as e: print(f"\n ✗ Insert failed: {e}") raise # Verify row count pg_cur.execute(f"SELECT COUNT(*) FROM {table}") postgres_count = pg_cur.fetchone()[0] print(f"✓ {sqlite_count} rows → {postgres_count} rows") return { 'sqlite_count': sqlite_count, 'postgres_count': postgres_count } def verify_migration(pg_conn, stats: Dict[str, Dict[str, int]]): """ Verify migration integrity. Args: pg_conn: PostgreSQL connection stats: Migration stats per table """ print("\n═══════════════════════════════════════════════════════════") print("VERIFICATION") print("═══════════════════════════════════════════════════════════") all_ok = True for table, counts in stats.items(): sqlite_count = counts['sqlite_count'] postgres_count = counts['postgres_count'] status = "✓" if sqlite_count == postgres_count else "✗" print(f" {status} {table:20s} SQLite: {sqlite_count:5d} → PostgreSQL: {postgres_count:5d}") if sqlite_count != postgres_count: all_ok = False # Sample some data print("\n───────────────────────────────────────────────────────────") print("SAMPLE DATA (first profile)") print("───────────────────────────────────────────────────────────") cur = pg_conn.cursor(cursor_factory=RealDictCursor) cur.execute("SELECT * FROM profiles LIMIT 1") profile = cur.fetchone() if profile: for key, value in dict(profile).items(): print(f" {key:20s} = {value}") else: print(" (no profiles found)") print("\n───────────────────────────────────────────────────────────") print("SAMPLE DATA (latest weight entry)") print("───────────────────────────────────────────────────────────") cur.execute("SELECT * FROM weight_log ORDER BY date DESC LIMIT 1") weight = cur.fetchone() if weight: for key, value in dict(weight).items(): print(f" {key:20s} = {value}") else: print(" (no weight entries found)") print("\n═══════════════════════════════════════════════════════════") if all_ok: print("✓ MIGRATION SUCCESSFUL - All row counts match!") else: print("✗ MIGRATION FAILED - Row count mismatch detected!") sys.exit(1) # ================================================================ # MAIN # ================================================================ def main(): print("═══════════════════════════════════════════════════════════") print("MITAI JINKENDO - SQLite → PostgreSQL Migration (v9a → v9b)") print("═══════════════════════════════════════════════════════════\n") # Check SQLite DB exists if not SQLITE_DB.exists(): print(f"✗ SQLite database not found: {SQLITE_DB}") print(f" Set DATA_DIR environment variable if needed.") sys.exit(1) print(f"✓ SQLite source: {SQLITE_DB}") print(f"✓ PostgreSQL target: {PG_CONFIG['user']}@{PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}\n") # Check PostgreSQL password if not PG_CONFIG['password']: print("✗ DB_PASSWORD environment variable not set!") sys.exit(1) # Connect to PostgreSQL print("Connecting to PostgreSQL...", end=' ', flush=True) try: pg_conn = psycopg2.connect(**PG_CONFIG) print("✓") except psycopg2.Error as e: print(f"\n✗ Connection failed: {e}") print("\nTroubleshooting:") print(" - Is PostgreSQL running? (docker compose ps)") print(" - Is DB_PASSWORD correct?") print(" - Is the schema initialized? (schema.sql loaded?)") sys.exit(1) # Check if schema is initialized print("Checking PostgreSQL schema...", end=' ', flush=True) cur = pg_conn.cursor() cur.execute(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'profiles' """) if cur.fetchone()[0] == 0: print("\n✗ Schema not initialized!") print("\nRun this first:") print(" docker compose exec backend python -c \"from main import init_db; init_db()\"") print(" Or manually load schema.sql") sys.exit(1) print("✓") # Check if PostgreSQL is empty print("Checking if PostgreSQL is empty...", end=' ', flush=True) cur.execute("SELECT COUNT(*) FROM profiles") existing_profiles = cur.fetchone()[0] if existing_profiles > 0: print(f"\n⚠ WARNING: PostgreSQL already has {existing_profiles} profiles!") response = input(" Continue anyway? This will create duplicates! (yes/no): ") if response.lower() != 'yes': print("Migration cancelled.") sys.exit(0) else: print("✓") print("\n───────────────────────────────────────────────────────────") print("MIGRATION") print("───────────────────────────────────────────────────────────") stats = {} try: for table in TABLES: stats[table] = migrate_table(pg_conn, table) # Commit all changes pg_conn.commit() print("\n✓ All changes committed to PostgreSQL") except Exception as e: print(f"\n✗ Migration failed: {e}") print("Rolling back...") pg_conn.rollback() pg_conn.close() sys.exit(1) # Verification verify_migration(pg_conn, stats) # Cleanup pg_conn.close() print("\n✓ Migration complete!") print("\nNext steps:") print(" 1. Test login with existing credentials") print(" 2. Check Dashboard (weight chart, stats)") print(" 3. Verify KI-Analysen work") print(" 4. If everything works: commit + push to develop") if __name__ == '__main__': main()