PostgreSQL DATE type doesn't accept empty strings ('').
Convert empty/whitespace date values to NULL during migration.
Fixes: invalid input syntax for type date: ""
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
374 lines
12 KiB
Python
374 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SQLite → PostgreSQL Migration Script für Mitai Jinkendo (v9a → v9b)
|
|
|
|
Migrates all data from SQLite to PostgreSQL with type conversions and validation.
|
|
|
|
Usage:
|
|
# Inside Docker container:
|
|
python migrate_to_postgres.py
|
|
|
|
# Or locally with custom paths:
|
|
DATA_DIR=./data DB_HOST=localhost python migrate_to_postgres.py
|
|
|
|
Environment Variables:
|
|
SQLite Source:
|
|
DATA_DIR (default: ./data)
|
|
|
|
PostgreSQL Target:
|
|
DB_HOST (default: postgres)
|
|
DB_PORT (default: 5432)
|
|
DB_NAME (default: mitai)
|
|
DB_USER (default: mitai)
|
|
DB_PASSWORD (required)
|
|
"""
|
|
import os
|
|
import sys
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values, RealDictCursor
|
|
|
|
|
|
# ================================================================
|
|
# CONFIGURATION
|
|
# ================================================================
|
|
|
|
# SQLite Source
|
|
DATA_DIR = Path(os.getenv("DATA_DIR", "./data"))
|
|
SQLITE_DB = DATA_DIR / "bodytrack.db"
|
|
|
|
# PostgreSQL Target
|
|
PG_CONFIG = {
|
|
'host': os.getenv("DB_HOST", "postgres"),
|
|
'port': int(os.getenv("DB_PORT", "5432")),
|
|
'database': os.getenv("DB_NAME", "mitai"),
|
|
'user': os.getenv("DB_USER", "mitai"),
|
|
'password': os.getenv("DB_PASSWORD", "")
|
|
}
|
|
|
|
# Tables to migrate (in order - respects foreign keys)
|
|
TABLES = [
|
|
'profiles',
|
|
'sessions',
|
|
'ai_usage',
|
|
'ai_prompts',
|
|
'weight_log',
|
|
'circumference_log',
|
|
'caliper_log',
|
|
'nutrition_log',
|
|
'activity_log',
|
|
'photos',
|
|
'ai_insights',
|
|
]
|
|
|
|
# Columns that need INTEGER (0/1) → BOOLEAN conversion
|
|
BOOLEAN_COLUMNS = {
|
|
'profiles': ['ai_enabled', 'export_enabled'],
|
|
'ai_prompts': ['active'],
|
|
}
|
|
|
|
|
|
# ================================================================
|
|
# CONVERSION HELPERS
|
|
# ================================================================
|
|
|
|
def convert_value(value: Any, column: str, table: str) -> Any:
|
|
"""
|
|
Convert SQLite value to PostgreSQL-compatible format.
|
|
|
|
Args:
|
|
value: Raw value from SQLite
|
|
column: Column name
|
|
table: Table name
|
|
|
|
Returns:
|
|
Converted value suitable for PostgreSQL
|
|
"""
|
|
# NULL values pass through
|
|
if value is None:
|
|
return None
|
|
|
|
# Empty string → NULL for DATE columns (PostgreSQL doesn't accept '' for DATE type)
|
|
if isinstance(value, str) and value.strip() == '' and column == 'date':
|
|
return None
|
|
|
|
# INTEGER → BOOLEAN conversion
|
|
if table in BOOLEAN_COLUMNS and column in BOOLEAN_COLUMNS[table]:
|
|
return bool(value)
|
|
|
|
# All other values pass through
|
|
# (PostgreSQL handles TEXT timestamps, UUIDs, and numerics automatically)
|
|
return value
|
|
|
|
|
|
def convert_row(row: Dict[str, Any], table: str) -> Dict[str, Any]:
|
|
"""
|
|
Convert entire row from SQLite to PostgreSQL format.
|
|
|
|
Args:
|
|
row: Dictionary with column:value pairs from SQLite
|
|
table: Table name
|
|
|
|
Returns:
|
|
Converted dictionary
|
|
"""
|
|
return {
|
|
column: convert_value(value, column, table)
|
|
for column, value in row.items()
|
|
}
|
|
|
|
|
|
# ================================================================
|
|
# MIGRATION LOGIC
|
|
# ================================================================
|
|
|
|
def get_sqlite_rows(table: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fetch all rows from SQLite table.
|
|
|
|
Args:
|
|
table: Table name
|
|
|
|
Returns:
|
|
List of dictionaries (one per row)
|
|
"""
|
|
conn = sqlite3.connect(SQLITE_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
rows = cur.execute(f"SELECT * FROM {table}").fetchall()
|
|
return [dict(row) for row in rows]
|
|
except sqlite3.OperationalError as e:
|
|
# Table doesn't exist in SQLite (OK, might be new in v9b)
|
|
print(f" ⚠ Table '{table}' not found in SQLite: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def migrate_table(pg_conn, table: str) -> Dict[str, int]:
|
|
"""
|
|
Migrate one table from SQLite to PostgreSQL.
|
|
|
|
Args:
|
|
pg_conn: PostgreSQL connection
|
|
table: Table name
|
|
|
|
Returns:
|
|
Dictionary with stats: {'sqlite_count': N, 'postgres_count': M}
|
|
"""
|
|
print(f" Migrating '{table}'...", end=' ', flush=True)
|
|
|
|
# Fetch from SQLite
|
|
sqlite_rows = get_sqlite_rows(table)
|
|
sqlite_count = len(sqlite_rows)
|
|
|
|
if sqlite_count == 0:
|
|
print("(empty)")
|
|
return {'sqlite_count': 0, 'postgres_count': 0}
|
|
|
|
# Convert rows
|
|
converted_rows = [convert_row(row, table) for row in sqlite_rows]
|
|
|
|
# Get column names
|
|
columns = list(converted_rows[0].keys())
|
|
cols_str = ', '.join(columns)
|
|
placeholders = ', '.join(['%s'] * len(columns))
|
|
|
|
# Insert into PostgreSQL
|
|
pg_cur = pg_conn.cursor()
|
|
|
|
# Build INSERT query
|
|
query = f"INSERT INTO {table} ({cols_str}) VALUES %s"
|
|
|
|
# Prepare values (list of tuples)
|
|
values = [
|
|
tuple(row[col] for col in columns)
|
|
for row in converted_rows
|
|
]
|
|
|
|
# Batch insert with execute_values (faster than executemany)
|
|
try:
|
|
execute_values(pg_cur, query, values, page_size=100)
|
|
except psycopg2.Error as e:
|
|
print(f"\n ✗ Insert failed: {e}")
|
|
raise
|
|
|
|
# Verify row count
|
|
pg_cur.execute(f"SELECT COUNT(*) FROM {table}")
|
|
postgres_count = pg_cur.fetchone()[0]
|
|
|
|
print(f"✓ {sqlite_count} rows → {postgres_count} rows")
|
|
|
|
return {
|
|
'sqlite_count': sqlite_count,
|
|
'postgres_count': postgres_count
|
|
}
|
|
|
|
|
|
def verify_migration(pg_conn, stats: Dict[str, Dict[str, int]]):
|
|
"""
|
|
Verify migration integrity.
|
|
|
|
Args:
|
|
pg_conn: PostgreSQL connection
|
|
stats: Migration stats per table
|
|
"""
|
|
print("\n═══════════════════════════════════════════════════════════")
|
|
print("VERIFICATION")
|
|
print("═══════════════════════════════════════════════════════════")
|
|
|
|
all_ok = True
|
|
|
|
for table, counts in stats.items():
|
|
sqlite_count = counts['sqlite_count']
|
|
postgres_count = counts['postgres_count']
|
|
|
|
status = "✓" if sqlite_count == postgres_count else "✗"
|
|
print(f" {status} {table:20s} SQLite: {sqlite_count:5d} → PostgreSQL: {postgres_count:5d}")
|
|
|
|
if sqlite_count != postgres_count:
|
|
all_ok = False
|
|
|
|
# Sample some data
|
|
print("\n───────────────────────────────────────────────────────────")
|
|
print("SAMPLE DATA (first profile)")
|
|
print("───────────────────────────────────────────────────────────")
|
|
|
|
cur = pg_conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute("SELECT * FROM profiles LIMIT 1")
|
|
profile = cur.fetchone()
|
|
|
|
if profile:
|
|
for key, value in dict(profile).items():
|
|
print(f" {key:20s} = {value}")
|
|
else:
|
|
print(" (no profiles found)")
|
|
|
|
print("\n───────────────────────────────────────────────────────────")
|
|
print("SAMPLE DATA (latest weight entry)")
|
|
print("───────────────────────────────────────────────────────────")
|
|
|
|
cur.execute("SELECT * FROM weight_log ORDER BY date DESC LIMIT 1")
|
|
weight = cur.fetchone()
|
|
|
|
if weight:
|
|
for key, value in dict(weight).items():
|
|
print(f" {key:20s} = {value}")
|
|
else:
|
|
print(" (no weight entries found)")
|
|
|
|
print("\n═══════════════════════════════════════════════════════════")
|
|
|
|
if all_ok:
|
|
print("✓ MIGRATION SUCCESSFUL - All row counts match!")
|
|
else:
|
|
print("✗ MIGRATION FAILED - Row count mismatch detected!")
|
|
sys.exit(1)
|
|
|
|
|
|
# ================================================================
|
|
# MAIN
|
|
# ================================================================
|
|
|
|
def main():
|
|
print("═══════════════════════════════════════════════════════════")
|
|
print("MITAI JINKENDO - SQLite → PostgreSQL Migration (v9a → v9b)")
|
|
print("═══════════════════════════════════════════════════════════\n")
|
|
|
|
# Check SQLite DB exists
|
|
if not SQLITE_DB.exists():
|
|
print(f"✗ SQLite database not found: {SQLITE_DB}")
|
|
print(f" Set DATA_DIR environment variable if needed.")
|
|
sys.exit(1)
|
|
|
|
print(f"✓ SQLite source: {SQLITE_DB}")
|
|
print(f"✓ PostgreSQL target: {PG_CONFIG['user']}@{PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}\n")
|
|
|
|
# Check PostgreSQL password
|
|
if not PG_CONFIG['password']:
|
|
print("✗ DB_PASSWORD environment variable not set!")
|
|
sys.exit(1)
|
|
|
|
# Connect to PostgreSQL
|
|
print("Connecting to PostgreSQL...", end=' ', flush=True)
|
|
try:
|
|
pg_conn = psycopg2.connect(**PG_CONFIG)
|
|
print("✓")
|
|
except psycopg2.Error as e:
|
|
print(f"\n✗ Connection failed: {e}")
|
|
print("\nTroubleshooting:")
|
|
print(" - Is PostgreSQL running? (docker compose ps)")
|
|
print(" - Is DB_PASSWORD correct?")
|
|
print(" - Is the schema initialized? (schema.sql loaded?)")
|
|
sys.exit(1)
|
|
|
|
# Check if schema is initialized
|
|
print("Checking PostgreSQL schema...", end=' ', flush=True)
|
|
cur = pg_conn.cursor()
|
|
cur.execute("""
|
|
SELECT COUNT(*) FROM information_schema.tables
|
|
WHERE table_schema = 'public' AND table_name = 'profiles'
|
|
""")
|
|
if cur.fetchone()[0] == 0:
|
|
print("\n✗ Schema not initialized!")
|
|
print("\nRun this first:")
|
|
print(" docker compose exec backend python -c \"from main import init_db; init_db()\"")
|
|
print(" Or manually load schema.sql")
|
|
sys.exit(1)
|
|
print("✓")
|
|
|
|
# Check if PostgreSQL is empty
|
|
print("Checking if PostgreSQL is empty...", end=' ', flush=True)
|
|
cur.execute("SELECT COUNT(*) FROM profiles")
|
|
existing_profiles = cur.fetchone()[0]
|
|
if existing_profiles > 0:
|
|
print(f"\n⚠ WARNING: PostgreSQL already has {existing_profiles} profiles!")
|
|
response = input(" Continue anyway? This will create duplicates! (yes/no): ")
|
|
if response.lower() != 'yes':
|
|
print("Migration cancelled.")
|
|
sys.exit(0)
|
|
else:
|
|
print("✓")
|
|
|
|
print("\n───────────────────────────────────────────────────────────")
|
|
print("MIGRATION")
|
|
print("───────────────────────────────────────────────────────────")
|
|
|
|
stats = {}
|
|
|
|
try:
|
|
for table in TABLES:
|
|
stats[table] = migrate_table(pg_conn, table)
|
|
|
|
# Commit all changes
|
|
pg_conn.commit()
|
|
print("\n✓ All changes committed to PostgreSQL")
|
|
|
|
except Exception as e:
|
|
print(f"\n✗ Migration failed: {e}")
|
|
print("Rolling back...")
|
|
pg_conn.rollback()
|
|
pg_conn.close()
|
|
sys.exit(1)
|
|
|
|
# Verification
|
|
verify_migration(pg_conn, stats)
|
|
|
|
# Cleanup
|
|
pg_conn.close()
|
|
|
|
print("\n✓ Migration complete!")
|
|
print("\nNext steps:")
|
|
print(" 1. Test login with existing credentials")
|
|
print(" 2. Check Dashboard (weight chart, stats)")
|
|
print(" 3. Verify KI-Analysen work")
|
|
print(" 4. If everything works: commit + push to develop")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|