- Migrated all 16 calculation functions from calculations/nutrition_metrics.py to data_layer/nutrition_metrics.py - Functions: Energy balance (7d calculation, deficit/surplus classification) - Functions: Protein adequacy (g/kg, days in target, 28d score) - Functions: Macro consistency (score, intake volatility) - Functions: Nutrition scoring (main score with focus weights, calorie/macro adherence helpers) - Functions: Energy availability warning (with severity levels and recommendations) - Functions: Data quality assessment - Functions: Fiber/sugar averages (TODO stubs) - Updated data_layer/__init__.py with 12 new exports - Refactored placeholder_resolver.py to import nutrition_metrics from data_layer Module 2/6 complete. Single Source of Truth for nutrition metrics established. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1094 lines
34 KiB
Python
1094 lines
34 KiB
Python
"""
|
||
Nutrition Metrics Data Layer
|
||
|
||
Provides structured data for nutrition tracking and analysis.
|
||
|
||
Functions:
|
||
- get_nutrition_average_data(): Average calor
|
||
|
||
ies, protein, carbs, fat
|
||
- get_nutrition_days_data(): Number of days with nutrition data
|
||
- get_protein_targets_data(): Protein targets based on weight
|
||
- get_energy_balance_data(): Energy balance calculation
|
||
- get_protein_adequacy_data(): Protein adequacy score
|
||
- get_macro_consistency_data(): Macro consistency analysis
|
||
|
||
All functions return structured data (dict) without formatting.
|
||
Use placeholder_resolver.py for formatted strings for AI.
|
||
|
||
Phase 0c: Multi-Layer Architecture
|
||
Version: 1.0
|
||
"""
|
||
|
||
from typing import Dict, List, Optional
|
||
from datetime import datetime, timedelta, date
|
||
from db import get_db, get_cursor, r2d
|
||
from data_layer.utils import calculate_confidence, safe_float, safe_int
|
||
|
||
|
||
def get_nutrition_average_data(
|
||
profile_id: str,
|
||
days: int = 30
|
||
) -> Dict:
|
||
"""
|
||
Get average nutrition values for all macros.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 30)
|
||
|
||
Returns:
|
||
{
|
||
"kcal_avg": float,
|
||
"protein_avg": float,
|
||
"carbs_avg": float,
|
||
"fat_avg": float,
|
||
"data_points": int,
|
||
"confidence": str,
|
||
"days_analyzed": int
|
||
}
|
||
|
||
Migration from Phase 0b:
|
||
OLD: get_nutrition_avg(pid, field, days) per field
|
||
NEW: All macros in one call
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT
|
||
AVG(kcal) as kcal_avg,
|
||
AVG(protein_g) as protein_avg,
|
||
AVG(carbs_g) as carbs_avg,
|
||
AVG(fat_g) as fat_avg,
|
||
COUNT(*) as data_points
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s""",
|
||
(profile_id, cutoff)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row or row['data_points'] == 0:
|
||
return {
|
||
"kcal_avg": 0.0,
|
||
"protein_avg": 0.0,
|
||
"carbs_avg": 0.0,
|
||
"fat_avg": 0.0,
|
||
"data_points": 0,
|
||
"confidence": "insufficient",
|
||
"days_analyzed": days
|
||
}
|
||
|
||
data_points = row['data_points']
|
||
confidence = calculate_confidence(data_points, days, "general")
|
||
|
||
return {
|
||
"kcal_avg": safe_float(row['kcal_avg']),
|
||
"protein_avg": safe_float(row['protein_avg']),
|
||
"carbs_avg": safe_float(row['carbs_avg']),
|
||
"fat_avg": safe_float(row['fat_avg']),
|
||
"data_points": data_points,
|
||
"confidence": confidence,
|
||
"days_analyzed": days
|
||
}
|
||
|
||
|
||
def get_nutrition_days_data(
|
||
profile_id: str,
|
||
days: int = 30
|
||
) -> Dict:
|
||
"""
|
||
Count days with nutrition data.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 30)
|
||
|
||
Returns:
|
||
{
|
||
"days_with_data": int,
|
||
"days_analyzed": int,
|
||
"coverage_pct": float,
|
||
"confidence": str
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT COUNT(DISTINCT date) as days
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s""",
|
||
(profile_id, cutoff)
|
||
)
|
||
row = cur.fetchone()
|
||
days_with_data = row['days'] if row else 0
|
||
|
||
coverage_pct = (days_with_data / days * 100) if days > 0 else 0
|
||
confidence = calculate_confidence(days_with_data, days, "general")
|
||
|
||
return {
|
||
"days_with_data": days_with_data,
|
||
"days_analyzed": days,
|
||
"coverage_pct": coverage_pct,
|
||
"confidence": confidence
|
||
}
|
||
|
||
|
||
def get_protein_targets_data(
|
||
profile_id: str
|
||
) -> Dict:
|
||
"""
|
||
Calculate protein targets based on current weight.
|
||
|
||
Targets:
|
||
- Low: 1.6 g/kg (maintenance)
|
||
- High: 2.2 g/kg (muscle building)
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
|
||
Returns:
|
||
{
|
||
"current_weight": float,
|
||
"protein_target_low": float, # 1.6 g/kg
|
||
"protein_target_high": float, # 2.2 g/kg
|
||
"confidence": str
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""SELECT weight FROM weight_log
|
||
WHERE profile_id=%s ORDER BY date DESC LIMIT 1""",
|
||
(profile_id,)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
return {
|
||
"current_weight": 0.0,
|
||
"protein_target_low": 0.0,
|
||
"protein_target_high": 0.0,
|
||
"confidence": "insufficient"
|
||
}
|
||
|
||
weight = safe_float(row['weight'])
|
||
|
||
return {
|
||
"current_weight": weight,
|
||
"protein_target_low": weight * 1.6,
|
||
"protein_target_high": weight * 2.2,
|
||
"confidence": "high"
|
||
}
|
||
|
||
|
||
def get_energy_balance_data(
|
||
profile_id: str,
|
||
days: int = 7
|
||
) -> Dict:
|
||
"""
|
||
Calculate energy balance (intake - estimated expenditure).
|
||
|
||
Note: This is a simplified calculation.
|
||
For accurate TDEE, use profile-based calculations.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 7)
|
||
|
||
Returns:
|
||
{
|
||
"energy_balance": float, # kcal/day (negative = deficit)
|
||
"avg_intake": float,
|
||
"estimated_tdee": float,
|
||
"status": str, # "deficit" | "surplus" | "maintenance"
|
||
"confidence": str,
|
||
"days_analyzed": int,
|
||
"data_points": int
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
# Get average intake
|
||
cur.execute(
|
||
"""SELECT AVG(kcal) as avg_kcal, COUNT(*) as cnt
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s AND kcal IS NOT NULL""",
|
||
(profile_id, cutoff)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row or row['cnt'] == 0:
|
||
return {
|
||
"energy_balance": 0.0,
|
||
"avg_intake": 0.0,
|
||
"estimated_tdee": 0.0,
|
||
"status": "unknown",
|
||
"confidence": "insufficient",
|
||
"days_analyzed": days,
|
||
"data_points": 0
|
||
}
|
||
|
||
avg_intake = safe_float(row['avg_kcal'])
|
||
data_points = row['cnt']
|
||
|
||
# Simple TDEE estimation (this should be improved with profile data)
|
||
# For now, use a rough estimate: 2500 kcal for average adult
|
||
estimated_tdee = 2500.0 # TODO: Calculate from profile (weight, height, age, activity)
|
||
|
||
energy_balance = avg_intake - estimated_tdee
|
||
|
||
# Determine status
|
||
if energy_balance < -200:
|
||
status = "deficit"
|
||
elif energy_balance > 200:
|
||
status = "surplus"
|
||
else:
|
||
status = "maintenance"
|
||
|
||
confidence = calculate_confidence(data_points, days, "general")
|
||
|
||
return {
|
||
"energy_balance": energy_balance,
|
||
"avg_intake": avg_intake,
|
||
"estimated_tdee": estimated_tdee,
|
||
"status": status,
|
||
"confidence": confidence,
|
||
"days_analyzed": days,
|
||
"data_points": data_points
|
||
}
|
||
|
||
|
||
def get_protein_adequacy_data(
|
||
profile_id: str,
|
||
days: int = 28
|
||
) -> Dict:
|
||
"""
|
||
Calculate protein adequacy score (0-100).
|
||
|
||
Score based on:
|
||
- Daily protein intake vs. target (1.6-2.2 g/kg)
|
||
- Consistency across days
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 28)
|
||
|
||
Returns:
|
||
{
|
||
"adequacy_score": int, # 0-100
|
||
"avg_protein_g": float,
|
||
"target_protein_low": float,
|
||
"target_protein_high": float,
|
||
"protein_g_per_kg": float,
|
||
"days_in_target": int,
|
||
"days_with_data": int,
|
||
"confidence": str
|
||
}
|
||
"""
|
||
# Get protein targets
|
||
targets = get_protein_targets_data(profile_id)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT
|
||
AVG(protein_g) as avg_protein,
|
||
COUNT(*) as cnt,
|
||
SUM(CASE WHEN protein_g >= %s AND protein_g <= %s THEN 1 ELSE 0 END) as days_in_target
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s AND protein_g IS NOT NULL""",
|
||
(targets['protein_target_low'], targets['protein_target_high'], profile_id, cutoff)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row or row['cnt'] == 0:
|
||
return {
|
||
"adequacy_score": 0,
|
||
"avg_protein_g": 0.0,
|
||
"target_protein_low": targets['protein_target_low'],
|
||
"target_protein_high": targets['protein_target_high'],
|
||
"protein_g_per_kg": 0.0,
|
||
"days_in_target": 0,
|
||
"days_with_data": 0,
|
||
"confidence": "insufficient"
|
||
}
|
||
|
||
avg_protein = safe_float(row['avg_protein'])
|
||
days_with_data = row['cnt']
|
||
days_in_target = row['days_in_target']
|
||
|
||
protein_g_per_kg = avg_protein / targets['current_weight'] if targets['current_weight'] > 0 else 0.0
|
||
|
||
# Calculate adequacy score
|
||
# 100 = always in target range
|
||
# Scale based on percentage of days in target + average relative to target
|
||
target_pct = (days_in_target / days_with_data * 100) if days_with_data > 0 else 0
|
||
|
||
# Bonus/penalty for average protein level
|
||
target_mid = (targets['protein_target_low'] + targets['protein_target_high']) / 2
|
||
avg_vs_target = (avg_protein / target_mid) if target_mid > 0 else 0
|
||
|
||
# Weighted score: 70% target days, 30% average level
|
||
adequacy_score = int(target_pct * 0.7 + min(avg_vs_target * 100, 100) * 0.3)
|
||
adequacy_score = max(0, min(100, adequacy_score)) # Clamp to 0-100
|
||
|
||
confidence = calculate_confidence(days_with_data, days, "general")
|
||
|
||
return {
|
||
"adequacy_score": adequacy_score,
|
||
"avg_protein_g": avg_protein,
|
||
"target_protein_low": targets['protein_target_low'],
|
||
"target_protein_high": targets['protein_target_high'],
|
||
"protein_g_per_kg": protein_g_per_kg,
|
||
"days_in_target": days_in_target,
|
||
"days_with_data": days_with_data,
|
||
"confidence": confidence
|
||
}
|
||
|
||
|
||
def get_macro_consistency_data(
|
||
profile_id: str,
|
||
days: int = 28
|
||
) -> Dict:
|
||
"""
|
||
Calculate macro consistency score (0-100).
|
||
|
||
Measures how consistent macronutrient ratios are across days.
|
||
High consistency = predictable nutrition, easier to track progress.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 28)
|
||
|
||
Returns:
|
||
{
|
||
"consistency_score": int, # 0-100 (100 = very consistent)
|
||
"avg_protein_pct": float,
|
||
"avg_carbs_pct": float,
|
||
"avg_fat_pct": float,
|
||
"std_dev_protein": float, # Standard deviation
|
||
"std_dev_carbs": float,
|
||
"std_dev_fat": float,
|
||
"confidence": str,
|
||
"data_points": int
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT
|
||
protein_g, carbs_g, fat_g, kcal
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s
|
||
AND date >= %s
|
||
AND protein_g IS NOT NULL
|
||
AND carbs_g IS NOT NULL
|
||
AND fat_g IS NOT NULL
|
||
AND kcal > 0
|
||
ORDER BY date""",
|
||
(profile_id, cutoff)
|
||
)
|
||
rows = cur.fetchall()
|
||
|
||
if len(rows) < 3:
|
||
return {
|
||
"consistency_score": 0,
|
||
"avg_protein_pct": 0.0,
|
||
"avg_carbs_pct": 0.0,
|
||
"avg_fat_pct": 0.0,
|
||
"std_dev_protein": 0.0,
|
||
"std_dev_carbs": 0.0,
|
||
"std_dev_fat": 0.0,
|
||
"confidence": "insufficient",
|
||
"data_points": len(rows)
|
||
}
|
||
|
||
# Calculate macro percentages for each day
|
||
import statistics
|
||
|
||
protein_pcts = []
|
||
carbs_pcts = []
|
||
fat_pcts = []
|
||
|
||
for row in rows:
|
||
total_kcal = safe_float(row['kcal'])
|
||
if total_kcal == 0:
|
||
continue
|
||
|
||
# Convert grams to kcal (protein=4, carbs=4, fat=9)
|
||
protein_kcal = safe_float(row['protein_g']) * 4
|
||
carbs_kcal = safe_float(row['carbs_g']) * 4
|
||
fat_kcal = safe_float(row['fat_g']) * 9
|
||
|
||
macro_kcal_total = protein_kcal + carbs_kcal + fat_kcal
|
||
|
||
if macro_kcal_total > 0:
|
||
protein_pcts.append(protein_kcal / macro_kcal_total * 100)
|
||
carbs_pcts.append(carbs_kcal / macro_kcal_total * 100)
|
||
fat_pcts.append(fat_kcal / macro_kcal_total * 100)
|
||
|
||
if len(protein_pcts) < 3:
|
||
return {
|
||
"consistency_score": 0,
|
||
"avg_protein_pct": 0.0,
|
||
"avg_carbs_pct": 0.0,
|
||
"avg_fat_pct": 0.0,
|
||
"std_dev_protein": 0.0,
|
||
"std_dev_carbs": 0.0,
|
||
"std_dev_fat": 0.0,
|
||
"confidence": "insufficient",
|
||
"data_points": len(protein_pcts)
|
||
}
|
||
|
||
# Calculate averages and standard deviations
|
||
avg_protein_pct = statistics.mean(protein_pcts)
|
||
avg_carbs_pct = statistics.mean(carbs_pcts)
|
||
avg_fat_pct = statistics.mean(fat_pcts)
|
||
|
||
std_protein = statistics.stdev(protein_pcts) if len(protein_pcts) > 1 else 0.0
|
||
std_carbs = statistics.stdev(carbs_pcts) if len(carbs_pcts) > 1 else 0.0
|
||
std_fat = statistics.stdev(fat_pcts) if len(fat_pcts) > 1 else 0.0
|
||
|
||
# Consistency score: inverse of average standard deviation
|
||
# Lower std_dev = higher consistency
|
||
avg_std = (std_protein + std_carbs + std_fat) / 3
|
||
|
||
# Score: 100 - (avg_std * scale_factor)
|
||
# avg_std of 5% = score 75, avg_std of 10% = score 50, avg_std of 20% = score 0
|
||
consistency_score = max(0, min(100, int(100 - (avg_std * 5))))
|
||
|
||
confidence = calculate_confidence(len(protein_pcts), days, "general")
|
||
|
||
return {
|
||
"consistency_score": consistency_score,
|
||
"avg_protein_pct": avg_protein_pct,
|
||
"avg_carbs_pct": avg_carbs_pct,
|
||
"avg_fat_pct": avg_fat_pct,
|
||
"std_dev_protein": std_protein,
|
||
"std_dev_carbs": std_carbs,
|
||
"std_dev_fat": std_fat,
|
||
"confidence": confidence,
|
||
"data_points": len(protein_pcts)
|
||
}
|
||
|
||
|
||
# ============================================================================
|
||
# Calculated Metrics (migrated from calculations/nutrition_metrics.py)
|
||
# ============================================================================
|
||
# These functions return simple values for placeholders.
|
||
# Use get_*_data() functions above for structured chart data.
|
||
|
||
|
||
def calculate_energy_balance_7d(profile_id: str) -> Optional[float]:
|
||
"""
|
||
Calculate 7-day average energy balance (kcal/day)
|
||
Positive = surplus, Negative = deficit
|
||
|
||
Migration from Phase 0b:
|
||
Used by placeholders that need single balance value
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("""
|
||
SELECT kcal
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '7 days'
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
calories = [row['kcal'] for row in cur.fetchall()]
|
||
|
||
if len(calories) < 4: # Need at least 4 days
|
||
return None
|
||
|
||
avg_intake = float(sum(calories) / len(calories))
|
||
|
||
# Get estimated TDEE (simplified - could use Harris-Benedict)
|
||
# For now, use weight-based estimate
|
||
cur.execute("""
|
||
SELECT weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
ORDER BY date DESC
|
||
LIMIT 1
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row:
|
||
return None
|
||
|
||
# Simple TDEE estimate: bodyweight (kg) × 30-35
|
||
# TODO: Improve with activity level, age, gender
|
||
estimated_tdee = float(weight_row['weight']) * 32.5
|
||
|
||
balance = avg_intake - estimated_tdee
|
||
|
||
return round(balance, 0)
|
||
|
||
|
||
def calculate_energy_deficit_surplus(profile_id: str, days: int = 7) -> Optional[str]:
|
||
"""
|
||
Classify energy balance as deficit/maintenance/surplus
|
||
Returns: 'deficit', 'maintenance', 'surplus', or None
|
||
"""
|
||
balance = calculate_energy_balance_7d(profile_id)
|
||
|
||
if balance is None:
|
||
return None
|
||
|
||
if balance < -200:
|
||
return 'deficit'
|
||
elif balance > 200:
|
||
return 'surplus'
|
||
else:
|
||
return 'maintenance'
|
||
|
||
|
||
def calculate_protein_g_per_kg(profile_id: str) -> Optional[float]:
|
||
"""Calculate average protein intake in g/kg bodyweight (last 7 days)"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Get recent weight
|
||
cur.execute("""
|
||
SELECT weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
ORDER BY date DESC
|
||
LIMIT 1
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row:
|
||
return None
|
||
|
||
weight = float(weight_row['weight'])
|
||
|
||
# Get protein intake
|
||
cur.execute("""
|
||
SELECT protein_g
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '7 days'
|
||
AND protein_g IS NOT NULL
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
protein_values = [row['protein_g'] for row in cur.fetchall()]
|
||
|
||
if len(protein_values) < 4:
|
||
return None
|
||
|
||
avg_protein = float(sum(protein_values) / len(protein_values))
|
||
protein_per_kg = avg_protein / weight
|
||
|
||
return round(protein_per_kg, 2)
|
||
|
||
|
||
def calculate_protein_days_in_target(profile_id: str, target_low: float = 1.6, target_high: float = 2.2) -> Optional[str]:
|
||
"""
|
||
Calculate how many days in last 7 were within protein target
|
||
Returns: "5/7" format or None
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Get recent weight
|
||
cur.execute("""
|
||
SELECT weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
ORDER BY date DESC
|
||
LIMIT 1
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row:
|
||
return None
|
||
|
||
weight = float(weight_row['weight'])
|
||
|
||
# Get protein intake last 7 days
|
||
cur.execute("""
|
||
SELECT protein_g, date
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '7 days'
|
||
AND protein_g IS NOT NULL
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
protein_data = cur.fetchall()
|
||
|
||
if len(protein_data) < 4:
|
||
return None
|
||
|
||
# Count days in target range
|
||
days_in_target = 0
|
||
total_days = len(protein_data)
|
||
|
||
for row in protein_data:
|
||
protein_per_kg = float(row['protein_g']) / weight
|
||
if target_low <= protein_per_kg <= target_high:
|
||
days_in_target += 1
|
||
|
||
return f"{days_in_target}/{total_days}"
|
||
|
||
|
||
def calculate_protein_adequacy_28d(profile_id: str) -> Optional[int]:
|
||
"""
|
||
Protein adequacy score 0-100 (last 28 days)
|
||
Based on consistency and target achievement
|
||
"""
|
||
import statistics
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Get average weight (28d)
|
||
cur.execute("""
|
||
SELECT AVG(weight) as avg_weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row or not weight_row['avg_weight']:
|
||
return None
|
||
|
||
weight = float(weight_row['avg_weight'])
|
||
|
||
# Get protein intake (28d)
|
||
cur.execute("""
|
||
SELECT protein_g
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
AND protein_g IS NOT NULL
|
||
""", (profile_id,))
|
||
|
||
protein_values = [float(row['protein_g']) for row in cur.fetchall()]
|
||
|
||
if len(protein_values) < 18: # 60% coverage
|
||
return None
|
||
|
||
# Calculate metrics
|
||
protein_per_kg_values = [p / weight for p in protein_values]
|
||
avg_protein_per_kg = sum(protein_per_kg_values) / len(protein_per_kg_values)
|
||
|
||
# Target range: 1.6-2.2 g/kg for active individuals
|
||
target_mid = 1.9
|
||
|
||
# Score based on distance from target
|
||
if 1.6 <= avg_protein_per_kg <= 2.2:
|
||
base_score = 100
|
||
elif avg_protein_per_kg < 1.6:
|
||
# Below target
|
||
base_score = max(40, 100 - ((1.6 - avg_protein_per_kg) * 40))
|
||
else:
|
||
# Above target (less penalty)
|
||
base_score = max(80, 100 - ((avg_protein_per_kg - 2.2) * 10))
|
||
|
||
# Consistency bonus/penalty
|
||
std_dev = statistics.stdev(protein_per_kg_values)
|
||
if std_dev < 0.3:
|
||
consistency_bonus = 10
|
||
elif std_dev < 0.5:
|
||
consistency_bonus = 0
|
||
else:
|
||
consistency_bonus = -10
|
||
|
||
final_score = min(100, max(0, base_score + consistency_bonus))
|
||
|
||
return int(final_score)
|
||
|
||
|
||
def calculate_macro_consistency_score(profile_id: str) -> Optional[int]:
|
||
"""
|
||
Macro consistency score 0-100 (last 28 days)
|
||
Lower variability = higher score
|
||
"""
|
||
import statistics
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("""
|
||
SELECT kcal, protein_g, fat_g, carbs_g
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
AND kcal IS NOT NULL
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
data = cur.fetchall()
|
||
|
||
if len(data) < 18:
|
||
return None
|
||
|
||
# Calculate coefficient of variation for each macro
|
||
def cv(values):
|
||
"""Coefficient of variation (std_dev / mean)"""
|
||
if not values or len(values) < 2:
|
||
return None
|
||
mean = sum(values) / len(values)
|
||
if mean == 0:
|
||
return None
|
||
std_dev = statistics.stdev(values)
|
||
return std_dev / mean
|
||
|
||
calories_cv = cv([d['kcal'] for d in data])
|
||
protein_cv = cv([d['protein_g'] for d in data if d['protein_g']])
|
||
fat_cv = cv([d['fat_g'] for d in data if d['fat_g']])
|
||
carbs_cv = cv([d['carbs_g'] for d in data if d['carbs_g']])
|
||
|
||
cv_values = [v for v in [calories_cv, protein_cv, fat_cv, carbs_cv] if v is not None]
|
||
|
||
if not cv_values:
|
||
return None
|
||
|
||
avg_cv = sum(cv_values) / len(cv_values)
|
||
|
||
# Score: lower CV = higher score
|
||
# CV < 0.2 = excellent consistency
|
||
# CV > 0.5 = poor consistency
|
||
if avg_cv < 0.2:
|
||
score = 100
|
||
elif avg_cv < 0.3:
|
||
score = 85
|
||
elif avg_cv < 0.4:
|
||
score = 70
|
||
elif avg_cv < 0.5:
|
||
score = 55
|
||
else:
|
||
score = max(30, 100 - (avg_cv * 100))
|
||
|
||
return int(score)
|
||
|
||
|
||
def calculate_intake_volatility(profile_id: str) -> Optional[str]:
|
||
"""
|
||
Classify intake volatility: 'stable', 'moderate', 'high'
|
||
"""
|
||
consistency = calculate_macro_consistency_score(profile_id)
|
||
|
||
if consistency is None:
|
||
return None
|
||
|
||
if consistency >= 80:
|
||
return 'stable'
|
||
elif consistency >= 60:
|
||
return 'moderate'
|
||
else:
|
||
return 'high'
|
||
|
||
|
||
def calculate_nutrition_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]:
|
||
"""
|
||
Nutrition adherence score 0-100
|
||
Weighted by user's nutrition-related focus areas
|
||
"""
|
||
if focus_weights is None:
|
||
# Import here to avoid circular dependency
|
||
from data_layer.scores import get_user_focus_weights
|
||
focus_weights = get_user_focus_weights(profile_id)
|
||
|
||
# Nutrition-related focus areas (English keys from DB)
|
||
protein_intake = focus_weights.get('protein_intake', 0)
|
||
calorie_balance = focus_weights.get('calorie_balance', 0)
|
||
macro_consistency = focus_weights.get('macro_consistency', 0)
|
||
meal_timing = focus_weights.get('meal_timing', 0)
|
||
hydration = focus_weights.get('hydration', 0)
|
||
|
||
total_nutrition_weight = protein_intake + calorie_balance + macro_consistency + meal_timing + hydration
|
||
|
||
if total_nutrition_weight == 0:
|
||
return None # No nutrition goals
|
||
|
||
components = []
|
||
|
||
# 1. Calorie target adherence (if calorie_balance goal active)
|
||
if calorie_balance > 0:
|
||
calorie_score = _score_calorie_adherence(profile_id)
|
||
if calorie_score is not None:
|
||
components.append(('calories', calorie_score, calorie_balance))
|
||
|
||
# 2. Protein target adherence (if protein_intake goal active)
|
||
if protein_intake > 0:
|
||
protein_score = calculate_protein_adequacy_28d(profile_id)
|
||
if protein_score is not None:
|
||
components.append(('protein', protein_score, protein_intake))
|
||
|
||
# 3. Intake consistency (if macro_consistency goal active)
|
||
if macro_consistency > 0:
|
||
consistency_score = calculate_macro_consistency_score(profile_id)
|
||
if consistency_score is not None:
|
||
components.append(('consistency', consistency_score, macro_consistency))
|
||
|
||
# 4. Macro balance (always relevant if any nutrition goal)
|
||
if total_nutrition_weight > 0:
|
||
macro_score = _score_macro_balance(profile_id)
|
||
if macro_score is not None:
|
||
# Use 20% of total weight for macro balance
|
||
components.append(('macros', macro_score, total_nutrition_weight * 0.2))
|
||
|
||
if not components:
|
||
return None
|
||
|
||
# Weighted average
|
||
total_score = sum(score * weight for _, score, weight in components)
|
||
total_weight = sum(weight for _, _, weight in components)
|
||
|
||
return int(total_score / total_weight)
|
||
|
||
|
||
def _score_calorie_adherence(profile_id: str) -> Optional[int]:
|
||
"""Score calorie target adherence (0-100)"""
|
||
# Check for energy balance goal
|
||
# For now, use energy balance calculation
|
||
balance = calculate_energy_balance_7d(profile_id)
|
||
|
||
if balance is None:
|
||
return None
|
||
|
||
# Score based on whether deficit/surplus aligns with goal
|
||
# Simplified: assume weight loss goal = deficit is good
|
||
# TODO: Check actual goal type
|
||
|
||
abs_balance = abs(balance)
|
||
|
||
# Moderate deficit/surplus = good
|
||
if 200 <= abs_balance <= 500:
|
||
return 100
|
||
elif 100 <= abs_balance <= 700:
|
||
return 85
|
||
elif abs_balance <= 900:
|
||
return 70
|
||
elif abs_balance <= 1200:
|
||
return 55
|
||
else:
|
||
return 40
|
||
|
||
|
||
def _score_macro_balance(profile_id: str) -> Optional[int]:
|
||
"""Score macro balance (0-100)"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("""
|
||
SELECT protein_g, fat_g, carbs_g, kcal
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
AND protein_g IS NOT NULL
|
||
AND fat_g IS NOT NULL
|
||
AND carbs_g IS NOT NULL
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
data = cur.fetchall()
|
||
|
||
if len(data) < 18:
|
||
return None
|
||
|
||
# Calculate average macro percentages
|
||
macro_pcts = []
|
||
for row in data:
|
||
total_kcal = (row['protein_g'] * 4) + (row['fat_g'] * 9) + (row['carbs_g'] * 4)
|
||
if total_kcal == 0:
|
||
continue
|
||
|
||
protein_pct = (row['protein_g'] * 4 / total_kcal) * 100
|
||
fat_pct = (row['fat_g'] * 9 / total_kcal) * 100
|
||
carbs_pct = (row['carbs_g'] * 4 / total_kcal) * 100
|
||
|
||
macro_pcts.append((protein_pct, fat_pct, carbs_pct))
|
||
|
||
if not macro_pcts:
|
||
return None
|
||
|
||
avg_protein_pct = sum(p for p, _, _ in macro_pcts) / len(macro_pcts)
|
||
avg_fat_pct = sum(f for _, f, _ in macro_pcts) / len(macro_pcts)
|
||
avg_carbs_pct = sum(c for _, _, c in macro_pcts) / len(macro_pcts)
|
||
|
||
# Reasonable ranges:
|
||
# Protein: 20-35%
|
||
# Fat: 20-35%
|
||
# Carbs: 30-55%
|
||
|
||
score = 100
|
||
|
||
# Protein score
|
||
if not (20 <= avg_protein_pct <= 35):
|
||
if avg_protein_pct < 20:
|
||
score -= (20 - avg_protein_pct) * 2
|
||
else:
|
||
score -= (avg_protein_pct - 35) * 1
|
||
|
||
# Fat score
|
||
if not (20 <= avg_fat_pct <= 35):
|
||
if avg_fat_pct < 20:
|
||
score -= (20 - avg_fat_pct) * 2
|
||
else:
|
||
score -= (avg_fat_pct - 35) * 2
|
||
|
||
# Carbs score
|
||
if not (30 <= avg_carbs_pct <= 55):
|
||
if avg_carbs_pct < 30:
|
||
score -= (30 - avg_carbs_pct) * 1.5
|
||
else:
|
||
score -= (avg_carbs_pct - 55) * 1.5
|
||
|
||
return max(40, min(100, int(score)))
|
||
|
||
|
||
def calculate_energy_availability_warning(profile_id: str) -> Optional[Dict]:
|
||
"""
|
||
Heuristic energy availability warning
|
||
Returns dict with warning level and reasons
|
||
"""
|
||
warnings = []
|
||
severity = 'none' # none, low, medium, high
|
||
|
||
# 1. Check for sustained large deficit
|
||
balance = calculate_energy_balance_7d(profile_id)
|
||
if balance and balance < -800:
|
||
warnings.append('Anhaltend großes Energiedefizit (>800 kcal/Tag)')
|
||
severity = 'medium'
|
||
|
||
if balance < -1200:
|
||
warnings.append('Sehr großes Energiedefizit (>1200 kcal/Tag)')
|
||
severity = 'high'
|
||
|
||
# 2. Check recovery score
|
||
from data_layer.recovery_metrics import calculate_recovery_score_v2
|
||
recovery = calculate_recovery_score_v2(profile_id)
|
||
if recovery and recovery < 50:
|
||
warnings.append('Recovery Score niedrig (<50)')
|
||
if severity == 'none':
|
||
severity = 'low'
|
||
elif severity == 'medium':
|
||
severity = 'high'
|
||
|
||
# 3. Check LBM trend
|
||
from data_layer.body_metrics import calculate_lbm_28d_change
|
||
lbm_change = calculate_lbm_28d_change(profile_id)
|
||
if lbm_change and lbm_change < -1.0:
|
||
warnings.append('Magermasse sinkt (>1kg in 28 Tagen)')
|
||
if severity == 'none':
|
||
severity = 'low'
|
||
elif severity in ['low', 'medium']:
|
||
severity = 'high'
|
||
|
||
# 4. Check sleep quality
|
||
from data_layer.recovery_metrics import calculate_sleep_quality_7d
|
||
sleep_quality = calculate_sleep_quality_7d(profile_id)
|
||
if sleep_quality and sleep_quality < 60:
|
||
warnings.append('Schlafqualität verschlechtert')
|
||
if severity == 'none':
|
||
severity = 'low'
|
||
|
||
if not warnings:
|
||
return None
|
||
|
||
return {
|
||
'severity': severity,
|
||
'warnings': warnings,
|
||
'recommendation': _get_energy_warning_recommendation(severity)
|
||
}
|
||
|
||
|
||
def _get_energy_warning_recommendation(severity: str) -> str:
|
||
"""Get recommendation text based on severity"""
|
||
if severity == 'high':
|
||
return ("Mögliche Unterversorgung erkannt. Erwäge eine Reduktion des Energiedefizits, "
|
||
"Erhöhung der Proteinzufuhr und mehr Erholung. Dies ist keine medizinische Diagnose.")
|
||
elif severity == 'medium':
|
||
return ("Hinweise auf aggressives Defizit. Beobachte Recovery, Schlaf und Magermasse genau.")
|
||
else:
|
||
return ("Leichte Hinweise auf Belastung. Monitoring empfohlen.")
|
||
|
||
|
||
def calculate_fiber_avg_7d(profile_id: str) -> Optional[float]:
|
||
"""Calculate average fiber intake (g/day) last 7 days"""
|
||
# TODO: Implement when fiber column added to nutrition_log
|
||
return None
|
||
|
||
|
||
def calculate_sugar_avg_7d(profile_id: str) -> Optional[float]:
|
||
"""Calculate average sugar intake (g/day) last 7 days"""
|
||
# TODO: Implement when sugar column added to nutrition_log
|
||
return None
|
||
|
||
|
||
def calculate_nutrition_data_quality(profile_id: str) -> Dict[str, any]:
|
||
"""
|
||
Assess data quality for nutrition metrics
|
||
Returns dict with quality score and details
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Nutrition entries last 28 days
|
||
cur.execute("""
|
||
SELECT COUNT(*) as total,
|
||
COUNT(protein_g) as with_protein,
|
||
COUNT(fat_g) as with_fat,
|
||
COUNT(carbs_g) as with_carbs
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
""", (profile_id,))
|
||
|
||
counts = cur.fetchone()
|
||
|
||
total_entries = counts['total']
|
||
protein_coverage = counts['with_protein'] / total_entries if total_entries > 0 else 0
|
||
macro_coverage = min(counts['with_fat'], counts['with_carbs']) / total_entries if total_entries > 0 else 0
|
||
|
||
# Score components
|
||
frequency_score = min(100, (total_entries / 21) * 100) # 21 = 75% of 28 days
|
||
protein_score = protein_coverage * 100
|
||
macro_score = macro_coverage * 100
|
||
|
||
# Overall score (frequency 50%, protein 30%, macros 20%)
|
||
overall_score = int(
|
||
frequency_score * 0.5 +
|
||
protein_score * 0.3 +
|
||
macro_score * 0.2
|
||
)
|
||
|
||
# Confidence level
|
||
if overall_score >= 80:
|
||
confidence = "high"
|
||
elif overall_score >= 60:
|
||
confidence = "medium"
|
||
else:
|
||
confidence = "low"
|
||
|
||
return {
|
||
"overall_score": overall_score,
|
||
"confidence": confidence,
|
||
"measurements": {
|
||
"entries_28d": total_entries,
|
||
"protein_coverage_pct": int(protein_coverage * 100),
|
||
"macro_coverage_pct": int(macro_coverage * 100)
|
||
},
|
||
"component_scores": {
|
||
"frequency": int(frequency_score),
|
||
"protein": int(protein_score),
|
||
"macros": int(macro_score)
|
||
}
|
||
}
|