mitai-jinkendo/backend/apply_v9c_migration.py
Lars 73bea5ee86
All checks were successful
Deploy Development / deploy (push) Successful in 33s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: v9c Phase 1 - Feature consolidation & cleanup migration
PHASE 1: Cleanup & Analyse
- Feature-Konsolidierung: export_csv/json/zip → data_export (1 Feature)
- Umbenennung: csv_import → data_import
- Auto-Migration bei Container-Start (apply_v9c_migration.py)
- Diagnose-Script (check_features.sql)

Lessons Learned angewendet:
- Ein Feature für Export, nicht drei
- Migration ist idempotent (kann mehrfach laufen)
- Zeigt BEFORE/AFTER State im Log

Finaler Feature-Katalog (10 statt 13):
- Data: weight, circumference, caliper, nutrition, activity, photos
- AI: ai_calls, ai_pipeline
- Export/Import: data_export, data_import

Tier Limits:
- FREE: 30 data entries, 0 AI/export/import
- BASIC: unlimited data, 3 AI/month, 5 export/month, 3 import/month
- PREMIUM/SELFHOSTED: unlimited

Migration läuft automatisch auf dev UND prod beim Container-Start.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 18:57:39 +01:00

254 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""
Apply v9c Subscription System Migration
This script checks if v9c migration is needed and applies it.
Run automatically on container startup via main.py startup event.
"""
import os
import psycopg2
from psycopg2.extras import RealDictCursor
def get_db_connection():
"""Get PostgreSQL connection."""
return psycopg2.connect(
host=os.getenv("DB_HOST", "postgres"),
port=int(os.getenv("DB_PORT", 5432)),
database=os.getenv("DB_NAME", "mitai_prod"),
user=os.getenv("DB_USER", "mitai_prod"),
password=os.getenv("DB_PASSWORD", ""),
cursor_factory=RealDictCursor
)
def migration_needed(conn):
"""Check if v9c migration is needed."""
cur = conn.cursor()
# Check if tiers table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'tiers'
)
""")
tiers_exists = cur.fetchone()['exists']
# Check if features table exists
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'features'
)
""")
features_exists = cur.fetchone()['exists']
cur.close()
# Migration needed if either table is missing
return not (tiers_exists and features_exists)
def apply_migration():
"""Apply v9c migration if needed."""
print("[v9c Migration] Checking if migration is needed...")
try:
conn = get_db_connection()
if not migration_needed(conn):
print("[v9c Migration] Already applied, skipping.")
conn.close()
# Even if main migration is done, check cleanup
apply_cleanup_migration()
return
print("[v9c Migration] Applying subscription system migration...")
# Read migration SQL
migration_path = os.path.join(
os.path.dirname(__file__),
"migrations",
"v9c_subscription_system.sql"
)
with open(migration_path, 'r', encoding='utf-8') as f:
migration_sql = f.read()
# Execute migration
cur = conn.cursor()
cur.execute(migration_sql)
conn.commit()
cur.close()
conn.close()
print("[v9c Migration] ✅ Migration completed successfully!")
# Apply fix migration if exists
fix_migration_path = os.path.join(
os.path.dirname(__file__),
"migrations",
"v9c_fix_features.sql"
)
if os.path.exists(fix_migration_path):
print("[v9c Migration] Applying feature fixes...")
with open(fix_migration_path, 'r', encoding='utf-8') as f:
fix_sql = f.read()
conn = get_db_connection()
cur = conn.cursor()
cur.execute(fix_sql)
conn.commit()
cur.close()
conn.close()
print("[v9c Migration] ✅ Feature fixes applied!")
# Verify tables created
conn = get_db_connection()
cur = conn.cursor()
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name IN ('tiers', 'features', 'tier_limits', 'access_grants', 'coupons')
ORDER BY table_name
""")
tables = [r['table_name'] for r in cur.fetchall()]
print(f"[v9c Migration] Created tables: {', '.join(tables)}")
# Verify initial data
cur.execute("SELECT COUNT(*) as count FROM tiers")
tier_count = cur.fetchone()['count']
cur.execute("SELECT COUNT(*) as count FROM features")
feature_count = cur.fetchone()['count']
cur.execute("SELECT COUNT(*) as count FROM tier_limits")
limit_count = cur.fetchone()['count']
print(f"[v9c Migration] Initial data: {tier_count} tiers, {feature_count} features, {limit_count} tier limits")
cur.close()
conn.close()
# After successful migration, apply cleanup
apply_cleanup_migration()
except Exception as e:
print(f"[v9c Migration] ❌ Error: {e}")
raise
def cleanup_features_needed(conn):
"""Check if feature cleanup migration is needed."""
cur = conn.cursor()
# Check if old export features still exist
cur.execute("""
SELECT COUNT(*) as count FROM features
WHERE id IN ('export_csv', 'export_json', 'export_zip')
""")
old_exports = cur.fetchone()['count']
# Check if csv_import needs to be renamed
cur.execute("""
SELECT COUNT(*) as count FROM features
WHERE id = 'csv_import'
""")
old_import = cur.fetchone()['count']
cur.close()
# Cleanup needed if old features exist
return old_exports > 0 or old_import > 0
def apply_cleanup_migration():
"""Apply v9c feature cleanup migration."""
print("[v9c Cleanup] Checking if cleanup migration is needed...")
try:
conn = get_db_connection()
if not cleanup_features_needed(conn):
print("[v9c Cleanup] Already applied, skipping.")
conn.close()
return
print("[v9c Cleanup] Applying feature consolidation...")
# Show BEFORE state
cur = conn.cursor()
cur.execute("SELECT id, name FROM features ORDER BY category, id")
features_before = [f"{r['id']} ({r['name']})" for r in cur.fetchall()]
print(f"[v9c Cleanup] Features BEFORE: {len(features_before)} features")
for f in features_before:
print(f" - {f}")
cur.close()
# Read cleanup migration SQL
cleanup_path = os.path.join(
os.path.dirname(__file__),
"migrations",
"v9c_cleanup_features.sql"
)
if not os.path.exists(cleanup_path):
print(f"[v9c Cleanup] ⚠️ Cleanup migration file not found: {cleanup_path}")
conn.close()
return
with open(cleanup_path, 'r', encoding='utf-8') as f:
cleanup_sql = f.read()
# Execute cleanup migration
cur = conn.cursor()
cur.execute(cleanup_sql)
conn.commit()
cur.close()
# Show AFTER state
cur = conn.cursor()
cur.execute("SELECT id, name, category FROM features ORDER BY category, id")
features_after = cur.fetchall()
print(f"[v9c Cleanup] Features AFTER: {len(features_after)} features")
# Group by category
categories = {}
for f in features_after:
cat = f['category'] or 'other'
if cat not in categories:
categories[cat] = []
categories[cat].append(f"{f['id']} ({f['name']})")
for cat, feats in sorted(categories.items()):
print(f" {cat.upper()}:")
for f in feats:
print(f" - {f}")
# Verify tier_limits updated
cur.execute("""
SELECT tier_id, feature_id, limit_value
FROM tier_limits
WHERE feature_id IN ('data_export', 'data_import')
ORDER BY tier_id, feature_id
""")
limits = cur.fetchall()
print(f"[v9c Cleanup] Tier limits for data_export/data_import:")
for lim in limits:
limit_str = 'unlimited' if lim['limit_value'] is None else lim['limit_value']
print(f" {lim['tier_id']}.{lim['feature_id']} = {limit_str}")
cur.close()
conn.close()
print("[v9c Cleanup] ✅ Feature cleanup completed successfully!")
except Exception as e:
print(f"[v9c Cleanup] ❌ Error: {e}")
raise
if __name__ == "__main__":
apply_migration()