mitai-jinkendo/backend/data_layer/recovery_chart_payloads.py
Lars 6c962bf6e5
All checks were successful
Deploy Development / deploy (push) Successful in 1m2s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: enhance recovery metrics and dashboard with sleep debt calculations and improved visualizations
- Introduced new constants for sleep debt calculations, including target hours and rolling window days.
- Added a function to calculate sleep debt over a specified window, aligning with KPI logic.
- Updated SQL queries in recovery chart payloads to ensure accurate data retrieval for sleep metrics.
- Enhanced the RecoveryDashboardOverview component to reflect changes in sleep debt visualization and descriptions, improving user understanding of metrics.
- Refined chart labels and notes for clarity, ensuring users can easily interpret recovery and sleep data.
2026-04-20 12:47:03 +02:00

556 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Chart.js-Payloads für Recovery (R1R5) — gemeinsam mit routers/charts und recovery-dashboard-viz.
Ausgelagert aus routers/charts.py (Issue 53 / Layer 1).
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Dict, Optional, Set
from db import get_db, get_cursor
from data_layer.recovery_metrics import (
SLEEP_DEBT_ROLLING_WINDOW_DAYS,
SLEEP_DEBT_TARGET_HOURS_DEFAULT,
calculate_hrv_vs_baseline_pct,
calculate_recovery_score_v2,
calculate_rhr_vs_baseline_pct,
calculate_sleep_debt_hours,
get_sleep_duration_data,
get_sleep_quality_data,
sleep_debt_sum_hours_in_window,
)
from data_layer.utils import calculate_confidence, safe_float, serialize_dates
from data_layer.vital_signs_assessment import build_vital_items_from_rows
def build_recovery_score_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
if days < 7:
days = 7
if days > 90:
days = 90
current_score = calculate_recovery_score_v2(profile_id)
if current_score is None:
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Recovery-Daten vorhanden",
},
}
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT date, resting_hr, hrv
FROM vitals_baseline
WHERE profile_id=%s AND date >= %s
ORDER BY date""",
(profile_id, cutoff),
)
rows = cur.fetchall()
if not rows:
return {
"chart_type": "line",
"data": {
"labels": [datetime.now().strftime("%Y-%m-%d")],
"datasets": [
{
"label": "Recovery Score",
"data": [current_score],
"borderColor": "#1D9E75",
"backgroundColor": "rgba(29, 158, 117, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"fill": True,
}
],
},
"metadata": {
"confidence": "low",
"data_points": 1,
"current_score": current_score,
},
}
labels = [row["date"].isoformat() for row in rows]
values = [min(100, max(0, safe_float(row["hrv"]) if row["hrv"] else 50)) for row in rows]
return {
"chart_type": "line",
"data": {
"labels": labels,
"datasets": [
{
"label": "HRV (ms, auf 0100 begrenzt) — nicht der KPI Recovery-Score",
"data": values,
"borderColor": "#1D9E75",
"backgroundColor": "rgba(29, 158, 117, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"fill": True,
}
],
},
"metadata": serialize_dates(
{
"confidence": calculate_confidence(len(rows), days, "general"),
"data_points": len(rows),
"current_score": current_score,
"chart_series_kind": "hrv_ms_clamped",
"kpi_score_source": "calculate_recovery_score_v2",
"note": "Kurve = HRV-Rohwert (ms) begrenzt auf 0100, nur Verlaufsorientierung. "
"KPI-Kachel «Recovery-Score» = gewichteter Score (HRV, RHR, Schlaf, …).",
}
),
}
def build_hrv_rhr_baseline_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
if days < 7:
days = 7
if days > 90:
days = 90
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT date, resting_hr, hrv
FROM vitals_baseline
WHERE profile_id=%s AND date >= %s
ORDER BY date""",
(profile_id, cutoff),
)
rows = cur.fetchall()
if not rows:
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Vitalwerte vorhanden",
},
}
labels = [row["date"].isoformat() for row in rows]
hrv_values = [safe_float(row["hrv"]) if row["hrv"] else None for row in rows]
rhr_values = [safe_float(row["resting_hr"]) if row["resting_hr"] else None for row in rows]
hrv_baseline = calculate_hrv_vs_baseline_pct(profile_id)
rhr_baseline = calculate_rhr_vs_baseline_pct(profile_id)
hrv_filtered = [v for v in hrv_values if v is not None]
rhr_filtered = [v for v in rhr_values if v is not None]
avg_hrv = sum(hrv_filtered) / len(hrv_filtered) if hrv_filtered else 50
avg_rhr = sum(rhr_filtered) / len(rhr_filtered) if rhr_filtered else 60
datasets = [
{
"label": "HRV (ms)",
"data": hrv_values,
"borderColor": "#1D9E75",
"backgroundColor": "rgba(29, 158, 117, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"yAxisID": "y1",
"fill": False,
},
{
"label": "RHR (bpm)",
"data": rhr_values,
"borderColor": "#3B82F6",
"backgroundColor": "rgba(59, 130, 246, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"yAxisID": "y2",
"fill": False,
},
]
return {
"chart_type": "line",
"data": {"labels": labels, "datasets": datasets},
"metadata": serialize_dates(
{
"confidence": calculate_confidence(len(rows), days, "general"),
"data_points": len(rows),
"avg_hrv": round(avg_hrv, 1),
"avg_rhr": round(avg_rhr, 1),
"hrv_vs_baseline_pct": hrv_baseline,
"rhr_vs_baseline_pct": rhr_baseline,
}
),
}
def build_sleep_duration_quality_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
if days < 7:
days = 7
if days > 90:
days = 90
duration_data = get_sleep_duration_data(profile_id, days)
quality_data = get_sleep_quality_data(profile_id, days)
if duration_data["confidence"] == "insufficient":
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Schlafdaten vorhanden",
},
}
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT date, duration_minutes
FROM sleep_log
WHERE profile_id=%s AND date >= %s
ORDER BY date""",
(profile_id, cutoff),
)
rows = cur.fetchall()
if not rows:
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Schlafdaten",
},
}
labels = [row["date"].isoformat() for row in rows]
duration_hours = [
safe_float(row["duration_minutes"]) / 60 if row["duration_minutes"] else None for row in rows
]
quality_scores = [(d / 8 * 100) if d else None for d in duration_hours]
datasets = [
{
"label": "Schlafdauer (h)",
"data": duration_hours,
"borderColor": "#3B82F6",
"backgroundColor": "rgba(59, 130, 246, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"yAxisID": "y1",
"fill": True,
},
{
"label": "Qualität (%)",
"data": quality_scores,
"borderColor": "#1D9E75",
"backgroundColor": "rgba(29, 158, 117, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"yAxisID": "y2",
"fill": False,
},
]
return {
"chart_type": "line",
"data": {"labels": labels, "datasets": datasets},
"metadata": serialize_dates(
{
"confidence": duration_data["confidence"],
"data_points": len(rows),
"avg_duration_hours": round(duration_data["avg_duration_hours"], 1),
"sleep_quality_score": quality_data.get("quality_score", 0),
}
),
}
def build_sleep_debt_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
if days < 7:
days = 7
if days > 90:
days = 90
current_debt = calculate_sleep_debt_hours(profile_id)
if current_debt is None:
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Schlafdaten für Schulden-Berechnung",
},
}
chart_cutoff = (datetime.now() - timedelta(days=days)).date()
# Historie vor dem Chart-Fenster, damit das rollierende 14-Tage-Fenster früh korrekt gefüllt ist
ext_cutoff = (datetime.now() - timedelta(days=days + SLEEP_DEBT_ROLLING_WINDOW_DAYS + 3)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT date, duration_minutes
FROM sleep_log
WHERE profile_id=%s AND date >= %s
AND duration_minutes IS NOT NULL
ORDER BY date ASC""",
(profile_id, ext_cutoff),
)
all_rows = [dict(r) for r in cur.fetchall()]
visible = []
for r in all_rows:
rd = r.get("date")
d = rd.date() if isinstance(rd, datetime) else rd
if d >= chart_cutoff:
visible.append(r)
if not visible:
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Schlafdaten",
},
}
labels = []
debt_values = []
for r in visible:
rd = r.get("date")
end_d = rd.date() if isinstance(rd, datetime) else rd
labels.append(end_d.isoformat() if hasattr(end_d, "isoformat") else str(end_d))
debt_values.append(sleep_debt_sum_hours_in_window(all_rows, end_d))
return {
"chart_type": "line",
"data": {
"labels": labels,
"datasets": [
{
"label": f"Schlafschuld (h), rollierend {SLEEP_DEBT_ROLLING_WINDOW_DAYS} Tage — wie KPI",
"data": debt_values,
"borderColor": "#EF4444",
"backgroundColor": "rgba(239, 68, 68, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"fill": True,
}
],
},
"metadata": serialize_dates(
{
"confidence": calculate_confidence(len(visible), days, "general"),
"data_points": len(visible),
"current_debt_hours": round(float(current_debt), 1),
"sleep_debt_target_hours_per_night": SLEEP_DEBT_TARGET_HOURS_DEFAULT,
"rolling_window_days": SLEEP_DEBT_ROLLING_WINDOW_DAYS,
"note": "Gleiche Formel wie KPI: Summe der nächtlichen Defizite vs. "
f"{SLEEP_DEBT_TARGET_HOURS_DEFAULT} h/Nacht im rollierenden {SLEEP_DEBT_ROLLING_WINDOW_DAYS}-Tage-Fenster "
"(jeder Punkt = Fensterende an dem Datum). Ziel aktuell nicht in den Profileinstellungen änderbar.",
}
),
}
VITAL_BASELINE_KEYS = ("resting_hr", "hrv", "vo2_max", "spo2", "respiratory_rate")
def _vitals_row_has_any_value(row: Any) -> bool:
if not row:
return False
for k in VITAL_BASELINE_KEYS:
if row.get(k) is not None:
return True
return False
def _merge_vitals_baseline_rows(rows: Any) -> tuple[Optional[Dict[str, Any]], Optional[Any]]:
"""
Pro Kennzahl den jeweils neuesten nicht-leeren Wert (Zeilen sortiert: date DESC).
So können KPIs (Aggregation über Zeilen) Daten haben, obwohl die jüngste Zeile leer ist.
"""
if not rows:
return None, None
merged: Dict[str, Any] = {k: None for k in VITAL_BASELINE_KEYS}
for row in rows:
for k in VITAL_BASELINE_KEYS:
if merged[k] is None and row.get(k) is not None:
merged[k] = row[k]
if all(merged[k] is not None for k in VITAL_BASELINE_KEYS):
break
if not _vitals_row_has_any_value(merged):
return None, None
newest_date = rows[0].get("date") if rows else None
return merged, newest_date
def _bp_row_complete(row: Any) -> bool:
return bool(row and row.get("systolic") is not None and row.get("diastolic") is not None)
def _tone_to_bar_value(tone: str) -> float:
return {"good": 88.0, "warn": 52.0, "bad": 22.0, "neutral": 62.0}.get(tone, 55.0)
def build_vital_signs_matrix_chart_payload(
profile_id: str,
days: int,
omit_snapshot_keys: Optional[Set[str]] = None,
) -> Dict[str, Any]:
"""Letzte Messungen im Fenster; sonst Fallback auf jüngste Messung überhaupt (Issue 53 / Layer 1).
omit_snapshot_keys: z. B. {'resting_hr','hrv'} wenn dieselbe Einordnung bereits im Vital-Verlauf steht.
"""
if days < 7:
days = 7
if days > 365:
days = 365
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
bp_row = None
vitals_measured_at = None
bp_measured_at = None
vitals_for_items: Optional[Dict[str, Any]] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
FROM vitals_baseline
WHERE profile_id=%s AND date >= %s
ORDER BY date DESC
LIMIT 200""",
(profile_id, cutoff),
)
vitals_merged, vitals_date = _merge_vitals_baseline_rows(cur.fetchall())
if vitals_merged is None:
cur.execute(
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
FROM vitals_baseline
WHERE profile_id=%s
ORDER BY date DESC
LIMIT 400""",
(profile_id,),
)
vitals_merged, vitals_date = _merge_vitals_baseline_rows(cur.fetchall())
if vitals_merged is not None:
vitals_for_items = dict(vitals_merged)
if vitals_date is not None:
vitals_measured_at = vitals_date.isoformat() if hasattr(vitals_date, "isoformat") else str(vitals_date)
cur.execute(
"""SELECT measured_at, systolic, diastolic
FROM blood_pressure_log
WHERE profile_id=%s AND measured_at::date >= %s::date
ORDER BY measured_at DESC
LIMIT 1""",
(profile_id, cutoff),
)
bp_row = cur.fetchone()
if bp_row and bp_row.get("measured_at") is not None:
bp_measured_at = bp_row["measured_at"]
if not _bp_row_complete(bp_row):
cur.execute(
"""SELECT measured_at, systolic, diastolic
FROM blood_pressure_log
WHERE profile_id=%s
ORDER BY measured_at DESC
LIMIT 1""",
(profile_id,),
)
bp_row = cur.fetchone()
if bp_row and bp_row.get("measured_at") is not None:
bp_measured_at = bp_row["measured_at"]
bp_for_items = None
if bp_row:
bp_for_items = {"systolic": bp_row.get("systolic"), "diastolic": bp_row.get("diastolic")}
items = build_vital_items_from_rows(
vitals_for_items, bp_for_items, omit_keys=omit_snapshot_keys
)
if not items and vitals_for_items and omit_snapshot_keys:
items = build_vital_items_from_rows(vitals_for_items, bp_for_items, omit_keys=None)
if not items:
return {
"chart_type": "bar",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Vitalwerte mit Zahlenwerten — Baseline-Vitals und/oder Blutdruck erfassen.",
"vital_items": [],
"vitals_measured_at": vitals_measured_at,
"blood_pressure_measured_at": bp_measured_at.isoformat() if bp_measured_at and hasattr(bp_measured_at, "isoformat") else None,
},
}
for it in items:
it["bar_value"] = round(_tone_to_bar_value(it["tone"]), 1)
labels_short = [it["label_de"] for it in items]
bar_values = [it["bar_value"] for it in items]
colors = []
for it in items:
t = it["tone"]
if t == "good":
colors.append("#1D9E75")
elif t == "warn":
colors.append("#EF9F27")
elif t == "bad":
colors.append("#D85A30")
else:
colors.append("#6B7280")
return {
"chart_type": "bar",
"data": {
"labels": labels_short,
"datasets": [
{
"label": "Einschätzung (relativ)",
"data": bar_values,
"backgroundColor": colors,
"borderColor": colors,
"borderWidth": 1,
}
],
},
"metadata": serialize_dates(
{
"confidence": "medium",
"data_points": len(items),
"note": "Orientierende Zonen, keine Diagnose. Balken = relative Einordnung (nicht körperliche Einheit).",
"vital_items": items,
"bar_is_relative_score": True,
"vitals_measured_at": vitals_measured_at,
"blood_pressure_measured_at": bp_measured_at.isoformat()
if bp_measured_at and hasattr(bp_measured_at, "isoformat")
else (str(bp_measured_at) if bp_measured_at else None),
"disclaimer_de": "Hinweis: Nur Orientierung; bei Beschwerden oder auffälligen Werten ärztlich abklären.",
}
),
}