""" Nutrition Metrics Calculation Engine Implements E1-E5 from visualization concept: - E1: Energy balance vs. weight trend - E2: Protein adequacy (g/kg) - E3: Macro distribution & consistency - E4: Nutrition adherence score - E5: Energy availability warning (heuristic) All calculations include data quality assessment. """ from datetime import datetime, timedelta from typing import Optional, Dict, List import statistics from db import get_db, get_cursor # ============================================================================ # E1: Energy Balance Calculations # ============================================================================ def calculate_energy_balance_7d(profile_id: str) -> Optional[float]: """ Calculate 7-day average energy balance (kcal/day) Positive = surplus, Negative = deficit """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT kcal FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' ORDER BY date DESC """, (profile_id,)) calories = [row['kcal'] for row in cur.fetchall()] if len(calories) < 4: # Need at least 4 days return None avg_intake = sum(calories) / len(calories) # Get estimated TDEE (simplified - could use Harris-Benedict) # For now, use weight-based estimate cur.execute(""" SELECT weight FROM weight_log WHERE profile_id = %s ORDER BY date DESC LIMIT 1 """, (profile_id,)) weight_row = cur.fetchone() if not weight_row: return None # Simple TDEE estimate: bodyweight (kg) × 30-35 # TODO: Improve with activity level, age, gender estimated_tdee = weight_row['weight'] * 32.5 balance = avg_intake - estimated_tdee return round(balance, 0) def calculate_energy_deficit_surplus(profile_id: str, days: int = 7) -> Optional[str]: """ Classify energy balance as deficit/maintenance/surplus Returns: 'deficit', 'maintenance', 'surplus', or None """ balance = calculate_energy_balance_7d(profile_id) if balance is None: return None if balance < -200: return 'deficit' elif balance > 200: return 'surplus' else: return 'maintenance' # ============================================================================ # E2: Protein Adequacy Calculations # ============================================================================ def calculate_protein_g_per_kg(profile_id: str) -> Optional[float]: """Calculate average protein intake in g/kg bodyweight (last 7 days)""" with get_db() as conn: cur = get_cursor(conn) # Get recent weight cur.execute(""" SELECT weight FROM weight_log WHERE profile_id = %s ORDER BY date DESC LIMIT 1 """, (profile_id,)) weight_row = cur.fetchone() if not weight_row: return None weight = weight_row['weight'] # Get protein intake cur.execute(""" SELECT protein_g FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' AND protein_g IS NOT NULL ORDER BY date DESC """, (profile_id,)) protein_values = [row['protein_g'] for row in cur.fetchall()] if len(protein_values) < 4: return None avg_protein = sum(protein_values) / len(protein_values) protein_per_kg = avg_protein / weight return round(protein_per_kg, 2) def calculate_protein_days_in_target(profile_id: str, target_low: float = 1.6, target_high: float = 2.2) -> Optional[str]: """ Calculate how many days in last 7 were within protein target Returns: "5/7" format or None """ with get_db() as conn: cur = get_cursor(conn) # Get recent weight cur.execute(""" SELECT weight FROM weight_log WHERE profile_id = %s ORDER BY date DESC LIMIT 1 """, (profile_id,)) weight_row = cur.fetchone() if not weight_row: return None weight = weight_row['weight'] # Get protein intake last 7 days cur.execute(""" SELECT protein_g, date FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' AND protein_g IS NOT NULL ORDER BY date DESC """, (profile_id,)) protein_data = cur.fetchall() if len(protein_data) < 4: return None # Count days in target range days_in_target = 0 total_days = len(protein_data) for row in protein_data: protein_per_kg = row['protein_g'] / weight if target_low <= protein_per_kg <= target_high: days_in_target += 1 return f"{days_in_target}/{total_days}" def calculate_protein_adequacy_28d(profile_id: str) -> Optional[int]: """ Protein adequacy score 0-100 (last 28 days) Based on consistency and target achievement """ with get_db() as conn: cur = get_cursor(conn) # Get average weight (28d) cur.execute(""" SELECT AVG(weight) as avg_weight FROM weight_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) weight_row = cur.fetchone() if not weight_row or not weight_row['avg_weight']: return None weight = weight_row['avg_weight'] # Get protein intake (28d) cur.execute(""" SELECT protein_g FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' AND protein_g IS NOT NULL """, (profile_id,)) protein_values = [row['protein_g'] for row in cur.fetchall()] if len(protein_values) < 18: # 60% coverage return None # Calculate metrics protein_per_kg_values = [p / weight for p in protein_values] avg_protein_per_kg = sum(protein_per_kg_values) / len(protein_per_kg_values) # Target range: 1.6-2.2 g/kg for active individuals target_mid = 1.9 # Score based on distance from target if 1.6 <= avg_protein_per_kg <= 2.2: base_score = 100 elif avg_protein_per_kg < 1.6: # Below target base_score = max(40, 100 - ((1.6 - avg_protein_per_kg) * 40)) else: # Above target (less penalty) base_score = max(80, 100 - ((avg_protein_per_kg - 2.2) * 10)) # Consistency bonus/penalty std_dev = statistics.stdev(protein_per_kg_values) if std_dev < 0.3: consistency_bonus = 10 elif std_dev < 0.5: consistency_bonus = 0 else: consistency_bonus = -10 final_score = min(100, max(0, base_score + consistency_bonus)) return int(final_score) # ============================================================================ # E3: Macro Distribution & Consistency # ============================================================================ def calculate_macro_consistency_score(profile_id: str) -> Optional[int]: """ Macro consistency score 0-100 (last 28 days) Lower variability = higher score """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT kcal, protein_g, fat_g, carbs_g FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' AND kcal IS NOT NULL ORDER BY date DESC """, (profile_id,)) data = cur.fetchall() if len(data) < 18: return None # Calculate coefficient of variation for each macro def cv(values): """Coefficient of variation (std_dev / mean)""" if not values or len(values) < 2: return None mean = sum(values) / len(values) if mean == 0: return None std_dev = statistics.stdev(values) return std_dev / mean calories_cv = cv([d['kcal'] for d in data]) protein_cv = cv([d['protein_g'] for d in data if d['protein_g']]) fat_cv = cv([d['fat_g'] for d in data if d['fat_g']]) carbs_cv = cv([d['carbs_g'] for d in data if d['carbs_g']]) cv_values = [v for v in [calories_cv, protein_cv, fat_cv, carbs_cv] if v is not None] if not cv_values: return None avg_cv = sum(cv_values) / len(cv_values) # Score: lower CV = higher score # CV < 0.2 = excellent consistency # CV > 0.5 = poor consistency if avg_cv < 0.2: score = 100 elif avg_cv < 0.3: score = 85 elif avg_cv < 0.4: score = 70 elif avg_cv < 0.5: score = 55 else: score = max(30, 100 - (avg_cv * 100)) return int(score) def calculate_intake_volatility(profile_id: str) -> Optional[str]: """ Classify intake volatility: 'stable', 'moderate', 'high' """ consistency = calculate_macro_consistency_score(profile_id) if consistency is None: return None if consistency >= 80: return 'stable' elif consistency >= 60: return 'moderate' else: return 'high' # ============================================================================ # E4: Nutrition Adherence Score (Dynamic Focus Areas) # ============================================================================ def calculate_nutrition_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]: """ Nutrition adherence score 0-100 Weighted by user's nutrition-related focus areas """ if focus_weights is None: from calculations.scores import get_user_focus_weights focus_weights = get_user_focus_weights(profile_id) # Nutrition-related focus areas nutrition_focus = { 'ernährung_basis': focus_weights.get('ernährung_basis', 0), 'ernährung_makros': focus_weights.get('ernährung_makros', 0), 'proteinzufuhr': focus_weights.get('proteinzufuhr', 0), 'kalorienbilanz': focus_weights.get('kalorienbilanz', 0), } total_nutrition_weight = sum(nutrition_focus.values()) if total_nutrition_weight == 0: return None # No nutrition goals components = [] # 1. Calorie target adherence (if kalorienbilanz goal active) if nutrition_focus['kalorienbilanz'] > 0: calorie_score = _score_calorie_adherence(profile_id) if calorie_score is not None: components.append(('calories', calorie_score, nutrition_focus['kalorienbilanz'])) # 2. Protein target adherence (always important if any nutrition goal) protein_score = calculate_protein_adequacy_28d(profile_id) if protein_score is not None: # Higher weight if protein-specific goal protein_weight = nutrition_focus['proteinzufuhr'] or (total_nutrition_weight * 0.3) components.append(('protein', protein_score, protein_weight)) # 3. Intake consistency (always relevant) consistency_score = calculate_macro_consistency_score(profile_id) if consistency_score is not None: consistency_weight = total_nutrition_weight * 0.2 components.append(('consistency', consistency_score, consistency_weight)) # 4. Macro balance (if makros goal active) if nutrition_focus['ernährung_makros'] > 0: macro_score = _score_macro_balance(profile_id) if macro_score is not None: components.append(('macros', macro_score, nutrition_focus['ernährung_makros'])) if not components: return None # Weighted average total_score = sum(score * weight for _, score, weight in components) total_weight = sum(weight for _, _, weight in components) return int(total_score / total_weight) def _score_calorie_adherence(profile_id: str) -> Optional[int]: """Score calorie target adherence (0-100)""" # Get goal (if exists) from goal_utils import get_goals_by_type # Check for energy balance goal # For now, use energy balance calculation balance = calculate_energy_balance_7d(profile_id) if balance is None: return None # Score based on whether deficit/surplus aligns with goal # Simplified: assume weight loss goal = deficit is good # TODO: Check actual goal type abs_balance = abs(balance) # Moderate deficit/surplus = good if 200 <= abs_balance <= 500: return 100 elif 100 <= abs_balance <= 700: return 85 elif abs_balance <= 900: return 70 elif abs_balance <= 1200: return 55 else: return 40 def _score_macro_balance(profile_id: str) -> Optional[int]: """Score macro balance (0-100)""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT protein_g, fat_g, carbs_g, kcal FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' AND protein_g IS NOT NULL AND fat_g IS NOT NULL AND carbs_g IS NOT NULL ORDER BY date DESC """, (profile_id,)) data = cur.fetchall() if len(data) < 18: return None # Calculate average macro percentages macro_pcts = [] for row in data: total_kcal = (row['protein_g'] * 4) + (row['fat_g'] * 9) + (row['carbs_g'] * 4) if total_kcal == 0: continue protein_pct = (row['protein_g'] * 4 / total_kcal) * 100 fat_pct = (row['fat_g'] * 9 / total_kcal) * 100 carbs_pct = (row['carbs_g'] * 4 / total_kcal) * 100 macro_pcts.append((protein_pct, fat_pct, carbs_pct)) if not macro_pcts: return None avg_protein_pct = sum(p for p, _, _ in macro_pcts) / len(macro_pcts) avg_fat_pct = sum(f for _, f, _ in macro_pcts) / len(macro_pcts) avg_carbs_pct = sum(c for _, _, c in macro_pcts) / len(macro_pcts) # Reasonable ranges: # Protein: 20-35% # Fat: 20-35% # Carbs: 30-55% score = 100 # Protein score if not (20 <= avg_protein_pct <= 35): if avg_protein_pct < 20: score -= (20 - avg_protein_pct) * 2 else: score -= (avg_protein_pct - 35) * 1 # Fat score if not (20 <= avg_fat_pct <= 35): if avg_fat_pct < 20: score -= (20 - avg_fat_pct) * 2 else: score -= (avg_fat_pct - 35) * 2 # Carbs score if not (30 <= avg_carbs_pct <= 55): if avg_carbs_pct < 30: score -= (30 - avg_carbs_pct) * 1.5 else: score -= (avg_carbs_pct - 55) * 1.5 return max(40, min(100, int(score))) # ============================================================================ # E5: Energy Availability Warning (Heuristic) # ============================================================================ def calculate_energy_availability_warning(profile_id: str) -> Optional[Dict]: """ Heuristic energy availability warning Returns dict with warning level and reasons """ warnings = [] severity = 'none' # none, low, medium, high # 1. Check for sustained large deficit balance = calculate_energy_balance_7d(profile_id) if balance and balance < -800: warnings.append('Anhaltend großes Energiedefizit (>800 kcal/Tag)') severity = 'medium' if balance < -1200: warnings.append('Sehr großes Energiedefizit (>1200 kcal/Tag)') severity = 'high' # 2. Check recovery score from calculations.recovery_metrics import calculate_recovery_score_v2 recovery = calculate_recovery_score_v2(profile_id) if recovery and recovery < 50: warnings.append('Recovery Score niedrig (<50)') if severity == 'none': severity = 'low' elif severity == 'medium': severity = 'high' # 3. Check LBM trend from calculations.body_metrics import calculate_lbm_28d_change lbm_change = calculate_lbm_28d_change(profile_id) if lbm_change and lbm_change < -1.0: warnings.append('Magermasse sinkt (>1kg in 28 Tagen)') if severity == 'none': severity = 'low' elif severity in ['low', 'medium']: severity = 'high' # 4. Check sleep quality from calculations.recovery_metrics import calculate_sleep_quality_7d sleep_quality = calculate_sleep_quality_7d(profile_id) if sleep_quality and sleep_quality < 60: warnings.append('Schlafqualität verschlechtert') if severity == 'none': severity = 'low' if not warnings: return None return { 'severity': severity, 'warnings': warnings, 'recommendation': _get_energy_warning_recommendation(severity) } def _get_energy_warning_recommendation(severity: str) -> str: """Get recommendation text based on severity""" if severity == 'high': return ("Mögliche Unterversorgung erkannt. Erwäge eine Reduktion des Energiedefizits, " "Erhöhung der Proteinzufuhr und mehr Erholung. Dies ist keine medizinische Diagnose.") elif severity == 'medium': return ("Hinweise auf aggressives Defizit. Beobachte Recovery, Schlaf und Magermasse genau.") else: return ("Leichte Hinweise auf Belastung. Monitoring empfohlen.") # ============================================================================ # Additional Helper Metrics # ============================================================================ def calculate_fiber_avg_7d(profile_id: str) -> Optional[float]: """Calculate average fiber intake (g/day) last 7 days""" # TODO: Implement when fiber column added to nutrition_log return None def calculate_sugar_avg_7d(profile_id: str) -> Optional[float]: """Calculate average sugar intake (g/day) last 7 days""" # TODO: Implement when sugar column added to nutrition_log return None # ============================================================================ # Data Quality Assessment # ============================================================================ def calculate_nutrition_data_quality(profile_id: str) -> Dict[str, any]: """ Assess data quality for nutrition metrics Returns dict with quality score and details """ with get_db() as conn: cur = get_cursor(conn) # Nutrition entries last 28 days cur.execute(""" SELECT COUNT(*) as total, COUNT(protein_g) as with_protein, COUNT(fat_g) as with_fat, COUNT(carbs_g) as with_carbs FROM nutrition_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) counts = cur.fetchone() total_entries = counts['total'] protein_coverage = counts['with_protein'] / total_entries if total_entries > 0 else 0 macro_coverage = min(counts['with_fat'], counts['with_carbs']) / total_entries if total_entries > 0 else 0 # Score components frequency_score = min(100, (total_entries / 21) * 100) # 21 = 75% of 28 days protein_score = protein_coverage * 100 macro_score = macro_coverage * 100 # Overall score (frequency 50%, protein 30%, macros 20%) overall_score = int( frequency_score * 0.5 + protein_score * 0.3 + macro_score * 0.2 ) # Confidence level if overall_score >= 80: confidence = "high" elif overall_score >= 60: confidence = "medium" else: confidence = "low" return { "overall_score": overall_score, "confidence": confidence, "measurements": { "entries_28d": total_entries, "protein_coverage_pct": int(protein_coverage * 100), "macro_coverage_pct": int(macro_coverage * 100) }, "component_scores": { "frequency": int(frequency_score), "protein": int(protein_score), "macros": int(macro_score) } }