mitai-jinkendo/backend/routers/exportdata.py
Lars df0165bee3
All checks were successful
Deploy Development / deploy (push) Successful in 1m0s
Build Test / pytest-backend (push) Successful in 9s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
feat: add relaxed arm circumference measurement and update related features
- Introduced `c_arm_relaxed` to the CircumferenceEntry model for tracking relaxed arm measurements.
- Updated database schema to include `c_arm_relaxed` in the circumference_log table.
- Implemented calculation for 28-day relaxed arm circumference change with `calculate_arm_relaxed_28d_delta`.
- Enhanced placeholder resolver and registration to support new relaxed arm measurement.
- Updated frontend components to accommodate the new measurement, including forms and CSV exports.
- Improved documentation and guide data to reflect the addition of relaxed arm measurements.
2026-04-19 10:34:51 +02:00

436 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Data Export Endpoints for Mitai Jinkendo
Handles CSV, JSON, and ZIP exports with photos.
"""
import os
import csv
import io
import json
import logging
import zipfile
from pathlib import Path
from typing import Optional
from datetime import datetime
from decimal import Decimal
from fastapi import APIRouter, HTTPException, Header, Depends
from fastapi.responses import StreamingResponse, Response
from db import get_db, get_cursor, r2d
from auth import require_auth, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
from caliper_composition import enrich_caliper_row_for_response, load_weight_rows
from data_layer.activity_session_metrics import enrich_sessions_with_metrics
from data_layer.utils import serialize_dates
from routers.photos import resolve_photo_path
router = APIRouter(prefix="/api/export", tags=["export"])
logger = logging.getLogger(__name__)
PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos"))
@router.get("/csv")
def export_csv(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as CSV."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_csv')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Build CSV
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow(["Typ", "Datum", "Wert", "Details"])
# Weight
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT date, weight, note FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Gewicht", r['date'], f"{float(r['weight'])}kg", r['note'] or ""])
# Circumferences
cur.execute("SELECT date, c_waist, c_belly, c_hip FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
details = f"Taille:{float(r['c_waist'])}cm Bauch:{float(r['c_belly'])}cm Hüfte:{float(r['c_hip'])}cm"
writer.writerow(["Umfänge", r['date'], "", details])
# Caliper (Magermasse aus Gewicht + KF% nachziehen wenn in DB leer)
cur.execute(
"SELECT date, body_fat_pct, lean_mass, fat_mass FROM caliper_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
cal_rows = [r2d(r) for r in cur.fetchall()]
weight_rows = load_weight_rows(conn, pid)
for r in cal_rows:
enrich_caliper_row_for_response(r, weight_rows)
if r.get("body_fat_pct") is None:
continue
bf = float(r["body_fat_pct"])
lm = r.get("lean_mass")
details = f"Magermasse:{float(lm)}kg" if lm is not None else ""
writer.writerow(["Caliper", r["date"], f"{bf}%", details])
# Nutrition
cur.execute("SELECT date, kcal, protein_g FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Ernährung", r['date'], f"{float(r['kcal'])}kcal", f"Protein:{float(r['protein_g'])}g"])
# Activity (Layer-1: gemergte session_metrics in Details)
cur.execute(
"SELECT id, date, activity_type, duration_min, kcal_active FROM activity_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
act_rows = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, act_rows)
for r in act_rows:
base = f"{float(r['duration_min'])}min {float(r['kcal_active'])}kcal"
eav_parts = []
for m in r.get("session_metrics") or []:
k, v = m.get("key"), m.get("value")
if k is None or v is None:
continue
eav_parts.append(f"{k}={v}")
details = base + (" | " + "; ".join(eav_parts) if eav_parts else "")
writer.writerow(["Training", r["date"], r["activity_type"], details])
output.seek(0)
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.csv"}
)
@router.get("/json")
def export_json(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as JSON."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_json')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Collect all data
data = {}
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
data['profile'] = r2d(cur.fetchone())
cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
data['weight'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
data['circumferences'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,))
data['caliper'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
data['nutrition'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,))
data["activity"] = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, data["activity"])
data["activity"] = serialize_dates(data["activity"])
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
data['insights'] = [r2d(r) for r in cur.fetchall()]
w_rows = [{'date': x['date'], 'weight': x['weight']} for x in data.get('weight', [])]
for row in data.get('caliper', []):
enrich_caliper_row_for_response(row, w_rows)
def decimal_handler(obj):
if isinstance(obj, Decimal):
return float(obj)
return str(obj)
json_str = json.dumps(data, indent=2, default=decimal_handler)
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return Response(
content=json_str,
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.json"}
)
@router.get("/zip")
def export_zip(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as ZIP (CSV + JSON + photos) per specification."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_zip')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Get profile
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
prof = r2d(cur.fetchone())
# Helper: CSV writer with UTF-8 BOM + semicolon
def write_csv(zf, filename, rows, columns):
if not rows:
return
output = io.StringIO()
writer = csv.writer(output, delimiter=';')
writer.writerow(columns)
for r in rows:
writer.writerow([
'' if r.get(col) is None else
(float(r[col]) if isinstance(r.get(col), Decimal) else r[col])
for col in columns
])
# UTF-8 with BOM for Excel
csv_bytes = '\ufeff'.encode('utf-8') + output.getvalue().encode('utf-8')
zf.writestr(f"data/{filename}", csv_bytes)
# Create ZIP
zip_buffer = io.BytesIO()
export_date = datetime.now().strftime('%Y-%m-%d')
profile_name = prof.get('name', 'export')
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
with get_db() as conn:
cur = get_cursor(conn)
# 1. README.txt
readme = f"""Mitai Jinkendo Datenexport
Version: 2
Exportiert am: {export_date}
Profil: {profile_name}
Inhalt:
- profile.json: Profildaten und Einstellungen
- data/*.csv: Messdaten (Semikolon-getrennt, UTF-8)
- insights/: KI-Auswertungen (JSON)
- photos/: Progress-Fotos (JPEG)
Import:
Dieser Export kann in Mitai Jinkendo unter
Einstellungen → Import → "Mitai Backup importieren"
wieder eingespielt werden.
activity.csv (optional): Spalte session_metrics_json (JSON-Array, Layer-1-merge)
wird beim Standard-Import ignoriert; für Vollständigkeit/externe Tools.
Format-Version 2 (ab v9b):
Alle CSV-Dateien sind UTF-8 mit BOM kodiert.
Trennzeichen: Semikolon (;)
Datumsformat: YYYY-MM-DD
"""
zf.writestr("README.txt", readme.encode('utf-8'))
# 2. profile.json (ohne Passwort-Hash)
cur.execute("SELECT COUNT(*) as c FROM weight_log WHERE profile_id=%s", (pid,))
w_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM nutrition_log WHERE profile_id=%s", (pid,))
n_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM activity_log WHERE profile_id=%s", (pid,))
a_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM photos WHERE profile_id=%s", (pid,))
p_count = cur.fetchone()['c']
profile_data = {
"export_version": "2",
"export_date": export_date,
"app": "Mitai Jinkendo",
"profile": {
"name": prof.get('name'),
"email": prof.get('email'),
"sex": prof.get('sex'),
"height": float(prof['height']) if prof.get('height') else None,
"birth_year": prof['dob'].year if prof.get('dob') else None,
"goal_weight": float(prof['goal_weight']) if prof.get('goal_weight') else None,
"goal_bf_pct": float(prof['goal_bf_pct']) if prof.get('goal_bf_pct') else None,
"avatar_color": prof.get('avatar_color'),
"auth_type": prof.get('auth_type'),
"session_days": prof.get('session_days'),
"ai_enabled": prof.get('ai_enabled'),
"tier": prof.get('tier')
},
"stats": {
"weight_entries": w_count,
"nutrition_entries": n_count,
"activity_entries": a_count,
"photos": p_count
}
}
zf.writestr("profile.json", json.dumps(profile_data, indent=2, ensure_ascii=False).encode('utf-8'))
# 3-7. CSV exports (weight, circumferences, caliper, nutrition, activity)
cur.execute("SELECT id, date, weight, note, source, created FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
write_csv(zf, "weight.csv", [r2d(r) for r in cur.fetchall()], ['id','date','weight','note','source','created'])
cur.execute(
"SELECT id, date, c_waist, c_hip, c_chest, c_neck, c_arm, c_arm_relaxed, c_thigh, c_calf, notes, created FROM circumference_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['waist'] = r.pop('c_waist', None); r['hip'] = r.pop('c_hip', None)
r['chest'] = r.pop('c_chest', None); r['neck'] = r.pop('c_neck', None)
r['upper_arm_contracted'] = r.pop('c_arm', None)
r['upper_arm_relaxed'] = r.pop('c_arm_relaxed', None)
r['thigh'] = r.pop('c_thigh', None)
r['calf'] = r.pop('c_calf', None); r['forearm'] = None; r['note'] = r.pop('notes', None)
write_csv(
zf,
"circumferences.csv",
rows,
[
'id',
'date',
'waist',
'hip',
'chest',
'neck',
'upper_arm_contracted',
'upper_arm_relaxed',
'thigh',
'calf',
'forearm',
'note',
'created',
],
)
cur.execute("SELECT id, date, sf_chest, sf_abdomen, sf_thigh, sf_triceps, sf_subscap, sf_suprailiac, sf_axilla, sf_method, body_fat_pct, notes, created FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['chest'] = r.pop('sf_chest', None); r['abdomen'] = r.pop('sf_abdomen', None)
r['thigh'] = r.pop('sf_thigh', None); r['tricep'] = r.pop('sf_triceps', None)
r['subscapular'] = r.pop('sf_subscap', None); r['suprailiac'] = r.pop('sf_suprailiac', None)
r['midaxillary'] = r.pop('sf_axilla', None); r['method'] = r.pop('sf_method', None)
r['bf_percent'] = r.pop('body_fat_pct', None); r['note'] = r.pop('notes', None)
write_csv(zf, "caliper.csv", rows, ['id','date','chest','abdomen','thigh','tricep','subscapular','suprailiac','midaxillary','method','bf_percent','note','created'])
cur.execute("SELECT id, date, kcal, protein_g, fat_g, carbs_g, source, created FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['meal_name'] = ''; r['protein'] = r.pop('protein_g', None)
r['fat'] = r.pop('fat_g', None); r['carbs'] = r.pop('carbs_g', None)
r['fiber'] = None; r['note'] = ''
write_csv(zf, "nutrition.csv", rows, ['id','date','meal_name','kcal','protein','fat','carbs','fiber','note','source','created'])
cur.execute(
"SELECT id, date, activity_type, duration_min, kcal_active, hr_avg, hr_max, distance_km, notes, source, created FROM activity_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
rows = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, rows)
for r in rows:
sm = r.pop("session_metrics", None) or []
r["session_metrics_json"] = json.dumps(sm, ensure_ascii=False, default=str)
r["name"] = r["activity_type"]
r["type"] = r.pop("activity_type", None)
r["kcal"] = r.pop("kcal_active", None)
r["heart_rate_avg"] = r.pop("hr_avg", None)
r["heart_rate_max"] = r.pop("hr_max", None)
r["note"] = r.pop("notes", None)
write_csv(
zf,
"activity.csv",
rows,
[
"id",
"date",
"name",
"type",
"duration_min",
"kcal",
"heart_rate_avg",
"heart_rate_max",
"distance_km",
"note",
"source",
"created",
"session_metrics_json",
],
)
# 8. insights/ai_insights.json
cur.execute("SELECT id, scope, content, created FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
insights = []
for r in cur.fetchall():
rd = r2d(r)
insights.append({
"id": rd['id'],
"scope": rd['scope'],
"created": rd['created'].isoformat() if hasattr(rd['created'], 'isoformat') else str(rd['created']),
"result": rd['content']
})
if insights:
zf.writestr("insights/ai_insights.json", json.dumps(insights, indent=2, ensure_ascii=False).encode('utf-8'))
# 9. photos/
cur.execute("SELECT * FROM photos WHERE profile_id=%s ORDER BY date", (pid,))
photos = [r2d(r) for r in cur.fetchall()]
for i, photo in enumerate(photos):
photo_path = resolve_photo_path(photo.get('path'))
if photo_path and photo_path.exists():
filename = f"{photo.get('date') or export_date}_{i+1}{photo_path.suffix}"
zf.write(photo_path, f"photos/{filename}")
zip_buffer.seek(0)
filename = f"mitai-export-{profile_name.replace(' ','-')}-{export_date}.zip"
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return StreamingResponse(
iter([zip_buffer.getvalue()]),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)