mitai-jinkendo/backend/routers/sleep.py
Lars 1644b34d5c
All checks were successful
Deploy Development / deploy (push) Successful in 43s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
fix: manual sleep entry creation + import overwrite protection
Critical fixes:
1. Added "+ Schlaf erfassen" button back (was missing!)
   - Opens NewEntryForm component inline
   - Default: 450 min (7h 30min), quality 3
   - Collapsible detail view
   - Live plausibility check

2. Fixed import overwriting manual entries
   - Problem: ON CONFLICT WHERE clause didn't prevent updates
   - Solution: Explicit if/else logic
     - If manual entry exists → skip (don't touch)
     - If non-manual entry exists → UPDATE
     - If no entry exists → INSERT
   - Properly counts imported vs skipped

Test results:
 CSV import with drag & drop
 Inline editing
 Segment timeline view with colors
 Source badges (Manual/Apple Health)
 Plausibility check (backend + frontend)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 13:43:02 +01:00

659 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Sleep Module Router (v9d Phase 2b)
Endpoints:
- CRUD: list, create/upsert, update, delete
- Stats: 7-day average, trends, phase distribution, sleep debt
- Correlations: sleep ↔ resting HR, training, weight (Phase 2e)
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Literal
from datetime import datetime, timedelta
import csv
import io
import json
from auth import require_auth
from db import get_db, get_cursor
router = APIRouter(prefix="/api/sleep", tags=["sleep"])
# ── Models ────────────────────────────────────────────────────────────────────
class SleepCreate(BaseModel):
date: str # YYYY-MM-DD
bedtime: str | None = None # HH:MM
wake_time: str | None = None # HH:MM
duration_minutes: int
quality: int | None = None # 1-5
wake_count: int | None = None
deep_minutes: int | None = None
rem_minutes: int | None = None
light_minutes: int | None = None
awake_minutes: int | None = None
note: str = ""
source: Literal['manual', 'apple_health', 'garmin'] = 'manual'
class SleepResponse(BaseModel):
id: int
profile_id: str
date: str
bedtime: str | None
wake_time: str | None
duration_minutes: int
duration_formatted: str
quality: int | None
wake_count: int | None
deep_minutes: int | None
rem_minutes: int | None
light_minutes: int | None
awake_minutes: int | None
sleep_segments: list | None
sleep_efficiency: float | None
deep_percent: float | None
rem_percent: float | None
note: str
source: str
created_at: str
class SleepStatsResponse(BaseModel):
avg_duration_minutes: float
avg_quality: float | None
total_nights: int
nights_below_goal: int
sleep_goal_minutes: int
class SleepDebtResponse(BaseModel):
sleep_debt_minutes: int
sleep_debt_formatted: str
days_analyzed: int
sleep_goal_minutes: int
# ── Helper Functions ──────────────────────────────────────────────────────────
def format_duration(minutes: int) -> str:
"""Convert minutes to 'Xh Ymin' format."""
hours = minutes // 60
mins = minutes % 60
return f"{hours}h {mins}min"
def calculate_sleep_efficiency(duration_min: int, awake_min: int | None) -> float | None:
"""Sleep efficiency = duration / (duration + awake) * 100."""
if awake_min is None or awake_min == 0:
return None
total = duration_min + awake_min
return round((duration_min / total) * 100, 1) if total > 0 else None
def calculate_phase_percent(phase_min: int | None, duration_min: int) -> float | None:
"""Calculate phase percentage of total duration."""
if phase_min is None or duration_min == 0:
return None
return round((phase_min / duration_min) * 100, 1)
def row_to_sleep_response(row: dict) -> SleepResponse:
"""Convert DB row to SleepResponse."""
return SleepResponse(
id=row['id'],
profile_id=row['profile_id'],
date=str(row['date']),
bedtime=str(row['bedtime']) if row['bedtime'] else None,
wake_time=str(row['wake_time']) if row['wake_time'] else None,
duration_minutes=row['duration_minutes'],
duration_formatted=format_duration(row['duration_minutes']),
quality=row['quality'],
wake_count=row['wake_count'],
deep_minutes=row['deep_minutes'],
rem_minutes=row['rem_minutes'],
light_minutes=row['light_minutes'],
awake_minutes=row['awake_minutes'],
sleep_segments=row['sleep_segments'],
sleep_efficiency=calculate_sleep_efficiency(row['duration_minutes'], row['awake_minutes']),
deep_percent=calculate_phase_percent(row['deep_minutes'], row['duration_minutes']),
rem_percent=calculate_phase_percent(row['rem_minutes'], row['duration_minutes']),
note=row['note'] or "",
source=row['source'],
created_at=str(row['created_at'])
)
# ── CRUD Endpoints ────────────────────────────────────────────────────────────
@router.get("")
def list_sleep(
limit: int = 90,
session: dict = Depends(require_auth)
):
"""List sleep entries for current profile (last N days)."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT * FROM sleep_log
WHERE profile_id = %s
ORDER BY date DESC
LIMIT %s
""", (pid, limit))
rows = cur.fetchall()
return [row_to_sleep_response(row) for row in rows]
@router.get("/by-date/{date}")
def get_sleep_by_date(
date: str,
session: dict = Depends(require_auth)
):
"""Get sleep entry for specific date."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT * FROM sleep_log
WHERE profile_id = %s AND date = %s
""", (pid, date))
row = cur.fetchone()
if not row:
raise HTTPException(404, "No sleep entry for this date")
return row_to_sleep_response(row)
@router.post("")
def create_sleep(
data: SleepCreate,
session: dict = Depends(require_auth)
):
"""Create or update sleep entry (upsert by date)."""
pid = session['profile_id']
# Convert empty strings to None for TIME fields
bedtime = data.bedtime if data.bedtime else None
wake_time = data.wake_time if data.wake_time else None
# Plausibility check: phases should sum to duration (tolerance: 5 min)
if any([data.deep_minutes, data.rem_minutes, data.light_minutes, data.awake_minutes]):
phase_sum = (data.deep_minutes or 0) + (data.rem_minutes or 0) + (data.light_minutes or 0) + (data.awake_minutes or 0)
diff = abs(data.duration_minutes - phase_sum)
if diff > 5:
raise HTTPException(
400,
f"Plausibilitätsprüfung fehlgeschlagen: Phasen-Summe ({phase_sum} min) weicht um {diff} min von Gesamtdauer ({data.duration_minutes} min) ab. Max. Toleranz: 5 min."
)
with get_db() as conn:
cur = get_cursor(conn)
# Upsert: INSERT ... ON CONFLICT DO UPDATE
cur.execute("""
INSERT INTO sleep_log (
profile_id, date, bedtime, wake_time, duration_minutes,
quality, wake_count, deep_minutes, rem_minutes, light_minutes,
awake_minutes, note, source, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP
)
ON CONFLICT (profile_id, date) DO UPDATE SET
bedtime = EXCLUDED.bedtime,
wake_time = EXCLUDED.wake_time,
duration_minutes = EXCLUDED.duration_minutes,
quality = EXCLUDED.quality,
wake_count = EXCLUDED.wake_count,
deep_minutes = EXCLUDED.deep_minutes,
rem_minutes = EXCLUDED.rem_minutes,
light_minutes = EXCLUDED.light_minutes,
awake_minutes = EXCLUDED.awake_minutes,
note = EXCLUDED.note,
source = EXCLUDED.source,
updated_at = CURRENT_TIMESTAMP
RETURNING *
""", (
pid, data.date, bedtime, wake_time, data.duration_minutes,
data.quality, data.wake_count, data.deep_minutes, data.rem_minutes,
data.light_minutes, data.awake_minutes, data.note, data.source
))
row = cur.fetchone()
conn.commit()
return row_to_sleep_response(row)
@router.put("/{id}")
def update_sleep(
id: int,
data: SleepCreate,
session: dict = Depends(require_auth)
):
"""Update existing sleep entry by ID."""
pid = session['profile_id']
# Convert empty strings to None for TIME fields
bedtime = data.bedtime if data.bedtime else None
wake_time = data.wake_time if data.wake_time else None
# Plausibility check: phases should sum to duration (tolerance: 5 min)
if any([data.deep_minutes, data.rem_minutes, data.light_minutes, data.awake_minutes]):
phase_sum = (data.deep_minutes or 0) + (data.rem_minutes or 0) + (data.light_minutes or 0) + (data.awake_minutes or 0)
diff = abs(data.duration_minutes - phase_sum)
if diff > 5:
raise HTTPException(
400,
f"Plausibilitätsprüfung fehlgeschlagen: Phasen-Summe ({phase_sum} min) weicht um {diff} min von Gesamtdauer ({data.duration_minutes} min) ab. Max. Toleranz: 5 min."
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
UPDATE sleep_log SET
date = %s,
bedtime = %s,
wake_time = %s,
duration_minutes = %s,
quality = %s,
wake_count = %s,
deep_minutes = %s,
rem_minutes = %s,
light_minutes = %s,
awake_minutes = %s,
note = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND profile_id = %s
RETURNING *
""", (
data.date, bedtime, wake_time, data.duration_minutes,
data.quality, data.wake_count, data.deep_minutes, data.rem_minutes,
data.light_minutes, data.awake_minutes, data.note, id, pid
))
row = cur.fetchone()
if not row:
raise HTTPException(404, "Sleep entry not found")
conn.commit()
return row_to_sleep_response(row)
@router.delete("/{id}")
def delete_sleep(
id: int,
session: dict = Depends(require_auth)
):
"""Delete sleep entry."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
DELETE FROM sleep_log
WHERE id = %s AND profile_id = %s
""", (id, pid))
conn.commit()
return {"deleted": id}
# ── Stats Endpoints ───────────────────────────────────────────────────────────
@router.get("/stats")
def get_sleep_stats(
days: int = 7,
session: dict = Depends(require_auth)
):
"""Get sleep statistics (average duration, quality, nights below goal)."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get sleep goal from profile
cur.execute("SELECT sleep_goal_minutes FROM profiles WHERE id = %s", (pid,))
profile = cur.fetchone()
sleep_goal = profile['sleep_goal_minutes'] if profile and profile['sleep_goal_minutes'] else 450
# Calculate stats
cur.execute("""
SELECT
AVG(duration_minutes)::FLOAT as avg_duration,
AVG(quality)::FLOAT as avg_quality,
COUNT(*) as total_nights,
COUNT(CASE WHEN duration_minutes < %s THEN 1 END) as nights_below_goal
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
""", (sleep_goal, pid, days))
stats = cur.fetchone()
return SleepStatsResponse(
avg_duration_minutes=round(stats['avg_duration'], 1) if stats['avg_duration'] else 0,
avg_quality=round(stats['avg_quality'], 1) if stats['avg_quality'] else None,
total_nights=stats['total_nights'],
nights_below_goal=stats['nights_below_goal'],
sleep_goal_minutes=sleep_goal
)
@router.get("/debt")
def get_sleep_debt(
days: int = 14,
session: dict = Depends(require_auth)
):
"""Calculate sleep debt over last N days."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get sleep goal
cur.execute("SELECT sleep_goal_minutes FROM profiles WHERE id = %s", (pid,))
profile = cur.fetchone()
sleep_goal = profile['sleep_goal_minutes'] if profile and profile['sleep_goal_minutes'] else 450
# Calculate debt
cur.execute("""
SELECT
SUM(%s - duration_minutes) as debt_minutes,
COUNT(*) as nights_analyzed
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
""", (sleep_goal, pid, days))
result = cur.fetchone()
debt_min = int(result['debt_minutes']) if result['debt_minutes'] else 0
nights = result['nights_analyzed'] if result['nights_analyzed'] else 0
# Format debt
if debt_min == 0:
formatted = "0 kein Defizit"
elif debt_min > 0:
formatted = f"+{format_duration(debt_min)}"
else:
formatted = f"{format_duration(abs(debt_min))}"
return SleepDebtResponse(
sleep_debt_minutes=debt_min,
sleep_debt_formatted=formatted,
days_analyzed=nights,
sleep_goal_minutes=sleep_goal
)
@router.get("/trend")
def get_sleep_trend(
days: int = 30,
session: dict = Depends(require_auth)
):
"""Get sleep duration and quality trend over time."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT
date,
duration_minutes,
quality
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date ASC
""", (pid, days))
rows = cur.fetchall()
return [
{
"date": str(row['date']),
"duration_minutes": row['duration_minutes'],
"quality": row['quality']
}
for row in rows
]
@router.get("/phases")
def get_sleep_phases(
days: int = 30,
session: dict = Depends(require_auth)
):
"""Get sleep phase distribution (deep, REM, light, awake) over time."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT
date,
deep_minutes,
rem_minutes,
light_minutes,
awake_minutes,
duration_minutes
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
AND (deep_minutes IS NOT NULL OR rem_minutes IS NOT NULL)
ORDER BY date ASC
""", (pid, days))
rows = cur.fetchall()
return [
{
"date": str(row['date']),
"deep_minutes": row['deep_minutes'],
"rem_minutes": row['rem_minutes'],
"light_minutes": row['light_minutes'],
"awake_minutes": row['awake_minutes'],
"deep_percent": calculate_phase_percent(row['deep_minutes'], row['duration_minutes']),
"rem_percent": calculate_phase_percent(row['rem_minutes'], row['duration_minutes'])
}
for row in rows
]
# ── Import Endpoints ──────────────────────────────────────────────────────────
@router.post("/import/apple-health")
async def import_apple_health_sleep(
file: UploadFile = File(...),
session: dict = Depends(require_auth)
):
"""
Import sleep data from Apple Health CSV export.
Expected CSV format:
Start,End,Duration (hr),Value,Source
2026-03-14 22:44:23,2026-03-14 23:00:19,0.266,Kern,Apple Watch
- Aggregates segments by night (wake date)
- Maps German phase names: Kern→light, REM→rem, Tief→deep, Wach→awake
- Stores raw segments in JSONB
- Does NOT overwrite manual entries (source='manual')
"""
pid = session['profile_id']
# Read CSV
content = await file.read()
csv_text = content.decode('utf-8-sig') # Handle BOM
reader = csv.DictReader(io.StringIO(csv_text))
# Phase mapping (German → English)
phase_map = {
'Kern': 'light',
'REM': 'rem',
'Tief': 'deep',
'Wach': 'awake',
'Schlafend': None # Ignore initial sleep entry
}
# Parse segments
segments = []
for row in reader:
phase_de = row['Value'].strip()
phase_en = phase_map.get(phase_de)
if phase_en is None: # Skip "Schlafend"
continue
start_dt = datetime.strptime(row['Start'], '%Y-%m-%d %H:%M:%S')
end_dt = datetime.strptime(row['End'], '%Y-%m-%d %H:%M:%S')
duration_hr = float(row['Duration (hr)'])
duration_min = int(duration_hr * 60)
segments.append({
'start': start_dt,
'end': end_dt,
'duration_min': duration_min,
'phase': phase_en
})
# Sort segments chronologically
segments.sort(key=lambda s: s['start'])
# Group segments into nights (gap-based)
# If gap between segments > 2 hours → new night
nights = []
current_night = None
for seg in segments:
# Start new night if:
# 1. First segment
# 2. Gap > 2 hours since last segment
if current_night is None or (seg['start'] - current_night['wake_time']).total_seconds() > 7200:
current_night = {
'bedtime': seg['start'],
'wake_time': seg['end'],
'segments': [],
'deep_minutes': 0,
'rem_minutes': 0,
'light_minutes': 0,
'awake_minutes': 0
}
nights.append(current_night)
# Add segment to current night
current_night['segments'].append(seg)
current_night['wake_time'] = max(current_night['wake_time'], seg['end'])
current_night['bedtime'] = min(current_night['bedtime'], seg['start'])
# Sum phases
if seg['phase'] == 'deep':
current_night['deep_minutes'] += seg['duration_min']
elif seg['phase'] == 'rem':
current_night['rem_minutes'] += seg['duration_min']
elif seg['phase'] == 'light':
current_night['light_minutes'] += seg['duration_min']
elif seg['phase'] == 'awake':
current_night['awake_minutes'] += seg['duration_min']
# Convert nights list to dict with wake_date as key
nights_dict = {}
for night in nights:
wake_date = night['wake_time'].date() # Date when you woke up
nights_dict[wake_date] = night
# Insert nights
imported = 0
skipped = 0
with get_db() as conn:
cur = get_cursor(conn)
for date, night in nights_dict.items():
# Calculate total duration (sum of all phases)
duration_minutes = (
night['deep_minutes'] +
night['rem_minutes'] +
night['light_minutes'] +
night['awake_minutes']
)
# Calculate wake_count (number of awake segments)
wake_count = sum(1 for seg in night['segments'] if seg['phase'] == 'awake')
# Prepare JSONB segments with full datetime
sleep_segments = [
{
'phase': seg['phase'],
'start': seg['start'].isoformat(), # Full datetime: 2026-03-21T22:30:00
'end': seg['end'].isoformat(), # Full datetime: 2026-03-21T23:15:00
'duration_min': seg['duration_min']
}
for seg in night['segments']
]
# Check if manual entry exists - do NOT overwrite
cur.execute("""
SELECT id, source FROM sleep_log
WHERE profile_id = %s AND date = %s
""", (pid, date))
existing = cur.fetchone()
if existing and existing['source'] == 'manual':
skipped += 1
continue # Skip - don't overwrite manual entries
# Upsert (only if not manual)
# If entry exists and is NOT manual → update
# If entry doesn't exist → insert
if existing:
# Update existing non-manual entry
cur.execute("""
UPDATE sleep_log SET
bedtime = %s,
wake_time = %s,
duration_minutes = %s,
wake_count = %s,
deep_minutes = %s,
rem_minutes = %s,
light_minutes = %s,
awake_minutes = %s,
sleep_segments = %s,
source = 'apple_health',
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND profile_id = %s
""", (
night['bedtime'].time(),
night['wake_time'].time(),
duration_minutes,
wake_count,
night['deep_minutes'],
night['rem_minutes'],
night['light_minutes'],
night['awake_minutes'],
json.dumps(sleep_segments),
existing['id'],
pid
))
else:
# Insert new entry
cur.execute("""
INSERT INTO sleep_log (
profile_id, date, bedtime, wake_time, duration_minutes,
wake_count, deep_minutes, rem_minutes, light_minutes, awake_minutes,
sleep_segments, source, created_at, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'apple_health', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
""", (
pid,
date,
night['bedtime'].time(),
night['wake_time'].time(),
duration_minutes,
wake_count,
night['deep_minutes'],
night['rem_minutes'],
night['light_minutes'],
night['awake_minutes'],
json.dumps(sleep_segments)
))
imported += 1
conn.commit()
return {
"imported": imported,
"skipped": skipped,
"total_nights": len(nights_dict),
"message": f"{imported} Nächte importiert, {skipped} übersprungen (manuelle Einträge)"
}