mitai-jinkendo/backend/routers/nutrition.py
Lars 7ac9752c3d
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: enhance nutrition data processing and visualization with new correlation insights
- Refactored the `calculate_lag_correlation` function to normalize lag payloads and improve correlation calculations for various nutrition metrics.
- Introduced a new function `build_nutrition_correlation_heuristic_items` to generate heuristic insights based on merged nutrition data, enhancing user understanding of dietary impacts on weight and body composition.
- Updated the `get_nutrition_history_viz_bundle` function to include daily calorie balance and protein vs. lean mass data, providing a comprehensive view of nutrition trends.
- Enhanced the frontend to visualize calorie balance and protein vs. lean mass insights, improving the user experience with clear graphical representations of dietary correlations.
2026-04-20 13:45:28 +02:00

266 lines
12 KiB
Python

"""
Nutrition Tracking Endpoints for Mitai Jinkendo
Handles nutrition data, FDDB CSV import, correlations, and weekly aggregates.
"""
import csv
import io
import uuid
import logging
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends
from db import get_db, get_cursor, r2d
from auth import require_auth, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
from data_layer.nutrition_body_merge import build_merged_daily_nutrition_body_rows
router = APIRouter(prefix="/api/nutrition", tags=["nutrition"])
logger = logging.getLogger(__name__)
# ── Helper ────────────────────────────────────────────────────────────────────
def _pf(s):
"""Parse float from string (handles comma decimal separator)."""
try: return float(str(s).replace(',','.').strip())
except: return 0.0
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.post("/import-csv")
async def import_nutrition_csv(file: UploadFile=File(...), x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Import FDDB nutrition CSV."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
# Note: CSV import can create many entries - we check once before import
access = check_feature_access(pid, 'nutrition_entries')
log_feature_usage(pid, 'nutrition_entries', access, 'import_csv')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"nutrition_entries {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Ernährungseinträge überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
raw = await file.read()
try: text = raw.decode('utf-8')
except: text = raw.decode('latin-1')
if text.startswith('\ufeff'): text = text[1:]
if not text.strip(): raise HTTPException(400,"Leere Datei")
reader = csv.DictReader(io.StringIO(text), delimiter=';')
days: dict = {}
count = 0
for row in reader:
rd = row.get('datum_tag_monat_jahr_stunde_minute','').strip().strip('"')
if not rd: continue
try:
p = rd.split(' ')[0].split('.')
iso = f"{p[2]}-{p[1]}-{p[0]}"
except: continue
days.setdefault(iso,{'kcal':0,'fat_g':0,'carbs_g':0,'protein_g':0})
days[iso]['kcal'] += _pf(row.get('kj',0))/4.184
days[iso]['fat_g'] += _pf(row.get('fett_g',0))
days[iso]['carbs_g'] += _pf(row.get('kh_g',0))
days[iso]['protein_g'] += _pf(row.get('protein_g',0))
count+=1
inserted=0
new_entries=0
with get_db() as conn:
cur = get_cursor(conn)
for iso,vals in days.items():
kcal=round(vals['kcal'],1); fat=round(vals['fat_g'],1)
carbs=round(vals['carbs_g'],1); prot=round(vals['protein_g'],1)
cur.execute("SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s",(pid,iso))
is_new = not cur.fetchone()
if not is_new:
# UPDATE existing
cur.execute("UPDATE nutrition_log SET kcal=%s,protein_g=%s,fat_g=%s,carbs_g=%s WHERE profile_id=%s AND date=%s",
(kcal,prot,fat,carbs,pid,iso))
else:
# INSERT new
cur.execute("INSERT INTO nutrition_log (id,profile_id,date,kcal,protein_g,fat_g,carbs_g,source,created) VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)",
(str(uuid.uuid4()),pid,iso,kcal,prot,fat,carbs))
new_entries += 1
inserted+=1
# Phase 2: Increment usage counter for each new entry created
for _ in range(new_entries):
increment_feature_usage(pid, 'nutrition_entries')
return {"rows_parsed":count,"days_imported":inserted,"new_entries":new_entries,
"date_range":{"from":min(days) if days else None,"to":max(days) if days else None}}
@router.post("")
def create_nutrition(date: str, kcal: float, protein_g: float, fat_g: float, carbs_g: float,
x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Create or update nutrition entry for a specific date."""
pid = get_pid(x_profile_id)
# Validate date format
try:
datetime.strptime(date, '%Y-%m-%d')
except ValueError:
raise HTTPException(400, "Ungültiges Datumsformat. Erwartet: YYYY-MM-DD")
with get_db() as conn:
cur = get_cursor(conn)
# Check if entry exists
cur.execute("SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s", (pid, date))
existing = cur.fetchone()
if existing:
# UPDATE existing entry
cur.execute("""
UPDATE nutrition_log
SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='manual'
WHERE id=%s AND profile_id=%s
""", (round(kcal,1), round(protein_g,1), round(fat_g,1), round(carbs_g,1), existing['id'], pid))
return {"success": True, "mode": "updated", "id": existing['id']}
else:
# Phase 4: Check feature access before INSERT
access = check_feature_access(pid, 'nutrition_entries')
log_feature_usage(pid, 'nutrition_entries', access, 'create')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"nutrition_entries {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Ernährungseinträge überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# INSERT new entry
new_id = str(uuid.uuid4())
cur.execute("""
INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created)
VALUES (%s, %s, %s, %s, %s, %s, %s, 'manual', CURRENT_TIMESTAMP)
""", (new_id, pid, date, round(kcal,1), round(protein_g,1), round(fat_g,1), round(carbs_g,1)))
# Phase 2: Increment usage counter
increment_feature_usage(pid, 'nutrition_entries')
return {"success": True, "mode": "created", "id": new_id}
@router.get("")
def list_nutrition(limit: int=365, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get nutrition entries for current profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date DESC LIMIT %s", (pid,limit))
return [r2d(r) for r in cur.fetchall()]
@router.get("/by-date/{date}")
def get_nutrition_by_date(date: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get nutrition entry for a specific date."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s AND date=%s", (pid, date))
row = cur.fetchone()
return r2d(row) if row else None
@router.get("/correlations")
def nutrition_correlations(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get nutrition data correlated with weight and body fat (Layer 1 Merge, siehe nutrition_body_merge)."""
pid = get_pid(x_profile_id)
return build_merged_daily_nutrition_body_rows(pid)
@router.get("/weekly")
def nutrition_weekly(weeks: int=16, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get nutrition data aggregated by week."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date DESC LIMIT %s",(pid,weeks*7))
rows=[r2d(r) for r in cur.fetchall()]
if not rows: return []
wm={}
for d in rows:
# Handle both datetime.date objects (from DB) and strings
date_obj = d['date'] if hasattr(d['date'], 'strftime') else datetime.strptime(d['date'],'%Y-%m-%d')
wk = date_obj.strftime('%Y-W%V')
wm.setdefault(wk,[]).append(d)
result=[]
for wk in sorted(wm):
en=wm[wk]; n=len(en)
def avg(k): return round(sum(float(e.get(k) or 0) for e in en)/n,1)
result.append({'week':wk,'days':n,'kcal':avg('kcal'),'protein_g':avg('protein_g'),'fat_g':avg('fat_g'),'carbs_g':avg('carbs_g')})
return result
@router.get("/import-history")
def import_history(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get import history by grouping entries by created timestamp."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT
DATE(created) as import_date,
COUNT(*) as count,
MIN(date) as date_from,
MAX(date) as date_to,
MAX(created) as last_created
FROM nutrition_log
WHERE profile_id=%s AND source='csv'
GROUP BY DATE(created)
ORDER BY DATE(created) DESC
""", (pid,))
return [r2d(r) for r in cur.fetchall()]
@router.put("/{entry_id}")
def update_nutrition(entry_id: str, kcal: float, protein_g: float, fat_g: float, carbs_g: float,
x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Update nutrition entry macros."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
# Verify ownership
cur.execute("SELECT id FROM nutrition_log WHERE id=%s AND profile_id=%s", (entry_id, pid))
if not cur.fetchone():
raise HTTPException(404, "Eintrag nicht gefunden")
cur.execute("""
UPDATE nutrition_log
SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s
WHERE id=%s AND profile_id=%s
""", (round(kcal,1), round(protein_g,1), round(fat_g,1), round(carbs_g,1), entry_id, pid))
return {"success": True}
@router.delete("/{entry_id}")
def delete_nutrition(entry_id: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Delete nutrition entry."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
# Verify ownership
cur.execute("SELECT id FROM nutrition_log WHERE id=%s AND profile_id=%s", (entry_id, pid))
if not cur.fetchone():
raise HTTPException(404, "Eintrag nicht gefunden")
cur.execute("DELETE FROM nutrition_log WHERE id=%s AND profile_id=%s", (entry_id, pid))
return {"success": True}