""" Recovery Metrics Data Layer Provides structured data for recovery tracking and analysis. Functions: - get_sleep_duration_data(): Average sleep duration - get_sleep_quality_data(): Sleep quality score (Deep+REM %) - get_rest_days_data(): Rest day count and types All functions return structured data (dict) without formatting. Use placeholder_resolver.py for formatted strings for AI. Phase 0c: Multi-Layer Architecture Version: 1.0 """ from typing import Dict, List, Optional from datetime import datetime, timedelta, date from db import get_db, get_cursor, r2d from data_layer.utils import calculate_confidence, safe_float, safe_int def get_sleep_duration_data( profile_id: str, days: int = 7 ) -> Dict: """ Calculate average sleep duration. Args: profile_id: User profile ID days: Analysis window (default 7) Returns: { "avg_duration_hours": float, "avg_duration_minutes": int, "total_nights": int, "nights_with_data": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_sleep_avg_duration(pid, days) formatted string NEW: Structured data with hours and minutes """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT sleep_segments FROM sleep_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC""", (profile_id, cutoff) ) rows = cur.fetchall() if not rows: return { "avg_duration_hours": 0.0, "avg_duration_minutes": 0, "total_nights": 0, "nights_with_data": 0, "confidence": "insufficient", "days_analyzed": days } total_minutes = 0 nights_with_data = 0 for row in rows: segments = row['sleep_segments'] if segments: night_minutes = sum(seg.get('duration_min', 0) for seg in segments) if night_minutes > 0: total_minutes += night_minutes nights_with_data += 1 if nights_with_data == 0: return { "avg_duration_hours": 0.0, "avg_duration_minutes": 0, "total_nights": len(rows), "nights_with_data": 0, "confidence": "insufficient", "days_analyzed": days } avg_minutes = int(total_minutes / nights_with_data) avg_hours = avg_minutes / 60 confidence = calculate_confidence(nights_with_data, days, "general") return { "avg_duration_hours": round(avg_hours, 1), "avg_duration_minutes": avg_minutes, "total_nights": len(rows), "nights_with_data": nights_with_data, "confidence": confidence, "days_analyzed": days } def get_sleep_quality_data( profile_id: str, days: int = 7 ) -> Dict: """ Calculate sleep quality score (Deep+REM percentage). Args: profile_id: User profile ID days: Analysis window (default 7) Returns: { "quality_score": float, # 0-100, Deep+REM percentage "avg_deep_rem_minutes": int, "avg_total_minutes": int, "avg_light_minutes": int, "avg_awake_minutes": int, "nights_analyzed": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_sleep_avg_quality(pid, days) formatted string NEW: Complete sleep phase breakdown """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT sleep_segments FROM sleep_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC""", (profile_id, cutoff) ) rows = cur.fetchall() if not rows: return { "quality_score": 0.0, "avg_deep_rem_minutes": 0, "avg_total_minutes": 0, "avg_light_minutes": 0, "avg_awake_minutes": 0, "nights_analyzed": 0, "confidence": "insufficient", "days_analyzed": days } total_quality = 0 total_deep_rem = 0 total_light = 0 total_awake = 0 total_all = 0 count = 0 for row in rows: segments = row['sleep_segments'] if segments: # Note: segments use 'phase' key, stored lowercase (deep, rem, light, awake) deep_rem_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') in ['deep', 'rem']) light_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') == 'light') awake_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') == 'awake') total_min = sum(s.get('duration_min', 0) for s in segments) if total_min > 0: quality_pct = (deep_rem_min / total_min) * 100 total_quality += quality_pct total_deep_rem += deep_rem_min total_light += light_min total_awake += awake_min total_all += total_min count += 1 if count == 0: return { "quality_score": 0.0, "avg_deep_rem_minutes": 0, "avg_total_minutes": 0, "avg_light_minutes": 0, "avg_awake_minutes": 0, "nights_analyzed": 0, "confidence": "insufficient", "days_analyzed": days } avg_quality = total_quality / count avg_deep_rem = int(total_deep_rem / count) avg_total = int(total_all / count) avg_light = int(total_light / count) avg_awake = int(total_awake / count) confidence = calculate_confidence(count, days, "general") return { "quality_score": round(avg_quality, 1), "avg_deep_rem_minutes": avg_deep_rem, "avg_total_minutes": avg_total, "avg_light_minutes": avg_light, "avg_awake_minutes": avg_awake, "nights_analyzed": count, "confidence": confidence, "days_analyzed": days } def get_rest_days_data( profile_id: str, days: int = 30 ) -> Dict: """ Get rest days count and breakdown by type. Args: profile_id: User profile ID days: Analysis window (default 30) Returns: { "total_rest_days": int, "rest_types": { "muscle_recovery": int, "cardio_recovery": int, "mental_rest": int, "deload": int, "injury": int }, "rest_frequency": float, # days per week "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_rest_days_count(pid, days) formatted string NEW: Complete breakdown by rest type """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Get total distinct rest days cur.execute( """SELECT COUNT(DISTINCT date) as count FROM rest_days WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) total_row = cur.fetchone() total_count = total_row['count'] if total_row else 0 # Get breakdown by focus type cur.execute( """SELECT focus, COUNT(*) as count FROM rest_days WHERE profile_id=%s AND date >= %s GROUP BY focus""", (profile_id, cutoff) ) type_rows = cur.fetchall() rest_types = { "muscle_recovery": 0, "cardio_recovery": 0, "mental_rest": 0, "deload": 0, "injury": 0 } for row in type_rows: focus = row['focus'] if focus in rest_types: rest_types[focus] = row['count'] # Calculate frequency (rest days per week) rest_frequency = (total_count / days * 7) if days > 0 else 0.0 confidence = calculate_confidence(total_count, days, "general") return { "total_rest_days": total_count, "rest_types": rest_types, "rest_frequency": round(rest_frequency, 1), "confidence": confidence, "days_analyzed": days } # ============================================================================ # Calculated Metrics (migrated from calculations/recovery_metrics.py) # ============================================================================ # These functions return simple values for placeholders and scoring. # Use get_*_data() functions above for structured chart data. def calculate_recovery_score_v2(profile_id: str) -> Optional[int]: """ Improved recovery/readiness score (0-100) Components: - HRV status (25%) - RHR status (20%) - Sleep duration (20%) - Sleep debt (10%) - Sleep regularity (10%) - Recent load balance (10%) - Data quality (5%) """ components = [] # 1. HRV status (25%) hrv_score = _score_hrv_vs_baseline(profile_id) if hrv_score is not None: components.append(('hrv', hrv_score, 25)) # 2. RHR status (20%) rhr_score = _score_rhr_vs_baseline(profile_id) if rhr_score is not None: components.append(('rhr', rhr_score, 20)) # 3. Sleep duration (20%) sleep_duration_score = _score_sleep_duration(profile_id) if sleep_duration_score is not None: components.append(('sleep_duration', sleep_duration_score, 20)) # 4. Sleep debt (10%) sleep_debt_score = _score_sleep_debt(profile_id) if sleep_debt_score is not None: components.append(('sleep_debt', sleep_debt_score, 10)) # 5. Sleep regularity (10%) regularity_score = _score_sleep_regularity(profile_id) if regularity_score is not None: components.append(('regularity', regularity_score, 10)) # 6. Recent load balance (10%) load_score = _score_recent_load_balance(profile_id) if load_score is not None: components.append(('load', load_score, 10)) # 7. Data quality (5%) quality_score = _score_recovery_data_quality(profile_id) if quality_score is not None: components.append(('data_quality', quality_score, 5)) if not components: return None # Weighted average total_score = sum(score * weight for _, score, weight in components) total_weight = sum(weight for _, _, weight in components) final_score = int(total_score / total_weight) return final_score def _score_hrv_vs_baseline(profile_id: str) -> Optional[int]: """Score HRV relative to 28d baseline (0-100)""" with get_db() as conn: cur = get_cursor(conn) # Get recent HRV (last 3 days average) cur.execute(""" SELECT AVG(hrv) as recent_hrv FROM vitals_baseline WHERE profile_id = %s AND hrv IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) recent_row = cur.fetchone() if not recent_row or not recent_row['recent_hrv']: return None recent_hrv = recent_row['recent_hrv'] # Get baseline (28d average, excluding last 3 days) cur.execute(""" SELECT AVG(hrv) as baseline_hrv FROM vitals_baseline WHERE profile_id = %s AND hrv IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' AND date < CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) baseline_row = cur.fetchone() if not baseline_row or not baseline_row['baseline_hrv']: return None baseline_hrv = baseline_row['baseline_hrv'] # Calculate percentage deviation deviation_pct = ((recent_hrv - baseline_hrv) / baseline_hrv) * 100 # Score: higher HRV = better recovery if deviation_pct >= 10: return 100 elif deviation_pct >= 5: return 90 elif deviation_pct >= 0: return 75 elif deviation_pct >= -5: return 60 elif deviation_pct >= -10: return 45 else: return max(20, 45 + int(deviation_pct * 2)) def _score_rhr_vs_baseline(profile_id: str) -> Optional[int]: """Score RHR relative to 28d baseline (0-100)""" with get_db() as conn: cur = get_cursor(conn) # Get recent RHR (last 3 days average) cur.execute(""" SELECT AVG(resting_hr) as recent_rhr FROM vitals_baseline WHERE profile_id = %s AND resting_hr IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) recent_row = cur.fetchone() if not recent_row or not recent_row['recent_rhr']: return None recent_rhr = recent_row['recent_rhr'] # Get baseline (28d average, excluding last 3 days) cur.execute(""" SELECT AVG(resting_hr) as baseline_rhr FROM vitals_baseline WHERE profile_id = %s AND resting_hr IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' AND date < CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) baseline_row = cur.fetchone() if not baseline_row or not baseline_row['baseline_rhr']: return None baseline_rhr = baseline_row['baseline_rhr'] # Calculate difference (bpm) difference = recent_rhr - baseline_rhr # Score: lower RHR = better recovery if difference <= -3: return 100 elif difference <= -1: return 90 elif difference <= 1: return 75 elif difference <= 3: return 60 elif difference <= 5: return 45 else: return max(20, 45 - (difference * 5)) def _score_sleep_duration(profile_id: str) -> Optional[int]: """Score recent sleep duration (0-100)""" avg_sleep_hours = calculate_sleep_avg_duration_7d(profile_id) if avg_sleep_hours is None: return None # Target: 7-9 hours if 7 <= avg_sleep_hours <= 9: return 100 elif 6.5 <= avg_sleep_hours < 7: return 85 elif 6 <= avg_sleep_hours < 6.5: return 70 elif avg_sleep_hours >= 9.5: return 85 # Too much sleep can indicate fatigue else: return max(40, int(avg_sleep_hours * 10)) def _score_sleep_debt(profile_id: str) -> Optional[int]: """Score sleep debt (0-100)""" debt_hours = calculate_sleep_debt_hours(profile_id) if debt_hours is None: return None # Score based on accumulated debt if debt_hours <= 1: return 100 elif debt_hours <= 3: return 85 elif debt_hours <= 5: return 70 elif debt_hours <= 8: return 55 else: return max(30, 100 - (debt_hours * 8)) def _score_sleep_regularity(profile_id: str) -> Optional[int]: """Score sleep regularity (0-100)""" regularity_proxy = calculate_sleep_regularity_proxy(profile_id) if regularity_proxy is None: return None # regularity_proxy = mean absolute shift in minutes # Lower = better if regularity_proxy <= 30: return 100 elif regularity_proxy <= 45: return 85 elif regularity_proxy <= 60: return 70 elif regularity_proxy <= 90: return 55 else: return max(30, 100 - int(regularity_proxy / 2)) def _score_recent_load_balance(profile_id: str) -> Optional[int]: """Score recent training load balance (0-100)""" load_3d = calculate_recent_load_balance_3d(profile_id) if load_3d is None: return None # Proxy load: 0-300 = low, 300-600 = moderate, >600 = high if load_3d < 300: # Under-loading return 90 elif load_3d <= 600: # Optimal return 100 elif load_3d <= 900: # High but manageable return 75 elif load_3d <= 1200: # Very high return 55 else: # Excessive return max(30, 100 - (load_3d / 20)) def _score_recovery_data_quality(profile_id: str) -> Optional[int]: """Score data quality for recovery metrics (0-100)""" quality = calculate_recovery_data_quality(profile_id) return quality['overall_score'] # ============================================================================ # Individual Recovery Metrics # ============================================================================ def calculate_hrv_vs_baseline_pct(profile_id: str) -> Optional[float]: """Calculate HRV deviation from baseline (percentage)""" with get_db() as conn: cur = get_cursor(conn) # Recent HRV (3d avg) cur.execute(""" SELECT AVG(hrv) as recent_hrv FROM vitals_baseline WHERE profile_id = %s AND hrv IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) recent_row = cur.fetchone() if not recent_row or not recent_row['recent_hrv']: return None recent = recent_row['recent_hrv'] # Baseline (28d avg, excluding last 3d) cur.execute(""" SELECT AVG(hrv) as baseline_hrv FROM vitals_baseline WHERE profile_id = %s AND hrv IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' AND date < CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) baseline_row = cur.fetchone() if not baseline_row or not baseline_row['baseline_hrv']: return None baseline = baseline_row['baseline_hrv'] deviation_pct = ((recent - baseline) / baseline) * 100 return round(deviation_pct, 1) def calculate_rhr_vs_baseline_pct(profile_id: str) -> Optional[float]: """Calculate RHR deviation from baseline (percentage)""" with get_db() as conn: cur = get_cursor(conn) # Recent RHR (3d avg) cur.execute(""" SELECT AVG(resting_hr) as recent_rhr FROM vitals_baseline WHERE profile_id = %s AND resting_hr IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) recent_row = cur.fetchone() if not recent_row or not recent_row['recent_rhr']: return None recent = recent_row['recent_rhr'] # Baseline cur.execute(""" SELECT AVG(resting_hr) as baseline_rhr FROM vitals_baseline WHERE profile_id = %s AND resting_hr IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' AND date < CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) baseline_row = cur.fetchone() if not baseline_row or not baseline_row['baseline_rhr']: return None baseline = baseline_row['baseline_rhr'] deviation_pct = ((recent - baseline) / baseline) * 100 return round(deviation_pct, 1) def calculate_sleep_avg_duration_7d(profile_id: str) -> Optional[float]: """Calculate average sleep duration (hours) last 7 days""" with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT AVG(duration_minutes) as avg_sleep_min FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' AND duration_minutes IS NOT NULL """, (profile_id,)) row = cur.fetchone() if not row or not row['avg_sleep_min']: return None avg_hours = row['avg_sleep_min'] / 60 return round(avg_hours, 1) def calculate_sleep_debt_hours(profile_id: str) -> Optional[float]: """ Calculate accumulated sleep debt (hours) last 14 days Assumes 7.5h target per night """ target_hours = 7.5 with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT duration_minutes FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '14 days' AND duration_minutes IS NOT NULL ORDER BY date DESC """, (profile_id,)) sleep_data = [row['duration_minutes'] for row in cur.fetchall()] if len(sleep_data) < 10: # Need at least 10 days return None # Calculate cumulative debt total_debt_min = sum(max(0, (target_hours * 60) - sleep_min) for sleep_min in sleep_data) debt_hours = total_debt_min / 60 return round(debt_hours, 1) def calculate_sleep_regularity_proxy(profile_id: str) -> Optional[float]: """ Sleep regularity proxy: mean absolute shift from previous day (minutes) Lower = more regular """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT bedtime, wake_time, date FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '14 days' AND bedtime IS NOT NULL AND wake_time IS NOT NULL ORDER BY date """, (profile_id,)) sleep_data = cur.fetchall() if len(sleep_data) < 7: return None # Calculate day-to-day shifts shifts = [] for i in range(1, len(sleep_data)): prev = sleep_data[i-1] curr = sleep_data[i] # Bedtime shift (minutes) prev_bedtime = prev['bedtime'] curr_bedtime = curr['bedtime'] # Convert to minutes since midnight prev_bed_min = prev_bedtime.hour * 60 + prev_bedtime.minute curr_bed_min = curr_bedtime.hour * 60 + curr_bedtime.minute # Handle cross-midnight (e.g., 23:00 to 01:00) bed_shift = abs(curr_bed_min - prev_bed_min) if bed_shift > 720: # More than 12 hours = wrapped around bed_shift = 1440 - bed_shift shifts.append(bed_shift) mean_shift = sum(shifts) / len(shifts) return round(mean_shift, 1) def calculate_recent_load_balance_3d(profile_id: str) -> Optional[int]: """Calculate proxy internal load last 3 days""" from calculations.activity_metrics import calculate_proxy_internal_load_7d with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT SUM(duration_min) as total_duration FROM activity_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '3 days' """, (profile_id,)) row = cur.fetchone() if not row: return None # Simplified 3d load (duration-based) return int(row['total_duration'] or 0) def calculate_sleep_quality_7d(profile_id: str) -> Optional[int]: """ Calculate sleep quality score (0-100) based on deep+REM percentage Last 7 days """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT duration_minutes, deep_minutes, rem_minutes FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '7 days' AND duration_minutes IS NOT NULL """, (profile_id,)) sleep_data = cur.fetchall() if len(sleep_data) < 4: return None quality_scores = [] for s in sleep_data: if s['deep_minutes'] and s['rem_minutes']: quality_pct = ((s['deep_minutes'] + s['rem_minutes']) / s['duration_minutes']) * 100 # 40-60% deep+REM is good if quality_pct >= 45: quality_scores.append(100) elif quality_pct >= 35: quality_scores.append(75) elif quality_pct >= 25: quality_scores.append(50) else: quality_scores.append(30) if not quality_scores: return None avg_quality = sum(quality_scores) / len(quality_scores) return int(avg_quality) # ============================================================================ # Data Quality Assessment # ============================================================================ def calculate_recovery_data_quality(profile_id: str) -> Dict[str, any]: """ Assess data quality for recovery metrics Returns dict with quality score and details """ with get_db() as conn: cur = get_cursor(conn) # HRV measurements (28d) cur.execute(""" SELECT COUNT(*) as hrv_count FROM vitals_baseline WHERE profile_id = %s AND hrv IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) hrv_count = cur.fetchone()['hrv_count'] # RHR measurements (28d) cur.execute(""" SELECT COUNT(*) as rhr_count FROM vitals_baseline WHERE profile_id = %s AND resting_hr IS NOT NULL AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) rhr_count = cur.fetchone()['rhr_count'] # Sleep measurements (28d) cur.execute(""" SELECT COUNT(*) as sleep_count FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '28 days' """, (profile_id,)) sleep_count = cur.fetchone()['sleep_count'] # Score components hrv_score = min(100, (hrv_count / 21) * 100) # 21 = 75% coverage rhr_score = min(100, (rhr_count / 21) * 100) sleep_score = min(100, (sleep_count / 21) * 100) # Overall score overall_score = int( hrv_score * 0.3 + rhr_score * 0.3 + sleep_score * 0.4 ) if overall_score >= 80: confidence = "high" elif overall_score >= 60: confidence = "medium" else: confidence = "low" return { "overall_score": overall_score, "confidence": confidence, "measurements": { "hrv_28d": hrv_count, "rhr_28d": rhr_count, "sleep_28d": sleep_count }, "component_scores": { "hrv": int(hrv_score), "rhr": int(rhr_score), "sleep": int(sleep_score) } }