mitai-jinkendo/backend/routers/exportdata.py
Lars 49e9c9c214
All checks were successful
Deploy Development / deploy (push) Successful in 1m0s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Integrate caliper data enrichment and weight loading in API responses
- Enhanced the caliper listing and export functionalities to include enriched data from weight logs.
- Updated the upsert and update operations to utilize new composition functions for body composition calculations.
- Refactored the CaliperScreen component to streamline payload construction by removing unnecessary parameters.
2026-04-06 06:08:37 +02:00

363 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Data Export Endpoints for Mitai Jinkendo
Handles CSV, JSON, and ZIP exports with photos.
"""
import os
import csv
import io
import json
import logging
import zipfile
from pathlib import Path
from typing import Optional
from datetime import datetime
from decimal import Decimal
from fastapi import APIRouter, HTTPException, Header, Depends
from fastapi.responses import StreamingResponse, Response
from db import get_db, get_cursor, r2d
from auth import require_auth, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
from caliper_composition import enrich_caliper_row_for_response, load_weight_rows
router = APIRouter(prefix="/api/export", tags=["export"])
logger = logging.getLogger(__name__)
PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos"))
@router.get("/csv")
def export_csv(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as CSV."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_csv')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Build CSV
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow(["Typ", "Datum", "Wert", "Details"])
# Weight
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT date, weight, note FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Gewicht", r['date'], f"{float(r['weight'])}kg", r['note'] or ""])
# Circumferences
cur.execute("SELECT date, c_waist, c_belly, c_hip FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
details = f"Taille:{float(r['c_waist'])}cm Bauch:{float(r['c_belly'])}cm Hüfte:{float(r['c_hip'])}cm"
writer.writerow(["Umfänge", r['date'], "", details])
# Caliper (Magermasse aus Gewicht + KF% nachziehen wenn in DB leer)
cur.execute(
"SELECT date, body_fat_pct, lean_mass, fat_mass FROM caliper_log WHERE profile_id=%s ORDER BY date",
(pid,),
)
cal_rows = [r2d(r) for r in cur.fetchall()]
weight_rows = load_weight_rows(conn, pid)
for r in cal_rows:
enrich_caliper_row_for_response(r, weight_rows)
if r.get("body_fat_pct") is None:
continue
bf = float(r["body_fat_pct"])
lm = r.get("lean_mass")
details = f"Magermasse:{float(lm)}kg" if lm is not None else ""
writer.writerow(["Caliper", r["date"], f"{bf}%", details])
# Nutrition
cur.execute("SELECT date, kcal, protein_g FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Ernährung", r['date'], f"{float(r['kcal'])}kcal", f"Protein:{float(r['protein_g'])}g"])
# Activity
cur.execute("SELECT date, activity_type, duration_min, kcal_active FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,))
for r in cur.fetchall():
writer.writerow(["Training", r['date'], r['activity_type'], f"{float(r['duration_min'])}min {float(r['kcal_active'])}kcal"])
output.seek(0)
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.csv"}
)
@router.get("/json")
def export_json(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as JSON."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_json')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Collect all data
data = {}
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
data['profile'] = r2d(cur.fetchone())
cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
data['weight'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
data['circumferences'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,))
data['caliper'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
data['nutrition'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,))
data['activity'] = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
data['insights'] = [r2d(r) for r in cur.fetchall()]
w_rows = [{'date': x['date'], 'weight': x['weight']} for x in data.get('weight', [])]
for row in data.get('caliper', []):
enrich_caliper_row_for_response(row, w_rows)
def decimal_handler(obj):
if isinstance(obj, Decimal):
return float(obj)
return str(obj)
json_str = json.dumps(data, indent=2, default=decimal_handler)
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return Response(
content=json_str,
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=mitai-export-{pid}.json"}
)
@router.get("/zip")
def export_zip(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Export all data as ZIP (CSV + JSON + photos) per specification."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'data_export')
log_feature_usage(pid, 'data_export', access, 'export_zip')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"data_export {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Daten-Exporte überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Get profile
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
prof = r2d(cur.fetchone())
# Helper: CSV writer with UTF-8 BOM + semicolon
def write_csv(zf, filename, rows, columns):
if not rows:
return
output = io.StringIO()
writer = csv.writer(output, delimiter=';')
writer.writerow(columns)
for r in rows:
writer.writerow([
'' if r.get(col) is None else
(float(r[col]) if isinstance(r.get(col), Decimal) else r[col])
for col in columns
])
# UTF-8 with BOM for Excel
csv_bytes = '\ufeff'.encode('utf-8') + output.getvalue().encode('utf-8')
zf.writestr(f"data/{filename}", csv_bytes)
# Create ZIP
zip_buffer = io.BytesIO()
export_date = datetime.now().strftime('%Y-%m-%d')
profile_name = prof.get('name', 'export')
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
with get_db() as conn:
cur = get_cursor(conn)
# 1. README.txt
readme = f"""Mitai Jinkendo Datenexport
Version: 2
Exportiert am: {export_date}
Profil: {profile_name}
Inhalt:
- profile.json: Profildaten und Einstellungen
- data/*.csv: Messdaten (Semikolon-getrennt, UTF-8)
- insights/: KI-Auswertungen (JSON)
- photos/: Progress-Fotos (JPEG)
Import:
Dieser Export kann in Mitai Jinkendo unter
Einstellungen → Import → "Mitai Backup importieren"
wieder eingespielt werden.
Format-Version 2 (ab v9b):
Alle CSV-Dateien sind UTF-8 mit BOM kodiert.
Trennzeichen: Semikolon (;)
Datumsformat: YYYY-MM-DD
"""
zf.writestr("README.txt", readme.encode('utf-8'))
# 2. profile.json (ohne Passwort-Hash)
cur.execute("SELECT COUNT(*) as c FROM weight_log WHERE profile_id=%s", (pid,))
w_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM nutrition_log WHERE profile_id=%s", (pid,))
n_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM activity_log WHERE profile_id=%s", (pid,))
a_count = cur.fetchone()['c']
cur.execute("SELECT COUNT(*) as c FROM photos WHERE profile_id=%s", (pid,))
p_count = cur.fetchone()['c']
profile_data = {
"export_version": "2",
"export_date": export_date,
"app": "Mitai Jinkendo",
"profile": {
"name": prof.get('name'),
"email": prof.get('email'),
"sex": prof.get('sex'),
"height": float(prof['height']) if prof.get('height') else None,
"birth_year": prof['dob'].year if prof.get('dob') else None,
"goal_weight": float(prof['goal_weight']) if prof.get('goal_weight') else None,
"goal_bf_pct": float(prof['goal_bf_pct']) if prof.get('goal_bf_pct') else None,
"avatar_color": prof.get('avatar_color'),
"auth_type": prof.get('auth_type'),
"session_days": prof.get('session_days'),
"ai_enabled": prof.get('ai_enabled'),
"tier": prof.get('tier')
},
"stats": {
"weight_entries": w_count,
"nutrition_entries": n_count,
"activity_entries": a_count,
"photos": p_count
}
}
zf.writestr("profile.json", json.dumps(profile_data, indent=2, ensure_ascii=False).encode('utf-8'))
# 3-7. CSV exports (weight, circumferences, caliper, nutrition, activity)
cur.execute("SELECT id, date, weight, note, source, created FROM weight_log WHERE profile_id=%s ORDER BY date", (pid,))
write_csv(zf, "weight.csv", [r2d(r) for r in cur.fetchall()], ['id','date','weight','note','source','created'])
cur.execute("SELECT id, date, c_waist, c_hip, c_chest, c_neck, c_arm, c_thigh, c_calf, notes, created FROM circumference_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['waist'] = r.pop('c_waist', None); r['hip'] = r.pop('c_hip', None)
r['chest'] = r.pop('c_chest', None); r['neck'] = r.pop('c_neck', None)
r['upper_arm'] = r.pop('c_arm', None); r['thigh'] = r.pop('c_thigh', None)
r['calf'] = r.pop('c_calf', None); r['forearm'] = None; r['note'] = r.pop('notes', None)
write_csv(zf, "circumferences.csv", rows, ['id','date','waist','hip','chest','neck','upper_arm','thigh','calf','forearm','note','created'])
cur.execute("SELECT id, date, sf_chest, sf_abdomen, sf_thigh, sf_triceps, sf_subscap, sf_suprailiac, sf_axilla, sf_method, body_fat_pct, notes, created FROM caliper_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['chest'] = r.pop('sf_chest', None); r['abdomen'] = r.pop('sf_abdomen', None)
r['thigh'] = r.pop('sf_thigh', None); r['tricep'] = r.pop('sf_triceps', None)
r['subscapular'] = r.pop('sf_subscap', None); r['suprailiac'] = r.pop('sf_suprailiac', None)
r['midaxillary'] = r.pop('sf_axilla', None); r['method'] = r.pop('sf_method', None)
r['bf_percent'] = r.pop('body_fat_pct', None); r['note'] = r.pop('notes', None)
write_csv(zf, "caliper.csv", rows, ['id','date','chest','abdomen','thigh','tricep','subscapular','suprailiac','midaxillary','method','bf_percent','note','created'])
cur.execute("SELECT id, date, kcal, protein_g, fat_g, carbs_g, source, created FROM nutrition_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['meal_name'] = ''; r['protein'] = r.pop('protein_g', None)
r['fat'] = r.pop('fat_g', None); r['carbs'] = r.pop('carbs_g', None)
r['fiber'] = None; r['note'] = ''
write_csv(zf, "nutrition.csv", rows, ['id','date','meal_name','kcal','protein','fat','carbs','fiber','note','source','created'])
cur.execute("SELECT id, date, activity_type, duration_min, kcal_active, hr_avg, hr_max, distance_km, notes, source, created FROM activity_log WHERE profile_id=%s ORDER BY date", (pid,))
rows = [r2d(r) for r in cur.fetchall()]
for r in rows:
r['name'] = r['activity_type']; r['type'] = r.pop('activity_type', None)
r['kcal'] = r.pop('kcal_active', None); r['heart_rate_avg'] = r.pop('hr_avg', None)
r['heart_rate_max'] = r.pop('hr_max', None); r['note'] = r.pop('notes', None)
write_csv(zf, "activity.csv", rows, ['id','date','name','type','duration_min','kcal','heart_rate_avg','heart_rate_max','distance_km','note','source','created'])
# 8. insights/ai_insights.json
cur.execute("SELECT id, scope, content, created FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
insights = []
for r in cur.fetchall():
rd = r2d(r)
insights.append({
"id": rd['id'],
"scope": rd['scope'],
"created": rd['created'].isoformat() if hasattr(rd['created'], 'isoformat') else str(rd['created']),
"result": rd['content']
})
if insights:
zf.writestr("insights/ai_insights.json", json.dumps(insights, indent=2, ensure_ascii=False).encode('utf-8'))
# 9. photos/
cur.execute("SELECT * FROM photos WHERE profile_id=%s ORDER BY date", (pid,))
photos = [r2d(r) for r in cur.fetchall()]
for i, photo in enumerate(photos):
photo_path = Path(PHOTOS_DIR) / photo['path']
if photo_path.exists():
filename = f"{photo.get('date') or export_date}_{i+1}{photo_path.suffix}"
zf.write(photo_path, f"photos/{filename}")
zip_buffer.seek(0)
filename = f"mitai-export-{profile_name.replace(' ','-')}-{export_date}.zip"
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'data_export')
return StreamingResponse(
iter([zip_buffer.getvalue()]),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)