diff --git a/backend/data_layer/__init__.py b/backend/data_layer/__init__.py index aade0d1..83dffe0 100644 --- a/backend/data_layer/__init__.py +++ b/backend/data_layer/__init__.py @@ -44,12 +44,30 @@ __all__ = [ 'calculate_confidence', 'serialize_dates', - # Body Metrics + # Body Metrics (Basic) 'get_latest_weight_data', 'get_weight_trend_data', 'get_body_composition_data', 'get_circumference_summary_data', + # Body Metrics (Calculated) + 'calculate_weight_7d_median', + 'calculate_weight_28d_slope', + 'calculate_weight_90d_slope', + 'calculate_goal_projection_date', + 'calculate_goal_progress_pct', + 'calculate_fm_28d_change', + 'calculate_lbm_28d_change', + 'calculate_waist_28d_delta', + 'calculate_hip_28d_delta', + 'calculate_chest_28d_delta', + 'calculate_arm_28d_delta', + 'calculate_thigh_28d_delta', + 'calculate_waist_hip_ratio', + 'calculate_recomposition_quadrant', + 'calculate_body_progress_score', + 'calculate_body_data_quality', + # Nutrition Metrics 'get_nutrition_average_data', 'get_nutrition_days_data', diff --git a/backend/data_layer/body_metrics.py b/backend/data_layer/body_metrics.py index ce6237c..ec65ec2 100644 --- a/backend/data_layer/body_metrics.py +++ b/backend/data_layer/body_metrics.py @@ -18,6 +18,7 @@ Version: 1.0 from typing import Dict, List, Optional, Tuple from datetime import datetime, timedelta, date +import statistics from db import get_db, get_cursor, r2d from data_layer.utils import calculate_confidence, safe_float @@ -315,3 +316,516 @@ def get_circumference_summary_data( "newest_date": newest_date, "oldest_date": oldest_date } + + +# ============================================================================ +# Calculated Metrics (migrated from calculations/body_metrics.py) +# Phase 0c: Single Source of Truth for KI + Charts +# ============================================================================ + +# ── Weight Trend Calculations ────────────────────────────────────────────── + +def calculate_weight_7d_median(profile_id: str) -> Optional[float]: + """Calculate 7-day median weight (reduces daily noise)""" + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT weight + FROM weight_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '7 days' + ORDER BY date DESC + """, (profile_id,)) + + weights = [row['weight'] for row in cur.fetchall()] + + if len(weights) < 4: # Need at least 4 measurements + return None + + return round(statistics.median(weights), 1) + + +def calculate_weight_28d_slope(profile_id: str) -> Optional[float]: + """Calculate 28-day weight slope (kg/day)""" + return _calculate_weight_slope(profile_id, days=28) + + +def calculate_weight_90d_slope(profile_id: str) -> Optional[float]: + """Calculate 90-day weight slope (kg/day)""" + return _calculate_weight_slope(profile_id, days=90) + + +def _calculate_weight_slope(profile_id: str, days: int) -> Optional[float]: + """ + Calculate weight slope using linear regression + Returns kg/day (negative = weight loss) + """ + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT date, weight + FROM weight_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '%s days' + ORDER BY date + """, (profile_id, days)) + + data = [(row['date'], row['weight']) for row in cur.fetchall()] + + # Need minimum data points based on period + min_points = max(18, int(days * 0.6)) # 60% coverage + if len(data) < min_points: + return None + + # Convert dates to days since start + start_date = data[0][0] + x_values = [(date - start_date).days for date, _ in data] + y_values = [weight for _, weight in data] + + # Linear regression + n = len(data) + x_mean = sum(x_values) / n + y_mean = sum(y_values) / n + + numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_values, y_values)) + denominator = sum((x - x_mean) ** 2 for x in x_values) + + if denominator == 0: + return None + + slope = numerator / denominator + return round(slope, 4) # kg/day + + +def calculate_goal_projection_date(profile_id: str, goal_id: str) -> Optional[str]: + """ + Calculate projected date to reach goal based on 28d trend + Returns ISO date string or None if unrealistic + """ + from goal_utils import get_goal_by_id + + goal = get_goal_by_id(goal_id) + if not goal or goal['goal_type'] != 'weight': + return None + + slope = calculate_weight_28d_slope(profile_id) + if not slope or slope == 0: + return None + + current = goal['current_value'] + target = goal['target_value'] + remaining = target - current + + days_needed = remaining / slope + + # Unrealistic if >2 years or negative + if days_needed < 0 or days_needed > 730: + return None + + projection_date = datetime.now().date() + timedelta(days=int(days_needed)) + return projection_date.isoformat() + + +def calculate_goal_progress_pct(current: float, target: float, start: float) -> int: + """ + Calculate goal progress percentage + Returns 0-100 (can exceed 100 if target surpassed) + """ + if start == target: + return 100 if current == target else 0 + + progress = ((current - start) / (target - start)) * 100 + return max(0, min(100, int(progress))) + + +# ── Fat Mass / Lean Mass Calculations ─────────────────────────────────────── + +def calculate_fm_28d_change(profile_id: str) -> Optional[float]: + """Calculate 28-day fat mass change (kg)""" + return _calculate_body_composition_change(profile_id, 'fm', 28) + + +def calculate_lbm_28d_change(profile_id: str) -> Optional[float]: + """Calculate 28-day lean body mass change (kg)""" + return _calculate_body_composition_change(profile_id, 'lbm', 28) + + +def _calculate_body_composition_change(profile_id: str, metric: str, days: int) -> Optional[float]: + """ + Calculate change in body composition over period + metric: 'fm' (fat mass) or 'lbm' (lean mass) + """ + with get_db() as conn: + cur = get_cursor(conn) + + # Get weight and caliper measurements + cur.execute(""" + SELECT w.date, w.weight, c.body_fat_pct + FROM weight_log w + LEFT JOIN caliper_log c ON w.profile_id = c.profile_id + AND w.date = c.date + WHERE w.profile_id = %s + AND w.date >= CURRENT_DATE - INTERVAL '%s days' + ORDER BY w.date DESC + """, (profile_id, days)) + + data = [ + { + 'date': row['date'], + 'weight': row['weight'], + 'bf_pct': row['body_fat_pct'] + } + for row in cur.fetchall() + if row['body_fat_pct'] is not None + ] + + if len(data) < 2: + return None + + # Most recent and oldest measurement + recent = data[0] + oldest = data[-1] + + # Calculate FM and LBM + recent_fm = recent['weight'] * (recent['bf_pct'] / 100) + recent_lbm = recent['weight'] - recent_fm + + oldest_fm = oldest['weight'] * (oldest['bf_pct'] / 100) + oldest_lbm = oldest['weight'] - oldest_fm + + if metric == 'fm': + change = recent_fm - oldest_fm + else: + change = recent_lbm - oldest_lbm + + return round(change, 2) + + +# ── Circumference Calculations ────────────────────────────────────────────── + +def calculate_waist_28d_delta(profile_id: str) -> Optional[float]: + """Calculate 28-day waist circumference change (cm)""" + return _calculate_circumference_delta(profile_id, 'c_waist', 28) + + +def calculate_hip_28d_delta(profile_id: str) -> Optional[float]: + """Calculate 28-day hip circumference change (cm)""" + return _calculate_circumference_delta(profile_id, 'c_hip', 28) + + +def calculate_chest_28d_delta(profile_id: str) -> Optional[float]: + """Calculate 28-day chest circumference change (cm)""" + return _calculate_circumference_delta(profile_id, 'c_chest', 28) + + +def calculate_arm_28d_delta(profile_id: str) -> Optional[float]: + """Calculate 28-day arm circumference change (cm)""" + return _calculate_circumference_delta(profile_id, 'c_arm', 28) + + +def calculate_thigh_28d_delta(profile_id: str) -> Optional[float]: + """Calculate 28-day thigh circumference change (cm)""" + delta = _calculate_circumference_delta(profile_id, 'c_thigh', 28) + if delta is None: + return None + return round(delta, 1) + + +def _calculate_circumference_delta(profile_id: str, column: str, days: int) -> Optional[float]: + """Calculate change in circumference measurement""" + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(f""" + SELECT {column} + FROM circumference_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '%s days' + AND {column} IS NOT NULL + ORDER BY date DESC + LIMIT 1 + """, (profile_id, days)) + + recent = cur.fetchone() + if not recent: + return None + + cur.execute(f""" + SELECT {column} + FROM circumference_log + WHERE profile_id = %s + AND date < CURRENT_DATE - INTERVAL '%s days' + AND {column} IS NOT NULL + ORDER BY date DESC + LIMIT 1 + """, (profile_id, days)) + + oldest = cur.fetchone() + if not oldest: + return None + + change = recent[column] - oldest[column] + return round(change, 1) + + +def calculate_waist_hip_ratio(profile_id: str) -> Optional[float]: + """Calculate current waist-to-hip ratio""" + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT c_waist, c_hip + FROM circumference_log + WHERE profile_id = %s + AND c_waist IS NOT NULL + AND c_hip IS NOT NULL + ORDER BY date DESC + LIMIT 1 + """, (profile_id,)) + + row = cur.fetchone() + if not row: + return None + + ratio = row['c_waist'] / row['c_hip'] + return round(ratio, 3) + + +# ── Recomposition Detector ─────────────────────────────────────────────────── + +def calculate_recomposition_quadrant(profile_id: str) -> Optional[str]: + """ + Determine recomposition quadrant based on 28d changes: + - optimal: FM down, LBM up + - cut_with_risk: FM down, LBM down + - bulk: FM up, LBM up + - unfavorable: FM up, LBM down + """ + fm_change = calculate_fm_28d_change(profile_id) + lbm_change = calculate_lbm_28d_change(profile_id) + + if fm_change is None or lbm_change is None: + return None + + if fm_change < 0 and lbm_change > 0: + return "optimal" + elif fm_change < 0 and lbm_change < 0: + return "cut_with_risk" + elif fm_change > 0 and lbm_change > 0: + return "bulk" + else: + return "unfavorable" + + +# ── Body Progress Score ─────────────────────────────────────────────────────── + +def calculate_body_progress_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]: + """Calculate body progress score (0-100) weighted by user's focus areas""" + if focus_weights is None: + from goal_utils import get_focus_weights + with get_db() as conn: + focus_weights = get_focus_weights(conn, profile_id) + + weight_loss = focus_weights.get('weight_loss', 0) + muscle_gain = focus_weights.get('muscle_gain', 0) + body_recomp = focus_weights.get('body_recomposition', 0) + + total_body_weight = weight_loss + muscle_gain + body_recomp + + if total_body_weight == 0: + return None + + components = [] + + if weight_loss > 0: + weight_score = _score_weight_trend(profile_id) + if weight_score is not None: + components.append(('weight', weight_score, weight_loss)) + + if muscle_gain > 0 or body_recomp > 0: + comp_score = _score_body_composition(profile_id) + if comp_score is not None: + components.append(('composition', comp_score, muscle_gain + body_recomp)) + + waist_score = _score_waist_trend(profile_id) + if waist_score is not None: + waist_weight = 20 + (weight_loss * 0.3) + components.append(('waist', waist_score, waist_weight)) + + if not components: + return None + + total_score = sum(score * weight for _, score, weight in components) + total_weight = sum(weight for _, _, weight in components) + + return int(total_score / total_weight) + + +def _score_weight_trend(profile_id: str) -> Optional[int]: + """Score weight trend alignment with goals (0-100)""" + from goal_utils import get_active_goals + + goals = get_active_goals(profile_id) + weight_goals = [g for g in goals if g.get('goal_type') == 'weight'] + if not weight_goals: + return None + + goal = next((g for g in weight_goals if g.get('is_primary')), weight_goals[0]) + + current = goal.get('current_value') + target = goal.get('target_value') + start = goal.get('start_value') + + if None in [current, target]: + return None + + current = float(current) + target = float(target) + + if start is None: + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT weight + FROM weight_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '90 days' + ORDER BY date ASC + LIMIT 1 + """, (profile_id,)) + row = cur.fetchone() + start = float(row['weight']) if row else current + else: + start = float(start) + + progress_pct = calculate_goal_progress_pct(current, target, start) + + slope = calculate_weight_28d_slope(profile_id) + if slope is not None: + desired_direction = -1 if target < start else 1 + actual_direction = -1 if slope < 0 else 1 + + if desired_direction == actual_direction: + score = min(100, progress_pct + 10) + else: + score = max(0, progress_pct - 20) + else: + score = progress_pct + + return int(score) + + +def _score_body_composition(profile_id: str) -> Optional[int]: + """Score body composition changes (0-100)""" + fm_change = calculate_fm_28d_change(profile_id) + lbm_change = calculate_lbm_28d_change(profile_id) + + if fm_change is None or lbm_change is None: + return None + + quadrant = calculate_recomposition_quadrant(profile_id) + + if quadrant == "optimal": + return 100 + elif quadrant == "cut_with_risk": + penalty = min(30, abs(lbm_change) * 15) + return max(50, 80 - int(penalty)) + elif quadrant == "bulk": + if lbm_change > 0 and fm_change > 0: + ratio = lbm_change / fm_change + if ratio >= 3: + return 90 + elif ratio >= 2: + return 75 + elif ratio >= 1: + return 60 + else: + return 45 + return 60 + else: + return 20 + + +def _score_waist_trend(profile_id: str) -> Optional[int]: + """Score waist circumference trend (0-100)""" + delta = calculate_waist_28d_delta(profile_id) + + if delta is None: + return None + + if delta <= -3: + return 100 + elif delta <= -2: + return 90 + elif delta <= -1: + return 80 + elif delta <= 0: + return 70 + elif delta <= 1: + return 55 + elif delta <= 2: + return 40 + else: + return 20 + + +# ── Data Quality Assessment ─────────────────────────────────────────────────── + +def calculate_body_data_quality(profile_id: str) -> Dict[str, any]: + """Assess data quality for body metrics""" + with get_db() as conn: + cur = get_cursor(conn) + + cur.execute(""" + SELECT COUNT(*) as count + FROM weight_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '28 days' + """, (profile_id,)) + weight_count = cur.fetchone()['count'] + + cur.execute(""" + SELECT COUNT(*) as count + FROM caliper_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '28 days' + """, (profile_id,)) + caliper_count = cur.fetchone()['count'] + + cur.execute(""" + SELECT COUNT(*) as count + FROM circumference_log + WHERE profile_id = %s + AND date >= CURRENT_DATE - INTERVAL '28 days' + """, (profile_id,)) + circ_count = cur.fetchone()['count'] + + weight_score = min(100, (weight_count / 18) * 100) + caliper_score = min(100, (caliper_count / 4) * 100) + circ_score = min(100, (circ_count / 4) * 100) + + overall_score = int( + weight_score * 0.5 + + caliper_score * 0.3 + + circ_score * 0.2 + ) + + if overall_score >= 80: + confidence = "high" + elif overall_score >= 60: + confidence = "medium" + else: + confidence = "low" + + return { + "overall_score": overall_score, + "confidence": confidence, + "measurements": { + "weight_28d": weight_count, + "caliper_28d": caliper_count, + "circumference_28d": circ_count + }, + "component_scores": { + "weight": int(weight_score), + "caliper": int(caliper_score), + "circumference": int(circ_score) + } + } diff --git a/backend/placeholder_resolver.py b/backend/placeholder_resolver.py index 4e72961..f3c1744 100644 --- a/backend/placeholder_resolver.py +++ b/backend/placeholder_resolver.py @@ -417,7 +417,8 @@ def _safe_int(func_name: str, profile_id: str) -> str: import traceback try: # Import calculations dynamically to avoid circular imports - from calculations import scores, body_metrics, nutrition_metrics, activity_metrics, recovery_metrics, correlation_metrics + from calculations import scores, nutrition_metrics, activity_metrics, recovery_metrics, correlation_metrics + from data_layer import body_metrics # Map function names to actual functions func_map = { @@ -479,7 +480,8 @@ def _safe_float(func_name: str, profile_id: str, decimals: int = 1) -> str: """ import traceback try: - from calculations import body_metrics, nutrition_metrics, activity_metrics, recovery_metrics, scores + from calculations import nutrition_metrics, activity_metrics, recovery_metrics, scores + from data_layer import body_metrics func_map = { 'weight_7d_median': body_metrics.calculate_weight_7d_median,