mitai-jinkendo/backend/routers/caliper.py
Lars ddcd2f4350
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 13s
feat: v9c Phase 2 - Backend Non-Blocking Logging (12 Endpoints)
PHASE 2: Backend Non-Blocking Logging - KOMPLETT

Instrumentierte Endpoints (12):
- Data: weight, circumference, caliper, nutrition, activity, photos (6)
- AI: insights/run/{slug}, insights/pipeline (2)
- Export: csv, json, zip (3)
- Import: zip (1)

Pattern implementiert:
- check_feature_access() VOR Operation (non-blocking)
- [FEATURE-LIMIT] Logging wenn Limit überschritten
- increment_feature_usage() NACH Operation
- Alte Permission-Checks bleiben aktiv

Features geprüft:
- weight_entries, circumference_entries, caliper_entries
- nutrition_entries, activity_entries, photos
- ai_calls, ai_pipeline
- data_export, data_import

Monitoring: 1-2 Wochen Log-Only-Phase
Logs zeigen: Wie oft würde blockiert werden?
Nächste Phase: Frontend Display (Usage-Counter)

Phase 1 (Cleanup) + Phase 2 (Logging) vollständig!

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 21:59:33 +01:00

95 lines
3.8 KiB
Python

"""
Caliper/Skinfold Tracking Endpoints for Mitai Jinkendo
Handles body fat measurements via skinfold caliper (4 methods supported).
"""
import uuid
import logging
from typing import Optional
from fastapi import APIRouter, Header, Depends
from db import get_db, get_cursor, r2d
from auth import require_auth, check_feature_access, increment_feature_usage
from models import CaliperEntry
from routers.profiles import get_pid
router = APIRouter(prefix="/api/caliper", tags=["caliper"])
logger = logging.getLogger(__name__)
@router.get("")
def list_caliper(limit: int=100, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get caliper entries for current profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date DESC LIMIT %s", (pid,limit))
return [r2d(r) for r in cur.fetchall()]
@router.post("")
def upsert_caliper(e: CaliperEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Create or update caliper entry (upsert by date)."""
pid = get_pid(x_profile_id)
# Phase 2: Check feature access (non-blocking, log only)
access = check_feature_access(pid, 'caliper_entries')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} would be blocked: "
f"caliper_entries {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id FROM caliper_log WHERE profile_id=%s AND date=%s", (pid,e.date))
ex = cur.fetchone()
d = e.model_dump()
is_new_entry = not ex
if ex:
# UPDATE existing entry
eid = ex['id']
sets = ', '.join(f"{k}=%s" for k in d if k!='date')
cur.execute(f"UPDATE caliper_log SET {sets} WHERE id=%s",
[v for k,v in d.items() if k!='date']+[eid])
else:
# INSERT new entry
eid = str(uuid.uuid4())
cur.execute("""INSERT INTO caliper_log
(id,profile_id,date,sf_method,sf_chest,sf_axilla,sf_triceps,sf_subscap,sf_suprailiac,
sf_abdomen,sf_thigh,sf_calf_med,sf_lowerback,sf_biceps,body_fat_pct,lean_mass,fat_mass,notes,created)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""",
(eid,pid,d['date'],d['sf_method'],d['sf_chest'],d['sf_axilla'],d['sf_triceps'],
d['sf_subscap'],d['sf_suprailiac'],d['sf_abdomen'],d['sf_thigh'],d['sf_calf_med'],
d['sf_lowerback'],d['sf_biceps'],d['body_fat_pct'],d['lean_mass'],d['fat_mass'],d['notes']))
# Phase 2: Increment usage counter (only for new entries)
increment_feature_usage(pid, 'caliper_entries')
return {"id":eid,"date":e.date}
@router.put("/{eid}")
def update_caliper(eid: str, e: CaliperEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Update existing caliper entry."""
pid = get_pid(x_profile_id)
with get_db() as conn:
d = e.model_dump()
cur = get_cursor(conn)
cur.execute(f"UPDATE caliper_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s",
list(d.values())+[eid,pid])
return {"id":eid}
@router.delete("/{eid}")
def delete_caliper(eid: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Delete caliper entry."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM caliper_log WHERE id=%s AND profile_id=%s", (eid,pid))
return {"ok":True}