mitai-jinkendo/backend/calculations/body_metrics.py
Lars dd3a4111fc
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
fix: Phase 0b - fix remaining calculation errors
Fixes applied:
1. WHERE clause column names (total_sleep_min → duration_minutes, resting_heart_rate → resting_hr)
2. COUNT() column names (avg_heart_rate → hr_avg, quality_label → rpe)
3. Type errors (Decimal * float) - convert to float before multiplication
4. rest_days table (type column removed in migration 010, now uses rest_config JSONB)
5. c_thigh_l → c_thigh (no separate left/right columns)
6. focus_area_definitions queries (focus_area_id → key, label_de → name_de)

Missing functions implemented:
- goal_utils.get_active_goals() - queries goals table for active goals
- goal_utils.get_goal_by_id() - gets single goal
- calculations.scores.calculate_category_progress() - maps categories to score functions

Changes:
- activity_metrics.py: Fixed Decimal/float type errors, rest_config JSONB, data quality query
- recovery_metrics.py: Fixed all WHERE clause column names
- body_metrics.py: Fixed c_thigh column reference
- scores.py: Fixed focus_area queries, added calculate_category_progress()
- goal_utils.py: Added get_active_goals(), get_goal_by_id()

All calculation functions should now work with correct schema.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-28 08:39:31 +01:00

555 lines
17 KiB
Python

