- Updated the Gitea issues index to reflect the latest state as of 2026-04-11, adding issue #76 to the list. - Refined data handling in `activity_metrics.py`, `body_metrics.py`, `nutrition_metrics.py`, and `scores.py` to ensure consistent float conversions for calculations, improving accuracy in metric evaluations. - Enhanced the calculation logic for various metrics to handle potential None values more robustly, ensuring smoother data processing and improved reliability across the application. These changes improve the clarity of the Gitea issues documentation and enhance the overall accuracy and reliability of health and fitness metrics.
1161 lines
36 KiB
Python
1161 lines
36 KiB
Python
"""
|
||
Nutrition Metrics Data Layer
|
||
|
||
Provides structured data for nutrition tracking and analysis.
|
||
|
||
Functions:
|
||
- get_nutrition_average_data(): Average calor
|
||
|
||
ies, protein, carbs, fat
|
||
- get_nutrition_days_data(): Number of days with nutrition data
|
||
- get_protein_targets_data(): Protein targets based on weight
|
||
- get_energy_balance_data(): Energy balance calculation
|
||
- get_protein_adequacy_data(): Protein adequacy score
|
||
- get_macro_consistency_data(): Macro consistency analysis
|
||
|
||
All functions return structured data (dict) without formatting.
|
||
Use placeholder_resolver.py for formatted strings for AI.
|
||
|
||
Phase 0c: Multi-Layer Architecture
|
||
Version: 1.0
|
||
"""
|
||
|
||
from typing import Dict, List, Optional
|
||
from datetime import datetime, timedelta, date
|
||
from db import get_db, get_cursor, r2d
|
||
from data_layer.utils import calculate_confidence, safe_float, safe_int
|
||
|
||
# Fallback TDEE (kcal/day) when demographics for Mifflin–St Jeor are incomplete.
|
||
TDEE_KCAL_PER_KG_BODYWEIGHT = 32.5
|
||
# PAL applied to MSJ BMR when height, sex, dob and weight are available (moderate activity).
|
||
TDEE_PAL_MODERATE = 1.55
|
||
|
||
|
||
def _age_years_from_dob(dob) -> Optional[int]:
|
||
if dob is None:
|
||
return None
|
||
try:
|
||
if isinstance(dob, str):
|
||
birth = datetime.strptime(dob[:10], "%Y-%m-%d").date()
|
||
else:
|
||
birth = dob
|
||
today = date.today()
|
||
return today.year - birth.year - ((today.month, today.day) < (birth.month, birth.day))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _mifflin_st_jeor_bmr_kcal(
|
||
weight_kg: float, height_cm: float, age_years: int, sex_is_male: bool
|
||
) -> float:
|
||
if sex_is_male:
|
||
return 10.0 * weight_kg + 6.25 * height_cm - 5.0 * age_years + 5.0
|
||
return 10.0 * weight_kg + 6.25 * height_cm - 5.0 * age_years - 161.0
|
||
|
||
|
||
def estimate_tdee_kcal_from_latest_weight(profile_id: str) -> Optional[float]:
|
||
"""
|
||
Estimated TDEE (kcal/day).
|
||
|
||
Primary: Mifflin–St Jeor BMR × TDEE_PAL_MODERATE when latest weight plus
|
||
profiles.height, profiles.sex, profiles.dob are usable.
|
||
|
||
Fallback: latest weight (kg) × TDEE_KCAL_PER_KG_BODYWEIGHT (legacy heuristic).
|
||
|
||
Returns None if no weight on record.
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""SELECT weight FROM weight_log
|
||
WHERE profile_id=%s ORDER BY date DESC LIMIT 1""",
|
||
(profile_id,),
|
||
)
|
||
wrow = cur.fetchone()
|
||
if not wrow or wrow["weight"] is None:
|
||
return None
|
||
weight_kg = float(wrow["weight"])
|
||
|
||
cur.execute(
|
||
"SELECT height, sex, dob FROM profiles WHERE id=%s",
|
||
(profile_id,),
|
||
)
|
||
prow = cur.fetchone()
|
||
|
||
if prow and prow.get("height") and prow.get("sex") is not None and prow.get("dob"):
|
||
height_cm = float(prow["height"])
|
||
age = _age_years_from_dob(prow["dob"])
|
||
if age is not None and 10 < age < 120 and height_cm > 50:
|
||
sex_raw = str(prow["sex"]).strip().lower()
|
||
sex_is_male = sex_raw in ("m", "male", "männlich", "mann")
|
||
bmr = _mifflin_st_jeor_bmr_kcal(weight_kg, height_cm, age, sex_is_male)
|
||
if bmr > 400:
|
||
return bmr * TDEE_PAL_MODERATE
|
||
|
||
return weight_kg * TDEE_KCAL_PER_KG_BODYWEIGHT
|
||
|
||
|
||
def _get_profile_goal_mode(profile_id: str) -> str:
|
||
"""Strategic goal_mode from profiles (Phase 0a); defaults to health."""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT goal_mode FROM profiles WHERE id=%s", (profile_id,))
|
||
row = cur.fetchone()
|
||
if row and row.get("goal_mode"):
|
||
g = str(row["goal_mode"]).strip().lower()
|
||
if g:
|
||
return g
|
||
return "health"
|
||
|
||
|
||
def get_nutrition_average_data(
|
||
profile_id: str,
|
||
days: int = 30
|
||
) -> Dict:
|
||
"""
|
||
Get average nutrition values for all macros.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 30)
|
||
|
||
Returns:
|
||
{
|
||
"kcal_avg": float,
|
||
"protein_avg": float,
|
||
"carbs_avg": float,
|
||
"fat_avg": float,
|
||
"data_points": int,
|
||
"confidence": str,
|
||
"days_analyzed": int
|
||
}
|
||
|
||
Migration from Phase 0b:
|
||
OLD: get_nutrition_avg(pid, field, days) per field
|
||
NEW: All macros in one call
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
# Mean over calendar days (per-day sums), not over raw log rows.
|
||
cur.execute(
|
||
"""SELECT
|
||
AVG(daily_kcal) AS kcal_avg,
|
||
AVG(daily_protein) AS protein_avg,
|
||
AVG(daily_carbs) AS carbs_avg,
|
||
AVG(daily_fat) AS fat_avg,
|
||
COUNT(*)::int AS day_count
|
||
FROM (
|
||
SELECT date,
|
||
COALESCE(SUM(kcal), 0)::float AS daily_kcal,
|
||
COALESCE(SUM(protein_g), 0)::float AS daily_protein,
|
||
COALESCE(SUM(carbs_g), 0)::float AS daily_carbs,
|
||
COALESCE(SUM(fat_g), 0)::float AS daily_fat
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s
|
||
GROUP BY date
|
||
) AS daily""",
|
||
(profile_id, cutoff),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row or row["day_count"] == 0:
|
||
return {
|
||
"kcal_avg": 0.0,
|
||
"protein_avg": 0.0,
|
||
"carbs_avg": 0.0,
|
||
"fat_avg": 0.0,
|
||
"data_points": 0,
|
||
"confidence": "insufficient",
|
||
"days_analyzed": days
|
||
}
|
||
|
||
data_points = row["day_count"]
|
||
confidence = calculate_confidence(data_points, days, "general")
|
||
|
||
return {
|
||
"kcal_avg": safe_float(row['kcal_avg']),
|
||
"protein_avg": safe_float(row['protein_avg']),
|
||
"carbs_avg": safe_float(row['carbs_avg']),
|
||
"fat_avg": safe_float(row['fat_avg']),
|
||
"data_points": data_points,
|
||
"confidence": confidence,
|
||
"days_analyzed": days
|
||
}
|
||
|
||
|
||
def get_nutrition_days_data(
|
||
profile_id: str,
|
||
days: int = 30
|
||
) -> Dict:
|
||
"""
|
||
Count days with nutrition data.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 30)
|
||
|
||
Returns:
|
||
{
|
||
"days_with_data": int,
|
||
"days_analyzed": int,
|
||
"coverage_pct": float,
|
||
"confidence": str
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT COUNT(DISTINCT date) as days
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s""",
|
||
(profile_id, cutoff)
|
||
)
|
||
row = cur.fetchone()
|
||
days_with_data = row['days'] if row else 0
|
||
|
||
coverage_pct = (days_with_data / days * 100) if days > 0 else 0
|
||
confidence = calculate_confidence(days_with_data, days, "general")
|
||
|
||
return {
|
||
"days_with_data": days_with_data,
|
||
"days_analyzed": days,
|
||
"coverage_pct": coverage_pct,
|
||
"confidence": confidence
|
||
}
|
||
|
||
|
||
def get_protein_targets_data(
|
||
profile_id: str
|
||
) -> Dict:
|
||
"""
|
||
Calculate protein targets based on current weight.
|
||
|
||
Targets:
|
||
- Low: 1.6 g/kg (maintenance)
|
||
- High: 2.2 g/kg (muscle building)
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
|
||
Returns:
|
||
{
|
||
"current_weight": float,
|
||
"protein_target_low": float, # 1.6 g/kg
|
||
"protein_target_high": float, # 2.2 g/kg
|
||
"confidence": str
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""SELECT weight FROM weight_log
|
||
WHERE profile_id=%s ORDER BY date DESC LIMIT 1""",
|
||
(profile_id,)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
return {
|
||
"current_weight": 0.0,
|
||
"protein_target_low": 0.0,
|
||
"protein_target_high": 0.0,
|
||
"confidence": "insufficient"
|
||
}
|
||
|
||
weight = safe_float(row['weight'])
|
||
|
||
return {
|
||
"current_weight": weight,
|
||
"protein_target_low": weight * 1.6,
|
||
"protein_target_high": weight * 2.2,
|
||
"confidence": "high"
|
||
}
|
||
|
||
|
||
def get_energy_balance_data(
|
||
profile_id: str,
|
||
days: int = 7
|
||
) -> Dict:
|
||
"""
|
||
Energy balance (intake - estimated expenditure), kcal/day.
|
||
|
||
Intake: mean of daily total kcal (sum per calendar day).
|
||
TDEE: estimate_tdee_kcal_from_latest_weight (MSJ × PAL oder kg-Fallback).
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT date, SUM(kcal)::float AS daily_kcal
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s AND kcal IS NOT NULL
|
||
GROUP BY date
|
||
ORDER BY date""",
|
||
(profile_id, cutoff),
|
||
)
|
||
daily_rows = cur.fetchall()
|
||
|
||
if not daily_rows:
|
||
return {
|
||
"energy_balance": 0.0,
|
||
"avg_intake": 0.0,
|
||
"estimated_tdee": 0.0,
|
||
"status": "unknown",
|
||
"confidence": "insufficient",
|
||
"days_analyzed": days,
|
||
"data_points": 0,
|
||
}
|
||
|
||
daily_totals = [safe_float(r["daily_kcal"]) for r in daily_rows]
|
||
avg_intake = sum(daily_totals) / len(daily_totals)
|
||
data_points = len(daily_totals)
|
||
|
||
estimated_tdee = estimate_tdee_kcal_from_latest_weight(profile_id)
|
||
if estimated_tdee is None:
|
||
return {
|
||
"energy_balance": 0.0,
|
||
"avg_intake": avg_intake,
|
||
"estimated_tdee": 0.0,
|
||
"status": "unknown",
|
||
"confidence": "insufficient",
|
||
"days_analyzed": days,
|
||
"data_points": data_points
|
||
}
|
||
|
||
energy_balance = avg_intake - estimated_tdee
|
||
|
||
if energy_balance < -200:
|
||
status = "deficit"
|
||
elif energy_balance > 200:
|
||
status = "surplus"
|
||
else:
|
||
status = "maintenance"
|
||
|
||
confidence = calculate_confidence(data_points, days, "general")
|
||
|
||
return {
|
||
"energy_balance": energy_balance,
|
||
"avg_intake": avg_intake,
|
||
"estimated_tdee": estimated_tdee,
|
||
"status": status,
|
||
"confidence": confidence,
|
||
"days_analyzed": days,
|
||
"data_points": data_points
|
||
}
|
||
|
||
|
||
def get_protein_adequacy_data(
|
||
profile_id: str,
|
||
days: int = 28
|
||
) -> Dict:
|
||
"""
|
||
Calculate protein adequacy score (0-100).
|
||
|
||
Score based on:
|
||
- Daily protein intake vs. target (1.6-2.2 g/kg)
|
||
- Consistency across days
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 28)
|
||
|
||
Returns:
|
||
{
|
||
"adequacy_score": int, # 0-100
|
||
"avg_protein_g": float,
|
||
"target_protein_low": float,
|
||
"target_protein_high": float,
|
||
"protein_g_per_kg": float,
|
||
"days_in_target": int,
|
||
"days_with_data": int,
|
||
"confidence": str
|
||
}
|
||
"""
|
||
targets = get_protein_targets_data(profile_id)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT COALESCE(SUM(protein_g), 0)::float AS daily_protein
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s
|
||
GROUP BY date""",
|
||
(profile_id, cutoff),
|
||
)
|
||
rows = cur.fetchall()
|
||
|
||
if not rows or targets.get("confidence") == "insufficient" or targets["current_weight"] <= 0:
|
||
return {
|
||
"adequacy_score": 0,
|
||
"avg_protein_g": 0.0,
|
||
"target_protein_low": targets['protein_target_low'],
|
||
"target_protein_high": targets['protein_target_high'],
|
||
"protein_g_per_kg": 0.0,
|
||
"days_in_target": 0,
|
||
"days_with_data": 0,
|
||
"confidence": "insufficient"
|
||
}
|
||
|
||
daily_totals = [safe_float(r["daily_protein"]) for r in rows]
|
||
days_with_data = len(daily_totals)
|
||
low = targets["protein_target_low"]
|
||
high = targets["protein_target_high"]
|
||
days_in_target = sum(1 for d in daily_totals if low <= d <= high)
|
||
|
||
avg_protein = sum(daily_totals) / days_with_data
|
||
protein_g_per_kg = avg_protein / targets["current_weight"] if targets["current_weight"] > 0 else 0.0
|
||
|
||
target_pct = (days_in_target / days_with_data * 100) if days_with_data > 0 else 0
|
||
target_mid = (low + high) / 2
|
||
avg_vs_target = (avg_protein / target_mid) if target_mid > 0 else 0
|
||
|
||
adequacy_score = int(target_pct * 0.7 + min(avg_vs_target * 100, 100) * 0.3)
|
||
adequacy_score = max(0, min(100, adequacy_score))
|
||
|
||
confidence = calculate_confidence(days_with_data, days, "general")
|
||
|
||
return {
|
||
"adequacy_score": adequacy_score,
|
||
"avg_protein_g": avg_protein,
|
||
"target_protein_low": targets['protein_target_low'],
|
||
"target_protein_high": targets['protein_target_high'],
|
||
"protein_g_per_kg": protein_g_per_kg,
|
||
"days_in_target": days_in_target,
|
||
"days_with_data": days_with_data,
|
||
"confidence": confidence
|
||
}
|
||
|
||
|
||
def get_macro_consistency_data(
|
||
profile_id: str,
|
||
days: int = 28
|
||
) -> Dict:
|
||
"""
|
||
Calculate macro consistency score (0-100).
|
||
|
||
Measures how consistent macronutrient ratios are across days.
|
||
High consistency = predictable nutrition, easier to track progress.
|
||
|
||
Args:
|
||
profile_id: User profile ID
|
||
days: Analysis window (default 28)
|
||
|
||
Returns:
|
||
{
|
||
"consistency_score": int, # 0-100 (100 = very consistent)
|
||
"avg_protein_pct": float,
|
||
"avg_carbs_pct": float,
|
||
"avg_fat_pct": float,
|
||
"std_dev_protein": float, # Standard deviation
|
||
"std_dev_carbs": float,
|
||
"std_dev_fat": float,
|
||
"confidence": str,
|
||
"data_points": int
|
||
}
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
||
|
||
cur.execute(
|
||
"""SELECT
|
||
COALESCE(SUM(kcal), 0)::float AS kcal,
|
||
COALESCE(SUM(protein_g), 0)::float AS protein_g,
|
||
COALESCE(SUM(carbs_g), 0)::float AS carbs_g,
|
||
COALESCE(SUM(fat_g), 0)::float AS fat_g
|
||
FROM nutrition_log
|
||
WHERE profile_id=%s AND date >= %s
|
||
GROUP BY date
|
||
HAVING COALESCE(SUM(kcal), 0) > 0
|
||
AND COALESCE(SUM(protein_g), 0) > 0
|
||
AND COALESCE(SUM(carbs_g), 0) > 0
|
||
AND COALESCE(SUM(fat_g), 0) > 0""",
|
||
(profile_id, cutoff),
|
||
)
|
||
rows = cur.fetchall()
|
||
|
||
if len(rows) < 3:
|
||
return {
|
||
"consistency_score": 0,
|
||
"avg_protein_pct": 0.0,
|
||
"avg_carbs_pct": 0.0,
|
||
"avg_fat_pct": 0.0,
|
||
"std_dev_protein": 0.0,
|
||
"std_dev_carbs": 0.0,
|
||
"std_dev_fat": 0.0,
|
||
"confidence": "insufficient",
|
||
"data_points": len(rows)
|
||
}
|
||
|
||
import statistics
|
||
|
||
protein_pcts = []
|
||
carbs_pcts = []
|
||
fat_pcts = []
|
||
|
||
for row in rows:
|
||
total_kcal = safe_float(row['kcal'])
|
||
if total_kcal == 0:
|
||
continue
|
||
|
||
protein_kcal = safe_float(row['protein_g']) * 4
|
||
carbs_kcal = safe_float(row['carbs_g']) * 4
|
||
fat_kcal = safe_float(row['fat_g']) * 9
|
||
|
||
macro_kcal_total = protein_kcal + carbs_kcal + fat_kcal
|
||
|
||
if macro_kcal_total > 0:
|
||
protein_pcts.append(protein_kcal / macro_kcal_total * 100)
|
||
carbs_pcts.append(carbs_kcal / macro_kcal_total * 100)
|
||
fat_pcts.append(fat_kcal / macro_kcal_total * 100)
|
||
|
||
if len(protein_pcts) < 3:
|
||
return {
|
||
"consistency_score": 0,
|
||
"avg_protein_pct": 0.0,
|
||
"avg_carbs_pct": 0.0,
|
||
"avg_fat_pct": 0.0,
|
||
"std_dev_protein": 0.0,
|
||
"std_dev_carbs": 0.0,
|
||
"std_dev_fat": 0.0,
|
||
"confidence": "insufficient",
|
||
"data_points": len(protein_pcts)
|
||
}
|
||
|
||
# Calculate averages and standard deviations
|
||
avg_protein_pct = statistics.mean(protein_pcts)
|
||
avg_carbs_pct = statistics.mean(carbs_pcts)
|
||
avg_fat_pct = statistics.mean(fat_pcts)
|
||
|
||
std_protein = statistics.stdev(protein_pcts) if len(protein_pcts) > 1 else 0.0
|
||
std_carbs = statistics.stdev(carbs_pcts) if len(carbs_pcts) > 1 else 0.0
|
||
std_fat = statistics.stdev(fat_pcts) if len(fat_pcts) > 1 else 0.0
|
||
|
||
# Consistency score: inverse of average standard deviation
|
||
# Lower std_dev = higher consistency
|
||
avg_std = (std_protein + std_carbs + std_fat) / 3
|
||
|
||
# Score: 100 - (avg_std * scale_factor)
|
||
# avg_std of 5% = score 75, avg_std of 10% = score 50, avg_std of 20% = score 0
|
||
consistency_score = max(0, min(100, int(100 - (avg_std * 5))))
|
||
|
||
confidence = calculate_confidence(len(protein_pcts), days, "general")
|
||
|
||
return {
|
||
"consistency_score": consistency_score,
|
||
"avg_protein_pct": avg_protein_pct,
|
||
"avg_carbs_pct": avg_carbs_pct,
|
||
"avg_fat_pct": avg_fat_pct,
|
||
"std_dev_protein": std_protein,
|
||
"std_dev_carbs": std_carbs,
|
||
"std_dev_fat": std_fat,
|
||
"confidence": confidence,
|
||
"data_points": len(protein_pcts)
|
||
}
|
||
|
||
|
||
# ============================================================================
|
||
# Calculated Metrics (migrated from calculations/nutrition_metrics.py)
|
||
# ============================================================================
|
||
# These functions return simple values for placeholders.
|
||
# Use get_*_data() functions above for structured chart data.
|
||
|
||
|
||
def calculate_energy_balance_7d(profile_id: str) -> Optional[float]:
|
||
"""
|
||
7-day mean energy balance (kcal/day), same rules as get_energy_balance_data(..., 7).
|
||
"""
|
||
data = get_energy_balance_data(profile_id, 7)
|
||
if data["data_points"] < 4:
|
||
return None
|
||
tdee = data.get("estimated_tdee") or 0
|
||
if tdee <= 0:
|
||
return None
|
||
return round(float(data["energy_balance"]), 0)
|
||
|
||
|
||
def calculate_energy_deficit_surplus(profile_id: str, days: int = 7) -> Optional[str]:
|
||
"""
|
||
Classify energy balance as deficit/maintenance/surplus
|
||
Returns: 'deficit', 'maintenance', 'surplus', or None
|
||
"""
|
||
balance = calculate_energy_balance_7d(profile_id)
|
||
|
||
if balance is None:
|
||
return None
|
||
|
||
if balance < -200:
|
||
return 'deficit'
|
||
elif balance > 200:
|
||
return 'surplus'
|
||
else:
|
||
return 'maintenance'
|
||
|
||
|
||
def calculate_protein_g_per_kg(profile_id: str) -> Optional[float]:
|
||
"""Calculate average protein intake in g/kg bodyweight (last 7 days)"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Get recent weight
|
||
cur.execute("""
|
||
SELECT weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
ORDER BY date DESC
|
||
LIMIT 1
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row:
|
||
return None
|
||
|
||
weight = float(weight_row['weight'])
|
||
|
||
# Get protein intake aggregated by day (SUM per day)
|
||
cur.execute("""
|
||
SELECT date, SUM(protein_g) as daily_protein
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '7 days'
|
||
AND protein_g IS NOT NULL
|
||
GROUP BY date
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
daily_protein = [float(row['daily_protein']) for row in cur.fetchall()]
|
||
|
||
if len(daily_protein) < 4: # At least 4 days with data
|
||
return None
|
||
|
||
avg_protein = sum(daily_protein) / len(daily_protein)
|
||
protein_per_kg = avg_protein / weight
|
||
|
||
return round(protein_per_kg, 2)
|
||
|
||
|
||
def calculate_protein_days_in_target(profile_id: str, target_low: float = 1.6, target_high: float = 2.2) -> Optional[str]:
|
||
"""
|
||
Calculate how many days in last 7 were within protein target
|
||
Returns: "5/7" format or None
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Get recent weight
|
||
cur.execute("""
|
||
SELECT weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
ORDER BY date DESC
|
||
LIMIT 1
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row:
|
||
return None
|
||
|
||
weight = float(weight_row['weight'])
|
||
|
||
# Calculate protein target range (absolute values)
|
||
target_low_g = target_low * weight
|
||
target_high_g = target_high * weight
|
||
|
||
# Get protein intake aggregated by day (SUM per day)
|
||
cur.execute("""
|
||
SELECT date, SUM(protein_g) as daily_protein
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '7 days'
|
||
AND protein_g IS NOT NULL
|
||
GROUP BY date
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
daily_data = cur.fetchall()
|
||
|
||
if len(daily_data) < 4: # At least 4 days with data
|
||
return None
|
||
|
||
# Count days in target range
|
||
days_in_target = 0
|
||
total_days = len(daily_data)
|
||
|
||
for row in daily_data:
|
||
daily_protein = float(row['daily_protein'])
|
||
if target_low_g <= daily_protein <= target_high_g:
|
||
days_in_target += 1
|
||
|
||
return f"{days_in_target}/{total_days}"
|
||
|
||
|
||
def calculate_protein_adequacy_28d(profile_id: str) -> Optional[int]:
|
||
"""
|
||
Protein adequacy score 0-100 (last 28 days).
|
||
Uses per-calendar-day total protein vs. average weight in the window (g/kg per day).
|
||
"""
|
||
import statistics
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
cur.execute("""
|
||
SELECT AVG(weight) as avg_weight
|
||
FROM weight_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
""", (profile_id,))
|
||
|
||
weight_row = cur.fetchone()
|
||
if not weight_row or not weight_row['avg_weight']:
|
||
return None
|
||
|
||
weight = float(weight_row['avg_weight'])
|
||
|
||
cur.execute("""
|
||
SELECT COALESCE(SUM(protein_g), 0)::float AS daily_protein
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
GROUP BY date
|
||
""", (profile_id,))
|
||
|
||
daily_totals = [float(row['daily_protein']) for row in cur.fetchall()]
|
||
|
||
if len(daily_totals) < 18:
|
||
return None
|
||
|
||
protein_per_kg_values = [p / weight for p in daily_totals]
|
||
avg_protein_per_kg = sum(protein_per_kg_values) / len(protein_per_kg_values)
|
||
|
||
if 1.6 <= avg_protein_per_kg <= 2.2:
|
||
base_score = 100
|
||
elif avg_protein_per_kg < 1.6:
|
||
base_score = max(40, 100 - ((1.6 - avg_protein_per_kg) * 40))
|
||
else:
|
||
base_score = max(80, 100 - ((avg_protein_per_kg - 2.2) * 10))
|
||
|
||
std_dev = statistics.stdev(protein_per_kg_values)
|
||
if std_dev < 0.3:
|
||
consistency_bonus = 10
|
||
elif std_dev < 0.5:
|
||
consistency_bonus = 0
|
||
else:
|
||
consistency_bonus = -10
|
||
|
||
final_score = min(100, max(0, base_score + consistency_bonus))
|
||
|
||
return int(final_score)
|
||
|
||
|
||
def calculate_macro_consistency_score(profile_id: str) -> Optional[int]:
|
||
"""
|
||
Macro consistency score 0-100 (last 28 days).
|
||
CV of daily totals (kcal and macros), not raw log rows.
|
||
"""
|
||
import statistics
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("""
|
||
SELECT
|
||
COALESCE(SUM(kcal), 0)::float AS dk,
|
||
COALESCE(SUM(protein_g), 0)::float AS dp,
|
||
COALESCE(SUM(fat_g), 0)::float AS df,
|
||
COALESCE(SUM(carbs_g), 0)::float AS dc
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
GROUP BY date
|
||
HAVING COALESCE(SUM(kcal), 0) > 0
|
||
""", (profile_id,))
|
||
|
||
data = cur.fetchall()
|
||
|
||
if len(data) < 18:
|
||
return None
|
||
|
||
def cv(values):
|
||
if not values or len(values) < 2:
|
||
return None
|
||
mean = sum(values) / len(values)
|
||
if mean == 0:
|
||
return None
|
||
std_dev = statistics.stdev(values)
|
||
return std_dev / mean
|
||
|
||
calories_cv = cv([d['dk'] for d in data])
|
||
protein_cv = cv([d['dp'] for d in data if d['dp']])
|
||
fat_cv = cv([d['df'] for d in data if d['df']])
|
||
carbs_cv = cv([d['dc'] for d in data if d['dc']])
|
||
|
||
cv_values = [v for v in [calories_cv, protein_cv, fat_cv, carbs_cv] if v is not None]
|
||
|
||
if not cv_values:
|
||
return None
|
||
|
||
avg_cv = sum(cv_values) / len(cv_values)
|
||
|
||
if avg_cv < 0.2:
|
||
score = 100
|
||
elif avg_cv < 0.3:
|
||
score = 85
|
||
elif avg_cv < 0.4:
|
||
score = 70
|
||
elif avg_cv < 0.5:
|
||
score = 55
|
||
else:
|
||
score = max(30, 100 - (avg_cv * 100))
|
||
|
||
return int(score)
|
||
|
||
|
||
def calculate_intake_volatility(profile_id: str) -> Optional[str]:
|
||
"""
|
||
Classify intake volatility: 'stable', 'moderate', 'high'
|
||
"""
|
||
consistency = calculate_macro_consistency_score(profile_id)
|
||
|
||
if consistency is None:
|
||
return None
|
||
|
||
if consistency >= 80:
|
||
return 'stable'
|
||
elif consistency >= 60:
|
||
return 'moderate'
|
||
else:
|
||
return 'high'
|
||
|
||
|
||
def calculate_nutrition_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]:
|
||
"""
|
||
Nutrition adherence score 0-100
|
||
Weighted by user's nutrition-related focus areas
|
||
"""
|
||
if focus_weights is None:
|
||
# Import here to avoid circular dependency
|
||
from data_layer.scores import get_user_focus_weights
|
||
focus_weights = get_user_focus_weights(profile_id)
|
||
|
||
# Nutrition-related focus areas (English keys from DB; Gewichte immer float)
|
||
protein_intake = float(focus_weights.get('protein_intake', 0) or 0)
|
||
calorie_balance = float(focus_weights.get('calorie_balance', 0) or 0)
|
||
macro_consistency = float(focus_weights.get('macro_consistency', 0) or 0)
|
||
meal_timing = float(focus_weights.get('meal_timing', 0) or 0)
|
||
hydration = float(focus_weights.get('hydration', 0) or 0)
|
||
|
||
total_nutrition_weight = (
|
||
protein_intake + calorie_balance + macro_consistency + meal_timing + hydration
|
||
)
|
||
|
||
if total_nutrition_weight == 0:
|
||
return None # No nutrition goals
|
||
|
||
components = []
|
||
|
||
# 1. Calorie target adherence (if calorie_balance goal active)
|
||
if calorie_balance > 0:
|
||
calorie_score = _score_calorie_adherence(profile_id)
|
||
if calorie_score is not None:
|
||
components.append(('calories', calorie_score, calorie_balance))
|
||
|
||
# 2. Protein target adherence (if protein_intake goal active)
|
||
if protein_intake > 0:
|
||
protein_score = calculate_protein_adequacy_28d(profile_id)
|
||
if protein_score is not None:
|
||
components.append(('protein', protein_score, protein_intake))
|
||
|
||
# 3. Intake consistency (if macro_consistency goal active)
|
||
if macro_consistency > 0:
|
||
consistency_score = calculate_macro_consistency_score(profile_id)
|
||
if consistency_score is not None:
|
||
components.append(('consistency', consistency_score, macro_consistency))
|
||
|
||
# 4. Macro balance (always relevant if any nutrition goal)
|
||
if total_nutrition_weight > 0:
|
||
macro_score = _score_macro_balance(profile_id)
|
||
if macro_score is not None:
|
||
# Use 20% of total weight for macro balance
|
||
components.append(('macros', macro_score, total_nutrition_weight * 0.2))
|
||
|
||
if not components:
|
||
return None
|
||
|
||
# Weighted average (float: DB-Werte können Decimal sein)
|
||
total_score = sum(float(score) * float(weight) for _, score, weight in components)
|
||
total_weight = sum(float(weight) for _, _, weight in components)
|
||
|
||
return int(total_score / total_weight)
|
||
|
||
|
||
def _score_calorie_adherence(profile_id: str) -> Optional[int]:
|
||
"""Score calorie target adherence (0–100) using 7d balance vs profiles.goal_mode."""
|
||
balance = calculate_energy_balance_7d(profile_id)
|
||
if balance is None:
|
||
return None
|
||
|
||
mode = _get_profile_goal_mode(profile_id)
|
||
b = float(balance)
|
||
|
||
def _weight_loss(x: float) -> int:
|
||
if -550 <= x <= -250:
|
||
return 100
|
||
if x > 450:
|
||
return 38
|
||
if -750 <= x < -550 or -250 < x <= 120:
|
||
return 82
|
||
if x < -1200:
|
||
return 52
|
||
if -950 <= x < -750 or 120 < x <= 350:
|
||
return 68
|
||
return 58
|
||
|
||
def _surplus_friendly(x: float) -> int:
|
||
if 80 <= x <= 480:
|
||
return 100
|
||
if -120 <= x < 80 or 480 < x <= 700:
|
||
return 86
|
||
if -380 <= x < -120:
|
||
return 68
|
||
if x > 850:
|
||
return 54
|
||
if x < -650:
|
||
return 44
|
||
return 72
|
||
|
||
def _maintenance(x: float) -> int:
|
||
a = abs(x)
|
||
if a <= 200:
|
||
return 100
|
||
if a <= 400:
|
||
return 84
|
||
if a <= 650:
|
||
return 70
|
||
if a <= 900:
|
||
return 55
|
||
return 40
|
||
|
||
if mode == "weight_loss":
|
||
return _weight_loss(b)
|
||
if mode in ("strength", "recomposition"):
|
||
return _surplus_friendly(b)
|
||
return _maintenance(b)
|
||
|
||
|
||
def _score_macro_balance(profile_id: str) -> Optional[int]:
|
||
"""Score macro balance (0-100)"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("""
|
||
SELECT protein_g, fat_g, carbs_g, kcal
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
AND protein_g IS NOT NULL
|
||
AND fat_g IS NOT NULL
|
||
AND carbs_g IS NOT NULL
|
||
ORDER BY date DESC
|
||
""", (profile_id,))
|
||
|
||
data = cur.fetchall()
|
||
|
||
if len(data) < 18:
|
||
return None
|
||
|
||
# Calculate average macro percentages
|
||
macro_pcts = []
|
||
for row in data:
|
||
total_kcal = (row['protein_g'] * 4) + (row['fat_g'] * 9) + (row['carbs_g'] * 4)
|
||
if total_kcal == 0:
|
||
continue
|
||
|
||
protein_pct = (row['protein_g'] * 4 / total_kcal) * 100
|
||
fat_pct = (row['fat_g'] * 9 / total_kcal) * 100
|
||
carbs_pct = (row['carbs_g'] * 4 / total_kcal) * 100
|
||
|
||
macro_pcts.append((protein_pct, fat_pct, carbs_pct))
|
||
|
||
if not macro_pcts:
|
||
return None
|
||
|
||
avg_protein_pct = sum(p for p, _, _ in macro_pcts) / len(macro_pcts)
|
||
avg_fat_pct = sum(f for _, f, _ in macro_pcts) / len(macro_pcts)
|
||
avg_carbs_pct = sum(c for _, _, c in macro_pcts) / len(macro_pcts)
|
||
|
||
# Reasonable ranges:
|
||
# Protein: 20-35%
|
||
# Fat: 20-35%
|
||
# Carbs: 30-55%
|
||
|
||
score = 100
|
||
|
||
# Protein score
|
||
if not (20 <= avg_protein_pct <= 35):
|
||
if avg_protein_pct < 20:
|
||
score -= (20 - avg_protein_pct) * 2
|
||
else:
|
||
score -= (avg_protein_pct - 35) * 1
|
||
|
||
# Fat score
|
||
if not (20 <= avg_fat_pct <= 35):
|
||
if avg_fat_pct < 20:
|
||
score -= (20 - avg_fat_pct) * 2
|
||
else:
|
||
score -= (avg_fat_pct - 35) * 2
|
||
|
||
# Carbs score
|
||
if not (30 <= avg_carbs_pct <= 55):
|
||
if avg_carbs_pct < 30:
|
||
score -= (30 - avg_carbs_pct) * 1.5
|
||
else:
|
||
score -= (avg_carbs_pct - 55) * 1.5
|
||
|
||
return max(40, min(100, int(score)))
|
||
|
||
|
||
def calculate_energy_availability_warning(profile_id: str) -> Optional[Dict]:
|
||
"""
|
||
Heuristic energy availability warning
|
||
Returns dict with warning level and reasons
|
||
"""
|
||
warnings = []
|
||
severity = 'none' # none, low, medium, high
|
||
|
||
# 1. Check for sustained large deficit
|
||
balance = calculate_energy_balance_7d(profile_id)
|
||
if balance and balance < -800:
|
||
warnings.append('Anhaltend großes Energiedefizit (>800 kcal/Tag)')
|
||
severity = 'medium'
|
||
|
||
if balance < -1200:
|
||
warnings.append('Sehr großes Energiedefizit (>1200 kcal/Tag)')
|
||
severity = 'high'
|
||
|
||
# 2. Check recovery score
|
||
from data_layer.recovery_metrics import calculate_recovery_score_v2
|
||
recovery = calculate_recovery_score_v2(profile_id)
|
||
if recovery and recovery < 50:
|
||
warnings.append('Recovery Score niedrig (<50)')
|
||
if severity == 'none':
|
||
severity = 'low'
|
||
elif severity == 'medium':
|
||
severity = 'high'
|
||
|
||
# 3. Check LBM trend
|
||
from data_layer.body_metrics import calculate_lbm_28d_change
|
||
lbm_change = calculate_lbm_28d_change(profile_id)
|
||
if lbm_change and lbm_change < -1.0:
|
||
warnings.append('Magermasse sinkt (>1kg in 28 Tagen)')
|
||
if severity == 'none':
|
||
severity = 'low'
|
||
elif severity in ['low', 'medium']:
|
||
severity = 'high'
|
||
|
||
# 4. Check sleep quality
|
||
from data_layer.recovery_metrics import calculate_sleep_quality_7d
|
||
sleep_quality = calculate_sleep_quality_7d(profile_id)
|
||
if sleep_quality and sleep_quality < 60:
|
||
warnings.append('Schlafqualität verschlechtert')
|
||
if severity == 'none':
|
||
severity = 'low'
|
||
|
||
if not warnings:
|
||
return None
|
||
|
||
return {
|
||
'severity': severity,
|
||
'warnings': warnings,
|
||
'recommendation': _get_energy_warning_recommendation(severity)
|
||
}
|
||
|
||
|
||
def _get_energy_warning_recommendation(severity: str) -> str:
|
||
"""Get recommendation text based on severity"""
|
||
if severity == 'high':
|
||
return ("Mögliche Unterversorgung erkannt. Erwäge eine Reduktion des Energiedefizits, "
|
||
"Erhöhung der Proteinzufuhr und mehr Erholung. Dies ist keine medizinische Diagnose.")
|
||
elif severity == 'medium':
|
||
return ("Hinweise auf aggressives Defizit. Beobachte Recovery, Schlaf und Magermasse genau.")
|
||
else:
|
||
return ("Leichte Hinweise auf Belastung. Monitoring empfohlen.")
|
||
|
||
|
||
def calculate_fiber_avg_7d(profile_id: str) -> Optional[float]:
|
||
"""Calculate average fiber intake (g/day) last 7 days"""
|
||
# TODO: Implement when fiber column added to nutrition_log
|
||
return None
|
||
|
||
|
||
def calculate_sugar_avg_7d(profile_id: str) -> Optional[float]:
|
||
"""Calculate average sugar intake (g/day) last 7 days"""
|
||
# TODO: Implement when sugar column added to nutrition_log
|
||
return None
|
||
|
||
|
||
def calculate_nutrition_data_quality(profile_id: str) -> Dict[str, any]:
|
||
"""
|
||
Assess data quality for nutrition metrics
|
||
Returns dict with quality score and details
|
||
"""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Nutrition entries last 28 days
|
||
cur.execute("""
|
||
SELECT COUNT(*) as total,
|
||
COUNT(protein_g) as with_protein,
|
||
COUNT(fat_g) as with_fat,
|
||
COUNT(carbs_g) as with_carbs
|
||
FROM nutrition_log
|
||
WHERE profile_id = %s
|
||
AND date >= CURRENT_DATE - INTERVAL '28 days'
|
||
""", (profile_id,))
|
||
|
||
counts = cur.fetchone()
|
||
|
||
total_entries = counts['total']
|
||
protein_coverage = counts['with_protein'] / total_entries if total_entries > 0 else 0
|
||
macro_coverage = min(counts['with_fat'], counts['with_carbs']) / total_entries if total_entries > 0 else 0
|
||
|
||
# Score components
|
||
frequency_score = min(100, (total_entries / 21) * 100) # 21 = 75% of 28 days
|
||
protein_score = protein_coverage * 100
|
||
macro_score = macro_coverage * 100
|
||
|
||
# Overall score (frequency 50%, protein 30%, macros 20%)
|
||
overall_score = int(
|
||
frequency_score * 0.5 +
|
||
protein_score * 0.3 +
|
||
macro_score * 0.2
|
||
)
|
||
|
||
# Confidence level
|
||
if overall_score >= 80:
|
||
confidence = "high"
|
||
elif overall_score >= 60:
|
||
confidence = "medium"
|
||
else:
|
||
confidence = "low"
|
||
|
||
return {
|
||
"overall_score": overall_score,
|
||
"confidence": confidence,
|
||
"measurements": {
|
||
"entries_28d": total_entries,
|
||
"protein_coverage_pct": int(protein_coverage * 100),
|
||
"macro_coverage_pct": int(macro_coverage * 100)
|
||
},
|
||
"component_scores": {
|
||
"frequency": int(frequency_score),
|
||
"protein": int(protein_score),
|
||
"macros": int(macro_score)
|
||
}
|
||
}
|