mitai-jinkendo/backend/csv_parser/sleep_apple_import.py
Lars 41cc0ed2a8
Some checks failed
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend-csv (push) Failing after 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat(csv-import): Enhance Apple sleep CSV import functionality
- Integrated date parsing improvements using dateutil for better handling of various date formats in sleep data.
- Added total sleep hours to the nights dictionary for comprehensive sleep analysis.
- Updated the import logic to handle cases where sleep duration is zero, providing appropriate warnings.
- Enhanced the CSV import interface to detect Apple sleep CSV format and provide user feedback on template selection.
- Improved the admin CSV template editor to accommodate new sleep import requirements and clarify usage instructions.
2026-04-10 07:52:04 +02:00

351 lines
12 KiB
Python

"""
Apple-Health-Schlaf-CSV → sleep_log (für Universal-Import und /api/sleep/import).
Nutzt dieselbe Logik wie der Sleep-Router, ohne HTTPException — arbeitet auf einem übergebenen Cursor.
"""
from __future__ import annotations
import csv
import io
import json
import logging
from datetime import date, datetime
from decimal import Decimal, InvalidOperation
from typing import Any, Literal
from dateutil import parser as dateutil_parser
logger = logging.getLogger(__name__)
def _strip_row_keys(row: dict) -> dict:
return {(k or "").strip(): (v.strip() if isinstance(v, str) else v) for k, v in row.items()}
def _safe_float(value: Any) -> float | None:
if value is None or value == "":
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, Decimal):
return float(value)
try:
s = str(value).strip().replace(",", ".")
return float(s)
except (ValueError, InvalidOperation):
return None
def _parse_apple_sleep_datetime(value: str) -> datetime:
raw = (value or "").strip()
if not raw:
raise ValueError("empty datetime")
fmts = (
"%Y-%m-%d %H:%M:%S %z",
"%d.%m.%y %H:%M:%S",
"%d.%m.%Y %H:%M:%S",
"%Y-%m-%d %H:%M:%S",
)
for fmt in fmts:
try:
return datetime.strptime(raw, fmt)
except ValueError:
continue
try:
return dateutil_parser.parse(raw, dayfirst=False)
except (ValueError, TypeError, OverflowError) as e:
raise ValueError(f"Unbekanntes Datumsformat: {raw!r}") from e
def _hr_to_minutes(hours: float | None) -> int:
if hours is None:
return 0
return int(round(float(hours) * 60))
def detect_apple_sleep_csv_format(fieldnames: list[str] | None) -> Literal["segments", "summary"]:
if not fieldnames:
raise ValueError("CSV enthält keine Spaltenüberschriften.")
fn = {(f or "").strip() for f in fieldnames}
if {"Start", "End", "Duration (hr)", "Value"}.issubset(fn):
return "segments"
if "Start" in fn and "End" in fn and "Total Sleep (hr)" in fn:
return "summary"
raise ValueError(
"Unbekanntes Apple-Health-Schlaf-CSV. Erwartet: Segment-Export oder Schlafanalyse-Zusammenfassung."
)
def _build_nights_from_apple_summary(reader: csv.DictReader) -> dict[date, dict]:
nights_dict: dict[date, dict] = {}
for raw in reader:
row = _strip_row_keys(raw)
start_s = row.get("Start") or ""
end_s = row.get("End") or ""
if not start_s or not end_s:
continue
try:
start_dt = _parse_apple_sleep_datetime(start_s)
end_dt = _parse_apple_sleep_datetime(end_s)
except ValueError:
continue
dt_key = (row.get("Date/Time") or row.get("Datum/Uhrzeit") or "").strip()
if dt_key:
try:
wake_d = datetime.strptime(dt_key[:10], "%Y-%m-%d").date()
except ValueError:
wake_d = end_dt.date()
else:
wake_d = end_dt.date()
core_hr = _safe_float(row.get("Core (hr)"))
if core_hr is None:
core_hr = _safe_float(row.get("Light (hr)")) or 0.0
deep_min = _hr_to_minutes(_safe_float(row.get("Deep (hr)")))
rem_min = _hr_to_minutes(_safe_float(row.get("REM (hr)")))
light_min = _hr_to_minutes(core_hr)
awake_min = _hr_to_minutes(_safe_float(row.get("Awake (hr)")))
total_sleep_hr = _safe_float(row.get("Total Sleep (hr)"))
nights_dict[wake_d] = {
"bedtime": start_dt,
"wake_time": end_dt,
"segments": [],
"deep_minutes": deep_min,
"rem_minutes": rem_min,
"light_minutes": light_min,
"awake_minutes": awake_min,
"total_sleep_hr": total_sleep_hr,
}
return nights_dict
def _build_nights_from_apple_segments(reader: csv.DictReader, phase_map: dict) -> dict[date, dict]:
segments = []
for raw in reader:
row = _strip_row_keys(raw)
phase_key = (row.get("Value") or "").strip()
phase_en = phase_map.get(phase_key)
if phase_en is None:
continue
try:
start_dt = _parse_apple_sleep_datetime(row.get("Start") or "")
end_dt = _parse_apple_sleep_datetime(row.get("End") or "")
duration_hr = _safe_float(row.get("Duration (hr)"))
if duration_hr is None:
continue
except (ValueError, TypeError):
continue
duration_min = int(duration_hr * 60)
segments.append({
"start": start_dt,
"end": end_dt,
"duration_min": duration_min,
"phase": phase_en,
})
segments.sort(key=lambda s: s["start"])
nights = []
current_night = None
for seg in segments:
if current_night is None or (seg["start"] - current_night["wake_time"]).total_seconds() > 7200:
current_night = {
"bedtime": seg["start"],
"wake_time": seg["end"],
"segments": [],
"deep_minutes": 0,
"rem_minutes": 0,
"light_minutes": 0,
"awake_minutes": 0,
}
nights.append(current_night)
current_night["segments"].append(seg)
current_night["wake_time"] = max(current_night["wake_time"], seg["end"])
current_night["bedtime"] = min(current_night["bedtime"], seg["start"])
if seg["phase"] == "deep":
current_night["deep_minutes"] += seg["duration_min"]
elif seg["phase"] == "rem":
current_night["rem_minutes"] += seg["duration_min"]
elif seg["phase"] == "light":
current_night["light_minutes"] += seg["duration_min"]
elif seg["phase"] == "awake":
current_night["awake_minutes"] += seg["duration_min"]
nights_dict: dict[date, dict] = {}
for night in nights:
wake_date = night["wake_time"].date()
nights_dict[wake_date] = night
return nights_dict
def import_apple_sleep_nights(cur, profile_id: str, text: str) -> dict[str, Any]:
"""
Schreibt in sleep_log. Kein conn.commit — Aufrufer rollt Transaktion.
Gibt Statistik im Executor-Format zurück.
"""
csv_text = text.replace("\r\n", "\n").replace("\r", "\n")
if csv_text.startswith("\ufeff"):
csv_text = csv_text[1:]
reader = csv.DictReader(io.StringIO(csv_text))
fmt = detect_apple_sleep_csv_format(reader.fieldnames)
phase_map = {
"Kern": "light",
"Core": "light",
"Light": "light",
"REM": "rem",
"Tief": "deep",
"Deep": "deep",
"Wach": "awake",
"Awake": "awake",
"Schlafend": None,
"Asleep": None,
"In Bed": None,
}
if fmt == "summary":
nights_dict = _build_nights_from_apple_summary(reader)
else:
nights_dict = _build_nights_from_apple_segments(reader, phase_map)
if not nights_dict:
raise ValueError(
"Keine importierbaren Schlafzeilen gefunden (prüfe Start/Ende und Format)."
)
inserted = 0
updated = 0
skipped = 0
error_details: list[dict[str, Any]] = []
affected_ids: list[str] = []
row_hint = 0
for wake_date, night in nights_dict.items():
row_hint += 1
phase_sum = (
night["deep_minutes"] + night["rem_minutes"] + night["light_minutes"]
)
total_hr = night.get("total_sleep_hr")
fallback_min = int(round(float(total_hr) * 60)) if total_hr is not None else 0
duration_minutes = phase_sum if phase_sum > 0 else fallback_min
if duration_minutes <= 0:
logger.warning(
"Sleep import: überspringe %s — Dauer 0 (Phasen-Summe und Total Sleep (hr) leer/0).",
wake_date,
)
error_details.append(
{
"row": row_hint,
"error": f"Schlafdauer für {wake_date} ist 0 — Phasen oder Total Sleep (hr) fehlen.",
}
)
continue
wake_count = sum(1 for seg in night["segments"] if seg["phase"] == "awake")
sleep_segments = [
{
"phase": seg["phase"],
"start": seg["start"].isoformat(),
"end": seg["end"].isoformat(),
"duration_min": seg["duration_min"],
}
for seg in night["segments"]
]
cur.execute(
"""
SELECT id, source FROM sleep_log
WHERE profile_id = %s AND date = %s
""",
(profile_id, wake_date),
)
existing = cur.fetchone()
if existing and existing["source"] == "manual":
skipped += 1
continue
try:
if existing:
cur.execute(
"""
UPDATE sleep_log SET
bedtime = %s,
wake_time = %s,
duration_minutes = %s,
wake_count = %s,
deep_minutes = %s,
rem_minutes = %s,
light_minutes = %s,
awake_minutes = %s,
sleep_segments = %s,
source = 'apple_health',
updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND profile_id = %s
RETURNING id
""",
(
night["bedtime"].time(),
night["wake_time"].time(),
duration_minutes,
wake_count,
night["deep_minutes"],
night["rem_minutes"],
night["light_minutes"],
night["awake_minutes"],
json.dumps(sleep_segments),
existing["id"],
profile_id,
),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids.append(str(row["id"]))
else:
cur.execute(
"""
INSERT INTO sleep_log (
profile_id, date, bedtime, wake_time, duration_minutes,
wake_count, deep_minutes, rem_minutes, light_minutes, awake_minutes,
sleep_segments, source, created_at, updated_at
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 'apple_health', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
RETURNING id
""",
(
profile_id,
wake_date,
night["bedtime"].time(),
night["wake_time"].time(),
duration_minutes,
wake_count,
night["deep_minutes"],
night["rem_minutes"],
night["light_minutes"],
night["awake_minutes"],
json.dumps(sleep_segments),
),
)
row = cur.fetchone()
inserted += 1
if row and row.get("id"):
affected_ids.append(str(row["id"]))
except Exception as e:
logger.warning("Sleep import row failed: %s", e)
error_details.append({"row": row_hint, "error": str(e)})
return {
"rows_total": len(nights_dict),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"new_entries": inserted,
"error_details": error_details,
"affected_ids": affected_ids,
}