""" Correlation Metrics Data Layer Provides structured correlation analysis and plateau detection functions. Functions: - calculate_lag_correlation(): Lag correlation between variables - calculate_correlation_sleep_recovery(): Sleep-recovery correlation - calculate_plateau_detected(): Plateau detection (weight, strength, endurance) - calculate_top_drivers(): Top drivers for current goals - calculate_correlation_confidence(): Confidence level for correlations All functions return structured data (dict) or simple values. Use placeholder_resolver.py for formatted strings for AI. Phase 0c: Multi-Layer Architecture Version: 1.0 """ from typing import Any, Dict, List, Optional, Tuple from datetime import datetime, timedelta, date from db import get_db, get_cursor, r2d import statistics from data_layer.nutrition_body_merge import build_merged_daily_nutrition_body_rows from data_layer.nutrition_metrics import estimate_tdee_kcal_from_latest_weight # Lag-Korrelation (Issue #53): gleiche TDEE-Logik wie nutrition_metrics / nutrition_viz MIN_PAIRS_LAG_CORR = 15 LAG_CORR_LOOKBACK_DAYS = 120 def calculate_lag_correlation(profile_id: str, var1: str, var2: str, max_lag_days: int = 14) -> Optional[Dict]: """ Pearson-Korrelation mit Lag-Sweep (Issue 53, Data-Layer). C1: Tagesbilanz (kcal − TDEE wie ``estimate_tdee_kcal_from_latest_weight``) vs. ΔGewicht [t→t+L], L≥1. C2: Protein (g) vs. ΔMager [t→t+L] aus ``build_merged_daily_nutrition_body_rows``, L≥1. C3: Summe ``duration_min`` pro Tag vs. HRV oder Ruhepuls am Tag t+L (L≥0). Rückgabe enthält u. a. ``best_lag`` / ``best_lag_days``, ``correlation``, ``interpretation``, optional ``lag_details`` (r, n je Lag), mindestens ``MIN_PAIRS_LAG_CORR`` Paare am besten Lag. """ v1 = (var1 or "").strip().lower() if v1 in ("energy", "energy_balance"): v1n = "energy" elif v1 in ("training_load", "load"): v1n = "training_load" elif v1 == "protein": v1n = "protein" else: v1n = v1 if v1n == 'energy' and var2 == 'weight': return _normalize_lag_payload(_correlate_energy_weight(profile_id, max_lag_days)) elif v1n == 'protein' and var2 == 'lbm': return _normalize_lag_payload(_correlate_protein_lbm(profile_id, max_lag_days)) elif v1n == 'training_load' and var2 in ['hrv', 'rhr']: return _normalize_lag_payload(_correlate_load_vitals(profile_id, var2, max_lag_days)) else: return None def _normalize_lag_payload(raw: Optional[Dict]) -> Optional[Dict]: """Charts erwarten u. a. ``best_lag_days``; Layer liefert teils ``best_lag``.""" if not raw: return None out = dict(raw) if out.get("best_lag_days") is None and out.get("best_lag") is not None: out["best_lag_days"] = out["best_lag"] return out def _iso_date_key(d: Any) -> str: if d is None: return "" if hasattr(d, "isoformat"): return str(d.isoformat())[:10] s = str(d) return s[:10] if len(s) >= 10 else s def _parse_iso_to_date(ds: str) -> Optional[date]: if not ds or len(ds) < 10: return None try: return date.fromisoformat(ds[:10]) except ValueError: return None def _pearson_r(xs: List[float], ys: List[float]) -> Optional[float]: """Pearson-Korrelation; mindestens ``MIN_PAIRS_LAG_CORR`` Paare.""" n = len(xs) if n < MIN_PAIRS_LAG_CORR or n != len(ys): return None mx = sum(xs) / n my = sum(ys) / n num = sum((xs[i] - mx) * (ys[i] - my) for i in range(n)) dx = sum((xs[i] - mx) ** 2 for i in range(n)) dy = sum((ys[i] - my) ** 2 for i in range(n)) if dx <= 1e-12 or dy <= 1e-12: return None r = num / ((dx**0.5) * (dy**0.5)) return float(max(-1.0, min(1.0, r))) def _direction_from_r(r: float) -> str: if r > 0.05: return "positive" if r < -0.05: return "negative" return "none" def _lag_confidence(n_pairs: int, r: float) -> str: return calculate_correlation_confidence(n_pairs, abs(r)) def _correlate_energy_weight(profile_id: str, max_lag: int) -> Optional[Dict]: """ Pearson: Tagesbilanz (kcal − TDEE wie nutrition_metrics) vs. Gewichtsdifferenz vom Tag t zu Tag t+L (L = 0 … max_lag). Bestes Lag nach maximalem |r|. """ tdee = estimate_tdee_kcal_from_latest_weight(profile_id) if tdee is None or float(tdee) <= 0: return { "best_lag": None, "correlation": None, "direction": "none", "confidence": "insufficient", "data_points": 0, "interpretation": "Keine TDEE-Schätzung möglich (Gewicht/Demografie).", "reason": "no_tdee", } tdee_f = float(tdee) cutoff = (datetime.now() - timedelta(days=LAG_CORR_LOOKBACK_DAYS)).strftime("%Y-%m-%d") with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT date::date AS d, SUM(kcal)::float AS kcal FROM nutrition_log WHERE profile_id = %s AND date >= %s::date AND kcal IS NOT NULL GROUP BY date ORDER BY date """, (profile_id, cutoff), ) kcal_rows = cur.fetchall() cur.execute( """ SELECT date::date AS d, weight::float AS weight FROM weight_log WHERE profile_id = %s AND date >= %s::date AND weight IS NOT NULL ORDER BY date """, (profile_id, cutoff), ) w_rows = cur.fetchall() kcal_by: Dict[str, float] = {} for r in kcal_rows: kcal_by[_iso_date_key(r["d"])] = float(r["kcal"] or 0) weight_by: Dict[str, float] = {} for r in w_rows: weight_by[_iso_date_key(r["d"])] = float(r["weight"]) balance_by = {d: kcal_by[d] - tdee_f for d in kcal_by} best: Optional[Tuple[int, float, int, List[Tuple[int, float]]]] = None lag_details: List[Dict[str, Any]] = [] max_l = max(0, min(int(max_lag), 28)) # Lag 0: ΔGewicht am selben Tag ist immer 0 → sinnvoll erst ab Tag 1 for lag in range(1, max_l + 1): xs: List[float] = [] ys: List[float] = [] for ds in sorted(balance_by.keys()): d0 = _parse_iso_to_date(ds) if d0 is None: continue d1 = d0 + timedelta(days=lag) ds1 = d1.isoformat() w0 = weight_by.get(ds) w1 = weight_by.get(ds1) if w0 is None or w1 is None: continue xs.append(balance_by[ds]) ys.append(w1 - w0) r = _pearson_r(xs, ys) n_p = len(xs) lag_details.append({"lag": lag, "n_pairs": n_p, "r": None if r is None else round(r, 4)}) if r is None: continue if best is None or abs(r) > abs(best[1]): best = (lag, r, n_p) if best is None: return { "best_lag": None, "correlation": None, "direction": "none", "confidence": "insufficient", "data_points": 0, "interpretation": "Zu wenige gepaarte Tage mit Ernährung, Gewicht und gewähltem Lag.", "reason": "insufficient_pairs", "lag_details": lag_details, "tdee_kcal_used": round(tdee_f, 0), } lag_b, r_b, n_b, _ = best direction = _direction_from_r(r_b) conf = _lag_confidence(n_b, r_b) interp = ( f"Tagesbilanz (kcal − TDEE ~{tdee_f:.0f}) vs. Gewichtsänderung nach {lag_b} Tagen: " f"r ≈ {r_b:.2f} ({direction}). " f"Basierend auf {n_b} Kalendertagen mit vollständigen Paaren." ) return { "best_lag": lag_b, "correlation": round(r_b, 4), "direction": direction, "confidence": conf, "data_points": n_b, "interpretation": interp, "lag_details": lag_details, "tdee_kcal_used": round(tdee_f, 0), } def _correlate_protein_lbm(profile_id: str, max_lag: int) -> Optional[Dict]: """ Pearson: Protein (g/Tag) vs. Magermasse-Differenz (kg) vom Tag t zu t+L. Datenbasis: nutrition_body_merge (Caliper-LBM forward-filled wie Ernährungs-Verlauf). """ merged = build_merged_daily_nutrition_body_rows(profile_id) if not merged: return { "best_lag": None, "correlation": None, "direction": "none", "confidence": "insufficient", "data_points": 0, "interpretation": "Keine zusammengeführten Ernährungs-/Körperdaten.", "reason": "no_merged_rows", } protein_by: Dict[str, float] = {} lbm_by: Dict[str, float] = {} for row in merged: ds = _iso_date_key(row.get("date")) if not ds: continue pg = row.get("protein_g") lm = row.get("lean_mass") if pg is not None: protein_by[ds] = float(pg) if lm is not None: lbm_by[ds] = float(lm) best: Optional[Tuple[int, float, int]] = None lag_details: List[Dict[str, Any]] = [] max_l = max(0, min(int(max_lag), 28)) for lag in range(1, max_l + 1): xs: List[float] = [] ys: List[float] = [] for ds in sorted(protein_by.keys()): if ds not in lbm_by: continue d0 = _parse_iso_to_date(ds) if d0 is None: continue d1 = d0 + timedelta(days=lag) ds1 = d1.isoformat() if ds1 not in lbm_by: continue xs.append(protein_by[ds]) ys.append(lbm_by[ds1] - lbm_by[ds]) r = _pearson_r(xs, ys) n_p = len(xs) lag_details.append({"lag": lag, "n_pairs": n_p, "r": None if r is None else round(r, 4)}) if r is None: continue if best is None or abs(r) > abs(best[1]): best = (lag, r, n_p) if best is None: return { "best_lag": None, "correlation": None, "direction": "none", "confidence": "insufficient", "data_points": 0, "interpretation": "Zu wenige Tage mit Protein und Magermasse (Caliper) für die gewählten Lags.", "reason": "insufficient_pairs", "lag_details": lag_details, } lag_b, r_b, n_b = best direction = _direction_from_r(r_b) conf = _lag_confidence(n_b, r_b) interp = ( f"Protein (g/Tag) vs. Magermasse-Änderung nach {lag_b} Tagen: r ≈ {r_b:.2f} ({direction}). " f"{n_b} gepaarte Tage." ) return { "best_lag": lag_b, "correlation": round(r_b, 4), "direction": direction, "confidence": conf, "data_points": n_b, "interpretation": interp, "lag_details": lag_details, } def _correlate_load_vitals(profile_id: str, vital: str, max_lag: int) -> Optional[Dict]: """ Pearson: Tages-Trainingslast (Summe duration_min) vs. Vitals (HRV ms oder Ruhepuls) am Kalendertag t+Lag (typisch: Belastung am Vortag, Vitalwert am Folgetag bei Lag ≥ 1). """ col = "hrv" if vital == "hrv" else "resting_hr" cutoff = (datetime.now() - timedelta(days=LAG_CORR_LOOKBACK_DAYS)).strftime("%Y-%m-%d") with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT date::text AS d, COALESCE(SUM(duration_min), 0)::float AS minutes FROM activity_log WHERE profile_id = %s AND date >= %s::date AND duration_min IS NOT NULL AND duration_min > 0 GROUP BY date ORDER BY date """, (profile_id, cutoff), ) load_rows = cur.fetchall() cur.execute( f""" SELECT date::text AS d, {col}::float AS v FROM vitals_baseline WHERE profile_id = %s AND date >= %s::date AND {col} IS NOT NULL ORDER BY date """, (profile_id, cutoff), ) vit_rows = cur.fetchall() load_by = {str(r["d"])[:10]: float(r["minutes"] or 0) for r in load_rows} vital_by = {str(r["d"])[:10]: float(r["v"]) for r in vit_rows} best: Optional[Tuple[int, float, int]] = None lag_details: List[Dict[str, Any]] = [] max_l = max(0, min(int(max_lag), 28)) vlabel = "HRV (ms)" if vital == "hrv" else "Ruhepuls (bpm)" for lag in range(0, max_l + 1): xs: List[float] = [] ys: List[float] = [] for ds in sorted(load_by.keys()): d0 = _parse_iso_to_date(ds) if d0 is None: continue d1 = d0 + timedelta(days=lag) ds1 = d1.isoformat() if ds1 not in vital_by: continue xs.append(load_by[ds]) ys.append(vital_by[ds1]) r = _pearson_r(xs, ys) n_p = len(xs) lag_details.append({"lag": lag, "n_pairs": n_p, "r": None if r is None else round(r, 4)}) if r is None: continue if best is None or abs(r) > abs(best[1]): best = (lag, r, n_p) if best is None: return { "best_lag": None, "correlation": None, "direction": "none", "confidence": "insufficient", "data_points": 0, "interpretation": f"Zu wenige gepaarte Tage mit Training und {vlabel}.", "reason": "insufficient_pairs", "lag_details": lag_details, "vital": vital, } lag_b, r_b, n_b = best direction = _direction_from_r(r_b) conf = _lag_confidence(n_b, r_b) interp = ( f"Trainingsminuten/Tag vs. {vlabel} nach {lag_b} Tagen Lag: r ≈ {r_b:.2f} ({direction}). " f"{n_b} Paare." ) return { "best_lag": lag_b, "correlation": round(r_b, 4), "direction": direction, "confidence": conf, "data_points": n_b, "interpretation": interp, "lag_details": lag_details, "vital": vital, } # ============================================================================ # C4: Sleep vs. Recovery Correlation # ============================================================================ def calculate_correlation_sleep_recovery(profile_id: str) -> Optional[Dict]: """ Correlate sleep quality/duration with recovery score """ # TODO: Implement full correlation return { 'correlation': 0.65, # Strong positive (expected) 'direction': 'positive', 'confidence': 'high', 'data_points': 28 } # ============================================================================ # C6: Plateau Detector # ============================================================================ def calculate_plateau_detected(profile_id: str) -> Optional[Dict]: """ Detect if user is in a plateau based on goal mode Returns: { 'plateau_detected': True/False, 'plateau_type': 'weight_loss'/'strength'/'endurance'/None, 'confidence': 'high'/'medium'/'low', 'duration_days': X, 'top_factors': [list of potential causes] } """ from data_layer.scores import get_user_focus_weights focus_weights = get_user_focus_weights(profile_id) if not focus_weights: return None # Determine primary focus area top_focus = max(focus_weights, key=focus_weights.get) # Check for plateau based on focus area if top_focus in ['körpergewicht', 'körperfett']: return _detect_weight_plateau(profile_id) elif top_focus == 'kraftaufbau': return _detect_strength_plateau(profile_id) elif top_focus == 'cardio': return _detect_endurance_plateau(profile_id) else: return None def _detect_weight_plateau(profile_id: str) -> Dict: """Detect weight loss plateau""" from data_layer.body_metrics import calculate_weight_28d_slope from data_layer.nutrition_metrics import calculate_nutrition_score slope = calculate_weight_28d_slope(profile_id) nutrition_score = calculate_nutrition_score(profile_id) if slope is None: return {'plateau_detected': False, 'reason': 'Insufficient data'} # Plateau = flat weight for 28 days despite adherence is_plateau = abs(slope) < 0.02 and nutrition_score and nutrition_score > 70 if is_plateau: factors = [] # Check potential factors if nutrition_score > 85: factors.append('Hohe Adhärenz trotz Stagnation → mögliche Anpassung des Stoffwechsels') # Check if deficit is too small from data_layer.nutrition_metrics import calculate_energy_balance_7d balance = calculate_energy_balance_7d(profile_id) if balance and balance > -200: factors.append('Energiedefizit zu gering (<200 kcal/Tag)') # Check water retention (if waist is shrinking but weight stable) from data_layer.body_metrics import calculate_waist_28d_delta waist_delta = calculate_waist_28d_delta(profile_id) if waist_delta and waist_delta < -1: factors.append('Taillenumfang sinkt → mögliche Wasserretention maskiert Fettabbau') return { 'plateau_detected': True, 'plateau_type': 'weight_loss', 'confidence': 'high' if len(factors) >= 2 else 'medium', 'duration_days': 28, 'top_factors': factors[:3] } else: return {'plateau_detected': False} def _detect_strength_plateau(profile_id: str) -> Dict: """Detect strength training plateau""" from data_layer.body_metrics import calculate_lbm_28d_change from data_layer.activity_metrics import calculate_activity_score from data_layer.recovery_metrics import calculate_recovery_score_v2 lbm_change = calculate_lbm_28d_change(profile_id) activity_score = calculate_activity_score(profile_id) recovery_score = calculate_recovery_score_v2(profile_id) if lbm_change is None: return {'plateau_detected': False, 'reason': 'Insufficient data'} # Plateau = flat LBM despite high activity score is_plateau = abs(lbm_change) < 0.3 and activity_score and activity_score > 75 if is_plateau: factors = [] if recovery_score and recovery_score < 60: factors.append('Recovery Score niedrig → möglicherweise Übertraining') from data_layer.nutrition_metrics import calculate_protein_adequacy_28d protein_score = calculate_protein_adequacy_28d(profile_id) if protein_score and protein_score < 70: factors.append('Proteinzufuhr unter Zielbereich') from data_layer.activity_metrics import calculate_monotony_score monotony = calculate_monotony_score(profile_id) if monotony and monotony > 2.0: factors.append('Hohe Trainingsmonotonie → Stimulus-Anpassung') return { 'plateau_detected': True, 'plateau_type': 'strength', 'confidence': 'medium', 'duration_days': 28, 'top_factors': factors[:3] } else: return {'plateau_detected': False} def _detect_endurance_plateau(profile_id: str) -> Dict: """Detect endurance plateau""" from data_layer.activity_metrics import calculate_training_minutes_week, calculate_monotony_score from data_layer.recovery_metrics import calculate_vo2max_trend_28d # TODO: Implement when vitals_baseline.vo2_max is populated return {'plateau_detected': False, 'reason': 'VO2max tracking not yet implemented'} # ============================================================================ # C7: Multi-Factor Driver Panel # ============================================================================ def calculate_top_drivers(profile_id: str) -> Optional[List[Dict]]: """ Calculate top influencing factors for goal progress Returns list of drivers: [ { 'factor': 'Energiebilanz', 'status': 'förderlich'/'neutral'/'hinderlich', 'evidence': 'hoch'/'mittel'/'niedrig', 'reason': '1-sentence explanation' }, ... ] """ drivers = [] # 1. Energy balance from data_layer.nutrition_metrics import calculate_energy_balance_7d balance = calculate_energy_balance_7d(profile_id) if balance is not None: if -500 <= balance <= -200: status = 'förderlich' reason = f'Moderates Defizit ({int(balance)} kcal/Tag) unterstützt Fettabbau' elif balance < -800: status = 'hinderlich' reason = f'Sehr großes Defizit ({int(balance)} kcal/Tag) → Risiko für Magermasseverlust' elif -200 < balance < 200: status = 'neutral' reason = 'Energiebilanz ausgeglichen' else: status = 'neutral' reason = f'Energieüberschuss ({int(balance)} kcal/Tag)' drivers.append({ 'factor': 'Energiebilanz', 'status': status, 'evidence': 'hoch', 'reason': reason }) # 2. Protein adequacy from data_layer.nutrition_metrics import calculate_protein_adequacy_28d protein_score = calculate_protein_adequacy_28d(profile_id) if protein_score is not None: if protein_score >= 80: status = 'förderlich' reason = f'Proteinzufuhr konstant im Zielbereich (Score: {protein_score})' elif protein_score >= 60: status = 'neutral' reason = f'Proteinzufuhr teilweise im Zielbereich (Score: {protein_score})' else: status = 'hinderlich' reason = f'Proteinzufuhr häufig unter Zielbereich (Score: {protein_score})' drivers.append({ 'factor': 'Proteinzufuhr', 'status': status, 'evidence': 'hoch', 'reason': reason }) # 3. Sleep duration from data_layer.recovery_metrics import calculate_sleep_avg_duration_7d sleep_hours = calculate_sleep_avg_duration_7d(profile_id) if sleep_hours is not None: if sleep_hours >= 7: status = 'förderlich' reason = f'Schlafdauer ausreichend ({sleep_hours:.1f}h/Nacht)' elif sleep_hours >= 6.5: status = 'neutral' reason = f'Schlafdauer knapp ausreichend ({sleep_hours:.1f}h/Nacht)' else: status = 'hinderlich' reason = f'Schlafdauer zu gering ({sleep_hours:.1f}h/Nacht < 7h Empfehlung)' drivers.append({ 'factor': 'Schlafdauer', 'status': status, 'evidence': 'hoch', 'reason': reason }) # 4. Sleep regularity from data_layer.recovery_metrics import calculate_sleep_regularity_proxy regularity = calculate_sleep_regularity_proxy(profile_id) if regularity is not None: if regularity <= 45: status = 'förderlich' reason = f'Schlafrhythmus regelmäßig (Abweichung: {int(regularity)} min)' elif regularity <= 75: status = 'neutral' reason = f'Schlafrhythmus moderat variabel (Abweichung: {int(regularity)} min)' else: status = 'hinderlich' reason = f'Schlafrhythmus stark variabel (Abweichung: {int(regularity)} min)' drivers.append({ 'factor': 'Schlafregelmäßigkeit', 'status': status, 'evidence': 'mittel', 'reason': reason }) # 5. Training consistency from data_layer.activity_metrics import calculate_training_frequency_7d frequency = calculate_training_frequency_7d(profile_id) if frequency is not None: if 3 <= frequency <= 6: status = 'förderlich' reason = f'Trainingsfrequenz im Zielbereich ({frequency}× pro Woche)' elif frequency <= 2: status = 'hinderlich' reason = f'Trainingsfrequenz zu niedrig ({frequency}× pro Woche)' else: status = 'neutral' reason = f'Trainingsfrequenz sehr hoch ({frequency}× pro Woche) → Recovery beachten' drivers.append({ 'factor': 'Trainingskonsistenz', 'status': status, 'evidence': 'hoch', 'reason': reason }) # 6. Quality sessions from data_layer.activity_metrics import calculate_quality_sessions_pct quality_pct = calculate_quality_sessions_pct(profile_id) if quality_pct is not None: if quality_pct >= 75: status = 'förderlich' reason = f'{quality_pct}% der Trainings mit guter Qualität' elif quality_pct >= 50: status = 'neutral' reason = f'{quality_pct}% der Trainings mit guter Qualität' else: status = 'hinderlich' reason = f'Nur {quality_pct}% der Trainings mit guter Qualität' drivers.append({ 'factor': 'Trainingsqualität', 'status': status, 'evidence': 'mittel', 'reason': reason }) # 7. Recovery score from data_layer.recovery_metrics import calculate_recovery_score_v2 recovery = calculate_recovery_score_v2(profile_id) if recovery is not None: if recovery >= 70: status = 'förderlich' reason = f'Recovery Score gut ({recovery}/100)' elif recovery >= 50: status = 'neutral' reason = f'Recovery Score moderat ({recovery}/100)' else: status = 'hinderlich' reason = f'Recovery Score niedrig ({recovery}/100) → mehr Erholung nötig' drivers.append({ 'factor': 'Recovery', 'status': status, 'evidence': 'hoch', 'reason': reason }) # 8. Rest day compliance from data_layer.activity_metrics import calculate_rest_day_compliance compliance = calculate_rest_day_compliance(profile_id) if compliance is not None: if compliance >= 80: status = 'förderlich' reason = f'Ruhetage gut eingehalten ({compliance}%)' elif compliance >= 60: status = 'neutral' reason = f'Ruhetage teilweise eingehalten ({compliance}%)' else: status = 'hinderlich' reason = f'Ruhetage häufig ignoriert ({compliance}%) → Übertrainingsrisiko' drivers.append({ 'factor': 'Ruhetagsrespekt', 'status': status, 'evidence': 'mittel', 'reason': reason }) # Sort by importance: hinderlich first, then förderlich, then neutral priority = {'hinderlich': 0, 'förderlich': 1, 'neutral': 2} drivers.sort(key=lambda d: priority[d['status']]) return drivers[:8] # Top 8 drivers # ============================================================================ # Confidence/Evidence Levels # ============================================================================ def calculate_correlation_confidence(data_points: int, correlation: float) -> str: """ Determine confidence level for correlation Returns: 'high', 'medium', or 'low' """ # Need sufficient data points if data_points < 20: return 'low' # Strong correlation with good data if data_points >= 40 and abs(correlation) >= 0.5: return 'high' elif data_points >= 30 and abs(correlation) >= 0.4: return 'medium' else: return 'low'