""" Data Export Endpoints for Mitai Jinkendo Handles CSV, JSON, and ZIP exports with photos. """ import os import csv import io import json import zipfile from pathlib import Path from typing import Optional from datetime import datetime from decimal import Decimal from fastapi import APIRouter, HTTPException, Header, Depends from fastapi.responses import StreamingResponse, Response from db import get_db, get_cursor, r2d from auth import require_auth from routers.profiles import get_pid router = APIRouter(prefix="/api/export", tags=["export"]) PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos")) @router.get("/csv") def export_csv(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Export all data as CSV.""" pid = get_pid(x_profile_id) # Check export permission with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT export_enabled FROM profiles WHERE id=%s", (pid,)) prof = cur.fetchone() if not prof or not prof['export_enabled']: raise HTTPException(403, "Export ist für dieses Profil deaktiviert") # Build CSV output = io.StringIO() writer = csv.writer(output) # Header writer.writerow(["Typ", "Datum", "Wert", "Details"]) # Weight with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT date, weight, note FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Gewicht", r['date'], f"{float(r['weight'])}kg", r['note'] or ""]) # Circumferences cur.execute("SELECT date, c_waist, c_belly, c_hip FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): details = f"Taille:{float(r['c_waist'])}cm Bauch:{float(r['c_belly'])}cm Hüfte:{float(r['c_hip'])}cm" writer.writerow(["Umfänge", r['date'], "", details]) # Caliper cur.execute("SELECT date, body_fat_pct, lean_mass FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Caliper", r['date'], f"{float(r['body_fat_pct'])}%", f"Magermasse:{float(r['lean_mass'])}kg"]) # Nutrition cur.execute("SELECT date, kcal, protein_g FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Ernährung", r['date'], f"{float(r['kcal'])}kcal", f"Protein:{float(r['protein_g'])}g"]) # Activity cur.execute("SELECT date, activity_type, duration_min, kcal_active FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,)) for r in cur.fetchall(): writer.writerow(["Training", r['date'], r['activity_type'], f"{float(r['duration_min'])}min {float(r['kcal_active'])}kcal"]) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.csv"} ) @router.get("/json") def export_json(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Export all data as JSON.""" pid = get_pid(x_profile_id) # Check export permission with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT export_enabled FROM profiles WHERE id=%s", (pid,)) prof = cur.fetchone() if not prof or not prof['export_enabled']: raise HTTPException(403, "Export ist für dieses Profil deaktiviert") # Collect all data data = {} with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,)) data['profile'] = r2d(cur.fetchone()) cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,)) data['weight'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,)) data['circumferences'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,)) data['caliper'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,)) data['nutrition'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,)) data['activity'] = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,)) data['insights'] = [r2d(r) for r in cur.fetchall()] def decimal_handler(obj): if isinstance(obj, Decimal): return float(obj) return str(obj) json_str = json.dumps(data, indent=2, default=decimal_handler) return Response( content=json_str, media_type="application/json", headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.json"} ) @router.get("/zip") def export_zip(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Export all data as ZIP (CSV + JSON + photos) per specification.""" pid = get_pid(x_profile_id) # Check export permission & get profile with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,)) prof = r2d(cur.fetchone()) if not prof or not prof.get('export_enabled'): raise HTTPException(403, "Export ist für dieses Profil deaktiviert") # Helper: CSV writer with UTF-8 BOM + semicolon def write_csv(zf, filename, rows, columns): if not rows: return output = io.StringIO() writer = csv.writer(output, delimiter=';') writer.writerow(columns) for r in rows: writer.writerow([ '' if r.get(col) is None else (float(r[col]) if isinstance(r.get(col), Decimal) else r[col]) for col in columns ]) # UTF-8 with BOM for Excel csv_bytes = '\ufeff'.encode('utf-8') + output.getvalue().encode('utf-8') zf.writestr(f"data/{filename}", csv_bytes) # Create ZIP zip_buffer = io.BytesIO() export_date = datetime.now().strftime('%Y-%m-%d') profile_name = prof.get('name', 'export') with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: with get_db() as conn: cur = get_cursor(conn) # 1. README.txt readme = f"""Mitai Jinkendo – Datenexport Version: 2 Exportiert am: {export_date} Profil: {profile_name} Inhalt: - profile.json: Profildaten und Einstellungen - data/*.csv: Messdaten (Semikolon-getrennt, UTF-8) - insights/: KI-Auswertungen (JSON) - photos/: Progress-Fotos (JPEG) Import: Dieser Export kann in Mitai Jinkendo unter Einstellungen → Import → "Mitai Backup importieren" wieder eingespielt werden. Format-Version 2 (ab v9b): Alle CSV-Dateien sind UTF-8 mit BOM kodiert. Trennzeichen: Semikolon (;) Datumsformat: YYYY-MM-DD """ zf.writestr("README.txt", readme.encode('utf-8')) # 2. profile.json (ohne Passwort-Hash) cur.execute("SELECT COUNT(*) as c FROM weight_log WHERE profile_id=%s", (pid,)) w_count = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM nutrition_log WHERE profile_id=%s", (pid,)) n_count = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM activity_log WHERE profile_id=%s", (pid,)) a_count = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM photos WHERE profile_id=%s", (pid,)) p_count = cur.fetchone()['c'] profile_data = { "export_version": "2", "export_date": export_date, "app": "Mitai Jinkendo", "profile": { "name": prof.get('name'), "email": prof.get('email'), "sex": prof.get('sex'), "height": float(prof['height']) if prof.get('height') else None, "birth_year": prof['dob'].year if prof.get('dob') else None, "goal_weight": float(prof['goal_weight']) if prof.get('goal_weight') else None, "goal_bf_pct": float(prof['goal_bf_pct']) if prof.get('goal_bf_pct') else None, "avatar_color": prof.get('avatar_color'), "auth_type": prof.get('auth_type'), "session_days": prof.get('session_days'), "ai_enabled": prof.get('ai_enabled'), "tier": prof.get('tier') }, "stats": { "weight_entries": w_count, "nutrition_entries": n_count, "activity_entries": a_count, "photos": p_count } } zf.writestr("profile.json", json.dumps(profile_data, indent=2, ensure_ascii=False).encode('utf-8')) # 3-7. CSV exports (weight, circumferences, caliper, nutrition, activity) cur.execute("SELECT id, date, weight, note, source, created FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,)) write_csv(zf, "weight.csv", [r2d(r) for r in cur.fetchall()], ['id','date','weight','note','source','created']) cur.execute("SELECT id, date, c_waist, c_hip, c_chest, c_neck, c_arm, c_thigh, c_calf, notes, created FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['waist'] = r.pop('c_waist', None); r['hip'] = r.pop('c_hip', None) r['chest'] = r.pop('c_chest', None); r['neck'] = r.pop('c_neck', None) r['upper_arm'] = r.pop('c_arm', None); r['thigh'] = r.pop('c_thigh', None) r['calf'] = r.pop('c_calf', None); r['forearm'] = None; r['note'] = r.pop('notes', None) write_csv(zf, "circumferences.csv", rows, ['id','date','waist','hip','chest','neck','upper_arm','thigh','calf','forearm','note','created']) cur.execute("SELECT id, date, sf_chest, sf_abdomen, sf_thigh, sf_triceps, sf_subscap, sf_suprailiac, sf_axilla, sf_method, body_fat_pct, notes, created FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['chest'] = r.pop('sf_chest', None); r['abdomen'] = r.pop('sf_abdomen', None) r['thigh'] = r.pop('sf_thigh', None); r['tricep'] = r.pop('sf_triceps', None) r['subscapular'] = r.pop('sf_subscap', None); r['suprailiac'] = r.pop('sf_suprailiac', None) r['midaxillary'] = r.pop('sf_axilla', None); r['method'] = r.pop('sf_method', None) r['bf_percent'] = r.pop('body_fat_pct', None); r['note'] = r.pop('notes', None) write_csv(zf, "caliper.csv", rows, ['id','date','chest','abdomen','thigh','tricep','subscapular','suprailiac','midaxillary','method','bf_percent','note','created']) cur.execute("SELECT id, date, kcal, protein_g, fat_g, carbs_g, source, created FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['meal_name'] = ''; r['protein'] = r.pop('protein_g', None) r['fat'] = r.pop('fat_g', None); r['carbs'] = r.pop('carbs_g', None) r['fiber'] = None; r['note'] = '' write_csv(zf, "nutrition.csv", rows, ['id','date','meal_name','kcal','protein','fat','carbs','fiber','note','source','created']) cur.execute("SELECT id, date, activity_type, duration_min, kcal_active, hr_avg, hr_max, distance_km, notes, source, created FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,)) rows = [r2d(r) for r in cur.fetchall()] for r in rows: r['name'] = r['activity_type']; r['type'] = r.pop('activity_type', None) r['kcal'] = r.pop('kcal_active', None); r['heart_rate_avg'] = r.pop('hr_avg', None) r['heart_rate_max'] = r.pop('hr_max', None); r['note'] = r.pop('notes', None) write_csv(zf, "activity.csv", rows, ['id','date','name','type','duration_min','kcal','heart_rate_avg','heart_rate_max','distance_km','note','source','created']) # 8. insights/ai_insights.json cur.execute("SELECT id, scope, content, created FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,)) insights = [] for r in cur.fetchall(): rd = r2d(r) insights.append({ "id": rd['id'], "scope": rd['scope'], "created": rd['created'].isoformat() if hasattr(rd['created'], 'isoformat') else str(rd['created']), "result": rd['content'] }) if insights: zf.writestr("insights/ai_insights.json", json.dumps(insights, indent=2, ensure_ascii=False).encode('utf-8')) # 9. photos/ cur.execute("SELECT * FROM photos WHERE profile_id=%s ORDER BY date", (pid,)) photos = [r2d(r) for r in cur.fetchall()] for i, photo in enumerate(photos): photo_path = Path(PHOTOS_DIR) / photo['path'] if photo_path.exists(): filename = f"{photo.get('date') or export_date}_{i+1}{photo_path.suffix}" zf.write(photo_path, f"photos/{filename}") zip_buffer.seek(0) filename = f"mitai-export-{profile_name.replace(' ','-')}-{export_date}.zip" return StreamingResponse( iter([zip_buffer.getvalue()]), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={filename}"} )