"""
Body Metrics Calculation Engine
Implements K1-K5 from visualization concept:
- K1: Weight trend + goal projection
- K2: Weight/FM/LBM multi-line chart
- K3: Circumference panel
- K4: Recomposition detector
- K5: Body progress score (goal-mode dependent)
All calculations include data quality/confidence assessment.
"""
from datetime import datetime, timedelta
from typing import Optional, Dict, Tuple
import statistics
from db import get_db, get_cursor
# ============================================================================
# K1: Weight Trend Calculations
# ============================================================================
def calculate_weight_7d_median(profile_id: str) -> Optional[float]:
"""Calculate 7-day median weight (reduces daily noise)"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT weight
FROM weight_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY date DESC
""", (profile_id,))
weights = [row['weight'] for row in cur.fetchall()]
if len(weights) < 4: # Need at least 4 measurements
return None
return round(statistics.median(weights), 1)
def calculate_weight_28d_slope(profile_id: str) -> Optional[float]:
"""Calculate 28-day weight slope (kg/day)"""
return _calculate_weight_slope(profile_id, days=28)
def calculate_weight_90d_slope(profile_id: str) -> Optional[float]:
"""Calculate 90-day weight slope (kg/day)"""
return _calculate_weight_slope(profile_id, days=90)
def _calculate_weight_slope(profile_id: str, days: int) -> Optional[float]:
"""
Calculate weight slope using linear regression
Returns kg/day (negative = weight loss)
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT date, weight
FROM weight_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date
""", (profile_id, days))
data = [(row['date'], row['weight']) for row in cur.fetchall()]
# Need minimum data points based on period
min_points = max(18, int(days * 0.6)) # 60% coverage
if len(data) < min_points:
return None
# Convert dates to days since start
start_date = data[0][0]
x_values = [(date - start_date).days for date, _ in data]
y_values = [weight for _, weight in data]
# Linear regression
n = len(data)
x_mean = sum(x_values) / n
y_mean = sum(y_values) / n
numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_values, y_values))
denominator = sum((x - x_mean) ** 2 for x in x_values)
if denominator == 0:
return None
slope = numerator / denominator
return round(slope, 4) # kg/day
def calculate_goal_projection_date(profile_id: str, goal_id: str) -> Optional[str]:
"""
Calculate projected date to reach goal based on 28d trend
Returns ISO date string or None if unrealistic
"""
from goal_utils import get_goal_by_id
goal = get_goal_by_id(goal_id)
if not goal or goal['goal_type'] != 'weight':
return None
slope = calculate_weight_28d_slope(profile_id)
if not slope or slope == 0:
return None
current = goal['current_value']
target = goal['target_value']
remaining = target - current
days_needed = remaining / slope
# Unrealistic if >2 years or negative
if days_needed < 0 or days_needed > 730:
return None
projection_date = datetime.now().date() + timedelta(days=int(days_needed))
return projection_date.isoformat()
def calculate_goal_progress_pct(current: float, target: float, start: float) -> int:
"""
Calculate goal progress percentage
Returns 0-100 (can exceed 100 if target surpassed)
"""
if start == target:
return 100 if current == target else 0
progress = ((current - start) / (target - start)) * 100
return max(0, min(100, int(progress)))
# ============================================================================
# K2: Fat Mass / Lean Mass Calculations
# ============================================================================
def calculate_fm_28d_change(profile_id: str) -> Optional[float]:
"""Calculate 28-day fat mass change (kg)"""
return _calculate_body_composition_change(profile_id, 'fm', 28)
def calculate_lbm_28d_change(profile_id: str) -> Optional[float]:
"""Calculate 28-day lean body mass change (kg)"""
return _calculate_body_composition_change(profile_id, 'lbm', 28)
def _calculate_body_composition_change(profile_id: str, metric: str, days: int) -> Optional[float]:
"""
Calculate change in body composition over period
metric: 'fm' (fat mass) or 'lbm' (lean mass)
"""
with get_db() as conn:
cur = get_cursor(conn)
# Get weight and caliper measurements
cur.execute("""
SELECT w.date, w.weight, c.body_fat_pct
FROM weight_log w
LEFT JOIN caliper_log c ON w.profile_id = c.profile_id
AND w.date = c.date
WHERE w.profile_id = %s
AND w.date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY w.date DESC
""", (profile_id, days))
data = [
{
'date': row['date'],
'weight': row['weight'],
'bf_pct': row['body_fat_pct']
}
for row in cur.fetchall()
if row['body_fat_pct'] is not None # Need BF% for composition
]
if len(data) < 2:
return None
# Most recent and oldest measurement
recent = data[0]
oldest = data[-1]
# Calculate FM and LBM
recent_fm = recent['weight'] * (recent['bf_pct'] / 100)
recent_lbm = recent['weight'] - recent_fm
oldest_fm = oldest['weight'] * (oldest['bf_pct'] / 100)
oldest_lbm = oldest['weight'] - oldest_fm
if metric == 'fm':
change = recent_fm - oldest_fm
else: # lbm
change = recent_lbm - oldest_lbm
return round(change, 2)
# ============================================================================
# K3: Circumference Calculations
# ============================================================================
def calculate_waist_28d_delta(profile_id: str) -> Optional[float]:
"""Calculate 28-day waist circumference change (cm)"""
return _calculate_circumference_delta(profile_id, 'c_waist', 28)
def calculate_hip_28d_delta(profile_id: str) -> Optional[float]:
"""Calculate 28-day hip circumference change (cm)"""
return _calculate_circumference_delta(profile_id, 'c_hip', 28)
def calculate_chest_28d_delta(profile_id: str) -> Optional[float]:
"""Calculate 28-day chest circumference change (cm)"""
return _calculate_circumference_delta(profile_id, 'c_chest', 28)
def calculate_arm_28d_delta(profile_id: str) -> Optional[float]:
"""Calculate 28-day arm circumference change (cm)"""
return _calculate_circumference_delta(profile_id, 'c_arm', 28)
def calculate_thigh_28d_delta(profile_id: str) -> Optional[float]:
"""Calculate 28-day thigh circumference change (cm, average of L/R)"""
left = _calculate_circumference_delta(profile_id, 'c_thigh', 28)
right = _calculate_circumference_delta(profile_id, 'c_thigh_r', 28)
if left is None or right is None:
return None
return round((left + right) / 2, 1)
def _calculate_circumference_delta(profile_id: str, column: str, days: int) -> Optional[float]:
"""Calculate change in circumference measurement"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(f"""
SELECT {column}
FROM circumference_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
AND {column} IS NOT NULL
ORDER BY date DESC
LIMIT 1
""", (profile_id, days))
recent = cur.fetchone()
if not recent:
return None
cur.execute(f"""
SELECT {column}
FROM circumference_log
WHERE profile_id = %s
AND date < CURRENT_DATE - INTERVAL '%s days'
AND {column} IS NOT NULL
ORDER BY date DESC
LIMIT 1
""", (profile_id, days))
oldest = cur.fetchone()
if not oldest:
return None
change = recent[column] - oldest[column]
return round(change, 1)
def calculate_waist_hip_ratio(profile_id: str) -> Optional[float]:
"""Calculate current waist-to-hip ratio"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT c_waist, c_hip
FROM circumference_log
WHERE profile_id = %s
AND c_waist IS NOT NULL
AND c_hip IS NOT NULL
ORDER BY date DESC
LIMIT 1
""", (profile_id,))
row = cur.fetchone()
if not row:
return None
ratio = row['c_waist'] / row['c_hip']
return round(ratio, 3)
# ============================================================================
# K4: Recomposition Detector
# ============================================================================
def calculate_recomposition_quadrant(profile_id: str) -> Optional[str]:
"""
Determine recomposition quadrant based on 28d changes:
- optimal: FM down, LBM up
- cut_with_risk: FM down, LBM down
- bulk: FM up, LBM up
- unfavorable: FM up, LBM down
"""
fm_change = calculate_fm_28d_change(profile_id)
lbm_change = calculate_lbm_28d_change(profile_id)
if fm_change is None or lbm_change is None:
return None
if fm_change < 0 and lbm_change > 0:
return "optimal"
elif fm_change < 0 and lbm_change < 0:
return "cut_with_risk"
elif fm_change > 0 and lbm_change > 0:
return "bulk"
else: # fm_change > 0 and lbm_change < 0
return "unfavorable"
# ============================================================================
# K5: Body Progress Score (Dynamic Focus Areas)
# ============================================================================
def calculate_body_progress_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]:
"""
Calculate body progress score (0-100) weighted by user's focus areas
Components:
- Weight trend alignment with goals
- FM/LBM changes (recomposition quality)
- Circumference changes (especially waist)
- Goal progress percentage
Weighted dynamically based on user's focus area priorities
"""
if focus_weights is None:
from calculations.scores import get_user_focus_weights
focus_weights = get_user_focus_weights(profile_id)
# Get all body-related focus area weights
body_weight = focus_weights.get('körpergewicht', 0)
body_fat_weight = focus_weights.get('körperfett', 0)
muscle_weight = focus_weights.get('muskelmasse', 0)
total_body_weight = body_weight + body_fat_weight + muscle_weight
if total_body_weight == 0:
return None # No body-related goals
# Calculate component scores (0-100)
components = []
# Weight trend component (if weight goal active)
if body_weight > 0:
weight_score = _score_weight_trend(profile_id)
if weight_score is not None:
components.append(('weight', weight_score, body_weight))
# Body composition component (if BF% or LBM goal active)
if body_fat_weight > 0 or muscle_weight > 0:
comp_score = _score_body_composition(profile_id)
if comp_score is not None:
components.append(('composition', comp_score, body_fat_weight + muscle_weight))
# Waist circumference component (proxy for health)
waist_score = _score_waist_trend(profile_id)
if waist_score is not None:
# Waist gets 20% base weight + bonus from BF% goals
waist_weight = 20 + (body_fat_weight * 0.3)
components.append(('waist', waist_score, waist_weight))
if not components:
return None
# Weighted average
total_score = sum(score * weight for _, score, weight in components)
total_weight = sum(weight for _, _, weight in components)
return int(total_score / total_weight)
def _score_weight_trend(profile_id: str) -> Optional[int]:
"""Score weight trend alignment with goals (0-100)"""
from goal_utils import get_goals_by_type
goals = get_goals_by_type(profile_id, 'weight')
if not goals:
return None
# Use primary or first active goal
goal = next((g for g in goals if g.get('is_primary')), goals[0])
current = goal.get('current_value')
target = goal.get('target_value')
start = goal.get('start_value', current)
if None in [current, target, start]:
return None
# Progress percentage
progress_pct = calculate_goal_progress_pct(current, target, start)
# Bonus/penalty based on trend
slope = calculate_weight_28d_slope(profile_id)
if slope is not None:
desired_direction = -1 if target < start else 1
actual_direction = -1 if slope < 0 else 1
if desired_direction == actual_direction:
# Moving in right direction
score = min(100, progress_pct + 10)
else:
# Moving in wrong direction
score = max(0, progress_pct - 20)
else:
score = progress_pct
return int(score)
def _score_body_composition(profile_id: str) -> Optional[int]:
"""Score body composition changes (0-100)"""
fm_change = calculate_fm_28d_change(profile_id)
lbm_change = calculate_lbm_28d_change(profile_id)
if fm_change is None or lbm_change is None:
return None
quadrant = calculate_recomposition_quadrant(profile_id)
# Scoring by quadrant
if quadrant == "optimal":
return 100
elif quadrant == "cut_with_risk":
# Penalty proportional to LBM loss
penalty = min(30, abs(lbm_change) * 15)
return max(50, 80 - int(penalty))
elif quadrant == "bulk":
# Score based on FM/LBM ratio
if lbm_change > 0 and fm_change > 0:
ratio = lbm_change / fm_change
if ratio >= 3: # 3:1 LBM:FM = excellent bulk
return 90
elif ratio >= 2:
return 75
elif ratio >= 1:
return 60
else:
return 45
return 60
else: # unfavorable
return 20
def _score_waist_trend(profile_id: str) -> Optional[int]:
"""Score waist circumference trend (0-100)"""
delta = calculate_waist_28d_delta(profile_id)
if delta is None:
return None
# Waist reduction is almost always positive
if delta <= -3: # >3cm reduction
return 100
elif delta <= -2:
return 90
elif delta <= -1:
return 80
elif delta <= 0:
return 70
elif delta <= 1:
return 55
elif delta <= 2:
return 40
else: # >2cm increase
return 20
# ============================================================================
# Data Quality Assessment
# ============================================================================
def calculate_body_data_quality(profile_id: str) -> Dict[str, any]:
"""
Assess data quality for body metrics
Returns dict with quality score and details
"""
with get_db() as conn:
cur = get_cursor(conn)
# Weight measurement frequency (last 28 days)
cur.execute("""
SELECT COUNT(*) as count
FROM weight_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
weight_count = cur.fetchone()['count']
# Caliper measurement frequency (last 28 days)
cur.execute("""
SELECT COUNT(*) as count
FROM caliper_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
caliper_count = cur.fetchone()['count']
# Circumference measurement frequency (last 28 days)
cur.execute("""
SELECT COUNT(*) as count
FROM circumference_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
circ_count = cur.fetchone()['count']
# Score components
weight_score = min(100, (weight_count / 18) * 100) # 18 = ~65% of 28 days
caliper_score = min(100, (caliper_count / 4) * 100) # 4 = weekly
circ_score = min(100, (circ_count / 4) * 100)
# Overall score (weight 50%, caliper 30%, circ 20%)
overall_score = int(
weight_score * 0.5 +
caliper_score * 0.3 +
circ_score * 0.2
)
# Confidence level
if overall_score >= 80:
confidence = "high"
elif overall_score >= 60:
confidence = "medium"
else:
confidence = "low"
return {
"overall_score": overall_score,
"confidence": confidence,
"measurements": {
"weight_28d": weight_count,
"caliper_28d": caliper_count,
"circumference_28d": circ_count
},
"component_scores": {
"weight": int(weight_score),
"caliper": int(caliper_score),
"circumference": int(circ_score)
}
}