mitai-jinkendo/backend/routers/sleep.py
Lars 00437a92ab
All checks were successful
Deploy Development / deploy (push) Successful in 56s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
feat: Enhance sleep module with CSV import functionality and date parsing improvements
2026-04-05 17:35:48 +02:00

780 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Sleep Module Router (v9d Phase 2b)
Endpoints:
- CRUD: list, create/upsert, update, delete
- Stats: 7-day average, trends, phase distribution, sleep debt
- Correlations: sleep ↔ resting HR, training, weight (Phase 2e)
"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import Literal
from datetime import datetime, timedelta, date
from decimal import Decimal, InvalidOperation
import csv
import io
import json
from auth import require_auth
from db import get_db, get_cursor
router = APIRouter(prefix="/api/sleep", tags=["sleep"])
def _strip_row_keys(row: dict) -> dict:
return {(k or "").strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
def _safe_float(value) -> float | None:
if value is None or value == "":
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, Decimal):
return float(value)
try:
s = str(value).strip().replace(",", ".")
return float(s)
except (ValueError, InvalidOperation):
return None
def _parse_apple_sleep_datetime(value: str) -> datetime:
raw = (value or "").strip()
if not raw:
raise ValueError("empty datetime")
fmts = (
"%Y-%m-%d %H:%M:%S %z",
"%d.%m.%y %H:%M:%S",
"%d.%m.%Y %H:%M:%S",
"%Y-%m-%d %H:%M:%S",
)
last_err = None
for fmt in fmts:
try:
return datetime.strptime(raw, fmt)
except ValueError as e:
last_err = e
raise ValueError(f"Unbekanntes Datumsformat: {raw!r}") from last_err
def _hr_to_minutes(hours: float | None) -> int:
if hours is None:
return 0
return int(round(float(hours) * 60))
def _detect_apple_sleep_csv_format(fieldnames: list[str] | None) -> Literal["segments", "summary"]:
if not fieldnames:
raise HTTPException(
status_code=400,
detail="CSV enthält keine Spaltenüberschriften.",
)
fn = {(f or "").strip() for f in fieldnames}
if {"Start", "End", "Duration (hr)", "Value"}.issubset(fn):
return "segments"
if "Start" in fn and "End" in fn and "Total Sleep (hr)" in fn:
return "summary"
raise HTTPException(
status_code=400,
detail=(
"Unbekanntes Apple-Health-Schlaf-CSV. "
"Erwartet wird entweder der Segment-Export (Spalten Start, End, Duration (hr), Value) "
"oder die Schlafanalyse mit Nacht-Zusammenfassung (u. a. Total Sleep (hr), Core/Light, Deep, REM)."
),
)
def _build_nights_from_apple_summary(reader: csv.DictReader) -> dict[date, dict]:
nights_dict: dict[date, dict] = {}
for raw in reader:
row = _strip_row_keys(raw)
start_s = row.get("Start") or ""
end_s = row.get("End") or ""
if not start_s or not end_s:
continue
try:
start_dt = _parse_apple_sleep_datetime(start_s)
end_dt = _parse_apple_sleep_datetime(end_s)
except ValueError:
continue
dt_key = (row.get("Date/Time") or row.get("Datum/Uhrzeit") or "").strip()
if dt_key:
try:
wake_d = datetime.strptime(dt_key[:10], "%Y-%m-%d").date()
except ValueError:
wake_d = end_dt.date()
else:
wake_d = end_dt.date()
core_hr = _safe_float(row.get("Core (hr)"))
if core_hr is None:
core_hr = _safe_float(row.get("Light (hr)")) or 0.0
deep_min = _hr_to_minutes(_safe_float(row.get("Deep (hr)")))
rem_min = _hr_to_minutes(_safe_float(row.get("REM (hr)")))
light_min = _hr_to_minutes(core_hr)
awake_min = _hr_to_minutes(_safe_float(row.get("Awake (hr)")))
nights_dict[wake_d] = {
"bedtime": start_dt,
"wake_time": end_dt,
"segments": [],
"deep_minutes": deep_min,
"rem_minutes": rem_min,
"light_minutes": light_min,
"awake_minutes": awake_min,
}
return nights_dict
def _build_nights_from_apple_segments(reader: csv.DictReader, phase_map: dict) -> dict[date, dict]:
segments = []
for raw in reader:
row = _strip_row_keys(raw)
phase_key = (row.get("Value") or "").strip()
phase_en = phase_map.get(phase_key)
if phase_en is None:
continue
try:
start_dt = _parse_apple_sleep_datetime(row.get("Start") or "")
end_dt = _parse_apple_sleep_datetime(row.get("End") or "")
duration_hr = _safe_float(row.get("Duration (hr)"))
if duration_hr is None:
continue
except (ValueError, TypeError):
continue
duration_min = int(duration_hr * 60)
segments.append({
"start": start_dt,
"end": end_dt,
"duration_min": duration_min,
"phase": phase_en,
})
segments.sort(key=lambda s: s["start"])
nights = []
current_night = None
for seg in segments:
if current_night is None or (seg["start"] - current_night["wake_time"]).total_seconds() > 7200:
current_night = {
"bedtime": seg["start"],
"wake_time": seg["end"],
"segments": [],
"deep_minutes": 0,
"rem_minutes": 0,
"light_minutes": 0,
"awake_minutes": 0,
}
nights.append(current_night)
current_night["segments"].append(seg)
current_night["wake_time"] = max(current_night["wake_time"], seg["end"])
current_night["bedtime"] = min(current_night["bedtime"], seg["start"])
if seg["phase"] == "deep":
current_night["deep_minutes"] += seg["duration_min"]
elif seg["phase"] == "rem":
current_night["rem_minutes"] += seg["duration_min"]
elif seg["phase"] == "light":
current_night["light_minutes"] += seg["duration_min"]
elif seg["phase"] == "awake":
current_night["awake_minutes"] += seg["duration_min"]
nights_dict: dict[date, dict] = {}
for night in nights:
wake_date = night["wake_time"].date()
nights_dict[wake_date] = night
return nights_dict
# ── Models ────────────────────────────────────────────────────────────────────
class SleepCreate(BaseModel):
date: str # YYYY-MM-DD
bedtime: str | None = None # HH:MM
wake_time: str | None = None # HH:MM
duration_minutes: int
quality: int | None = None # 1-5
wake_count: int | None = None
deep_minutes: int | None = None
rem_minutes: int | None = None
light_minutes: int | None = None
awake_minutes: int | None = None
note: str = ""
source: Literal['manual', 'apple_health', 'garmin'] = 'manual'
class SleepResponse(BaseModel):
id: int
profile_id: str
date: str
bedtime: str | None
wake_time: str | None
duration_minutes: int
duration_formatted: str
quality: int | None
wake_count: int | None
deep_minutes: int | None
rem_minutes: int | None
light_minutes: int | None
awake_minutes: int | None
sleep_segments: list | None
sleep_efficiency: float | None
deep_percent: float | None
rem_percent: float | None
note: str
source: str
created_at: str
class SleepStatsResponse(BaseModel):
avg_duration_minutes: float
avg_quality: float | None
total_nights: int
nights_below_goal: int
sleep_goal_minutes: int
class SleepDebtResponse(BaseModel):
sleep_debt_minutes: int
sleep_debt_formatted: str
days_analyzed: int
sleep_goal_minutes: int
# ── Helper Functions ──────────────────────────────────────────────────────────
def format_duration(minutes: int) -> str:
"""Convert minutes to 'Xh Ymin' format."""
hours = minutes // 60
mins = minutes % 60
return f"{hours}h {mins}min"
def calculate_sleep_efficiency(duration_min: int, awake_min: int | None) -> float | None:
"""Sleep efficiency = duration / (duration + awake) * 100."""
if awake_min is None or awake_min == 0:
return None
total = duration_min + awake_min
return round((duration_min / total) * 100, 1) if total > 0 else None
def calculate_phase_percent(phase_min: int | None, duration_min: int) -> float | None:
"""Calculate phase percentage of total duration."""
if phase_min is None or duration_min == 0:
return None
return round((phase_min / duration_min) * 100, 1)
def row_to_sleep_response(row: dict) -> SleepResponse:
"""Convert DB row to SleepResponse."""
return SleepResponse(
id=row['id'],
profile_id=row['profile_id'],
date=str(row['date']),
bedtime=str(row['bedtime']) if row['bedtime'] else None,
wake_time=str(row['wake_time']) if row['wake_time'] else None,
duration_minutes=row['duration_minutes'],
duration_formatted=format_duration(row['duration_minutes']),
quality=row['quality'],
wake_count=row['wake_count'],
deep_minutes=row['deep_minutes'],
rem_minutes=row['rem_minutes'],
light_minutes=row['light_minutes'],
awake_minutes=row['awake_minutes'],
sleep_segments=row['sleep_segments'],
sleep_efficiency=calculate_sleep_efficiency(row['duration_minutes'], row['awake_minutes']),
deep_percent=calculate_phase_percent(row['deep_minutes'], row['duration_minutes']),
rem_percent=calculate_phase_percent(row['rem_minutes'], row['duration_minutes']),
note=row['note'] or "",
source=row['source'],
created_at=str(row['created_at'])
)
# ── CRUD Endpoints ────────────────────────────────────────────────────────────
@router.get("")
def list_sleep(
limit: int = 90,
session: dict = Depends(require_auth)
):
"""List sleep entries for current profile (last N days)."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT * FROM sleep_log
WHERE profile_id = %s
ORDER BY date DESC
LIMIT %s
""", (pid, limit))
rows = cur.fetchall()
return [row_to_sleep_response(row) for row in rows]
@router.get("/by-date/{date}")
def get_sleep_by_date(
date: str,
session: dict = Depends(require_auth)
):
"""Get sleep entry for specific date."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT * FROM sleep_log
WHERE profile_id = %s AND date = %s
""", (pid, date))
row = cur.fetchone()
if not row:
raise HTTPException(404, "No sleep entry for this date")
return row_to_sleep_response(row)
@router.post("")
def create_sleep(
data: SleepCreate,
session: dict = Depends(require_auth)
):
"""Create or update sleep entry (upsert by date)."""
pid = session['profile_id']
# Convert empty strings to None for TIME fields
bedtime = data.bedtime if data.bedtime else None
wake_time = data.wake_time if data.wake_time else None
# Plausibility check: sleep phases (deep+rem+light) should sum to duration
# Note: awake_minutes is NOT part of sleep duration (tracked separately)
if any([data.deep_minutes, data.rem_minutes, data.light_minutes]):
sleep_phase_sum = (data.deep_minutes or 0) + (data.rem_minutes or 0) + (data.light_minutes or 0)
diff = abs(data.duration_minutes - sleep_phase_sum)
if diff > 5:
raise HTTPException(
400,
f"Plausibilitätsprüfung fehlgeschlagen: Schlafphasen-Summe ({sleep_phase_sum} min) weicht um {diff} min von Schlafdauer ({data.duration_minutes} min) ab. Max. Toleranz: 5 min. Hinweis: Wachphasen werden nicht zur Schlafdauer gezählt."
)
with get_db() as conn:
cur = get_cursor(conn)
# Upsert: INSERT ... ON CONFLICT DO UPDATE
cur.execute("""
INSERT INTO sleep_log (
profile_id, date, bedtime, wake_time, duration_minutes,
quality, wake_count, deep_minutes, rem_minutes, light_minutes,
awake_minutes, note, source, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP
)
ON CONFLICT (profile_id, date) DO UPDATE SET
bedtime = EXCLUDED.bedtime,
wake_time = EXCLUDED.wake_time,
duration_minutes = EXCLUDED.duration_minutes,
quality = EXCLUDED.quality,
wake_count = EXCLUDED.wake_count,
deep_minutes = EXCLUDED.deep_minutes,
rem_minutes = EXCLUDED.rem_minutes,
light_minutes = EXCLUDED.light_minutes,
awake_minutes = EXCLUDED.awake_minutes,
note = EXCLUDED.note,
source = EXCLUDED.source,
updated_at = CURRENT_TIMESTAMP
RETURNING *
""", (
pid, data.date, bedtime, wake_time, data.duration_minutes,
data.quality, data.wake_count, data.deep_minutes, data.rem_minutes,
data.light_minutes, data.awake_minutes, data.note, data.source
))
row = cur.fetchone()
conn.commit()
return row_to_sleep_response(row)
@router.put("/{id}")
def update_sleep(
id: int,
data: SleepCreate,
session: dict = Depends(require_auth)
):
"""Update existing sleep entry by ID."""
pid = session['profile_id']
# Convert empty strings to None for TIME fields
bedtime = data.bedtime if data.bedtime else None
wake_time = data.wake_time if data.wake_time else None
# Plausibility check: sleep phases (deep+rem+light) should sum to duration
# Note: awake_minutes is NOT part of sleep duration (tracked separately)
if any([data.deep_minutes, data.rem_minutes, data.light_minutes]):
sleep_phase_sum = (data.deep_minutes or 0) + (data.rem_minutes or 0) + (data.light_minutes or 0)
diff = abs(data.duration_minutes - sleep_phase_sum)
if diff > 5:
raise HTTPException(
400,
f"Plausibilitätsprüfung fehlgeschlagen: Schlafphasen-Summe ({sleep_phase_sum} min) weicht um {diff} min von Schlafdauer ({data.duration_minutes} min) ab. Max. Toleranz: 5 min. Hinweis: Wachphasen werden nicht zur Schlafdauer gezählt."
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
UPDATE sleep_log SET
date = %s,
bedtime = %s,
wake_time = %s,
duration_minutes = %s,
quality = %s,
wake_count = %s,
deep_minutes = %s,
rem_minutes = %s,
light_minutes = %s,
awake_minutes = %s,
note = %s,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND profile_id = %s
RETURNING *
""", (
data.date, bedtime, wake_time, data.duration_minutes,
data.quality, data.wake_count, data.deep_minutes, data.rem_minutes,
data.light_minutes, data.awake_minutes, data.note, id, pid
))
row = cur.fetchone()
if not row:
raise HTTPException(404, "Sleep entry not found")
conn.commit()
return row_to_sleep_response(row)
@router.delete("/{id}")
def delete_sleep(
id: int,
session: dict = Depends(require_auth)
):
"""Delete sleep entry."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
DELETE FROM sleep_log
WHERE id = %s AND profile_id = %s
""", (id, pid))
conn.commit()
return {"deleted": id}
# ── Stats Endpoints ───────────────────────────────────────────────────────────
@router.get("/stats")
def get_sleep_stats(
days: int = 7,
session: dict = Depends(require_auth)
):
"""Get sleep statistics (average duration, quality, nights below goal)."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get sleep goal from profile
cur.execute("SELECT sleep_goal_minutes FROM profiles WHERE id = %s", (pid,))
profile = cur.fetchone()
sleep_goal = profile['sleep_goal_minutes'] if profile and profile['sleep_goal_minutes'] else 450
# Calculate stats
cur.execute("""
SELECT
AVG(duration_minutes)::FLOAT as avg_duration,
AVG(quality)::FLOAT as avg_quality,
COUNT(*) as total_nights,
COUNT(CASE WHEN duration_minutes < %s THEN 1 END) as nights_below_goal
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
""", (sleep_goal, pid, days))
stats = cur.fetchone()
return SleepStatsResponse(
avg_duration_minutes=round(stats['avg_duration'], 1) if stats['avg_duration'] else 0,
avg_quality=round(stats['avg_quality'], 1) if stats['avg_quality'] else None,
total_nights=stats['total_nights'],
nights_below_goal=stats['nights_below_goal'],
sleep_goal_minutes=sleep_goal
)
@router.get("/debt")
def get_sleep_debt(
days: int = 14,
session: dict = Depends(require_auth)
):
"""Calculate sleep debt over last N days."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get sleep goal
cur.execute("SELECT sleep_goal_minutes FROM profiles WHERE id = %s", (pid,))
profile = cur.fetchone()
sleep_goal = profile['sleep_goal_minutes'] if profile and profile['sleep_goal_minutes'] else 450
# Calculate debt
cur.execute("""
SELECT
SUM(%s - duration_minutes) as debt_minutes,
COUNT(*) as nights_analyzed
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
""", (sleep_goal, pid, days))
result = cur.fetchone()
debt_min = int(result['debt_minutes']) if result['debt_minutes'] else 0
nights = result['nights_analyzed'] if result['nights_analyzed'] else 0
# Format debt
if debt_min == 0:
formatted = "0 kein Defizit"
elif debt_min > 0:
formatted = f"+{format_duration(debt_min)}"
else:
formatted = f"{format_duration(abs(debt_min))}"
return SleepDebtResponse(
sleep_debt_minutes=debt_min,
sleep_debt_formatted=formatted,
days_analyzed=nights,
sleep_goal_minutes=sleep_goal
)
@router.get("/trend")
def get_sleep_trend(
days: int = 30,
session: dict = Depends(require_auth)
):
"""Get sleep duration and quality trend over time."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT
date,
duration_minutes,
quality
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
ORDER BY date ASC
""", (pid, days))
rows = cur.fetchall()
return [
{
"date": str(row['date']),
"duration_minutes": row['duration_minutes'],
"quality": row['quality']
}
for row in rows
]
@router.get("/phases")
def get_sleep_phases(
days: int = 30,
session: dict = Depends(require_auth)
):
"""Get sleep phase distribution (deep, REM, light, awake) over time."""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT
date,
deep_minutes,
rem_minutes,
light_minutes,
awake_minutes,
duration_minutes
FROM sleep_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '%s days'
AND (deep_minutes IS NOT NULL OR rem_minutes IS NOT NULL)
ORDER BY date ASC
""", (pid, days))
rows = cur.fetchall()
return [
{
"date": str(row['date']),
"deep_minutes": row['deep_minutes'],
"rem_minutes": row['rem_minutes'],
"light_minutes": row['light_minutes'],
"awake_minutes": row['awake_minutes'],
"deep_percent": calculate_phase_percent(row['deep_minutes'], row['duration_minutes']),
"rem_percent": calculate_phase_percent(row['rem_minutes'], row['duration_minutes'])
}
for row in rows
]
# ── Import Endpoints ──────────────────────────────────────────────────────────
@router.post("/import/apple-health")
async def import_apple_health_sleep(
file: UploadFile = File(...),
session: dict = Depends(require_auth)
):
"""
Import sleep data from Apple Health CSV export.
Supports:
- Segment-Export: Start, End, Duration (hr), Value (Kern/Tief/REM/Wach oder Core/Deep/…)
- Schlafanalyse (Nacht-Zusammenfassung): Date/Time, Start, End, Total Sleep (hr),
Core/Light (hr), Deep (hr), REM (hr), Awake (hr), …
- Aggregates segments by night (wake date) where applicable
- Stores Roh-Segmente in JSONB (bei Zusammenfassung oft leer)
- Überschreibt keine manuellen Einträge (source='manual')
"""
pid = session['profile_id']
content = await file.read()
csv_text = content.decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(csv_text))
fmt = _detect_apple_sleep_csv_format(reader.fieldnames)
phase_map = {
"Kern": "light",
"Core": "light",
"Light": "light",
"REM": "rem",
"Tief": "deep",
"Deep": "deep",
"Wach": "awake",
"Awake": "awake",
"Schlafend": None,
"Asleep": None,
"In Bed": None,
}
if fmt == "summary":
nights_dict = _build_nights_from_apple_summary(reader)
else:
nights_dict = _build_nights_from_apple_segments(reader, phase_map)
if not nights_dict:
raise HTTPException(
status_code=400,
detail="Keine importierbaren Schlafzeilen gefunden (prüfe Start/Ende und Format).",
)
# Insert nights
imported = 0
skipped = 0
with get_db() as conn:
cur = get_cursor(conn)
for date, night in nights_dict.items():
# Calculate sleep duration (deep + rem + light, WITHOUT awake)
# Note: awake_minutes tracked separately, not part of sleep duration
duration_minutes = (
night['deep_minutes'] +
night['rem_minutes'] +
night['light_minutes']
)
# Calculate wake_count (number of awake segments)
wake_count = sum(1 for seg in night['segments'] if seg['phase'] == 'awake')
# Prepare JSONB segments with full datetime
sleep_segments = [
{
'phase': seg['phase'],
'start': seg['start'].isoformat(), # Full datetime: 2026-03-21T22:30:00
'end': seg['end'].isoformat(), # Full datetime: 2026-03-21T23:15:00
'duration_min': seg['duration_min']
}
for seg in night['segments']
]
# Check if manual entry exists - do NOT overwrite
cur.execute("""
SELECT id, source FROM sleep_log
WHERE profile_id = %s AND date = %s
""", (pid, date))
existing = cur.fetchone()
if existing and existing['source'] == 'manual':
skipped += 1
continue # Skip - don't overwrite manual entries
# Upsert (only if not manual)
# If entry exists and is NOT manual → update
# If entry doesn't exist → insert
if existing:
# Update existing non-manual entry
cur.execute("""
UPDATE sleep_log SET
bedtime = %s,
wake_time = %s,
duration_minutes = %s,
wake_count = %s,
deep_minutes = %s,
rem_minutes = %s,
light_minutes = %s,
awake_minutes = %s,
sleep_segments = %s,
source = 'apple_health',
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND profile_id = %s
""", (
night['bedtime'].time(),
night['wake_time'].time(),
duration_minutes,
wake_count,
night['deep_minutes'],
night['rem_minutes'],
night['light_minutes'],
night['awake_minutes'],
json.dumps(sleep_segments),
existing['id'],
pid
))
else:
# Insert new entry
cur.execute("""
INSERT INTO sleep_log (
profile_id, date, bedtime, wake_time, duration_minutes,
wake_count, deep_minutes, rem_minutes, light_minutes, awake_minutes,
sleep_segments, source, created_at, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'apple_health', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
""", (
pid,
date,
night['bedtime'].time(),
night['wake_time'].time(),
duration_minutes,
wake_count,
night['deep_minutes'],
night['rem_minutes'],
night['light_minutes'],
night['awake_minutes'],
json.dumps(sleep_segments)
))
imported += 1
conn.commit()
return {
"imported": imported,
"skipped": skipped,
"total_nights": len(nights_dict),
"message": f"{imported} Nächte importiert, {skipped} übersprungen (manuelle Einträge)"
}