""" Data Export Endpoints for Mitai Jinkendo Handles CSV, JSON, and ZIP exports with photos. """ import os import csv import io import json import zipfile from pathlib import Path from typing import Optional from datetime import datetime from decimal import Decimal from fastapi import APIRouter, HTTPException, Header, Depends from fastapi.responses import StreamingResponse, Response from db import get_db, get_cursor, r2d from auth import require_auth, check_feature_access, increment_feature_usage from routers.profiles import get_pid router = APIRouter(prefix="/api/export", tags=["export"]) PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos")) @router.get("/csv") def export_csv(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Export all data as CSV.""" pid = get_pid(x_profile_id) # Check feature access (v9c feature system) access = check_feature_access(pid, 'data_export') if not access['allowed']: if access['reason'] == 'feature_disabled': raise HTTPException(403, "Export ist für dein Tier nicht verfügbar") elif access['reason'] == 'limit_exceeded': raise HTTPException(429, f"Monatliches Export-Limit erreicht ({access['limit']} Exporte)") else: raise HTTPException(403, f"Zugriff verweigert: {access['reason']}") # Build CSV output = io.StringIO() writer = csv.writer(output) # Header writer.writerow(["Typ", "Datum", "Wert", "Details"]) # Weight with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT date, weight, note FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Gewicht", r['date'], f"{float(r['weight'])}kg", r['note'] or ""]) # Circumferences cur.execute("SELECT date, c_waist, c_belly, c_hip FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): details = f"Taille:{float(r['c_waist'])}cm Bauch:{float(r['c_belly'])}cm Hüfte:{float(r['c_hip'])}cm" writer.writerow(["Umfänge", r['date'], "", details]) # Caliper cur.execute("SELECT date, body_fat_pct, lean_mass FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Caliper", r['date'], f"{float(r['body_fat_pct'])}%", f"Magermasse:{float(r['lean_mass'])}kg"]) # Nutrition cur.execute("SELECT date, kcal, protein_g FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Ernährung", r['date'], f"{float(r['kcal'])}kcal", f"Protein:{float(r['protein_g'])}g"]) # Activity cur.execute("SELECT date, activity_type, duration_min, kcal_active FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Training", r['date'], r['activity_type'], f"{float(r['duration_min'])}min {float(r['kcal_active'])}kcal"]) output.seek(0) # Increment export usage counter (v9c feature system) increment_feature_usage(pid, 'data_export') return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.csv"} ) @router.get("/json") def export_json(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Export all data as JSON.""" pid = get_pid(x_profile_id) # Check feature access (v9c feature system) access = check_feature_access(pid, 'data_export') if not access['allowed']: if access['reason'] == 'feature_disabled': raise HTTPException(403, "Export ist für dein Tier nicht verfügbar") elif access['reason'] == 'limit_exceeded': raise HTTPException(429, f"Monatliches Export-Limit erreicht ({access['limit']} Exporte)") else: raise HTTPException(403, f"Zugriff verweigert: {access['reason']}") # Collect all data data = {} with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,)) data['profile'] = r2d(cur.fetchone()) cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,)) data['weight'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,)) data['circumferences'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,)) data['caliper'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,)) data['nutrition'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,)) data['activity'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,)) data['insights'] = [r2d(r) for r in cur.fetchall()] def decimal_handler(obj): if isinstance(obj, Decimal): return float(obj) return str(obj) json_str = json.dumps(data, indent=2, default=decimal_handler) # Increment export usage counter (v9c feature system) increment_feature_usage(pid, 'data_export') return Response( content=json_str, media_type="application/json", headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.json"} ) @router.get("/zip") def export_zip(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Export all data as ZIP (CSV + JSON + photos) per specification.""" pid = get_pid(x_profile_id) # Check feature access (v9c feature system) access = check_feature_access(pid, 'data_export') if not access['allowed']: if access['reason'] == 'feature_disabled': raise HTTPException(403, "Export ist für dein Tier nicht verfügbar") elif access['reason'] == 'limit_exceeded': raise HTTPException(429, f"Monatliches Export-Limit erreicht ({access['limit']} Exporte)") else: raise HTTPException(403, f"Zugriff verweigert: {access['reason']}") # Get profile with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,)) prof = r2d(cur.fetchone()) # Helper: CSV writer with UTF-8 BOM + semicolon def write_csv(zf, filename, rows, columns): if not rows: return output = io.StringIO() writer = csv.writer(output, delimiter=';') writer.writerow(columns) for r in rows: writer.writerow([ '' if r.get(col) is None else (float(r[col]) if isinstance(r.get(col), Decimal) else r[col]) for col in columns ]) # UTF-8 with BOM for Excel csv_bytes = '\ufeff'.encode('utf-8') + output.getvalue().encode('utf-8') zf.writestr(f"data/{filename}", csv_bytes) # Create ZIP zip_buffer = io.BytesIO() export_date = datetime.now().strftime('%Y-%m-%d') profile_name = prof.get('name', 'export') with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: with get_db() as conn: cur = get_cursor(conn) # 1. README.txt readme = f"""Mitai Jinkendo – Datenexport Version: 2 Exportiert am: {export_date} Profil: {profile_name} Inhalt: - profile.json: Profildaten und Einstellungen - data/*.csv: Messdaten (Semikolon-getrennt, UTF-8) - insights/: KI-Auswertungen (JSON) - photos/: Progress-Fotos (JPEG) Import: Dieser Export kann in Mitai Jinkendo unter Einstellungen → Import → "Mitai Backup importieren" wieder eingespielt werden. Format-Version 2 (ab v9b): Alle CSV-Dateien sind UTF-8 mit BOM kodiert. Trennzeichen: Semikolon (;) Datumsformat: YYYY-MM-DD """ zf.writestr("README.txt", readme.encode('utf-8')) # 2. profile.json (ohne Passwort-Hash) cur.execute("SELECT COUNT(*) as c FROM weight_log WHERE profile_id=%s", (pid,)) w_count = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM nutrition_log WHERE profile_id=%s", (pid,)) n_count = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM activity_log WHERE profile_id=%s", (pid,)) a_count = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM photos WHERE profile_id=%s", (pid,)) p_count = cur.fetchone()['c'] profile_data = { "export_version": "2", "export_date": export_date, "app": "Mitai Jinkendo", "profile": { "name": prof.get('name'), "email": prof.get('email'), "sex": prof.get('sex'), "height": float(prof['height']) if prof.get('height') else None, "birth_year": prof['dob'].year if prof.get('dob') else None, "goal_weight": float(prof['goal_weight']) if prof.get('goal_weight') else None, "goal_bf_pct": float(prof['goal_bf_pct']) if prof.get('goal_bf_pct') else None, "avatar_color": prof.get('avatar_color'), "auth_type": prof.get('auth_type'), "session_days": prof.get('session_days'), "ai_enabled": prof.get('ai_enabled'), "tier": prof.get('tier') }, "stats": { "weight_entries": w_count, "nutrition_entries": n_count, "activity_entries": a_count, "photos": p_count } } zf.writestr("profile.json", json.dumps(profile_data, indent=2, ensure_ascii=False).encode('utf-8')) # 3-7. CSV exports (weight, circumferences, caliper, nutrition, activity) cur.execute("SELECT id, date, weight, note, source, created FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,)) write_csv(zf, "weight.csv", [r2d(r) for r in cur.fetchall()], ['id','date','weight','note','source','created']) cur.execute("SELECT id, date, c_waist, c_hip, c_chest, c_neck, c_arm, c_thigh, c_calf, notes, created FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['waist'] = r.pop('c_waist', None); r['hip'] = r.pop('c_hip', None) r['chest'] = r.pop('c_chest', None); r['neck'] = r.pop('c_neck', None) r['upper_arm'] = r.pop('c_arm', None); r['thigh'] = r.pop('c_thigh', None) r['calf'] = r.pop('c_calf', None); r['forearm'] = None; r['note'] = r.pop('notes', None) write_csv(zf, "circumferences.csv", rows, ['id','date','waist','hip','chest','neck','upper_arm','thigh','calf','forearm','note','created']) cur.execute("SELECT id, date, sf_chest, sf_abdomen, sf_thigh, sf_triceps, sf_subscap, sf_suprailiac, sf_axilla, sf_method, body_fat_pct, notes, created FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['chest'] = r.pop('sf_chest', None); r['abdomen'] = r.pop('sf_abdomen', None) r['thigh'] = r.pop('sf_thigh', None); r['tricep'] = r.pop('sf_triceps', None) r['subscapular'] = r.pop('sf_subscap', None); r['suprailiac'] = r.pop('sf_suprailiac', None) r['midaxillary'] = r.pop('sf_axilla', None); r['method'] = r.pop('sf_method', None) r['bf_percent'] = r.pop('body_fat_pct', None); r['note'] = r.pop('notes', None) write_csv(zf, "caliper.csv", rows, ['id','date','chest','abdomen','thigh','tricep','subscapular','suprailiac','midaxillary','method','bf_percent','note','created']) cur.execute("SELECT id, date, kcal, protein_g, fat_g, carbs_g, source, created FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['meal_name'] = ''; r['protein'] = r.pop('protein_g', None) r['fat'] = r.pop('fat_g', None); r['carbs'] = r.pop('carbs_g', None) r['fiber'] = None; r['note'] = '' write_csv(zf, "nutrition.csv", rows, ['id','date','meal_name','kcal','protein','fat','carbs','fiber','note','source','created']) cur.execute("SELECT id, date, activity_type, duration_min, kcal_active, hr_avg, hr_max, distance_km, notes, source, created FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['name'] = r['activity_type']; r['type'] = r.pop('activity_type', None) r['kcal'] = r.pop('kcal_active', None); r['heart_rate_avg'] = r.pop('hr_avg', None) r['heart_rate_max'] = r.pop('hr_max', None); r['note'] = r.pop('notes', None) write_csv(zf, "activity.csv", rows, ['id','date','name','type','duration_min','kcal','heart_rate_avg','heart_rate_max','distance_km','note','source','created']) # 8. insights/ai_insights.json cur.execute("SELECT id, scope, content, created FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,)) insights = [] for r in cur.fetchall(): rd = r2d(r) insights.append({ "id": rd['id'], "scope": rd['scope'], "created": rd['created'].isoformat() if hasattr(rd['created'], 'isoformat') else str(rd['created']), "result": rd['content'] }) if insights: zf.writestr("insights/ai_insights.json", json.dumps(insights, indent=2, ensure_ascii=False).encode('utf-8')) # 9. photos/ cur.execute("SELECT * FROM photos WHERE profile_id=%s ORDER BY date", (pid,)) photos = [r2d(r) for r in cur.fetchall()] for i, photo in enumerate(photos): photo_path = Path(PHOTOS_DIR) / photo['path'] if photo_path.exists(): filename = f"{photo.get('date') or export_date}_{i+1}{photo_path.suffix}" zf.write(photo_path, f"photos/{filename}") zip_buffer.seek(0) filename = f"mitai-export-{profile_name.replace(' ','-')}-{export_date}.zip" # Increment export usage counter (v9c feature system) increment_feature_usage(pid, 'data_export') return StreamingResponse( iter([zip_buffer.getvalue()]), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={filename}"} )