""" Activity Metrics Data Layer Provides structured data for training tracking and analysis. Functions: - get_activity_summary_data(): Count, total duration, calories, averages - get_activity_detail_data(): Detailed activity log entries - get_training_type_distribution_data(): Training category percentages - get_training_frequency_by_type_data(): Häufigkeit & Intensität pro activity_type - get_training_inter_session_gap_data(): Pausen zwischen Einheiten (Stunden) - get_training_sessions_recent_weeks_data(): Wochen-JSON für KI-Kontext - get_training_parameters_ki_glossary_data(): Parameter-Katalog (Feld, Namen, Beschreibungen) für KI All functions return structured data (dict) without formatting. Use placeholder_resolver.py for formatted strings for AI. Phase 0c: Multi-Layer Architecture Version: 1.0 """ from typing import Dict, List, Optional, Any from datetime import datetime, timedelta, date, time import statistics from db import get_db, get_cursor, r2d from data_layer.activity_session_metrics import enrich_sessions_with_metrics from data_layer.utils import calculate_confidence, safe_float, safe_int, serialize_dates def get_activity_summary_data( profile_id: str, days: int = 14 ) -> Dict: """ Get activity summary statistics. Args: profile_id: User profile ID days: Analysis window (default 14) Returns: { "activity_count": int, "total_duration_min": int, "total_kcal": int, "avg_duration_min": int, "avg_kcal_per_session": int, "sessions_per_week": float, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_activity_summary(pid, days) formatted string NEW: Structured data with all metrics """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT COUNT(*) as count, SUM(duration_min) as total_min, SUM(kcal_active) as total_kcal FROM activity_log WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) row = cur.fetchone() if not row or row['count'] == 0: return { "activity_count": 0, "total_duration_min": 0, "total_kcal": 0, "avg_duration_min": 0, "avg_kcal_per_session": 0, "sessions_per_week": 0.0, "confidence": "insufficient", "days_analyzed": days } activity_count = row['count'] total_min = safe_int(row['total_min']) total_kcal = safe_int(row['total_kcal']) avg_duration = int(total_min / activity_count) if activity_count > 0 else 0 avg_kcal = int(total_kcal / activity_count) if activity_count > 0 else 0 sessions_per_week = (activity_count / days * 7) if days > 0 else 0.0 confidence = calculate_confidence(activity_count, days, "general") return { "activity_count": activity_count, "total_duration_min": total_min, "total_kcal": total_kcal, "avg_duration_min": avg_duration, "avg_kcal_per_session": avg_kcal, "sessions_per_week": round(sessions_per_week, 1), "confidence": confidence, "days_analyzed": days } def get_activity_detail_data( profile_id: str, days: int = 14, limit: int = 50 ) -> Dict: """ Get detailed activity log entries. Args: profile_id: User profile ID days: Analysis window (default 14) limit: Maximum entries to return (default 50) Returns: { "activities": [ { "date": date, "activity_type": str, "duration_min": int, "kcal_active": int, "hr_avg": int | None, "training_category": str | None, "session_metrics": list | None, # EAV (enrich_sessions_with_metrics) }, ... ], "total_count": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_activity_detail(pid, days) formatted string list NEW: Structured array with all fields """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT id, date, activity_type, duration_min, kcal_active, hr_avg, training_category FROM activity_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC LIMIT %s""", (profile_id, cutoff, limit), ) rows = cur.fetchall() if not rows: return { "activities": [], "total_count": 0, "confidence": "insufficient", "days_analyzed": days, } activities = [] for row in rows: activities.append( { "id": str(row["id"]), "date": row["date"], "activity_type": row["activity_type"], "duration_min": safe_int(row["duration_min"]), "kcal_active": safe_int(row["kcal_active"]), "hr_avg": safe_int(row["hr_avg"]) if row.get("hr_avg") else None, "training_category": row.get("training_category"), } ) enrich_sessions_with_metrics(cur, activities) confidence = calculate_confidence(len(activities), days, "general") return { "activities": activities, "total_count": len(activities), "confidence": confidence, "days_analyzed": days, } def get_training_type_distribution_data( profile_id: str, days: int = 14 ) -> Dict: """ Calculate training category distribution. Args: profile_id: User profile ID days: Analysis window (default 14) Returns: { "distribution": [ { "category": str, "count": int, "percentage": float }, ... ], "total_sessions": int, "categorized_sessions": int, "uncategorized_sessions": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_trainingstyp_verteilung(pid, days) top 3 formatted NEW: Complete distribution with percentages """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Get categorized activities cur.execute( """SELECT training_category, COUNT(*) as count FROM activity_log WHERE profile_id=%s AND date >= %s AND training_category IS NOT NULL GROUP BY training_category ORDER BY count DESC""", (profile_id, cutoff) ) rows = cur.fetchall() # Get total activity count (including uncategorized) cur.execute( """SELECT COUNT(*) as total FROM activity_log WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) total_row = cur.fetchone() total_sessions = total_row['total'] if total_row else 0 if not rows or total_sessions == 0: return { "distribution": [], "total_sessions": total_sessions, "categorized_sessions": 0, "uncategorized_sessions": total_sessions, "confidence": "insufficient", "days_analyzed": days } categorized_count = sum(row['count'] for row in rows) uncategorized_count = total_sessions - categorized_count distribution = [] for row in rows: count = row['count'] percentage = (count / total_sessions * 100) if total_sessions > 0 else 0 distribution.append({ "category": row['training_category'], "count": count, "percentage": round(percentage, 1) }) confidence = calculate_confidence(categorized_count, days, "general") return { "distribution": distribution, "total_sessions": total_sessions, "categorized_sessions": categorized_count, "uncategorized_sessions": uncategorized_count, "confidence": confidence, "days_analyzed": days } # ============================================================================ # Calculated Metrics (migrated from calculations/activity_metrics.py) # ============================================================================ # These functions return simple values for placeholders and scoring. # Use get_*_data() functions above for structured chart data. def calculate_training_minutes_week(profile_id: str) -> Optional[int]: """Calculate total training minutes last 7 days""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT SUM(duration_min) as total_minutes FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' """, (profile_id,)) row = cur.fetchone() return int(row['total_minutes']) if row and row['total_minutes'] else None def calculate_training_frequency_7d(profile_id: str) -> Optional[int]: """Calculate number of training sessions last 7 days""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT COUNT(*) as session_count FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' """, (profile_id,)) row = cur.fetchone() return int(row['session_count']) if row else None def calculate_quality_sessions_pct(profile_id: str) -> Optional[int]: """Calculate percentage of quality sessions (good or better) last 28 days""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE quality_label IN ('excellent', 'very_good', 'good')) as quality_count FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) row = cur.fetchone() if not row or row['total'] == 0: return None pct = (row['quality_count'] / row['total']) * 100 return int(pct) # ============================================================================ # A2: Intensity Distribution (Proxy-based) # ============================================================================ def calculate_intensity_proxy_distribution(profile_id: str) -> Optional[Dict]: """ Calculate intensity distribution (proxy until HR zones available) Returns dict: {'low': X, 'moderate': Y, 'high': Z} in minutes """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT duration_min, hr_avg, hr_max FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) activities = cur.fetchall() if not activities: return None low_min = 0 moderate_min = 0 high_min = 0 for activity in activities: duration = activity['duration_min'] avg_hr = activity['hr_avg'] max_hr = activity['hr_max'] # Simple proxy classification if avg_hr: # Rough HR-based classification (assumes max HR ~190) if avg_hr < 120: low_min += duration elif avg_hr < 150: moderate_min += duration else: high_min += duration else: # Fallback: assume moderate moderate_min += duration return { 'low': low_min, 'moderate': moderate_min, 'high': high_min } # ============================================================================ # A4: Ability Balance Calculations # ============================================================================ def calculate_ability_balance(profile_id: str) -> Optional[Dict]: """ Calculate ability balance from training_types.abilities Returns dict with scores per ability dimension (0-100) """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT a.duration_min, tt.abilities FROM activity_log a JOIN training_types tt ON a.training_category = tt.category WHERE a.profile_id = %s AND a.date >= CURRENT_DATE - INTERVAL '28 days' AND tt.abilities IS NOT NULL """, (profile_id,)) activities = cur.fetchall() if not activities: return None # Accumulate ability load (duration × ability weight) ability_loads = { 'strength': 0, 'endurance': 0, 'mental': 0, 'coordination': 0, 'mobility': 0 } for activity in activities: duration = activity['duration_min'] abilities = activity['abilities'] # JSONB if not abilities: continue for ability, weight in abilities.items(): if ability in ability_loads: ability_loads[ability] += duration * weight # Normalize to 0-100 scale max_load = max(ability_loads.values()) if ability_loads else 1 if max_load == 0: return None normalized = { ability: int((load / max_load) * 100) for ability, load in ability_loads.items() } return normalized def calculate_ability_balance_strength(profile_id: str) -> Optional[int]: """Get strength ability score""" balance = calculate_ability_balance(profile_id) return balance['strength'] if balance else None def calculate_ability_balance_endurance(profile_id: str) -> Optional[int]: """Get endurance ability score""" balance = calculate_ability_balance(profile_id) return balance['endurance'] if balance else None def calculate_ability_balance_mental(profile_id: str) -> Optional[int]: """Get mental ability score""" balance = calculate_ability_balance(profile_id) return balance['mental'] if balance else None def calculate_ability_balance_coordination(profile_id: str) -> Optional[int]: """Get coordination ability score""" balance = calculate_ability_balance(profile_id) return balance['coordination'] if balance else None def calculate_ability_balance_mobility(profile_id: str) -> Optional[int]: """Get mobility ability score""" balance = calculate_ability_balance(profile_id) return balance['mobility'] if balance else None # ============================================================================ # A5: Load Monitoring (Proxy-based) # ============================================================================ def calculate_proxy_internal_load_7d(profile_id: str) -> Optional[int]: """ Calculate proxy internal load (last 7 days) Formula: duration × intensity_factor × quality_factor """ intensity_factors = {'low': 1.0, 'moderate': 1.5, 'high': 2.0} quality_factors = { 'excellent': 1.15, 'very_good': 1.05, 'good': 1.0, 'acceptable': 0.9, 'poor': 0.75, 'excluded': 0.0 } with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT duration_min, hr_avg, rpe FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' """, (profile_id,)) activities = cur.fetchall() if not activities: return None total_load = 0 for activity in activities: duration = activity['duration_min'] avg_hr = activity['hr_avg'] # Map RPE to quality (rpe 8-10 = excellent, 6-7 = good, 4-5 = moderate, <4 = poor) rpe = activity.get('rpe') if rpe and rpe >= 8: quality = 'excellent' elif rpe and rpe >= 6: quality = 'good' elif rpe and rpe >= 4: quality = 'moderate' else: quality = 'good' # default # Determine intensity if avg_hr: if avg_hr < 120: intensity = 'low' elif avg_hr < 150: intensity = 'moderate' else: intensity = 'high' else: intensity = 'moderate' load = float(duration) * intensity_factors[intensity] * quality_factors.get(quality, 1.0) total_load += load return int(total_load) def calculate_monotony_score(profile_id: str) -> Optional[float]: """ Calculate training monotony (last 7 days) Monotony = mean daily load / std dev daily load Higher = more monotonous """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT date, SUM(duration_min) as daily_duration FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' GROUP BY date ORDER BY date """, (profile_id,)) daily_loads = [float(row['daily_duration']) for row in cur.fetchall() if row['daily_duration']] if len(daily_loads) < 4: return None mean_load = sum(daily_loads) / len(daily_loads) std_dev = statistics.stdev(daily_loads) if std_dev == 0: return None monotony = mean_load / std_dev return round(monotony, 2) def calculate_strain_score(profile_id: str) -> Optional[int]: """ Calculate training strain (last 7 days) Strain = weekly load × monotony """ weekly_load = calculate_proxy_internal_load_7d(profile_id) monotony = calculate_monotony_score(profile_id) if weekly_load is None or monotony is None: return None strain = weekly_load * monotony return int(strain) # ============================================================================ # A6: Activity Goal Alignment Score (Dynamic Focus Areas) # ============================================================================ def calculate_activity_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]: """ Activity goal alignment score 0-100 Weighted by user's activity-related focus areas """ if focus_weights is None: from data_layer.scores import get_user_focus_weights focus_weights = get_user_focus_weights(profile_id) # Activity-related focus areas (English keys from DB); Gewichte float (kein Decimal×float) strength = float(focus_weights.get('strength', 0) or 0) strength_endurance = float(focus_weights.get('strength_endurance', 0) or 0) power = float(focus_weights.get('power', 0) or 0) total_strength = strength + strength_endurance + power aerobic = float(focus_weights.get('aerobic_endurance', 0) or 0) anaerobic = float(focus_weights.get('anaerobic_endurance', 0) or 0) cardiovascular = float(focus_weights.get('cardiovascular_health', 0) or 0) total_cardio = aerobic + anaerobic + cardiovascular flexibility = float(focus_weights.get('flexibility', 0) or 0) mobility = float(focus_weights.get('mobility', 0) or 0) balance = float(focus_weights.get('balance', 0) or 0) reaction = float(focus_weights.get('reaction', 0) or 0) rhythm = float(focus_weights.get('rhythm', 0) or 0) coordination = float(focus_weights.get('coordination', 0) or 0) total_ability = flexibility + mobility + balance + reaction + rhythm + coordination total_activity_weight = total_strength + total_cardio + total_ability if total_activity_weight == 0: return None # No activity goals components = [] # 1. Weekly minutes (general activity volume) minutes = calculate_training_minutes_week(profile_id) if minutes is not None: # WHO: 150-300 min/week if 150 <= minutes <= 300: minutes_score = 100 elif minutes < 150: minutes_score = max(40, (minutes / 150) * 100) else: minutes_score = max(80, 100 - ((minutes - 300) / 10)) # Volume relevant for all activity types (20% base weight) components.append(('minutes', minutes_score, total_activity_weight * 0.2)) # 2. Quality sessions (always relevant) quality_pct = calculate_quality_sessions_pct(profile_id) if quality_pct is not None: # Quality gets 10% base weight components.append(('quality', quality_pct, total_activity_weight * 0.1)) # 3. Strength presence (if strength focus active) if total_strength > 0: strength_score = _score_strength_presence(profile_id) if strength_score is not None: components.append(('strength', strength_score, total_strength)) # 4. Cardio presence (if cardio focus active) if total_cardio > 0: cardio_score = _score_cardio_presence(profile_id) if cardio_score is not None: components.append(('cardio', cardio_score, total_cardio)) # 5. Ability balance (if mobility/coordination focus active) if total_ability > 0: balance_score = _score_ability_balance(profile_id) if balance_score is not None: components.append(('balance', balance_score, total_ability)) if not components: return None # Weighted average (float: DB-Aggregate können Decimal sein) total_score = sum(float(score) * float(weight) for _, score, weight in components) total_weight = sum(float(weight) for _, _, weight in components) return int(total_score / total_weight) def _score_strength_presence(profile_id: str) -> Optional[int]: """Score strength training presence (0-100)""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT COUNT(DISTINCT date) as strength_days FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' AND training_category = 'strength' """, (profile_id,)) row = cur.fetchone() if not row: return None strength_days = row['strength_days'] # Target: 2-4 days/week if 2 <= strength_days <= 4: return 100 elif strength_days == 1: return 60 elif strength_days == 5: return 85 elif strength_days == 0: return 0 else: return 70 def _score_cardio_presence(profile_id: str) -> Optional[int]: """Score cardio training presence (0-100)""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT COUNT(DISTINCT date) as cardio_days, SUM(duration_min) as cardio_minutes FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' AND training_category = 'cardio' """, (profile_id,)) row = cur.fetchone() if not row: return None # psycopg2: SUM() → oft Decimal — vor Mix mit float konvertieren cardio_days = int(row['cardio_days'] or 0) cardio_minutes = float(row['cardio_minutes'] or 0) # Target: 3-5 days/week, 150+ minutes day_score = min(100.0, (cardio_days / 4) * 100) minute_score = min(100.0, (cardio_minutes / 150) * 100) return int((day_score + minute_score) / 2) def _score_ability_balance(profile_id: str) -> Optional[int]: """Score ability balance (0-100)""" balance = calculate_ability_balance(profile_id) if not balance: return None # Good balance = all abilities > 40, std_dev < 30 values = list(balance.values()) min_value = min(values) std_dev = statistics.stdev(values) if len(values) > 1 else 0 # Score based on minimum coverage and balance min_score = min(100, min_value * 2) # Want all > 50 balance_score = max(0, 100 - (std_dev * 2)) # Want low std_dev return int((min_score + balance_score) / 2) # ============================================================================ # A7: Rest Day Compliance # ============================================================================ def calculate_rest_day_compliance(profile_id: str) -> Optional[int]: """ Calculate rest day compliance percentage (last 28 days) Returns percentage of planned rest days that were respected """ with get_db() as conn: cur = get_cursor(conn) # Get planned rest days cur.execute(""" SELECT date, rest_config->>'focus' as rest_type FROM rest_days WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) rest_days = {row['date']: row['rest_type'] for row in cur.fetchall()} if not rest_days: return None # Check if training occurred on rest days cur.execute(""" SELECT date, training_category FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) training_days = {} for row in cur.fetchall(): if row['date'] not in training_days: training_days[row['date']] = [] training_days[row['date']].append(row['training_category']) # Count compliance compliant = 0 total = len(rest_days) for rest_date, rest_type in rest_days.items(): if rest_date not in training_days: # Full rest = compliant compliant += 1 else: # Check if training violates rest type categories = training_days[rest_date] if rest_type == 'strength_rest' and 'strength' not in categories: compliant += 1 elif rest_type == 'cardio_rest' and 'cardio' not in categories: compliant += 1 # If rest_type == 'recovery', any training = non-compliant compliance_pct = (compliant / total) * 100 return int(compliance_pct) # ============================================================================ # A8: VO2max Development # ============================================================================ def calculate_vo2max_trend_28d(profile_id: str) -> Optional[float]: """Calculate VO2max trend (change over 28 days)""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT vo2_max, date FROM vitals_baseline WHERE profile_id = %s AND vo2_max IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' ORDER BY date DESC """, (profile_id,)) measurements = cur.fetchall() if len(measurements) < 2: return None recent = measurements[0]['vo2_max'] oldest = measurements[-1]['vo2_max'] change = recent - oldest return round(change, 1) # ============================================================================ # Data Quality Assessment # ============================================================================ def calculate_activity_data_quality(profile_id: str) -> Dict[str, any]: """ Assess data quality for activity metrics Returns dict with quality score and details """ with get_db() as conn: cur = get_cursor(conn) # Activity entries last 28 days cur.execute(""" SELECT COUNT(*) as total, COUNT(hr_avg) as with_hr, COUNT(rpe) as with_quality FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) counts = cur.fetchone() total_entries = counts['total'] hr_coverage = counts['with_hr'] / total_entries if total_entries > 0 else 0 quality_coverage = counts['with_quality'] / total_entries if total_entries > 0 else 0 # Score components frequency_score = min(100, (total_entries / 15) * 100) # 15 = ~4 sessions/week hr_score = hr_coverage * 100 quality_score = quality_coverage * 100 # Overall score overall_score = int( frequency_score * 0.5 + hr_score * 0.25 + quality_score * 0.25 ) if overall_score >= 80: confidence = "high" elif overall_score >= 60: confidence = "medium" else: confidence = "low" return { "overall_score": overall_score, "confidence": confidence, "measurements": { "activities_28d": total_entries, "hr_coverage_pct": int(hr_coverage * 100), "quality_coverage_pct": int(quality_coverage * 100) }, "component_scores": { "frequency": int(frequency_score), "hr": int(hr_score), "quality": int(quality_score) } } def _session_sort_ts(row: Dict) -> datetime: """Einheitlicher Zeitstempel für Sortierung und Pausenberechnung.""" d = row["date"] if isinstance(d, str): d = datetime.strptime(d[:10], "%Y-%m-%d").date() st = row.get("start_time") if st is None: t = time(12, 0, 0) else: t = st return datetime.combine(d, t) def get_training_frequency_by_type_data( profile_id: str, days: int = 28, ) -> Dict[str, Any]: """ Pro activity_type (Roh-Label aus Import/Anzeige): Häufigkeit & Intensitätskennzahlen. Returns: { "days_analyzed": int, "confidence": str, "by_type": [ { "activity_type": str, "session_count": int, "sessions_per_week": float, "avg_duration_min": float | None, "avg_kcal_active": float | None, "avg_hr_avg": float | None, "avg_hr_max": float | None, "avg_rpe": float | None, "avg_kcal_per_min": float | None, # grobe Intensität, wenn kcal & Dauer }, ... ], } """ weeks = max(days / 7.0, 0.01) with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") cur.execute( """ SELECT activity_type, COUNT(*)::int AS session_count, AVG(duration_min)::float AS avg_duration_min, AVG(kcal_active)::float AS avg_kcal_active, AVG(hr_avg)::float AS avg_hr_avg, AVG(hr_max)::float AS avg_hr_max, AVG(rpe)::float AS avg_rpe, SUM(COALESCE(duration_min, 0))::float AS sum_duration, SUM(COALESCE(kcal_active, 0))::float AS sum_kcal FROM activity_log WHERE profile_id = %s AND date >= %s GROUP BY activity_type ORDER BY session_count DESC """, (profile_id, cutoff), ) rows = [r2d(r) for r in cur.fetchall()] if not rows: return { "days_analyzed": days, "confidence": "insufficient", "by_type": [], } by_type = [] for r in rows: sc = int(r["session_count"]) sum_dur = float(r["sum_duration"] or 0) sum_kcal = float(r["sum_kcal"] or 0) kcal_per_min = (sum_kcal / sum_dur) if sum_dur > 0 else None by_type.append( { "activity_type": r["activity_type"], "session_count": sc, "sessions_per_week": round(sc / weeks, 2), "avg_duration_min": r["avg_duration_min"], "avg_kcal_active": r["avg_kcal_active"], "avg_hr_avg": r["avg_hr_avg"], "avg_hr_max": r["avg_hr_max"], "avg_rpe": r["avg_rpe"], "avg_kcal_per_min": round(kcal_per_min, 2) if kcal_per_min is not None else None, } ) total_sessions = sum(x["session_count"] for x in by_type) confidence = calculate_confidence(total_sessions, days, "general") return { "days_analyzed": days, "confidence": confidence, "by_type": by_type, } def get_training_inter_session_gap_data( profile_id: str, days: int = 28, ) -> Dict[str, Any]: """ Mittlere/median Pausen zwischen aufeinanderfolgenden Trainingseinheiten (Stunden). Sortierung: Datum + start_time (fehlend → 12:00), dann created. """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") cur.execute( """ SELECT date, start_time, created FROM activity_log WHERE profile_id = %s AND date >= %s ORDER BY date ASC, start_time ASC NULLS LAST, created ASC """, (profile_id, cutoff), ) rows = [r2d(r) for r in cur.fetchall()] if len(rows) < 2: return { "days_analyzed": days, "confidence": "insufficient", "gap_hours_median": None, "gap_hours_mean": None, "gap_hours_min": None, "gaps_count": 0, } gaps = [] prev_ts = None for r in rows: ts = _session_sort_ts(r) if prev_ts is not None: gaps.append((ts - prev_ts).total_seconds() / 3600.0) prev_ts = ts if not gaps: return { "days_analyzed": days, "confidence": "insufficient", "gap_hours_median": None, "gap_hours_mean": None, "gap_hours_min": None, "gaps_count": 0, } gaps_sorted = sorted(gaps) mid = len(gaps_sorted) // 2 median = ( gaps_sorted[mid] if len(gaps_sorted) % 2 else (gaps_sorted[mid - 1] + gaps_sorted[mid]) / 2.0 ) confidence = calculate_confidence(len(rows), days, "general") return { "days_analyzed": days, "confidence": confidence, "gap_hours_median": round(median, 1), "gap_hours_mean": round(statistics.mean(gaps), 1), "gap_hours_min": round(min(gaps), 1), "gaps_count": len(gaps), } def get_training_sessions_recent_weeks_data( profile_id: str, weeks: int = 4, ) -> Dict[str, Any]: """ Letzte Wochen mit Einzeltrainings für KI-Kontext (Dauer, kcal, HF, Typ). weeks: Anzahl zurückliegender ISO-Kalenderwochen (Default 4). """ days = max(weeks * 7, 7) with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") cur.execute( """ SELECT a.id, a.date, a.start_time, a.activity_type, a.training_category, a.duration_min, a.kcal_active, a.hr_avg, a.hr_max, a.rpe, tt.name_de AS training_type_name FROM activity_log a LEFT JOIN training_types tt ON tt.id = a.training_type_id WHERE a.profile_id = %s AND a.date >= %s ORDER BY a.date ASC, a.start_time ASC NULLS LAST, a.created ASC """, (profile_id, cutoff), ) rows = [r2d(r) for r in cur.fetchall()] enrich_sessions_with_metrics(cur, rows) if not rows: return { "weeks": [], "meta": { "weeks_requested": weeks, "days_loaded": days, "session_count": 0, "confidence": "insufficient", }, } by_week: Dict[str, List[Dict]] = {} for r in rows: d = r["date"] if isinstance(d, str): d = datetime.strptime(d[:10], "%Y-%m-%d").date() iso = d.isocalendar() wk = f"{iso.year}-W{iso.week:02d}" if wk not in by_week: by_week[wk] = [] dur = r.get("duration_min") dur_f = float(dur) if dur is not None else None kcal = r.get("kcal_active") kcal_f = float(kcal) if kcal is not None else None hr_a = r.get("hr_avg") hr_m = r.get("hr_max") by_week[wk].append( { "id": str(r["id"]), "date": d, "start_time": str(r["start_time"]) if r.get("start_time") is not None else None, "activity_type": r.get("activity_type"), "training_category": r.get("training_category"), "training_type_name": r.get("training_type_name"), "duration_min": dur_f, "kcal_active": kcal_f, "hr_avg": int(hr_a) if hr_a is not None else None, "hr_max": int(hr_m) if hr_m is not None else None, "rpe": int(r["rpe"]) if r.get("rpe") is not None else None, "session_metrics": r.get("session_metrics", []), } ) week_keys = sorted(by_week.keys()) weeks_out = [{"week_iso": wk, "sessions": by_week[wk]} for wk in week_keys] confidence = calculate_confidence(len(rows), days, "general") return serialize_dates( { "weeks": weeks_out, "meta": { "weeks_requested": weeks, "days_loaded": days, "session_count": len(rows), "confidence": confidence, }, } ) def get_training_parameters_ki_glossary_data(profile_id: str) -> Dict[str, Any]: """ Alle aktiven ``training_parameters`` für KI-Kontext (z. B. neben ``training_sessions_recent_json``). Enthält technischen key, name_de/name_en, description_de/description_en, data_type, unit, category. Args: profile_id: Reserviert für spätere Einschränkung (z. B. nur im Profil vorkommende Keys); aktuell ungenutzt, Signatur bleibt für Platzhalter-Resolver. """ _ = profile_id with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT key, name_de, name_en, description_de, description_en, data_type, unit, category FROM training_parameters WHERE is_active = true ORDER BY category, key """ ) rows = [r2d(r) for r in cur.fetchall()] return { "parameters": rows, "meta": {"count": len(rows), "scope": "global_active_catalog"}, }