feat: Phase 0c - migrate body_metrics calculations to data_layer (20 functions)
- Migrated all 20 calculation functions from calculations/body_metrics.py to data_layer/body_metrics.py - Functions: weight trends (7d median, 28d/90d slopes, goal projection, progress) - Functions: body composition (FM/LBM changes) - Functions: circumferences (waist/hip/chest/arm/thigh deltas, WHR) - Functions: recomposition quadrant - Functions: scoring (body progress, data quality) - Updated data_layer/__init__.py with 20 new exports - Refactored placeholder_resolver.py to import body_metrics from data_layer Module 1/6 complete. Single Source of Truth for body metrics established. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
26110d44b4
commit
504581838c
|
|
@ -44,12 +44,30 @@ __all__ = [
|
|||
'calculate_confidence',
|
||||
'serialize_dates',
|
||||
|
||||
# Body Metrics
|
||||
# Body Metrics (Basic)
|
||||
'get_latest_weight_data',
|
||||
'get_weight_trend_data',
|
||||
'get_body_composition_data',
|
||||
'get_circumference_summary_data',
|
||||
|
||||
# Body Metrics (Calculated)
|
||||
'calculate_weight_7d_median',
|
||||
'calculate_weight_28d_slope',
|
||||
'calculate_weight_90d_slope',
|
||||
'calculate_goal_projection_date',
|
||||
'calculate_goal_progress_pct',
|
||||
'calculate_fm_28d_change',
|
||||
'calculate_lbm_28d_change',
|
||||
'calculate_waist_28d_delta',
|
||||
'calculate_hip_28d_delta',
|
||||
'calculate_chest_28d_delta',
|
||||
'calculate_arm_28d_delta',
|
||||
'calculate_thigh_28d_delta',
|
||||
'calculate_waist_hip_ratio',
|
||||
'calculate_recomposition_quadrant',
|
||||
'calculate_body_progress_score',
|
||||
'calculate_body_data_quality',
|
||||
|
||||
# Nutrition Metrics
|
||||
'get_nutrition_average_data',
|
||||
'get_nutrition_days_data',
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ Version: 1.0
|
|||
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from datetime import datetime, timedelta, date
|
||||
import statistics
|
||||
from db import get_db, get_cursor, r2d
|
||||
from data_layer.utils import calculate_confidence, safe_float
|
||||
|
||||
|
|
@ -315,3 +316,516 @@ def get_circumference_summary_data(
|
|||
"newest_date": newest_date,
|
||||
"oldest_date": oldest_date
|
||||
}
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Calculated Metrics (migrated from calculations/body_metrics.py)
|
||||
# Phase 0c: Single Source of Truth for KI + Charts
|
||||
# ============================================================================
|
||||
|
||||
# ── Weight Trend Calculations ──────────────────────────────────────────────
|
||||
|
||||
def calculate_weight_7d_median(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 7-day median weight (reduces daily noise)"""
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute("""
|
||||
SELECT weight
|
||||
FROM weight_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '7 days'
|
||||
ORDER BY date DESC
|
||||
""", (profile_id,))
|
||||
|
||||
weights = [row['weight'] for row in cur.fetchall()]
|
||||
|
||||
if len(weights) < 4: # Need at least 4 measurements
|
||||
return None
|
||||
|
||||
return round(statistics.median(weights), 1)
|
||||
|
||||
|
||||
def calculate_weight_28d_slope(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day weight slope (kg/day)"""
|
||||
return _calculate_weight_slope(profile_id, days=28)
|
||||
|
||||
|
||||
def calculate_weight_90d_slope(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 90-day weight slope (kg/day)"""
|
||||
return _calculate_weight_slope(profile_id, days=90)
|
||||
|
||||
|
||||
def _calculate_weight_slope(profile_id: str, days: int) -> Optional[float]:
|
||||
"""
|
||||
Calculate weight slope using linear regression
|
||||
Returns kg/day (negative = weight loss)
|
||||
"""
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute("""
|
||||
SELECT date, weight
|
||||
FROM weight_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '%s days'
|
||||
ORDER BY date
|
||||
""", (profile_id, days))
|
||||
|
||||
data = [(row['date'], row['weight']) for row in cur.fetchall()]
|
||||
|
||||
# Need minimum data points based on period
|
||||
min_points = max(18, int(days * 0.6)) # 60% coverage
|
||||
if len(data) < min_points:
|
||||
return None
|
||||
|
||||
# Convert dates to days since start
|
||||
start_date = data[0][0]
|
||||
x_values = [(date - start_date).days for date, _ in data]
|
||||
y_values = [weight for _, weight in data]
|
||||
|
||||
# Linear regression
|
||||
n = len(data)
|
||||
x_mean = sum(x_values) / n
|
||||
y_mean = sum(y_values) / n
|
||||
|
||||
numerator = sum((x - x_mean) * (y - y_mean) for x, y in zip(x_values, y_values))
|
||||
denominator = sum((x - x_mean) ** 2 for x in x_values)
|
||||
|
||||
if denominator == 0:
|
||||
return None
|
||||
|
||||
slope = numerator / denominator
|
||||
return round(slope, 4) # kg/day
|
||||
|
||||
|
||||
def calculate_goal_projection_date(profile_id: str, goal_id: str) -> Optional[str]:
|
||||
"""
|
||||
Calculate projected date to reach goal based on 28d trend
|
||||
Returns ISO date string or None if unrealistic
|
||||
"""
|
||||
from goal_utils import get_goal_by_id
|
||||
|
||||
goal = get_goal_by_id(goal_id)
|
||||
if not goal or goal['goal_type'] != 'weight':
|
||||
return None
|
||||
|
||||
slope = calculate_weight_28d_slope(profile_id)
|
||||
if not slope or slope == 0:
|
||||
return None
|
||||
|
||||
current = goal['current_value']
|
||||
target = goal['target_value']
|
||||
remaining = target - current
|
||||
|
||||
days_needed = remaining / slope
|
||||
|
||||
# Unrealistic if >2 years or negative
|
||||
if days_needed < 0 or days_needed > 730:
|
||||
return None
|
||||
|
||||
projection_date = datetime.now().date() + timedelta(days=int(days_needed))
|
||||
return projection_date.isoformat()
|
||||
|
||||
|
||||
def calculate_goal_progress_pct(current: float, target: float, start: float) -> int:
|
||||
"""
|
||||
Calculate goal progress percentage
|
||||
Returns 0-100 (can exceed 100 if target surpassed)
|
||||
"""
|
||||
if start == target:
|
||||
return 100 if current == target else 0
|
||||
|
||||
progress = ((current - start) / (target - start)) * 100
|
||||
return max(0, min(100, int(progress)))
|
||||
|
||||
|
||||
# ── Fat Mass / Lean Mass Calculations ───────────────────────────────────────
|
||||
|
||||
def calculate_fm_28d_change(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day fat mass change (kg)"""
|
||||
return _calculate_body_composition_change(profile_id, 'fm', 28)
|
||||
|
||||
|
||||
def calculate_lbm_28d_change(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day lean body mass change (kg)"""
|
||||
return _calculate_body_composition_change(profile_id, 'lbm', 28)
|
||||
|
||||
|
||||
def _calculate_body_composition_change(profile_id: str, metric: str, days: int) -> Optional[float]:
|
||||
"""
|
||||
Calculate change in body composition over period
|
||||
metric: 'fm' (fat mass) or 'lbm' (lean mass)
|
||||
"""
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
|
||||
# Get weight and caliper measurements
|
||||
cur.execute("""
|
||||
SELECT w.date, w.weight, c.body_fat_pct
|
||||
FROM weight_log w
|
||||
LEFT JOIN caliper_log c ON w.profile_id = c.profile_id
|
||||
AND w.date = c.date
|
||||
WHERE w.profile_id = %s
|
||||
AND w.date >= CURRENT_DATE - INTERVAL '%s days'
|
||||
ORDER BY w.date DESC
|
||||
""", (profile_id, days))
|
||||
|
||||
data = [
|
||||
{
|
||||
'date': row['date'],
|
||||
'weight': row['weight'],
|
||||
'bf_pct': row['body_fat_pct']
|
||||
}
|
||||
for row in cur.fetchall()
|
||||
if row['body_fat_pct'] is not None
|
||||
]
|
||||
|
||||
if len(data) < 2:
|
||||
return None
|
||||
|
||||
# Most recent and oldest measurement
|
||||
recent = data[0]
|
||||
oldest = data[-1]
|
||||
|
||||
# Calculate FM and LBM
|
||||
recent_fm = recent['weight'] * (recent['bf_pct'] / 100)
|
||||
recent_lbm = recent['weight'] - recent_fm
|
||||
|
||||
oldest_fm = oldest['weight'] * (oldest['bf_pct'] / 100)
|
||||
oldest_lbm = oldest['weight'] - oldest_fm
|
||||
|
||||
if metric == 'fm':
|
||||
change = recent_fm - oldest_fm
|
||||
else:
|
||||
change = recent_lbm - oldest_lbm
|
||||
|
||||
return round(change, 2)
|
||||
|
||||
|
||||
# ── Circumference Calculations ──────────────────────────────────────────────
|
||||
|
||||
def calculate_waist_28d_delta(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day waist circumference change (cm)"""
|
||||
return _calculate_circumference_delta(profile_id, 'c_waist', 28)
|
||||
|
||||
|
||||
def calculate_hip_28d_delta(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day hip circumference change (cm)"""
|
||||
return _calculate_circumference_delta(profile_id, 'c_hip', 28)
|
||||
|
||||
|
||||
def calculate_chest_28d_delta(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day chest circumference change (cm)"""
|
||||
return _calculate_circumference_delta(profile_id, 'c_chest', 28)
|
||||
|
||||
|
||||
def calculate_arm_28d_delta(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day arm circumference change (cm)"""
|
||||
return _calculate_circumference_delta(profile_id, 'c_arm', 28)
|
||||
|
||||
|
||||
def calculate_thigh_28d_delta(profile_id: str) -> Optional[float]:
|
||||
"""Calculate 28-day thigh circumference change (cm)"""
|
||||
delta = _calculate_circumference_delta(profile_id, 'c_thigh', 28)
|
||||
if delta is None:
|
||||
return None
|
||||
return round(delta, 1)
|
||||
|
||||
|
||||
def _calculate_circumference_delta(profile_id: str, column: str, days: int) -> Optional[float]:
|
||||
"""Calculate change in circumference measurement"""
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute(f"""
|
||||
SELECT {column}
|
||||
FROM circumference_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '%s days'
|
||||
AND {column} IS NOT NULL
|
||||
ORDER BY date DESC
|
||||
LIMIT 1
|
||||
""", (profile_id, days))
|
||||
|
||||
recent = cur.fetchone()
|
||||
if not recent:
|
||||
return None
|
||||
|
||||
cur.execute(f"""
|
||||
SELECT {column}
|
||||
FROM circumference_log
|
||||
WHERE profile_id = %s
|
||||
AND date < CURRENT_DATE - INTERVAL '%s days'
|
||||
AND {column} IS NOT NULL
|
||||
ORDER BY date DESC
|
||||
LIMIT 1
|
||||
""", (profile_id, days))
|
||||
|
||||
oldest = cur.fetchone()
|
||||
if not oldest:
|
||||
return None
|
||||
|
||||
change = recent[column] - oldest[column]
|
||||
return round(change, 1)
|
||||
|
||||
|
||||
def calculate_waist_hip_ratio(profile_id: str) -> Optional[float]:
|
||||
"""Calculate current waist-to-hip ratio"""
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute("""
|
||||
SELECT c_waist, c_hip
|
||||
FROM circumference_log
|
||||
WHERE profile_id = %s
|
||||
AND c_waist IS NOT NULL
|
||||
AND c_hip IS NOT NULL
|
||||
ORDER BY date DESC
|
||||
LIMIT 1
|
||||
""", (profile_id,))
|
||||
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
|
||||
ratio = row['c_waist'] / row['c_hip']
|
||||
return round(ratio, 3)
|
||||
|
||||
|
||||
# ── Recomposition Detector ───────────────────────────────────────────────────
|
||||
|
||||
def calculate_recomposition_quadrant(profile_id: str) -> Optional[str]:
|
||||
"""
|
||||
Determine recomposition quadrant based on 28d changes:
|
||||
- optimal: FM down, LBM up
|
||||
- cut_with_risk: FM down, LBM down
|
||||
- bulk: FM up, LBM up
|
||||
- unfavorable: FM up, LBM down
|
||||
"""
|
||||
fm_change = calculate_fm_28d_change(profile_id)
|
||||
lbm_change = calculate_lbm_28d_change(profile_id)
|
||||
|
||||
if fm_change is None or lbm_change is None:
|
||||
return None
|
||||
|
||||
if fm_change < 0 and lbm_change > 0:
|
||||
return "optimal"
|
||||
elif fm_change < 0 and lbm_change < 0:
|
||||
return "cut_with_risk"
|
||||
elif fm_change > 0 and lbm_change > 0:
|
||||
return "bulk"
|
||||
else:
|
||||
return "unfavorable"
|
||||
|
||||
|
||||
# ── Body Progress Score ───────────────────────────────────────────────────────
|
||||
|
||||
def calculate_body_progress_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]:
|
||||
"""Calculate body progress score (0-100) weighted by user's focus areas"""
|
||||
if focus_weights is None:
|
||||
from goal_utils import get_focus_weights
|
||||
with get_db() as conn:
|
||||
focus_weights = get_focus_weights(conn, profile_id)
|
||||
|
||||
weight_loss = focus_weights.get('weight_loss', 0)
|
||||
muscle_gain = focus_weights.get('muscle_gain', 0)
|
||||
body_recomp = focus_weights.get('body_recomposition', 0)
|
||||
|
||||
total_body_weight = weight_loss + muscle_gain + body_recomp
|
||||
|
||||
if total_body_weight == 0:
|
||||
return None
|
||||
|
||||
components = []
|
||||
|
||||
if weight_loss > 0:
|
||||
weight_score = _score_weight_trend(profile_id)
|
||||
if weight_score is not None:
|
||||
components.append(('weight', weight_score, weight_loss))
|
||||
|
||||
if muscle_gain > 0 or body_recomp > 0:
|
||||
comp_score = _score_body_composition(profile_id)
|
||||
if comp_score is not None:
|
||||
components.append(('composition', comp_score, muscle_gain + body_recomp))
|
||||
|
||||
waist_score = _score_waist_trend(profile_id)
|
||||
if waist_score is not None:
|
||||
waist_weight = 20 + (weight_loss * 0.3)
|
||||
components.append(('waist', waist_score, waist_weight))
|
||||
|
||||
if not components:
|
||||
return None
|
||||
|
||||
total_score = sum(score * weight for _, score, weight in components)
|
||||
total_weight = sum(weight for _, _, weight in components)
|
||||
|
||||
return int(total_score / total_weight)
|
||||
|
||||
|
||||
def _score_weight_trend(profile_id: str) -> Optional[int]:
|
||||
"""Score weight trend alignment with goals (0-100)"""
|
||||
from goal_utils import get_active_goals
|
||||
|
||||
goals = get_active_goals(profile_id)
|
||||
weight_goals = [g for g in goals if g.get('goal_type') == 'weight']
|
||||
if not weight_goals:
|
||||
return None
|
||||
|
||||
goal = next((g for g in weight_goals if g.get('is_primary')), weight_goals[0])
|
||||
|
||||
current = goal.get('current_value')
|
||||
target = goal.get('target_value')
|
||||
start = goal.get('start_value')
|
||||
|
||||
if None in [current, target]:
|
||||
return None
|
||||
|
||||
current = float(current)
|
||||
target = float(target)
|
||||
|
||||
if start is None:
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute("""
|
||||
SELECT weight
|
||||
FROM weight_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '90 days'
|
||||
ORDER BY date ASC
|
||||
LIMIT 1
|
||||
""", (profile_id,))
|
||||
row = cur.fetchone()
|
||||
start = float(row['weight']) if row else current
|
||||
else:
|
||||
start = float(start)
|
||||
|
||||
progress_pct = calculate_goal_progress_pct(current, target, start)
|
||||
|
||||
slope = calculate_weight_28d_slope(profile_id)
|
||||
if slope is not None:
|
||||
desired_direction = -1 if target < start else 1
|
||||
actual_direction = -1 if slope < 0 else 1
|
||||
|
||||
if desired_direction == actual_direction:
|
||||
score = min(100, progress_pct + 10)
|
||||
else:
|
||||
score = max(0, progress_pct - 20)
|
||||
else:
|
||||
score = progress_pct
|
||||
|
||||
return int(score)
|
||||
|
||||
|
||||
def _score_body_composition(profile_id: str) -> Optional[int]:
|
||||
"""Score body composition changes (0-100)"""
|
||||
fm_change = calculate_fm_28d_change(profile_id)
|
||||
lbm_change = calculate_lbm_28d_change(profile_id)
|
||||
|
||||
if fm_change is None or lbm_change is None:
|
||||
return None
|
||||
|
||||
quadrant = calculate_recomposition_quadrant(profile_id)
|
||||
|
||||
if quadrant == "optimal":
|
||||
return 100
|
||||
elif quadrant == "cut_with_risk":
|
||||
penalty = min(30, abs(lbm_change) * 15)
|
||||
return max(50, 80 - int(penalty))
|
||||
elif quadrant == "bulk":
|
||||
if lbm_change > 0 and fm_change > 0:
|
||||
ratio = lbm_change / fm_change
|
||||
if ratio >= 3:
|
||||
return 90
|
||||
elif ratio >= 2:
|
||||
return 75
|
||||
elif ratio >= 1:
|
||||
return 60
|
||||
else:
|
||||
return 45
|
||||
return 60
|
||||
else:
|
||||
return 20
|
||||
|
||||
|
||||
def _score_waist_trend(profile_id: str) -> Optional[int]:
|
||||
"""Score waist circumference trend (0-100)"""
|
||||
delta = calculate_waist_28d_delta(profile_id)
|
||||
|
||||
if delta is None:
|
||||
return None
|
||||
|
||||
if delta <= -3:
|
||||
return 100
|
||||
elif delta <= -2:
|
||||
return 90
|
||||
elif delta <= -1:
|
||||
return 80
|
||||
elif delta <= 0:
|
||||
return 70
|
||||
elif delta <= 1:
|
||||
return 55
|
||||
elif delta <= 2:
|
||||
return 40
|
||||
else:
|
||||
return 20
|
||||
|
||||
|
||||
# ── Data Quality Assessment ───────────────────────────────────────────────────
|
||||
|
||||
def calculate_body_data_quality(profile_id: str) -> Dict[str, any]:
|
||||
"""Assess data quality for body metrics"""
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
|
||||
cur.execute("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM weight_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||||
""", (profile_id,))
|
||||
weight_count = cur.fetchone()['count']
|
||||
|
||||
cur.execute("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM caliper_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||||
""", (profile_id,))
|
||||
caliper_count = cur.fetchone()['count']
|
||||
|
||||
cur.execute("""
|
||||
SELECT COUNT(*) as count
|
||||
FROM circumference_log
|
||||
WHERE profile_id = %s
|
||||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||||
""", (profile_id,))
|
||||
circ_count = cur.fetchone()['count']
|
||||
|
||||
weight_score = min(100, (weight_count / 18) * 100)
|
||||
caliper_score = min(100, (caliper_count / 4) * 100)
|
||||
circ_score = min(100, (circ_count / 4) * 100)
|
||||
|
||||
overall_score = int(
|
||||
weight_score * 0.5 +
|
||||
caliper_score * 0.3 +
|
||||
circ_score * 0.2
|
||||
)
|
||||
|
||||
if overall_score >= 80:
|
||||
confidence = "high"
|
||||
elif overall_score >= 60:
|
||||
confidence = "medium"
|
||||
else:
|
||||
confidence = "low"
|
||||
|
||||
return {
|
||||
"overall_score": overall_score,
|
||||
"confidence": confidence,
|
||||
"measurements": {
|
||||
"weight_28d": weight_count,
|
||||
"caliper_28d": caliper_count,
|
||||
"circumference_28d": circ_count
|
||||
},
|
||||
"component_scores": {
|
||||
"weight": int(weight_score),
|
||||
"caliper": int(caliper_score),
|
||||
"circumference": int(circ_score)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -417,7 +417,8 @@ def _safe_int(func_name: str, profile_id: str) -> str:
|
|||
import traceback
|
||||
try:
|
||||
# Import calculations dynamically to avoid circular imports
|
||||
from calculations import scores, body_metrics, nutrition_metrics, activity_metrics, recovery_metrics, correlation_metrics
|
||||
from calculations import scores, nutrition_metrics, activity_metrics, recovery_metrics, correlation_metrics
|
||||
from data_layer import body_metrics
|
||||
|
||||
# Map function names to actual functions
|
||||
func_map = {
|
||||
|
|
@ -479,7 +480,8 @@ def _safe_float(func_name: str, profile_id: str, decimals: int = 1) -> str:
|
|||
"""
|
||||
import traceback
|
||||
try:
|
||||
from calculations import body_metrics, nutrition_metrics, activity_metrics, recovery_metrics, scores
|
||||
from calculations import nutrition_metrics, activity_metrics, recovery_metrics, scores
|
||||
from data_layer import body_metrics
|
||||
|
||||
func_map = {
|
||||
'weight_7d_median': body_metrics.calculate_weight_7d_median,
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user