"""Derive lean_mass and fat_mass from weight_log + body_fat_pct for caliper entries.""" from datetime import date, datetime from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple from db import get_cursor, r2d def as_date(d: Any) -> Optional[date]: if d is None: return None if isinstance(d, datetime): return d.date() if isinstance(d, date): return d if isinstance(d, str) and len(d) >= 10: return datetime.strptime(d[:10], "%Y-%m-%d").date() return None def _floaty(x: Any) -> float: if isinstance(x, Decimal): return float(x) return float(x) def load_weight_rows(conn: Any, profile_id: str) -> List[Dict[str, Any]]: cur = get_cursor(conn) cur.execute( "SELECT date, weight FROM weight_log WHERE profile_id=%s ORDER BY date", (profile_id,), ) return [r2d(r) for r in cur.fetchall()] def nearest_weight_kg(weight_rows: List[Dict[str, Any]], caliper_date: Any) -> Optional[float]: td = as_date(caliper_date) if td is None or not weight_rows: return None best: Optional[float] = None best_delta: Optional[int] = None for row in weight_rows: wd = as_date(row.get("date")) if wd is None: continue wraw = row.get("weight") if wraw is None: continue delta = abs((wd - td).days) if best_delta is None or delta < best_delta: best_delta = delta best = _floaty(wraw) return best def nearest_weight_kg_from_map(wlog: Dict[Any, Any], target_date: Any) -> Optional[float]: if not wlog: return None td = as_date(target_date) if td is None: return None best_w: Optional[float] = None best_delta: Optional[int] = None for wd, wval in wlog.items(): if wval is None: continue wdate = as_date(wd) if wdate is None: continue delta = abs((wdate - td).days) if best_delta is None or delta < best_delta: best_delta = delta best_w = _floaty(wval) return best_w def compute_lean_fat_kg(weight_kg: float, body_fat_pct: float) -> Tuple[float, float]: lean = round(weight_kg - (weight_kg * body_fat_pct / 100.0), 2) fat = round(weight_kg * body_fat_pct / 100.0, 2) return lean, fat def fill_caliper_body_comp(d: Dict[str, Any], conn: Any, profile_id: str) -> None: """Mutate caliper API payload (model_dump) before INSERT/UPDATE.""" bf = d.get("body_fat_pct") if bf is None: return if d.get("lean_mass") is not None and d.get("fat_mass") is not None: return rows = load_weight_rows(conn, profile_id) w = nearest_weight_kg(rows, d.get("date")) if w is None: return lean, fat = compute_lean_fat_kg(w, _floaty(bf)) d["lean_mass"] = lean d["fat_mass"] = fat def enrich_caliper_row_for_response(row: Dict[str, Any], weight_rows: List[Dict[str, Any]]) -> None: """Fill missing lean_mass / fat_mass for API responses (read path).""" bf = row.get("body_fat_pct") if bf is None: return if row.get("lean_mass") is not None and row.get("fat_mass") is not None: return w = nearest_weight_kg(weight_rows, row.get("date")) if w is None: return lean, fat = compute_lean_fat_kg(w, _floaty(bf)) row["lean_mass"] = lean row["fat_mass"] = fat