""" Sleep Module Router (v9d Phase 2b) Endpoints: - CRUD: list, create/upsert, update, delete - Stats: 7-day average, trends, phase distribution, sleep debt - Correlations: sleep ↔ resting HR, training, weight (Phase 2e) """ from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from pydantic import BaseModel from typing import Literal from datetime import datetime, timedelta import csv import io import json from auth import require_auth from db import get_db, get_cursor router = APIRouter(prefix="/api/sleep", tags=["sleep"]) # ── Models ──────────────────────────────────────────────────────────────────── class SleepCreate(BaseModel): date: str # YYYY-MM-DD bedtime: str | None = None # HH:MM wake_time: str | None = None # HH:MM duration_minutes: int quality: int | None = None # 1-5 wake_count: int | None = None deep_minutes: int | None = None rem_minutes: int | None = None light_minutes: int | None = None awake_minutes: int | None = None note: str = "" source: Literal['manual', 'apple_health', 'garmin'] = 'manual' class SleepResponse(BaseModel): id: int profile_id: str date: str bedtime: str | None wake_time: str | None duration_minutes: int duration_formatted: str quality: int | None wake_count: int | None deep_minutes: int | None rem_minutes: int | None light_minutes: int | None awake_minutes: int | None sleep_segments: list | None sleep_efficiency: float | None deep_percent: float | None rem_percent: float | None note: str source: str created_at: str class SleepStatsResponse(BaseModel): avg_duration_minutes: float avg_quality: float | None total_nights: int nights_below_goal: int sleep_goal_minutes: int class SleepDebtResponse(BaseModel): sleep_debt_minutes: int sleep_debt_formatted: str days_analyzed: int sleep_goal_minutes: int # ── Helper Functions ────────────────────────────────────────────────────────── def format_duration(minutes: int) -> str: """Convert minutes to 'Xh Ymin' format.""" hours = minutes // 60 mins = minutes % 60 return f"{hours}h {mins}min" def calculate_sleep_efficiency(duration_min: int, awake_min: int | None) -> float | None: """Sleep efficiency = duration / (duration + awake) * 100.""" if awake_min is None or awake_min == 0: return None total = duration_min + awake_min return round((duration_min / total) * 100, 1) if total > 0 else None def calculate_phase_percent(phase_min: int | None, duration_min: int) -> float | None: """Calculate phase percentage of total duration.""" if phase_min is None or duration_min == 0: return None return round((phase_min / duration_min) * 100, 1) def row_to_sleep_response(row: dict) -> SleepResponse: """Convert DB row to SleepResponse.""" return SleepResponse( id=row['id'], profile_id=row['profile_id'], date=str(row['date']), bedtime=str(row['bedtime']) if row['bedtime'] else None, wake_time=str(row['wake_time']) if row['wake_time'] else None, duration_minutes=row['duration_minutes'], duration_formatted=format_duration(row['duration_minutes']), quality=row['quality'], wake_count=row['wake_count'], deep_minutes=row['deep_minutes'], rem_minutes=row['rem_minutes'], light_minutes=row['light_minutes'], awake_minutes=row['awake_minutes'], sleep_segments=row['sleep_segments'], sleep_efficiency=calculate_sleep_efficiency(row['duration_minutes'], row['awake_minutes']), deep_percent=calculate_phase_percent(row['deep_minutes'], row['duration_minutes']), rem_percent=calculate_phase_percent(row['rem_minutes'], row['duration_minutes']), note=row['note'] or "", source=row['source'], created_at=str(row['created_at']) ) # ── CRUD Endpoints ──────────────────────────────────────────────────────────── @router.get("") def list_sleep( limit: int = 90, session: dict = Depends(require_auth) ): """List sleep entries for current profile (last N days).""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT * FROM sleep_log WHERE profile_id = %s ORDER BY date DESC LIMIT %s """, (pid, limit)) rows = cur.fetchall() return [row_to_sleep_response(row) for row in rows] @router.get("/by-date/{date}") def get_sleep_by_date( date: str, session: dict = Depends(require_auth) ): """Get sleep entry for specific date.""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT * FROM sleep_log WHERE profile_id = %s AND date = %s """, (pid, date)) row = cur.fetchone() if not row: raise HTTPException(404, "No sleep entry for this date") return row_to_sleep_response(row) @router.post("") def create_sleep( data: SleepCreate, session: dict = Depends(require_auth) ): """Create or update sleep entry (upsert by date).""" pid = session['profile_id'] # Convert empty strings to None for TIME fields bedtime = data.bedtime if data.bedtime else None wake_time = data.wake_time if data.wake_time else None # Plausibility check: sleep phases (deep+rem+light) should sum to duration # Note: awake_minutes is NOT part of sleep duration (tracked separately) if any([data.deep_minutes, data.rem_minutes, data.light_minutes]): sleep_phase_sum = (data.deep_minutes or 0) + (data.rem_minutes or 0) + (data.light_minutes or 0) diff = abs(data.duration_minutes - sleep_phase_sum) if diff > 5: raise HTTPException( 400, f"Plausibilitätsprüfung fehlgeschlagen: Schlafphasen-Summe ({sleep_phase_sum} min) weicht um {diff} min von Schlafdauer ({data.duration_minutes} min) ab. Max. Toleranz: 5 min. Hinweis: Wachphasen werden nicht zur Schlafdauer gezählt." ) with get_db() as conn: cur = get_cursor(conn) # Upsert: INSERT ... ON CONFLICT DO UPDATE cur.execute(""" INSERT INTO sleep_log ( profile_id, date, bedtime, wake_time, duration_minutes, quality, wake_count, deep_minutes, rem_minutes, light_minutes, awake_minutes, note, source, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP ) ON CONFLICT (profile_id, date) DO UPDATE SET bedtime = EXCLUDED.bedtime, wake_time = EXCLUDED.wake_time, duration_minutes = EXCLUDED.duration_minutes, quality = EXCLUDED.quality, wake_count = EXCLUDED.wake_count, deep_minutes = EXCLUDED.deep_minutes, rem_minutes = EXCLUDED.rem_minutes, light_minutes = EXCLUDED.light_minutes, awake_minutes = EXCLUDED.awake_minutes, note = EXCLUDED.note, source = EXCLUDED.source, updated_at = CURRENT_TIMESTAMP RETURNING * """, ( pid, data.date, bedtime, wake_time, data.duration_minutes, data.quality, data.wake_count, data.deep_minutes, data.rem_minutes, data.light_minutes, data.awake_minutes, data.note, data.source )) row = cur.fetchone() conn.commit() return row_to_sleep_response(row) @router.put("/{id}") def update_sleep( id: int, data: SleepCreate, session: dict = Depends(require_auth) ): """Update existing sleep entry by ID.""" pid = session['profile_id'] # Convert empty strings to None for TIME fields bedtime = data.bedtime if data.bedtime else None wake_time = data.wake_time if data.wake_time else None # Plausibility check: sleep phases (deep+rem+light) should sum to duration # Note: awake_minutes is NOT part of sleep duration (tracked separately) if any([data.deep_minutes, data.rem_minutes, data.light_minutes]): sleep_phase_sum = (data.deep_minutes or 0) + (data.rem_minutes or 0) + (data.light_minutes or 0) diff = abs(data.duration_minutes - sleep_phase_sum) if diff > 5: raise HTTPException( 400, f"Plausibilitätsprüfung fehlgeschlagen: Schlafphasen-Summe ({sleep_phase_sum} min) weicht um {diff} min von Schlafdauer ({data.duration_minutes} min) ab. Max. Toleranz: 5 min. Hinweis: Wachphasen werden nicht zur Schlafdauer gezählt." ) with get_db() as conn: cur = get_cursor(conn) cur.execute(""" UPDATE sleep_log SET date = %s, bedtime = %s, wake_time = %s, duration_minutes = %s, quality = %s, wake_count = %s, deep_minutes = %s, rem_minutes = %s, light_minutes = %s, awake_minutes = %s, note = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s AND profile_id = %s RETURNING * """, ( data.date, bedtime, wake_time, data.duration_minutes, data.quality, data.wake_count, data.deep_minutes, data.rem_minutes, data.light_minutes, data.awake_minutes, data.note, id, pid )) row = cur.fetchone() if not row: raise HTTPException(404, "Sleep entry not found") conn.commit() return row_to_sleep_response(row) @router.delete("/{id}") def delete_sleep( id: int, session: dict = Depends(require_auth) ): """Delete sleep entry.""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" DELETE FROM sleep_log WHERE id = %s AND profile_id = %s """, (id, pid)) conn.commit() return {"deleted": id} # ── Stats Endpoints ─────────────────────────────────────────────────────────── @router.get("/stats") def get_sleep_stats( days: int = 7, session: dict = Depends(require_auth) ): """Get sleep statistics (average duration, quality, nights below goal).""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) # Get sleep goal from profile cur.execute("SELECT sleep_goal_minutes FROM profiles WHERE id = %s", (pid,)) profile = cur.fetchone() sleep_goal = profile['sleep_goal_minutes'] if profile and profile['sleep_goal_minutes'] else 450 # Calculate stats cur.execute(""" SELECT AVG(duration_minutes)::FLOAT as avg_duration, AVG(quality)::FLOAT as avg_quality, COUNT(*) as total_nights, COUNT(CASE WHEN duration_minutes < %s THEN 1 END) as nights_below_goal FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '%s days' """, (sleep_goal, pid, days)) stats = cur.fetchone() return SleepStatsResponse( avg_duration_minutes=round(stats['avg_duration'], 1) if stats['avg_duration'] else 0, avg_quality=round(stats['avg_quality'], 1) if stats['avg_quality'] else None, total_nights=stats['total_nights'], nights_below_goal=stats['nights_below_goal'], sleep_goal_minutes=sleep_goal ) @router.get("/debt") def get_sleep_debt( days: int = 14, session: dict = Depends(require_auth) ): """Calculate sleep debt over last N days.""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) # Get sleep goal cur.execute("SELECT sleep_goal_minutes FROM profiles WHERE id = %s", (pid,)) profile = cur.fetchone() sleep_goal = profile['sleep_goal_minutes'] if profile and profile['sleep_goal_minutes'] else 450 # Calculate debt cur.execute(""" SELECT SUM(%s - duration_minutes) as debt_minutes, COUNT(*) as nights_analyzed FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '%s days' """, (sleep_goal, pid, days)) result = cur.fetchone() debt_min = int(result['debt_minutes']) if result['debt_minutes'] else 0 nights = result['nights_analyzed'] if result['nights_analyzed'] else 0 # Format debt if debt_min == 0: formatted = "0 – kein Defizit" elif debt_min > 0: formatted = f"+{format_duration(debt_min)}" else: formatted = f"−{format_duration(abs(debt_min))}" return SleepDebtResponse( sleep_debt_minutes=debt_min, sleep_debt_formatted=formatted, days_analyzed=nights, sleep_goal_minutes=sleep_goal ) @router.get("/trend") def get_sleep_trend( days: int = 30, session: dict = Depends(require_auth) ): """Get sleep duration and quality trend over time.""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT date, duration_minutes, quality FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '%s days' ORDER BY date ASC """, (pid, days)) rows = cur.fetchall() return [ { "date": str(row['date']), "duration_minutes": row['duration_minutes'], "quality": row['quality'] } for row in rows ] @router.get("/phases") def get_sleep_phases( days: int = 30, session: dict = Depends(require_auth) ): """Get sleep phase distribution (deep, REM, light, awake) over time.""" pid = session['profile_id'] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT date, deep_minutes, rem_minutes, light_minutes, awake_minutes, duration_minutes FROM sleep_log WHERE profile_id = %s AND date >= CURRENT_DATE - INTERVAL '%s days' AND (deep_minutes IS NOT NULL OR rem_minutes IS NOT NULL) ORDER BY date ASC """, (pid, days)) rows = cur.fetchall() return [ { "date": str(row['date']), "deep_minutes": row['deep_minutes'], "rem_minutes": row['rem_minutes'], "light_minutes": row['light_minutes'], "awake_minutes": row['awake_minutes'], "deep_percent": calculate_phase_percent(row['deep_minutes'], row['duration_minutes']), "rem_percent": calculate_phase_percent(row['rem_minutes'], row['duration_minutes']) } for row in rows ] # ── Import Endpoints ────────────────────────────────────────────────────────── @router.post("/import/apple-health") async def import_apple_health_sleep( file: UploadFile = File(...), session: dict = Depends(require_auth) ): """ Import sleep data from Apple Health CSV export. Expected CSV format: Start,End,Duration (hr),Value,Source 2026-03-14 22:44:23,2026-03-14 23:00:19,0.266,Kern,Apple Watch - Aggregates segments by night (wake date) - Maps German phase names: Kern→light, REM→rem, Tief→deep, Wach→awake - Stores raw segments in JSONB - Does NOT overwrite manual entries (source='manual') """ pid = session['profile_id'] # Read CSV content = await file.read() csv_text = content.decode('utf-8-sig') # Handle BOM reader = csv.DictReader(io.StringIO(csv_text)) # Phase mapping (German → English) phase_map = { 'Kern': 'light', 'REM': 'rem', 'Tief': 'deep', 'Wach': 'awake', 'Schlafend': None # Ignore initial sleep entry } # Parse segments segments = [] for row in reader: phase_de = row['Value'].strip() phase_en = phase_map.get(phase_de) if phase_en is None: # Skip "Schlafend" continue start_dt = datetime.strptime(row['Start'], '%Y-%m-%d %H:%M:%S') end_dt = datetime.strptime(row['End'], '%Y-%m-%d %H:%M:%S') duration_hr = float(row['Duration (hr)']) duration_min = int(duration_hr * 60) segments.append({ 'start': start_dt, 'end': end_dt, 'duration_min': duration_min, 'phase': phase_en }) # Sort segments chronologically segments.sort(key=lambda s: s['start']) # Group segments into nights (gap-based) # If gap between segments > 2 hours → new night nights = [] current_night = None for seg in segments: # Start new night if: # 1. First segment # 2. Gap > 2 hours since last segment if current_night is None or (seg['start'] - current_night['wake_time']).total_seconds() > 7200: current_night = { 'bedtime': seg['start'], 'wake_time': seg['end'], 'segments': [], 'deep_minutes': 0, 'rem_minutes': 0, 'light_minutes': 0, 'awake_minutes': 0 } nights.append(current_night) # Add segment to current night current_night['segments'].append(seg) current_night['wake_time'] = max(current_night['wake_time'], seg['end']) current_night['bedtime'] = min(current_night['bedtime'], seg['start']) # Sum phases if seg['phase'] == 'deep': current_night['deep_minutes'] += seg['duration_min'] elif seg['phase'] == 'rem': current_night['rem_minutes'] += seg['duration_min'] elif seg['phase'] == 'light': current_night['light_minutes'] += seg['duration_min'] elif seg['phase'] == 'awake': current_night['awake_minutes'] += seg['duration_min'] # Convert nights list to dict with wake_date as key nights_dict = {} for night in nights: wake_date = night['wake_time'].date() # Date when you woke up nights_dict[wake_date] = night # Insert nights imported = 0 skipped = 0 with get_db() as conn: cur = get_cursor(conn) for date, night in nights_dict.items(): # Calculate sleep duration (deep + rem + light, WITHOUT awake) # Note: awake_minutes tracked separately, not part of sleep duration duration_minutes = ( night['deep_minutes'] + night['rem_minutes'] + night['light_minutes'] ) # Calculate wake_count (number of awake segments) wake_count = sum(1 for seg in night['segments'] if seg['phase'] == 'awake') # Prepare JSONB segments with full datetime sleep_segments = [ { 'phase': seg['phase'], 'start': seg['start'].isoformat(), # Full datetime: 2026-03-21T22:30:00 'end': seg['end'].isoformat(), # Full datetime: 2026-03-21T23:15:00 'duration_min': seg['duration_min'] } for seg in night['segments'] ] # Check if manual entry exists - do NOT overwrite cur.execute(""" SELECT id, source FROM sleep_log WHERE profile_id = %s AND date = %s """, (pid, date)) existing = cur.fetchone() if existing and existing['source'] == 'manual': skipped += 1 continue # Skip - don't overwrite manual entries # Upsert (only if not manual) # If entry exists and is NOT manual → update # If entry doesn't exist → insert if existing: # Update existing non-manual entry cur.execute(""" UPDATE sleep_log SET bedtime = %s, wake_time = %s, duration_minutes = %s, wake_count = %s, deep_minutes = %s, rem_minutes = %s, light_minutes = %s, awake_minutes = %s, sleep_segments = %s, source = 'apple_health', updated_at = CURRENT_TIMESTAMP WHERE id = %s AND profile_id = %s """, ( night['bedtime'].time(), night['wake_time'].time(), duration_minutes, wake_count, night['deep_minutes'], night['rem_minutes'], night['light_minutes'], night['awake_minutes'], json.dumps(sleep_segments), existing['id'], pid )) else: # Insert new entry cur.execute(""" INSERT INTO sleep_log ( profile_id, date, bedtime, wake_time, duration_minutes, wake_count, deep_minutes, rem_minutes, light_minutes, awake_minutes, sleep_segments, source, created_at, updated_at ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'apple_health', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) """, ( pid, date, night['bedtime'].time(), night['wake_time'].time(), duration_minutes, wake_count, night['deep_minutes'], night['rem_minutes'], night['light_minutes'], night['awake_minutes'], json.dumps(sleep_segments) )) imported += 1 conn.commit() return { "imported": imported, "skipped": skipped, "total_nights": len(nights_dict), "message": f"{imported} Nächte importiert, {skipped} übersprungen (manuelle Einträge)" }