""" Body Metrics Calculation Engine Implements K1-K5 from visualization concept: - K1: Weight trend + goal projection - K2: Weight/FM/LBM multi-line chart - K3: Circumference panel - K4: Recomposition detector - K5: Body progress score (goal-mode dependent) All calculations include data quality/confidence assessment. """ from datetime import datetime, timedelta from typing import Optional, Dict, Tuple import statistics from db import get_db, get_cursor # ============================================================================ # K1: Weight Trend Calculations # ============================================================================ def calculate_weight_7d_median(profile_id: str) -> Optional[float]: """Calculate 7-day median weight (reduces daily noise)""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT weight FROM weight_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' ORDER BY date DESC """, (profile_id,)) weights = [row['weight'] for row in cur.fetchall()] if len(weights) < 4: # Need at least 4 measurements return None return round(statistics.median(weights), 1) def calculate_weight_28d_slope(profile_id: str) -> Optional[float]: """Calculate 28-day weight slope (kg/day)""" return _calculate_weight_slope(profile_id, days=28) def calculate_weight_90d_slope(profile_id: str) -> Optional[float]: """Calculate 90-day weight slope (kg/day)""" return _calculate_weight_slope(profile_id, days=90) def _calculate_weight_slope(profile_id: str, days: int) -> Optional[float]: """ Calculate weight slope using linear regression Returns kg/day (negative = weight loss) """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT date, weight FROM weight_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '%s days' ORDER BY date """, (profile_id, days)) data = [(row['date'], row['weight']) for row in cur.fetchall()] # Need minimum data points based on period min_points = max(18, int(days * 0.6)) # 60% coverage if len(data) < min_points: return None # Convert dates to days since start start_date = data[0][0] x_values = [(date - start_date).days for date, _ in data] y_values = [weight for _, weight in data] # Linear regression n = len(data) x_mean = sum(x_values) / n y_mean = sum(y_values) / n numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_values, y_values)) denominator = sum((x - x_mean) ** 2 for x in x_values) if denominator == 0: return None slope = numerator / denominator return round(slope, 4) # kg/day def calculate_goal_projection_date(profile_id: str, goal_id: str) -> Optional[str]: """ Calculate projected date to reach goal based on 28d trend Returns ISO date string or None if unrealistic """ from goal_utils import get_goal_by_id goal = get_goal_by_id(goal_id) if not goal or goal['goal_type'] != 'weight': return None slope = calculate_weight_28d_slope(profile_id) if not slope or slope == 0: return None current = goal['current_value'] target = goal['target_value'] remaining = target - current days_needed = remaining / slope # Unrealistic if >2 years or negative if days_needed < 0 or days_needed > 730: return None projection_date = datetime.now().date() + timedelta(days=int(days_needed)) return projection_date.isoformat() def calculate_goal_progress_pct(current: float, target: float, start: float) -> int: """ Calculate goal progress percentage Returns 0-100 (can exceed 100 if target surpassed) """ if start == target: return 100 if current == target else 0 progress = ((current - start) / (target - start)) * 100 return max(0, min(100, int(progress))) # ============================================================================ # K2: Fat Mass / Lean Mass Calculations # ============================================================================ def calculate_fm_28d_change(profile_id: str) -> Optional[float]: """Calculate 28-day fat mass change (kg)""" return _calculate_body_composition_change(profile_id, 'fm', 28) def calculate_lbm_28d_change(profile_id: str) -> Optional[float]: """Calculate 28-day lean body mass change (kg)""" return _calculate_body_composition_change(profile_id, 'lbm', 28) def _calculate_body_composition_change(profile_id: str, metric: str, days: int) -> Optional[float]: """ Calculate change in body composition over period metric: 'fm' (fat mass) or 'lbm' (lean mass) """ with get_db() as conn: cur = get_cursor(conn) # Get weight and caliper measurements cur.execute(""" SELECT w.date, w.weight, c.body_fat_pct FROM weight_log w LEFT JOIN caliper_log c ON w.profile_id = c.profile_id AND w.date = c.date WHERE w.profile_id = %s AND w.date >= CURRENT_DATE - INTERVAL '%s days' ORDER BY w.date DESC """, (profile_id, days)) data = [ { 'date': row['date'], 'weight': row['weight'], 'bf_pct': row['body_fat_pct'] } for row in cur.fetchall() if row['body_fat_pct'] is not None # Need BF% for composition ] if len(data) < 2: return None # Most recent and oldest measurement recent = data[0] oldest = data[-1] # Calculate FM and LBM recent_fm = recent['weight'] * (recent['bf_pct'] / 100) recent_lbm = recent['weight'] - recent_fm oldest_fm = oldest['weight'] * (oldest['bf_pct'] / 100) oldest_lbm = oldest['weight'] - oldest_fm if metric == 'fm': change = recent_fm - oldest_fm else: # lbm change = recent_lbm - oldest_lbm return round(change, 2) # ============================================================================ # K3: Circumference Calculations # ============================================================================ def calculate_waist_28d_delta(profile_id: str) -> Optional[float]: """Calculate 28-day waist circumference change (cm)""" return _calculate_circumference_delta(profile_id, 'c_waist', 28) def calculate_hip_28d_delta(profile_id: str) -> Optional[float]: """Calculate 28-day hip circumference change (cm)""" return _calculate_circumference_delta(profile_id, 'c_hip', 28) def calculate_chest_28d_delta(profile_id: str) -> Optional[float]: """Calculate 28-day chest circumference change (cm)""" return _calculate_circumference_delta(profile_id, 'c_chest', 28) def calculate_arm_28d_delta(profile_id: str) -> Optional[float]: """Calculate 28-day arm circumference change (cm)""" return _calculate_circumference_delta(profile_id, 'c_arm', 28) def calculate_thigh_28d_delta(profile_id: str) -> Optional[float]: """Calculate 28-day thigh circumference change (cm)""" delta = _calculate_circumference_delta(profile_id, 'c_thigh', 28) if delta is None: return None return round(delta, 1) def _calculate_circumference_delta(profile_id: str, column: str, days: int) -> Optional[float]: """Calculate change in circumference measurement""" with get_db() as conn: cur = get_cursor(conn) cur.execute(f""" SELECT {column} FROM circumference_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '%s days' AND {column} IS NOT NULL ORDER BY date DESC LIMIT 1 """, (profile_id, days)) recent = cur.fetchone() if not recent: return None cur.execute(f""" SELECT {column} FROM circumference_log WHERE profile_id = %s AND date < CURRENT_DATE - INTERVAL '%s days' AND {column} IS NOT NULL ORDER BY date DESC LIMIT 1 """, (profile_id, days)) oldest = cur.fetchone() if not oldest: return None change = recent[column] - oldest[column] return round(change, 1) def calculate_waist_hip_ratio(profile_id: str) -> Optional[float]: """Calculate current waist-to-hip ratio""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT c_waist, c_hip FROM circumference_log WHERE profile_id = %s AND c_waist IS NOT NULL AND c_hip IS NOT NULL ORDER BY date DESC LIMIT 1 """, (profile_id,)) row = cur.fetchone() if not row: return None ratio = row['c_waist'] / row['c_hip'] return round(ratio, 3) # ============================================================================ # K4: Recomposition Detector # ============================================================================ def calculate_recomposition_quadrant(profile_id: str) -> Optional[str]: """ Determine recomposition quadrant based on 28d changes: - optimal: FM down, LBM up - cut_with_risk: FM down, LBM down - bulk: FM up, LBM up - unfavorable: FM up, LBM down """ fm_change = calculate_fm_28d_change(profile_id) lbm_change = calculate_lbm_28d_change(profile_id) if fm_change is None or lbm_change is None: return None if fm_change < 0 and lbm_change > 0: return "optimal" elif fm_change < 0 and lbm_change < 0: return "cut_with_risk" elif fm_change > 0 and lbm_change > 0: return "bulk" else: # fm_change > 0 and lbm_change < 0 return "unfavorable" # ============================================================================ # K5: Body Progress Score (Dynamic Focus Areas) # ============================================================================ def calculate_body_progress_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]: """ Calculate body progress score (0-100) weighted by user's focus areas Components: - Weight trend alignment with goals - FM/LBM changes (recomposition quality) - Circumference changes (especially waist) - Goal progress percentage Weighted dynamically based on user's focus area priorities """ if focus_weights is None: from calculations.scores import get_user_focus_weights focus_weights = get_user_focus_weights(profile_id) # Get all body-related focus area weights (English keys from DB) weight_loss = focus_weights.get('weight_loss', 0) muscle_gain = focus_weights.get('muscle_gain', 0) body_recomp = focus_weights.get('body_recomposition', 0) total_body_weight = weight_loss + muscle_gain + body_recomp if total_body_weight == 0: return None # No body-related goals # Calculate component scores (0-100) components = [] # Weight trend component (if weight loss goal active) if weight_loss > 0: weight_score = _score_weight_trend(profile_id) if weight_score is not None: components.append(('weight', weight_score, weight_loss)) # Body composition component (if muscle gain or recomp goal active) if muscle_gain > 0 or body_recomp > 0: comp_score = _score_body_composition(profile_id) if comp_score is not None: components.append(('composition', comp_score, muscle_gain + body_recomp)) # Waist circumference component (proxy for health) waist_score = _score_waist_trend(profile_id) if waist_score is not None: # Waist gets 20% base weight + bonus from weight loss goals waist_weight = 20 + (weight_loss * 0.3) components.append(('waist', waist_score, waist_weight)) if not components: return None # Weighted average total_score = sum(score * weight for _, score, weight in components) total_weight = sum(weight for _, _, weight in components) return int(total_score / total_weight) def _score_weight_trend(profile_id: str) -> Optional[int]: """Score weight trend alignment with goals (0-100)""" from goal_utils import get_active_goals goals = get_active_goals(profile_id) weight_goals = [g for g in goals if g.get('goal_type') == 'weight'] if not weight_goals: return None # Use primary or first active goal goal = next((g for g in weight_goals if g.get('is_primary')), weight_goals[0]) current = goal.get('current_value') target = goal.get('target_value') start = goal.get('start_value') if None in [current, target]: return None # Convert Decimal to float (PostgreSQL NUMERIC returns Decimal) current = float(current) target = float(target) # If no start_value, use oldest weight in last 90 days if start is None: with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT weight FROM weight_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '90 days' ORDER BY date ASC LIMIT 1 """, (profile_id,)) row = cur.fetchone() start = float(row['weight']) if row else current else: start = float(start) # Progress percentage progress_pct = calculate_goal_progress_pct(current, target, start) # Bonus/penalty based on trend slope = calculate_weight_28d_slope(profile_id) if slope is not None: desired_direction = -1 if target < start else 1 actual_direction = -1 if slope < 0 else 1 if desired_direction == actual_direction: # Moving in right direction score = min(100, progress_pct + 10) else: # Moving in wrong direction score = max(0, progress_pct - 20) else: score = progress_pct return int(score) def _score_body_composition(profile_id: str) -> Optional[int]: """Score body composition changes (0-100)""" fm_change = calculate_fm_28d_change(profile_id) lbm_change = calculate_lbm_28d_change(profile_id) if fm_change is None or lbm_change is None: return None quadrant = calculate_recomposition_quadrant(profile_id) # Scoring by quadrant if quadrant == "optimal": return 100 elif quadrant == "cut_with_risk": # Penalty proportional to LBM loss penalty = min(30, abs(lbm_change) * 15) return max(50, 80 - int(penalty)) elif quadrant == "bulk": # Score based on FM/LBM ratio if lbm_change > 0 and fm_change > 0: ratio = lbm_change / fm_change if ratio >= 3: # 3:1 LBM:FM = excellent bulk return 90 elif ratio >= 2: return 75 elif ratio >= 1: return 60 else: return 45 return 60 else: # unfavorable return 20 def _score_waist_trend(profile_id: str) -> Optional[int]: """Score waist circumference trend (0-100)""" delta = calculate_waist_28d_delta(profile_id) if delta is None: return None # Waist reduction is almost always positive if delta <= -3: # >3cm reduction return 100 elif delta <= -2: return 90 elif delta <= -1: return 80 elif delta <= 0: return 70 elif delta <= 1: return 55 elif delta <= 2: return 40 else: # >2cm increase return 20 # ============================================================================ # Data Quality Assessment # ============================================================================ def calculate_body_data_quality(profile_id: str) -> Dict[str, any]: """ Assess data quality for body metrics Returns dict with quality score and details """ with get_db() as conn: cur = get_cursor(conn) # Weight measurement frequency (last 28 days) cur.execute(""" SELECT COUNT(*) as count FROM weight_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) weight_count = cur.fetchone()['count'] # Caliper measurement frequency (last 28 days) cur.execute(""" SELECT COUNT(*) as count FROM caliper_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) caliper_count = cur.fetchone()['count'] # Circumference measurement frequency (last 28 days) cur.execute(""" SELECT COUNT(*) as count FROM circumference_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) circ_count = cur.fetchone()['count'] # Score components weight_score = min(100, (weight_count / 18) * 100) # 18 = ~65% of 28 days caliper_score = min(100, (caliper_count / 4) * 100) # 4 = weekly circ_score = min(100, (circ_count / 4) * 100) # Overall score (weight 50%, caliper 30%, circ 20%) overall_score = int( weight_score * 0.5 + caliper_score * 0.3 + circ_score * 0.2 ) # Confidence level if overall_score >= 80: confidence = "high" elif overall_score >= 60: confidence = "medium" else: confidence = "low" return { "overall_score": overall_score, "confidence": confidence, "measurements": { "weight_28d": weight_count, "caliper_28d": caliper_count, "circumference_28d": circ_count }, "component_scores": { "weight": int(weight_score), "caliper": int(caliper_score), "circumference": int(circ_score) } }