mitai-jinkendo/backend/migrate_to_postgres.py
Lars 6845397866
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 12s
fix: Migration-Fehler - meas_id Spalte in ai_insights
PROBLEM:
- Backend crasht beim Start auf Prod
- Migration schlägt fehl: column 'meas_id' does not exist
- SQLite ai_insights hat Legacy-Spalte meas_id
- PostgreSQL schema hat diese Spalte nicht mehr

FIX:
- COLUMN_WHITELIST für ai_insights hinzugefügt
- Nur erlaubte Spalten werden migriert:
  id, profile_id, scope, content, created
- meas_id wird beim Import gefiltert

DATEIEN:
- backend/migrate_to_postgres.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 08:39:36 +01:00

389 lines
13 KiB
Python

#!/usr/bin/env python3
"""
SQLite → PostgreSQL Migration Script für Mitai Jinkendo (v9a → v9b)
Migrates all data from SQLite to PostgreSQL with type conversions and validation.
Usage:
# Inside Docker container:
python migrate_to_postgres.py
# Or locally with custom paths:
DATA_DIR=./data DB_HOST=localhost python migrate_to_postgres.py
Environment Variables:
SQLite Source:
DATA_DIR (default: ./data)
PostgreSQL Target:
DB_HOST (default: postgres)
DB_PORT (default: 5432)
DB_NAME (default: mitai)
DB_USER (default: mitai)
DB_PASSWORD (required)
"""
import os
import sys
import sqlite3
from pathlib import Path
from typing import Dict, Any, List, Optional
import psycopg2
from psycopg2.extras import execute_values, RealDictCursor
# ================================================================
# CONFIGURATION
# ================================================================
# SQLite Source
DATA_DIR = Path(os.getenv("DATA_DIR", "./data"))
SQLITE_DB = DATA_DIR / "bodytrack.db"
# PostgreSQL Target
PG_CONFIG = {
'host': os.getenv("DB_HOST", "postgres"),
'port': int(os.getenv("DB_PORT", "5432")),
'database': os.getenv("DB_NAME", "mitai"),
'user': os.getenv("DB_USER", "mitai"),
'password': os.getenv("DB_PASSWORD", "")
}
# Tables to migrate (in order - respects foreign keys)
TABLES = [
'profiles',
'sessions',
'ai_usage',
'ai_prompts',
'weight_log',
'circumference_log',
'caliper_log',
'nutrition_log',
'activity_log',
'photos',
'ai_insights',
]
# Columns that need INTEGER (0/1) → BOOLEAN conversion
BOOLEAN_COLUMNS = {
'profiles': ['ai_enabled', 'export_enabled'],
'ai_prompts': ['active'],
}
# Column whitelist for tables that have schema differences
# Only these columns will be migrated (filters out legacy columns)
COLUMN_WHITELIST = {
'ai_insights': ['id', 'profile_id', 'scope', 'content', 'created'],
}
# ================================================================
# CONVERSION HELPERS
# ================================================================
def convert_value(value: Any, column: str, table: str) -> Any:
"""
Convert SQLite value to PostgreSQL-compatible format.
Args:
value: Raw value from SQLite
column: Column name
table: Table name
Returns:
Converted value suitable for PostgreSQL
"""
# NULL values pass through
if value is None:
return None
# Empty string → NULL for DATE columns (PostgreSQL doesn't accept '' for DATE type)
if isinstance(value, str) and value.strip() == '' and column == 'date':
return None
# INTEGER → BOOLEAN conversion
if table in BOOLEAN_COLUMNS and column in BOOLEAN_COLUMNS[table]:
return bool(value)
# All other values pass through
# (PostgreSQL handles TEXT timestamps, UUIDs, and numerics automatically)
return value
def convert_row(row: Dict[str, Any], table: str) -> Dict[str, Any]:
"""
Convert entire row from SQLite to PostgreSQL format.
Args:
row: Dictionary with column:value pairs from SQLite
table: Table name
Returns:
Converted dictionary
"""
return {
column: convert_value(value, column, table)
for column, value in row.items()
}
# ================================================================
# MIGRATION LOGIC
# ================================================================
def get_sqlite_rows(table: str) -> List[Dict[str, Any]]:
"""
Fetch all rows from SQLite table.
Args:
table: Table name
Returns:
List of dictionaries (one per row)
"""
conn = sqlite3.connect(SQLITE_DB)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
try:
rows = cur.execute(f"SELECT * FROM {table}").fetchall()
return [dict(row) for row in rows]
except sqlite3.OperationalError as e:
# Table doesn't exist in SQLite (OK, might be new in v9b)
print(f" ⚠ Table '{table}' not found in SQLite: {e}")
return []
finally:
conn.close()
def migrate_table(pg_conn, table: str) -> Dict[str, int]:
"""
Migrate one table from SQLite to PostgreSQL.
Args:
pg_conn: PostgreSQL connection
table: Table name
Returns:
Dictionary with stats: {'sqlite_count': N, 'postgres_count': M}
"""
print(f" Migrating '{table}'...", end=' ', flush=True)
# Fetch from SQLite
sqlite_rows = get_sqlite_rows(table)
sqlite_count = len(sqlite_rows)
if sqlite_count == 0:
print("(empty)")
return {'sqlite_count': 0, 'postgres_count': 0}
# Convert rows
converted_rows = [convert_row(row, table) for row in sqlite_rows]
# Get column names - filter if whitelist exists for this table
if table in COLUMN_WHITELIST:
allowed_columns = COLUMN_WHITELIST[table]
# Filter rows to only include allowed columns
converted_rows = [
{k: v for k, v in row.items() if k in allowed_columns}
for row in converted_rows
]
columns = allowed_columns
else:
columns = list(converted_rows[0].keys())
cols_str = ', '.join(columns)
placeholders = ', '.join(['%s'] * len(columns))
# Insert into PostgreSQL
pg_cur = pg_conn.cursor()
# Build INSERT query
query = f"INSERT INTO {table} ({cols_str}) VALUES %s"
# Prepare values (list of tuples)
values = [
tuple(row[col] for col in columns)
for row in converted_rows
]
# Batch insert with execute_values (faster than executemany)
try:
execute_values(pg_cur, query, values, page_size=100)
except psycopg2.Error as e:
print(f"\n ✗ Insert failed: {e}")
raise
# Verify row count
pg_cur.execute(f"SELECT COUNT(*) FROM {table}")
postgres_count = pg_cur.fetchone()[0]
print(f"{sqlite_count} rows → {postgres_count} rows")
return {
'sqlite_count': sqlite_count,
'postgres_count': postgres_count
}
def verify_migration(pg_conn, stats: Dict[str, Dict[str, int]]):
"""
Verify migration integrity.
Args:
pg_conn: PostgreSQL connection
stats: Migration stats per table
"""
print("\n═══════════════════════════════════════════════════════════")
print("VERIFICATION")
print("═══════════════════════════════════════════════════════════")
all_ok = True
for table, counts in stats.items():
sqlite_count = counts['sqlite_count']
postgres_count = counts['postgres_count']
status = "" if sqlite_count == postgres_count else ""
print(f" {status} {table:20s} SQLite: {sqlite_count:5d} → PostgreSQL: {postgres_count:5d}")
if sqlite_count != postgres_count:
all_ok = False
# Sample some data
print("\n───────────────────────────────────────────────────────────")
print("SAMPLE DATA (first profile)")
print("───────────────────────────────────────────────────────────")
cur = pg_conn.cursor(cursor_factory=RealDictCursor)
cur.execute("SELECT * FROM profiles LIMIT 1")
profile = cur.fetchone()
if profile:
for key, value in dict(profile).items():
print(f" {key:20s} = {value}")
else:
print(" (no profiles found)")
print("\n───────────────────────────────────────────────────────────")
print("SAMPLE DATA (latest weight entry)")
print("───────────────────────────────────────────────────────────")
cur.execute("SELECT * FROM weight_log ORDER BY date DESC LIMIT 1")
weight = cur.fetchone()
if weight:
for key, value in dict(weight).items():
print(f" {key:20s} = {value}")
else:
print(" (no weight entries found)")
print("\n═══════════════════════════════════════════════════════════")
if all_ok:
print("✓ MIGRATION SUCCESSFUL - All row counts match!")
else:
print("✗ MIGRATION FAILED - Row count mismatch detected!")
sys.exit(1)
# ================================================================
# MAIN
# ================================================================
def main():
print("═══════════════════════════════════════════════════════════")
print("MITAI JINKENDO - SQLite → PostgreSQL Migration (v9a → v9b)")
print("═══════════════════════════════════════════════════════════\n")
# Check SQLite DB exists
if not SQLITE_DB.exists():
print(f"✗ SQLite database not found: {SQLITE_DB}")
print(f" Set DATA_DIR environment variable if needed.")
sys.exit(1)
print(f"✓ SQLite source: {SQLITE_DB}")
print(f"✓ PostgreSQL target: {PG_CONFIG['user']}@{PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}\n")
# Check PostgreSQL password
if not PG_CONFIG['password']:
print("✗ DB_PASSWORD environment variable not set!")
sys.exit(1)
# Connect to PostgreSQL
print("Connecting to PostgreSQL...", end=' ', flush=True)
try:
pg_conn = psycopg2.connect(**PG_CONFIG)
print("")
except psycopg2.Error as e:
print(f"\n✗ Connection failed: {e}")
print("\nTroubleshooting:")
print(" - Is PostgreSQL running? (docker compose ps)")
print(" - Is DB_PASSWORD correct?")
print(" - Is the schema initialized? (schema.sql loaded?)")
sys.exit(1)
# Check if schema is initialized
print("Checking PostgreSQL schema...", end=' ', flush=True)
cur = pg_conn.cursor()
cur.execute("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'profiles'
""")
if cur.fetchone()[0] == 0:
print("\n✗ Schema not initialized!")
print("\nRun this first:")
print(" docker compose exec backend python -c \"from main import init_db; init_db()\"")
print(" Or manually load schema.sql")
sys.exit(1)
print("")
# Check if PostgreSQL is empty
print("Checking if PostgreSQL is empty...", end=' ', flush=True)
cur.execute("SELECT COUNT(*) FROM profiles")
existing_profiles = cur.fetchone()[0]
if existing_profiles > 0:
print(f"\n⚠ WARNING: PostgreSQL already has {existing_profiles} profiles!")
response = input(" Continue anyway? This will create duplicates! (yes/no): ")
if response.lower() != 'yes':
print("Migration cancelled.")
sys.exit(0)
else:
print("")
print("\n───────────────────────────────────────────────────────────")
print("MIGRATION")
print("───────────────────────────────────────────────────────────")
stats = {}
try:
for table in TABLES:
stats[table] = migrate_table(pg_conn, table)
# Commit all changes
pg_conn.commit()
print("\n✓ All changes committed to PostgreSQL")
except Exception as e:
print(f"\n✗ Migration failed: {e}")
print("Rolling back...")
pg_conn.rollback()
pg_conn.close()
sys.exit(1)
# Verification
verify_migration(pg_conn, stats)
# Cleanup
pg_conn.close()
print("\n✓ Migration complete!")
print("\nNext steps:")
print(" 1. Test login with existing credentials")
print(" 2. Check Dashboard (weight chart, stats)")
print(" 3. Verify KI-Analysen work")
print(" 4. If everything works: commit + push to develop")
if __name__ == '__main__':
main()