mitai-jinkendo/backend/routers/exportdata.py
Lars 38797d687d
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat: Enhance export functionality and documentation for activity metrics
- Implemented JSON and CSV export features in `routers/exportdata.py`, including enriched session metrics for activity data.
- Updated `CLAUDE.md` to reflect new export capabilities and added details about the inclusion of `session_metrics_json` in the `activity.csv` file.
- Revised `ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md` to document the integration of export features and their relationship with session metrics.
- Improved the overall structure and clarity of the documentation to support the new export functionalities.
2026-04-17 13:09:49 +02:00

411 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Data Export Endpoints for Mitai Jinkendo
Handles CSV, JSON, and ZIP exports with photos.
"""
import os
import csv
import io
import json
import logging
import zipfile
from pathlib import Path
from typing import Optional
from datetime import datetime
from decimal import Decimal
from fastapi import APIRouter, HTTPException, Header, Depends
from fastapi.responses import StreamingResponse, Response
from db import get_db, get_cursor, r2d
from auth import require_auth, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
from caliper_composition import enrich_caliper_row_for_response, load_weight_rows
from data_layer.activity_session_metrics import enrich_sessions_with_metrics
from data_layer.utils import serialize_dates
router = APIRouter(prefix="/api/export", tags=["export"])
logger = logging.getLogger(__name__)
PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos"))
@router.get("/csv")
def export_csv(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as CSV."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_csv')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Build CSV
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow(["Typ", "Datum", "Wert", "Details"])
# Weight
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT date, weight, note FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Gewicht", r['date'], f"{float(r['weight'])}kg", r['note'] or ""])
# Circumferences
cur.execute("SELECT date, c_waist, c_belly, c_hip FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
details = f"Taille:{float(r['c_waist'])}cm Bauch:{float(r['c_belly'])}cm Hüfte:{float(r['c_hip'])}cm"
writer.writerow(["Umfänge", r['date'], "", details])
# Caliper (Magermasse aus Gewicht + KF% nachziehen wenn in DB leer)
cur.execute(
"SELECT date, body_fat_pct, lean_mass, fat_mass FROM caliper_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
cal_rows = [r2d(r) for r in cur.fetchall()]
weight_rows = load_weight_rows(conn, pid)
for r in cal_rows:
enrich_caliper_row_for_response(r, weight_rows)
if r.get("body_fat_pct") is None:
continue
bf = float(r["body_fat_pct"])
lm = r.get("lean_mass")
details = f"Magermasse:{float(lm)}kg" if lm is not None else ""
writer.writerow(["Caliper", r["date"], f"{bf}%", details])
# Nutrition
cur.execute("SELECT date, kcal, protein_g FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Ernährung", r['date'], f"{float(r['kcal'])}kcal", f"Protein:{float(r['protein_g'])}g"])
# Activity (Layer-1: gemergte session_metrics in Details)
cur.execute(
"SELECT id, date, activity_type, duration_min, kcal_active FROM activity_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
act_rows = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, act_rows)
for r in act_rows:
base = f"{float(r['duration_min'])}min {float(r['kcal_active'])}kcal"
eav_parts = []
for m in r.get("session_metrics") or []:
k, v = m.get("key"), m.get("value")
if k is None or v is None:
continue
eav_parts.append(f"{k}={v}")
details = base + (" | " + "; ".join(eav_parts) if eav_parts else "")
writer.writerow(["Training", r["date"], r["activity_type"], details])
output.seek(0)
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.csv"}
)
@router.get("/json")
def export_json(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as JSON."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_json')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Collect all data
data = {}
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
data['profile'] = r2d(cur.fetchone())
cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
data['weight'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
data['circumferences'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,))
data['caliper'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
data['nutrition'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,))
data["activity"] = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, data["activity"])
data["activity"] = serialize_dates(data["activity"])
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
data['insights'] = [r2d(r) for r in cur.fetchall()]
w_rows = [{'date': x['date'], 'weight': x['weight']} for x in data.get('weight', [])]
for row in data.get('caliper', []):
enrich_caliper_row_for_response(row, w_rows)
def decimal_handler(obj):
if isinstance(obj, Decimal):
return float(obj)
return str(obj)
json_str = json.dumps(data, indent=2, default=decimal_handler)
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return Response(
content=json_str,
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.json"}
)
@router.get("/zip")
def export_zip(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as ZIP (CSV + JSON + photos) per specification."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_zip')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Get profile
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
prof = r2d(cur.fetchone())
# Helper: CSV writer with UTF-8 BOM + semicolon
def write_csv(zf, filename, rows, columns):
if not rows:
return
output = io.StringIO()
writer = csv.writer(output, delimiter=';')
writer.writerow(columns)
for r in rows:
writer.writerow([
'' if r.get(col) is None else
(float(r[col]) if isinstance(r.get(col), Decimal) else r[col])
for col in columns
])
# UTF-8 with BOM for Excel
csv_bytes = '\ufeff'.encode('utf-8') + output.getvalue().encode('utf-8')
zf.writestr(f"data/{filename}", csv_bytes)
# Create ZIP
zip_buffer = io.BytesIO()
export_date = datetime.now().strftime('%Y-%m-%d')
profile_name = prof.get('name', 'export')
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
with get_db() as conn:
cur = get_cursor(conn)
# 1. README.txt
readme = f"""Mitai Jinkendo Datenexport
Version: 2
Exportiert am: {export_date}
Profil: {profile_name}
Inhalt:
- profile.json: Profildaten und Einstellungen
- data/*.csv: Messdaten (Semikolon-getrennt, UTF-8)
- insights/: KI-Auswertungen (JSON)
- photos/: Progress-Fotos (JPEG)
Import:
Dieser Export kann in Mitai Jinkendo unter
Einstellungen → Import → "Mitai Backup importieren"
wieder eingespielt werden.
activity.csv (optional): Spalte session_metrics_json (JSON-Array, Layer-1-merge)
wird beim Standard-Import ignoriert; für Vollständigkeit/externe Tools.
Format-Version 2 (ab v9b):
Alle CSV-Dateien sind UTF-8 mit BOM kodiert.
Trennzeichen: Semikolon (;)
Datumsformat: YYYY-MM-DD
"""
zf.writestr("README.txt", readme.encode('utf-8'))
# 2. profile.json (ohne Passwort-Hash)
cur.execute("SELECT COUNT(*) as c FROM weight_log WHERE profile_id=%s", (pid,))
w_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM nutrition_log WHERE profile_id=%s", (pid,))
n_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM activity_log WHERE profile_id=%s", (pid,))
a_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM photos WHERE profile_id=%s", (pid,))
p_count = cur.fetchone()['c']
profile_data = {
"export_version": "2",
"export_date": export_date,
"app": "Mitai Jinkendo",
"profile": {
"name": prof.get('name'),
"email": prof.get('email'),
"sex": prof.get('sex'),
"height": float(prof['height']) if prof.get('height') else None,
"birth_year": prof['dob'].year if prof.get('dob') else None,
"goal_weight": float(prof['goal_weight']) if prof.get('goal_weight') else None,
"goal_bf_pct": float(prof['goal_bf_pct']) if prof.get('goal_bf_pct') else None,
"avatar_color": prof.get('avatar_color'),
"auth_type": prof.get('auth_type'),
"session_days": prof.get('session_days'),
"ai_enabled": prof.get('ai_enabled'),
"tier": prof.get('tier')
},
"stats": {
"weight_entries": w_count,
"nutrition_entries": n_count,
"activity_entries": a_count,
"photos": p_count
}
}
zf.writestr("profile.json", json.dumps(profile_data, indent=2, ensure_ascii=False).encode('utf-8'))
# 3-7. CSV exports (weight, circumferences, caliper, nutrition, activity)
cur.execute("SELECT id, date, weight, note, source, created FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
write_csv(zf, "weight.csv", [r2d(r) for r in cur.fetchall()], ['id','date','weight','note','source','created'])
cur.execute("SELECT id, date, c_waist, c_hip, c_chest, c_neck, c_arm, c_thigh, c_calf, notes, created FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['waist'] = r.pop('c_waist', None); r['hip'] = r.pop('c_hip', None)
r['chest'] = r.pop('c_chest', None); r['neck'] = r.pop('c_neck', None)
r['upper_arm'] = r.pop('c_arm', None); r['thigh'] = r.pop('c_thigh', None)
r['calf'] = r.pop('c_calf', None); r['forearm'] = None; r['note'] = r.pop('notes', None)
write_csv(zf, "circumferences.csv", rows, ['id','date','waist','hip','chest','neck','upper_arm','thigh','calf','forearm','note','created'])
cur.execute("SELECT id, date, sf_chest, sf_abdomen, sf_thigh, sf_triceps, sf_subscap, sf_suprailiac, sf_axilla, sf_method, body_fat_pct, notes, created FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['chest'] = r.pop('sf_chest', None); r['abdomen'] = r.pop('sf_abdomen', None)
r['thigh'] = r.pop('sf_thigh', None); r['tricep'] = r.pop('sf_triceps', None)
r['subscapular'] = r.pop('sf_subscap', None); r['suprailiac'] = r.pop('sf_suprailiac', None)
r['midaxillary'] = r.pop('sf_axilla', None); r['method'] = r.pop('sf_method', None)
r['bf_percent'] = r.pop('body_fat_pct', None); r['note'] = r.pop('notes', None)
write_csv(zf, "caliper.csv", rows, ['id','date','chest','abdomen','thigh','tricep','subscapular','suprailiac','midaxillary','method','bf_percent','note','created'])
cur.execute("SELECT id, date, kcal, protein_g, fat_g, carbs_g, source, created FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['meal_name'] = ''; r['protein'] = r.pop('protein_g', None)
r['fat'] = r.pop('fat_g', None); r['carbs'] = r.pop('carbs_g', None)
r['fiber'] = None; r['note'] = ''
write_csv(zf, "nutrition.csv", rows, ['id','date','meal_name','kcal','protein','fat','carbs','fiber','note','source','created'])
cur.execute(
"SELECT id, date, activity_type, duration_min, kcal_active, hr_avg, hr_max, distance_km, notes, source, created FROM activity_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
rows = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, rows)
for r in rows:
sm = r.pop("session_metrics", None) or []
r["session_metrics_json"] = json.dumps(sm, ensure_ascii=False, default=str)
r["name"] = r["activity_type"]
r["type"] = r.pop("activity_type", None)
r["kcal"] = r.pop("kcal_active", None)
r["heart_rate_avg"] = r.pop("hr_avg", None)
r["heart_rate_max"] = r.pop("hr_max", None)
r["note"] = r.pop("notes", None)
write_csv(
zf,
"activity.csv",
rows,
[
"id",
"date",
"name",
"type",
"duration_min",
"kcal",
"heart_rate_avg",
"heart_rate_max",
"distance_km",
"note",
"source",
"created",
"session_metrics_json",
],
)
# 8. insights/ai_insights.json
cur.execute("SELECT id, scope, content, created FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
insights = []
for r in cur.fetchall():
rd = r2d(r)
insights.append({
"id": rd['id'],
"scope": rd['scope'],
"created": rd['created'].isoformat() if hasattr(rd['created'], 'isoformat') else str(rd['created']),
"result": rd['content']
})
if insights:
zf.writestr("insights/ai_insights.json", json.dumps(insights, indent=2, ensure_ascii=False).encode('utf-8'))
# 9. photos/
cur.execute("SELECT * FROM photos WHERE profile_id=%s ORDER BY date", (pid,))
photos = [r2d(r) for r in cur.fetchall()]
for i, photo in enumerate(photos):
photo_path = Path(PHOTOS_DIR) / photo['path']
if photo_path.exists():
filename = f"{photo.get('date') or export_date}_{i+1}{photo_path.suffix}"
zf.write(photo_path, f"photos/{filename}")
zip_buffer.seek(0)
filename = f"mitai-export-{profile_name.replace(' ','-')}-{export_date}.zip"
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return StreamingResponse(
iter([zip_buffer.getvalue()]),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)