""" Recovery Metrics Data Layer Provides structured data for recovery tracking and analysis. Functions: - get_sleep_duration_data(): Average sleep duration - get_sleep_quality_data(): Sleep quality score (Deep+REM %) - get_rest_days_data(): Rest day count and types All functions return structured data (dict) without formatting. Use placeholder_resolver.py for formatted strings for AI. Phase 0c: Multi-Layer Architecture Version: 1.0 """ from typing import Dict, List, Optional from datetime import datetime, timedelta, date from db import get_db, get_cursor, r2d from data_layer.utils import calculate_confidence, safe_float, safe_int def get_sleep_duration_data( profile_id: str, days: int = 7 ) -> Dict: """ Calculate average sleep duration. Args: profile_id: User profile ID days: Analysis window (default 7) Returns: { "avg_duration_hours": float, "avg_duration_minutes": int, "total_nights": int, "nights_with_data": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_sleep_avg_duration(pid, days) formatted string NEW: Structured data with hours and minutes """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT sleep_segments FROM sleep_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC""", (profile_id, cutoff) ) rows = cur.fetchall() if not rows: return { "avg_duration_hours": 0.0, "avg_duration_minutes": 0, "total_nights": 0, "nights_with_data": 0, "confidence": "insufficient", "days_analyzed": days } total_minutes = 0 nights_with_data = 0 for row in rows: segments = row['sleep_segments'] if segments: night_minutes = sum(seg.get('duration_min', 0) for seg in segments) if night_minutes > 0: total_minutes += night_minutes nights_with_data += 1 if nights_with_data == 0: return { "avg_duration_hours": 0.0, "avg_duration_minutes": 0, "total_nights": len(rows), "nights_with_data": 0, "confidence": "insufficient", "days_analyzed": days } avg_minutes = int(total_minutes / nights_with_data) avg_hours = avg_minutes / 60 confidence = calculate_confidence(nights_with_data, days, "general") return { "avg_duration_hours": round(avg_hours, 1), "avg_duration_minutes": avg_minutes, "total_nights": len(rows), "nights_with_data": nights_with_data, "confidence": confidence, "days_analyzed": days } def get_sleep_quality_data( profile_id: str, days: int = 7 ) -> Dict: """ Calculate sleep quality score (Deep+REM percentage). Args: profile_id: User profile ID days: Analysis window (default 7) Returns: { "quality_score": float, # 0-100, Deep+REM percentage "avg_deep_rem_minutes": int, "avg_total_minutes": int, "avg_light_minutes": int, "avg_awake_minutes": int, "nights_analyzed": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_sleep_avg_quality(pid, days) formatted string NEW: Complete sleep phase breakdown """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT sleep_segments FROM sleep_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC""", (profile_id, cutoff) ) rows = cur.fetchall() if not rows: return { "quality_score": 0.0, "avg_deep_rem_minutes": 0, "avg_total_minutes": 0, "avg_light_minutes": 0, "avg_awake_minutes": 0, "nights_analyzed": 0, "confidence": "insufficient", "days_analyzed": days } total_quality = 0 total_deep_rem = 0 total_light = 0 total_awake = 0 total_all = 0 count = 0 for row in rows: segments = row['sleep_segments'] if segments: # Note: segments use 'phase' key, stored lowercase (deep, rem, light, awake) deep_rem_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') in ['deep', 'rem']) light_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') == 'light') awake_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') == 'awake') total_min = sum(s.get('duration_min', 0) for s in segments) if total_min > 0: quality_pct = (deep_rem_min / total_min) * 100 total_quality += quality_pct total_deep_rem += deep_rem_min total_light += light_min total_awake += awake_min total_all += total_min count += 1 if count == 0: return { "quality_score": 0.0, "avg_deep_rem_minutes": 0, "avg_total_minutes": 0, "avg_light_minutes": 0, "avg_awake_minutes": 0, "nights_analyzed": 0, "confidence": "insufficient", "days_analyzed": days } avg_quality = total_quality / count avg_deep_rem = int(total_deep_rem / count) avg_total = int(total_all / count) avg_light = int(total_light / count) avg_awake = int(total_awake / count) confidence = calculate_confidence(count, days, "general") return { "quality_score": round(avg_quality, 1), "avg_deep_rem_minutes": avg_deep_rem, "avg_total_minutes": avg_total, "avg_light_minutes": avg_light, "avg_awake_minutes": avg_awake, "nights_analyzed": count, "confidence": confidence, "days_analyzed": days } def get_rest_days_data( profile_id: str, days: int = 30 ) -> Dict: """ Get rest days count and breakdown by type. Args: profile_id: User profile ID days: Analysis window (default 30) Returns: { "total_rest_days": int, "rest_types": { "muscle_recovery": int, "cardio_recovery": int, "mental_rest": int, "deload": int, "injury": int }, "rest_frequency": float, # days per week "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_rest_days_count(pid, days) formatted string NEW: Complete breakdown by rest type """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Get total distinct rest days cur.execute( """SELECT COUNT(DISTINCT date) as count FROM rest_days WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) total_row = cur.fetchone() total_count = total_row['count'] if total_row else 0 # Get breakdown by focus type cur.execute( """SELECT focus, COUNT(*) as count FROM rest_days WHERE profile_id=%s AND date >= %s GROUP BY focus""", (profile_id, cutoff) ) type_rows = cur.fetchall() rest_types = { "muscle_recovery": 0, "cardio_recovery": 0, "mental_rest": 0, "deload": 0, "injury": 0 } for row in type_rows: focus = row['focus'] if focus in rest_types: rest_types[focus] = row['count'] # Calculate frequency (rest days per week) rest_frequency = (total_count / days * 7) if days > 0 else 0.0 confidence = calculate_confidence(total_count, days, "general") return { "total_rest_days": total_count, "rest_types": rest_types, "rest_frequency": round(rest_frequency, 1), "confidence": confidence, "days_analyzed": days }