""" Activity Tracking Endpoints for Mitai Jinkendo Handles workout/activity logging, statistics, and Apple Health CSV import. """ import csv import io import uuid import logging import re import calendar from datetime import date from typing import Optional from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends, Query from db import get_db, get_cursor, r2d from auth import require_auth, check_feature_access, increment_feature_usage from models import ActivityEntry, ActivityMetricsReplace from routers.profiles import get_pid from feature_logger import log_feature_usage from quality_filter import get_quality_filter_sql from data_layer.activity_persistence_orchestrator import ( get_mappable_activity_field_catalog, insert_activity_from_entry, run_activity_post_write_hooks, update_activity_from_entry, find_activity_duplicate_id, update_activity_columns, insert_activity_csv_minimal, run_activity_post_write_hooks_import, new_activity_id, ) from data_layer.activity_time_normalize import normalize_activity_start from data_layer.activity_session_metrics import enrich_sessions_with_metrics router = APIRouter(prefix="/api/activity", tags=["activity"]) logger = logging.getLogger(__name__) _MONTH_RE = re.compile(r"^(\d{4})-(\d{2})$") def _month_date_bounds(ym: str) -> tuple[date, date]: m = _MONTH_RE.match((ym or "").strip()) if not m: raise HTTPException(status_code=400, detail="month muss YYYY-MM sein") y, mo = int(m.group(1)), int(m.group(2)) if mo < 1 or mo > 12: raise HTTPException(status_code=400, detail="Ungültiger Monat") last = calendar.monthrange(y, mo)[1] return date(y, mo, 1), date(y, mo, last) _ACTIVITY_DEDUP_WINDOW = """ PARTITION BY al.profile_id, al.date, COALESCE(al.activity_type, ''), COALESCE(al.start_time::text, ''), COALESCE(ROUND(al.duration_min::numeric, 1), '-999999'::numeric), COALESCE(ROUND(al.kcal_active::numeric, 1), '-999999'::numeric) ORDER BY al.created DESC NULLS LAST, al.id DESC """ def _activity_rows_after_list_query(cur): rows = [] for r in cur.fetchall(): d = r2d(r) if not d: continue d.pop("_dup_rn", None) rows.append(d) return rows def _return_activity_list_rows(cur, rows: list) -> list: """Layer-1: gemergte session_metrics wie Detail-Pfad (Batch).""" enrich_sessions_with_metrics(cur, rows) return rows # Evaluation import with error handling (Phase 1.2) try: from evaluation_helper import evaluate_and_save_activity EVALUATION_AVAILABLE = True except Exception as e: logger.warning(f"[AUTO-EVAL] Evaluation system not available: {e}") EVALUATION_AVAILABLE = False evaluate_and_save_activity = None @router.get("") def list_activity( limit: int = Query(200, ge=1, le=50_000), offset: int = Query(0, ge=0, le=100_000, description="SQL OFFSET für Pagination"), days: Optional[int] = Query(None, ge=1, le=4000, description="Nur Einträge mit date >= HEUTE − days (Kalendertage)"), month: Optional[str] = Query( None, description='Kalendermonat "YYYY-MM" (ganzer Monat; schließt days und offset aus)', ), skip_quality_filter: bool = Query( False, description="True = alle Einträge des Profils (ohne quality_label-Filter). Für /activity Erfassung.", ), collapse_duplicate_sessions: bool = Query( False, description="True = Sessions mit gleichem Datum/Typ/Startzeit/Dauer/Kcal falten (neueste Zeile behalten).", ), session: dict = Depends(require_auth), ): """Get activity entries for current profile. Optional *days* filter by calendar window (not the same as *limit*).""" # Immer das Profil der gültigen Session (X-Profile-Id wird hier nicht verwendet). pid = str(session["profile_id"]) with get_db() as conn: cur = get_cursor(conn) # Issue #31: Qualitätsfilter — auf der Erfassungsseite /activity abschaltbar (skip_quality_filter) if skip_quality_filter: quality_filter = "" quality_filter_al = "" else: cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,)) profile = r2d(cur.fetchone()) quality_filter = get_quality_filter_sql(profile or {}, "") quality_filter_al = get_quality_filter_sql(profile or {}, "al.") if month: if days is not None: raise HTTPException(status_code=400, detail="month und days schließen sich aus") if offset != 0: raise HTTPException(status_code=400, detail="month und offset schließen sich aus") d0, d1 = _month_date_bounds(month) if collapse_duplicate_sessions: cur.execute( f""" SELECT d.* FROM ( SELECT al.*, ROW_NUMBER() OVER ( {_ACTIVITY_DEDUP_WINDOW} ) AS _dup_rn FROM activity_log al WHERE al.profile_id = %s {quality_filter_al} AND al.date >= %s AND al.date <= %s ) d WHERE d._dup_rn = 1 ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC LIMIT %s """, (pid, d0, d1, limit), ) return _return_activity_list_rows(cur, _activity_rows_after_list_query(cur)) cur.execute( f""" SELECT * FROM activity_log WHERE profile_id=%s {quality_filter} AND date >= %s AND date <= %s ORDER BY date DESC, start_time DESC NULLS LAST, id DESC LIMIT %s """, (pid, d0, d1, limit), ) return _return_activity_list_rows( cur, [r2d(r) for r in cur.fetchall()] ) if days is not None: if collapse_duplicate_sessions: cur.execute( f""" SELECT d.* FROM ( SELECT al.*, ROW_NUMBER() OVER ( {_ACTIVITY_DEDUP_WINDOW} ) AS _dup_rn FROM activity_log al WHERE al.profile_id = %s {quality_filter_al} AND al.date >= (CURRENT_DATE - %s::integer) ) d WHERE d._dup_rn = 1 ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC LIMIT %s OFFSET %s """, (pid, days, limit, offset), ) return _return_activity_list_rows(cur, _activity_rows_after_list_query(cur)) cur.execute( f""" SELECT * FROM activity_log WHERE profile_id=%s {quality_filter} AND date >= (CURRENT_DATE - %s::integer) ORDER BY date DESC, start_time DESC NULLS LAST, id DESC LIMIT %s OFFSET %s """, (pid, days, limit, offset), ) else: if collapse_duplicate_sessions: cur.execute( f""" SELECT d.* FROM ( SELECT al.*, ROW_NUMBER() OVER ( {_ACTIVITY_DEDUP_WINDOW} ) AS _dup_rn FROM activity_log al WHERE al.profile_id = %s {quality_filter_al} ) d WHERE d._dup_rn = 1 ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC LIMIT %s OFFSET %s """, (pid, limit, offset), ) return _return_activity_list_rows(cur, _activity_rows_after_list_query(cur)) cur.execute( f""" SELECT * FROM activity_log WHERE profile_id=%s {quality_filter} ORDER BY date DESC, start_time DESC NULLS LAST, id DESC LIMIT %s OFFSET %s """, (pid, limit, offset), ) return _return_activity_list_rows(cur, [r2d(r) for r in cur.fetchall()]) @router.post("") def create_activity(e: ActivityEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Create new activity entry.""" pid = get_pid(x_profile_id) # Phase 4: Check feature access and ENFORCE access = check_feature_access(pid, 'activity_entries') log_feature_usage(pid, 'activity_entries', access, 'create') if not access['allowed']: logger.warning( f"[FEATURE-LIMIT] User {pid} blocked: " f"activity_entries {access['reason']} (used: {access['used']}, limit: {access['limit']})" ) raise HTTPException( status_code=403, detail=f"Limit erreicht: Du hast das Kontingent für Aktivitätseinträge überschritten ({access['used']}/{access['limit']}). " f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset." ) eid = str(uuid.uuid4()) with get_db() as conn: cur = get_cursor(conn) insert_activity_from_entry(cur, pid, eid, e) run_activity_post_write_hooks(cur, pid, eid) # Phase 2: Increment usage counter (always for new entries) increment_feature_usage(pid, 'activity_entries') return {"id":eid,"date":e.date} @router.get("/stats") def activity_stats( skip_quality_filter: bool = Query( False, description="True = Statistik-Kacheln ohne Profil-Qualitätsfilter (passend zur /activity-Liste).", ), session: dict = Depends(require_auth), ): """Get activity statistics (last 30 entries).""" pid = str(session["profile_id"]) with get_db() as conn: cur = get_cursor(conn) if skip_quality_filter: quality_filter = "" else: cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,)) profile = r2d(cur.fetchone()) quality_filter = get_quality_filter_sql(profile or {}, "") cur.execute( f"SELECT COUNT(*)::bigint AS c FROM activity_log WHERE profile_id=%s {quality_filter}", (pid,), ) total_in_profile = int(cur.fetchone()["c"]) if skip_quality_filter: cur.execute( f""" SELECT d.* FROM ( SELECT al.*, ROW_NUMBER() OVER ( {_ACTIVITY_DEDUP_WINDOW} ) AS _dup_rn FROM activity_log al WHERE al.profile_id = %s ) d WHERE d._dup_rn = 1 ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC LIMIT 30 """, (pid,), ) rows = _activity_rows_after_list_query(cur) else: cur.execute( f""" SELECT * FROM activity_log WHERE profile_id=%s {quality_filter} ORDER BY date DESC, start_time DESC NULLS LAST, id DESC LIMIT 30 """, (pid,), ) rows = [r2d(r) for r in cur.fetchall()] if not rows: return { "count": 0, "sample_size": 0, "total_in_profile": total_in_profile, "total_kcal": 0, "total_min": 0, "by_type": {}, } total_kcal = sum(float(r.get("kcal_active") or 0) for r in rows) total_min = sum(float(r.get("duration_min") or 0) for r in rows) by_type = {} for r in rows: t = r["activity_type"] by_type.setdefault(t, {"count": 0, "kcal": 0, "min": 0}) by_type[t]["count"] += 1 by_type[t]["kcal"] += float(r.get("kcal_active") or 0) by_type[t]["min"] += float(r.get("duration_min") or 0) return { "count": len(rows), "sample_size": len(rows), "total_in_profile": total_in_profile, "total_kcal": round(total_kcal), "total_min": round(total_min), "by_type": by_type, } @router.get("/uncategorized") def list_uncategorized_activities( session: dict = Depends(require_auth), ): """Get activities without assigned training type, grouped by activity_type.""" pid = str(session["profile_id"]) with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT activity_type, COUNT(*) as count, MIN(date) as first_date, MAX(date) as last_date FROM activity_log WHERE profile_id=%s AND training_type_id IS NULL GROUP BY activity_type ORDER BY count DESC """, (pid,), ) return [r2d(r) for r in cur.fetchall()] @router.get("/mappable-fields") def get_activity_mappable_fields(session: dict = Depends(require_auth)): """ Vollständiger Katalog für Import-Mappings (activity_log-Kernfelder + alle aktiven training_parameters). Werte für Keys ohne Schema zur konkreten Session werden beim Import ignoriert. """ pid = str(session["profile_id"]) with get_db() as conn: cur = get_cursor(conn) return get_mappable_activity_field_catalog(cur, pid) @router.get("/attribute-schema") def get_activity_attribute_schema( training_category: Optional[str] = Query(None), training_type_id: Optional[int] = Query(None), session: dict = Depends(require_auth), ): """ Aufgelöstes Attributprofil (tcp/ttp) für Erfassung ohne bestehende Session — gleiche Logik wie resolve_activity_attribute_schema. """ from data_layer.activity_session_metrics import resolve_activity_attribute_schema cat = (training_category or "").strip() or None with get_db() as conn: cur = get_cursor(conn) schema = resolve_activity_attribute_schema(cur, cat, training_type_id) return {"schema": schema} @router.put("/{eid}") def update_activity(eid: str, e: ActivityEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Update existing activity entry.""" pid = get_pid(x_profile_id) with get_db() as conn: cur = get_cursor(conn) update_activity_from_entry(cur, pid, eid, e) run_activity_post_write_hooks(cur, pid, eid) return {"id":eid} @router.delete("/{eid}") def delete_activity(eid: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Delete activity entry.""" pid = get_pid(x_profile_id) with get_db() as conn: cur = get_cursor(conn) cur.execute("DELETE FROM activity_log WHERE id=%s AND profile_id=%s", (eid,pid)) return {"ok":True} @router.put("/{eid}/metrics") def replace_activity_metrics( eid: str, body: ActivityMetricsReplace, session: dict = Depends(require_auth), ): """ Voller Ersatz der EAV-Session-Metriken (siehe ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md). """ from data_layer.activity_session_metrics import ( ActivitySessionMetricsError, replace_activity_session_metrics, ) pid = str(session["profile_id"]) payload = [m.model_dump() for m in body.metrics] try: with get_db() as conn: cur = get_cursor(conn) metrics = replace_activity_session_metrics(cur, pid, eid, payload) conn.commit() except ActivitySessionMetricsError as err: raise HTTPException(err.status_code, err.detail) from err return {"id": eid, "metrics": metrics} @router.get("/{eid}") def get_activity_session( eid: str, use_form_schema: bool = Query( False, description="True: Schema aus Query training_category / training_type_id (Formular), nicht nur DB-Zeile", ), training_category: Optional[str] = Query(None), training_type_id: Optional[int] = Query(None), session: dict = Depends(require_auth), ): """Session-Kopf + aufgelöstes Schema + EAV-Metriken (Layer 1).""" from data_layer.activity_session_metrics import ( ActivitySessionMetricsError, get_activity_session_logical_unit, ) from data_layer.utils import serialize_dates pid = str(session["profile_id"]) try: with get_db() as conn: cur = get_cursor(conn) unit = get_activity_session_logical_unit( cur, pid, eid, use_form_training_context=use_form_schema, form_training_category=training_category, form_training_type_id=training_type_id, ) except ActivitySessionMetricsError as err: raise HTTPException(err.status_code, err.detail) from err unit["header"] = serialize_dates(unit["header"]) return unit def get_training_type_for_activity_with_cursor(cur, activity_type: str, profile_id: str | None = None): """ Wie get_training_type_for_activity, aber mit bestehendem Cursor (z. B. Universal-CSV-Import). Vermeidet verschachteltes get_db() — bei maxconn=1 sonst Deadlock auf dem Connection-Pool. """ if profile_id: cur.execute( """ SELECT m.training_type_id, t.category, t.subcategory FROM activity_type_mappings m JOIN training_types t ON m.training_type_id = t.id WHERE m.activity_type = %s AND m.profile_id = %s LIMIT 1 """, (activity_type, profile_id), ) row = cur.fetchone() if row: return (row["training_type_id"], row["category"], row["subcategory"]) cur.execute( """ SELECT m.training_type_id, t.category, t.subcategory FROM activity_type_mappings m JOIN training_types t ON m.training_type_id = t.id WHERE m.activity_type = %s AND m.profile_id IS NULL LIMIT 1 """, (activity_type,), ) row = cur.fetchone() if row: return (row["training_type_id"], row["category"], row["subcategory"]) return (None, None, None) def get_training_type_for_activity(activity_type: str, profile_id: str = None): """ Map activity_type to training_type_id using database mappings. Priority: 1. User-specific mapping (profile_id) 2. Global mapping (profile_id = NULL) 3. No mapping found → returns (None, None, None) Returns: (training_type_id, category, subcategory) or (None, None, None) """ with get_db() as conn: cur = get_cursor(conn) return get_training_type_for_activity_with_cursor(cur, activity_type, profile_id) @router.post("/bulk-categorize") def bulk_categorize_activities( data: dict, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth) ): """ Bulk update training type for activities. Also saves the mapping to activity_type_mappings for future imports. Body: { "activity_type": "Running", "training_type_id": 1, "training_category": "cardio", "training_subcategory": "running" } """ pid = get_pid(x_profile_id) activity_type = data.get('activity_type') training_type_id = data.get('training_type_id') training_category = data.get('training_category') training_subcategory = data.get('training_subcategory') if not activity_type or not training_type_id: raise HTTPException(400, "activity_type and training_type_id required") with get_db() as conn: cur = get_cursor(conn) # Update existing activities cur.execute(""" UPDATE activity_log SET training_type_id = %s, training_category = %s, training_subcategory = %s WHERE profile_id = %s AND activity_type = %s AND training_type_id IS NULL """, (training_type_id, training_category, training_subcategory, pid, activity_type)) updated_count = cur.rowcount # Phase 1.2: Auto-evaluation after bulk categorization if EVALUATION_AVAILABLE: # Load all activities that were just updated and evaluate them cur.execute(""" SELECT id, profile_id, date, training_type_id, duration_min, hr_avg, hr_max, distance_km, kcal_active, kcal_resting, rpe, pace_min_per_km, cadence, elevation_gain FROM activity_log WHERE profile_id = %s AND activity_type = %s AND training_type_id = %s """, (pid, activity_type, training_type_id)) activities_to_evaluate = cur.fetchall() evaluated_count = 0 for activity_row in activities_to_evaluate: activity_dict = dict(activity_row) try: evaluate_and_save_activity(cur, activity_dict["id"], activity_dict, training_type_id, pid) evaluated_count += 1 except Exception as eval_error: logger.warning(f"[AUTO-EVAL] Failed to evaluate bulk-categorized activity {activity_dict['id']}: {eval_error}") logger.info(f"[AUTO-EVAL] Evaluated {evaluated_count}/{updated_count} bulk-categorized activities") # Save mapping for future imports (upsert) cur.execute(""" INSERT INTO activity_type_mappings (activity_type, training_type_id, profile_id, source, updated_at) VALUES (%s, %s, %s, 'bulk', CURRENT_TIMESTAMP) ON CONFLICT (activity_type, profile_id) DO UPDATE SET training_type_id = EXCLUDED.training_type_id, source = 'bulk', updated_at = CURRENT_TIMESTAMP """, (activity_type, training_type_id, pid)) logger.info(f"[MAPPING] Saved bulk mapping: {activity_type} → training_type_id {training_type_id} (profile {pid})") return {"updated": updated_count, "activity_type": activity_type, "mapping_saved": True} @router.post("/import-csv") async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """ Legacy-Upload (Apple Health Workout-CSV-Spaltennamen). Persistenz läuft über activity_persistence_orchestrator — gleiche Schicht wie Universal-CSV. """ pid = get_pid(x_profile_id) raw = await file.read() try: text = raw.decode('utf-8') except: text = raw.decode('latin-1') if text.startswith('\ufeff'): text = text[1:] if not text.strip(): raise HTTPException(400,"Leere Datei") reader = csv.DictReader(io.StringIO(text)) inserted = skipped = 0 with get_db() as conn: cur = get_cursor(conn) for row in reader: wtype = row.get('Workout Type','').strip() start = row.get('Start','').strip() if not wtype or not start: continue workout_date, workout_start_t = normalize_activity_start(start) if not workout_date: continue dur = row.get('Duration','').strip() duration_min = None if dur: try: p = dur.split(':') duration_min = round(int(p[0])*60+int(p[1])+int(p[2])/60,1) except: pass def kj(v): try: return round(float(v)/4.184) if v else None except: return None def tf(v): try: return round(float(v),1) if v else None except: return None # Map activity_type to training_type_id using database mappings training_type_id, training_category, training_subcategory = get_training_type_for_activity(wtype, pid) kcal_a = kj(row.get("Aktive Energie (kJ)", "")) kcal_r = kj(row.get("Ruheeinträge (kJ)", "")) hr_av = tf(row.get("Durchschn. Herzfrequenz (count/min)", "")) hr_mx = tf(row.get("Max. Herzfrequenz (count/min)", "")) dist_km = tf(row.get("Distanz (km)", "")) try: existing_id = find_activity_duplicate_id(cur, pid, workout_date, workout_start_t) if existing_id: update_activity_columns( cur, pid, str(existing_id), { "start_time": workout_start_t, "end_time": row.get("End", "") or None, "activity_type": wtype, "duration_min": duration_min, "kcal_active": kcal_a, "kcal_resting": kcal_r, "hr_avg": hr_av, "hr_max": hr_mx, "distance_km": dist_km, "training_type_id": training_type_id, "training_category": training_category, "training_subcategory": training_subcategory, }, ) skipped += 1 run_activity_post_write_hooks_import( cur, pid, str(existing_id), workout_date=workout_date, training_type_id=training_type_id, duration_min=duration_min, hr_avg=hr_av, hr_max=hr_mx, distance_km=dist_km, kcal_active=kcal_a, kcal_resting=kcal_r, ) else: new_id = new_activity_id() insert_activity_csv_minimal( cur, pid, new_id, date_iso=workout_date, start_time=workout_start_t, end_time=row.get("End", "") or None, activity_type=wtype, duration_min=duration_min, kcal_active=kcal_a, kcal_resting=kcal_r, hr_avg=hr_av, hr_max=hr_mx, distance_km=dist_km, training_type_id=training_type_id, training_category=training_category, training_subcategory=training_subcategory, source="apple_health", ) inserted += 1 run_activity_post_write_hooks_import( cur, pid, new_id, workout_date=workout_date, training_type_id=training_type_id, duration_min=duration_min, hr_avg=hr_av, hr_max=hr_mx, distance_km=dist_km, kcal_active=kcal_a, kcal_resting=kcal_r, ) except Exception as e: logger.warning(f"Import row failed: {e}") skipped+=1 return {"inserted":inserted,"skipped":skipped,"message":f"{inserted} Trainings importiert"}