- Introduced `diagnose_blood_pressure_row` and `diagnose_activity_row` functions to validate and analyze blood pressure and activity data from CSV imports. - Updated the CSV import logic to handle combined datetime columns for blood pressure and activity, improving data integrity during import. - Enhanced type conversion specifications to include `start_time` for blood pressure and activity, ensuring accurate data mapping. - Added tests to validate the new diagnosis functions and their integration with existing import processes, ensuring robustness and reliability. - Updated frontend messages to provide clearer guidance on blood pressure and activity data handling during CSV imports.
386 lines
14 KiB
Python
386 lines
14 KiB
Python
"""
|
|
Smoke-Tests für Universal-CSV-Import (Executor + Apple-Schlaf-Parser).
|
|
|
|
Nutzt einen minimalen Fake-Cursor (kein PostgreSQL), damit die Pipelines bei jedem
|
|
pytest-Lauf mitlaufen.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from csv_parser.executor import diagnose_vitals_row, run_universal_csv_import
|
|
from csv_parser.sleep_apple_import import detect_apple_sleep_csv_format
|
|
from csv_parser.type_converter import build_row_after_mapping, diagnose_row_mapping
|
|
|
|
|
|
class _SeqCursor:
|
|
"""Minimaler Cursor: execute protokolliert; fetchone liefert vorgegebene Sequenz."""
|
|
|
|
def __init__(self, fetch_sequence: list) -> None:
|
|
self.executes: list[tuple[str, tuple | None]] = []
|
|
self._fetch = list(fetch_sequence)
|
|
|
|
def execute(self, sql: str, params=None) -> None:
|
|
self.executes.append((sql, params))
|
|
|
|
def fetchone(self):
|
|
if self._fetch:
|
|
return self._fetch.pop(0)
|
|
return None
|
|
|
|
|
|
PID = "00000000-0000-0000-0000-000000000001"
|
|
|
|
|
|
def test_detect_apple_sleep_summary_vs_segment():
|
|
assert detect_apple_sleep_csv_format(["Start", "End", "Total Sleep (hr)", "Core (hr)"]) == "summary"
|
|
assert detect_apple_sleep_csv_format(["Start", "End", "Duration (hr)", "Value"]) == "segments"
|
|
|
|
|
|
def test_run_universal_import_sleep_one_night_inserts(monkeypatch):
|
|
"""Eine Summary-Zeile → INSERT; SELECT vorher ohne Treffer."""
|
|
text = (
|
|
"Start,End,Total Sleep (hr),Core (hr),Deep (hr),REM (hr),Awake (hr)\n"
|
|
"2024-01-15 22:00:00,2024-01-16 06:00:00,8.0,5.0,1.0,1.5,0.5\n"
|
|
)
|
|
cur = _SeqCursor(
|
|
[
|
|
None,
|
|
{"id": 101},
|
|
]
|
|
)
|
|
out = run_universal_csv_import(cur, PID, "sleep", text, "sleep.csv", {})
|
|
assert out["rows_total"] >= 1
|
|
assert out["rows_imported"] >= 1
|
|
assert any("INSERT INTO sleep_log" in q[0] for q in cur.executes)
|
|
|
|
|
|
def test_run_universal_import_activity_insert(monkeypatch):
|
|
monkeypatch.setattr(
|
|
"csv_parser.executor._resolve_training_type_for_activity",
|
|
lambda *_a, **_k: (None, None, None),
|
|
)
|
|
text = (
|
|
"Workout Type,Start,End,Duration,Distance (km),Active Energy (kcal)\n"
|
|
"Running,2024-01-15 08:00:00,2024-01-15 09:00:00,1:00:00,10.0,500\n"
|
|
)
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {
|
|
"Workout Type": "activity_type",
|
|
"Start": "start_time",
|
|
"End": "end_time",
|
|
"Duration": "duration_min",
|
|
"Distance (km)": "distance_km",
|
|
"Active Energy (kcal)": "kcal_active",
|
|
},
|
|
"type_conversions": {
|
|
"start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
|
|
"end_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
|
|
"duration_min": {
|
|
"type": "duration",
|
|
"format": "HH:MM:SS",
|
|
"target_unit": "minutes",
|
|
"flexible": True,
|
|
},
|
|
"distance_km": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
"kcal_active": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
},
|
|
}
|
|
new_id = str(uuid.uuid4())
|
|
cur = _SeqCursor([None, {"id": new_id}])
|
|
out = run_universal_csv_import(cur, PID, "activity", text, "act.csv", mapping)
|
|
assert out["rows_imported"] == 1
|
|
assert out["new_entries"] == 1
|
|
assert any("INSERT INTO activity_log" in q[0] for q in cur.executes)
|
|
|
|
|
|
def test_run_universal_import_vitals_baseline_upsert_insert_path():
|
|
text = (
|
|
"Start,Resting Heart Rate,Heart Rate Variability,VO2 Max\n"
|
|
"2024-01-15 07:00:00,55,45,42.5\n"
|
|
)
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {
|
|
"Start": "date",
|
|
"Resting Heart Rate": "resting_hr",
|
|
"Heart Rate Variability": "hrv",
|
|
"VO2 Max": "vo2_max",
|
|
},
|
|
"type_conversions": {
|
|
"date": {
|
|
"type": "datetime",
|
|
"format": "yyyy-mm-dd HH:MM:SS",
|
|
"extract": "date_only",
|
|
"flexible": True,
|
|
},
|
|
"resting_hr": {"type": "int", "flexible": True},
|
|
"hrv": {"type": "int", "flexible": True},
|
|
"vo2_max": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
},
|
|
}
|
|
cur = _SeqCursor([{"inserted": True, "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}])
|
|
out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "v.csv", mapping)
|
|
assert out["rows_imported"] == 1
|
|
assert any("INSERT INTO vitals_baseline" in q[0] for q in cur.executes)
|
|
|
|
|
|
def test_run_universal_import_wide_german_vitals_with_english_template_slots():
|
|
"""Breiter Apple-DE-Export, aber nur englische Vorlagen-Spalten → über Aliase erkennbar."""
|
|
text = (
|
|
"Datum/Uhrzeit,Aktive Energie (kJ),Ruhepuls (count/min),Atemfrequenz (count/min)"
|
|
",Blutsauerstoffsättigung (%),Herzfrequenzvariabilität (ms),VO2 max (ml/(kg·min))\n"
|
|
"2026-04-03 00:00:00,,53,15.61,95.22,37.26,\n"
|
|
)
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {
|
|
"Start": "date",
|
|
"Resting Heart Rate": "resting_hr",
|
|
"Heart Rate Variability": "hrv",
|
|
"VO2 Max": "vo2_max",
|
|
"Oxygen Saturation": "spo2",
|
|
"Respiratory Rate": "respiratory_rate",
|
|
},
|
|
"type_conversions": {
|
|
"date": {
|
|
"type": "datetime",
|
|
"format": "yyyy-mm-dd HH:MM:SS",
|
|
"extract": "date_only",
|
|
"flexible": True,
|
|
},
|
|
"resting_hr": {"type": "int", "flexible": True},
|
|
"hrv": {"type": "int", "flexible": True},
|
|
"vo2_max": {"type": "float", "decimal_separator": "auto", "flexible": True},
|
|
"spo2": {"type": "int", "flexible": True},
|
|
"respiratory_rate": {"type": "float", "decimal_separator": "auto", "flexible": True},
|
|
},
|
|
}
|
|
cur = _SeqCursor([{"inserted": True, "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}])
|
|
out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "wide.csv", mapping)
|
|
assert out["rows_errors"] == 0
|
|
assert out["rows_imported"] == 1
|
|
|
|
|
|
def test_run_universal_import_vitals_baseline_two_rows_same_day_averages():
|
|
text = (
|
|
"Start,Resting Heart Rate,Heart Rate Variability,VO2 Max\n"
|
|
"2024-01-15 07:00:00,50,40,42.0\n"
|
|
"2024-01-15 18:00:00,60,50,43.0\n"
|
|
)
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {
|
|
"Start": "date",
|
|
"Resting Heart Rate": "resting_hr",
|
|
"Heart Rate Variability": "hrv",
|
|
"VO2 Max": "vo2_max",
|
|
},
|
|
"type_conversions": {
|
|
"date": {
|
|
"type": "datetime",
|
|
"format": "yyyy-mm-dd HH:MM:SS",
|
|
"extract": "date_only",
|
|
"flexible": True,
|
|
},
|
|
"resting_hr": {"type": "int", "flexible": True},
|
|
"hrv": {"type": "int", "flexible": True},
|
|
"vo2_max": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
},
|
|
}
|
|
cur = _SeqCursor([{"inserted": True, "id": "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"}])
|
|
out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "v2.csv", mapping)
|
|
assert out["rows_total"] == 2
|
|
assert out["rows_imported"] == 1
|
|
upsert = [q for q in cur.executes if "INSERT INTO vitals_baseline" in q[0]]
|
|
assert len(upsert) == 1
|
|
params = upsert[0][1]
|
|
assert params[2] == 55
|
|
assert params[3] == 45
|
|
assert params[4] == 42.5
|
|
|
|
|
|
def test_run_universal_import_activity_garmin_time_plus_date_columns(monkeypatch):
|
|
"""Datum in eigener Spalte, Uhrzeit wie bei Garmin nur als Uhrzeit."""
|
|
monkeypatch.setattr(
|
|
"csv_parser.executor._resolve_training_type_for_activity",
|
|
lambda *_a, **_k: (None, None, None),
|
|
)
|
|
text = (
|
|
"Activity Type,Date,Time,Duration,Distance,Calories,Avg HR\n"
|
|
"Run,2024-01-20,08:30:00,0:45:00,8.0,400,140\n"
|
|
)
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {
|
|
"Activity Type": "activity_type",
|
|
"Date": "date",
|
|
"Time": "start_time",
|
|
"Duration": "duration_min",
|
|
"Distance": "distance_km",
|
|
"Calories": "kcal_active",
|
|
"Avg HR": "hr_avg",
|
|
},
|
|
"type_conversions": {
|
|
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
|
|
"start_time": {"type": "time", "format": "HH:MM:SS", "flexible": True},
|
|
"duration_min": {
|
|
"type": "duration",
|
|
"format": "HH:MM:SS",
|
|
"target_unit": "minutes",
|
|
"flexible": True,
|
|
},
|
|
"distance_km": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
"kcal_active": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
"hr_avg": {"type": "int", "flexible": True},
|
|
},
|
|
}
|
|
new_id = str(uuid.uuid4())
|
|
cur = _SeqCursor([None, {"id": new_id}])
|
|
out = run_universal_csv_import(cur, PID, "activity", text, "garmin.csv", mapping)
|
|
assert out["rows_imported"] == 1
|
|
# Duplicate-Key muss Datum + kombinierte Startzeit enthalten
|
|
assert any(
|
|
params and "2024-01-20 08:30:00" in str(params)
|
|
for _sql, params in cur.executes
|
|
if params
|
|
)
|
|
|
|
|
|
def test_run_universal_import_nutrition_two_rows_same_day_aggregates_to_one_row():
|
|
"""Modul-Default: mehrere CSV-Zeilen pro Tag → Summe, ein nutrition_log-Eintrag."""
|
|
text = (
|
|
"Date,Kalorien,Protein,Fett,KH\n"
|
|
"2024-01-15,500,10,20,30\n"
|
|
"2024-01-15,300,5,10,15\n"
|
|
)
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {
|
|
"Date": "date",
|
|
"Kalorien": "kcal",
|
|
"Protein": "protein_g",
|
|
"Fett": "fat_g",
|
|
"KH": "carbs_g",
|
|
},
|
|
"type_conversions": {
|
|
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
|
|
"kcal": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
"protein_g": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
"fat_g": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
"carbs_g": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
},
|
|
}
|
|
cur = _SeqCursor([None])
|
|
out = run_universal_csv_import(cur, PID, "nutrition", text, "n.csv", mapping)
|
|
assert out["rows_total"] == 2
|
|
assert out["rows_imported"] == 1
|
|
insert_sqls = [q for q in cur.executes if "INSERT INTO nutrition_log" in q[0]]
|
|
assert len(insert_sqls) == 1
|
|
params = insert_sqls[0][1]
|
|
# (eid, profile_id, iso, kcal, prot, fat, carbs)
|
|
assert params[3] == 800.0
|
|
assert params[4] == 15.0
|
|
assert params[5] == 30.0
|
|
assert params[6] == 45.0
|
|
|
|
|
|
def test_run_universal_import_weight_two_rows_same_day_last_value():
|
|
"""Mehrere Gewichtszeilen pro Tag → Standard: letzter Wert in der Datei."""
|
|
text = "Date,Weight\n2024-01-15,85.0\n2024-01-15,83.5\n"
|
|
mapping = {
|
|
"delimiter": ",",
|
|
"has_header": True,
|
|
"field_mappings": {"Date": "date", "Weight": "weight"},
|
|
"type_conversions": {
|
|
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
|
|
"weight": {"type": "float", "decimal_separator": ".", "flexible": True},
|
|
},
|
|
}
|
|
cur = _SeqCursor([None])
|
|
out = run_universal_csv_import(cur, PID, "weight", text, "w.csv", mapping)
|
|
assert out["rows_total"] == 2
|
|
assert out["rows_imported"] == 1
|
|
insert_sqls = [q for q in cur.executes if "INSERT INTO weight_log" in q[0]]
|
|
assert len(insert_sqls) == 1
|
|
params = insert_sqls[0][1]
|
|
assert params[3] == 83.5
|
|
|
|
|
|
def test_activity_alias_maps_german_workout_wide_columns():
|
|
from csv_parser.type_converter import build_row_after_mapping
|
|
|
|
row = {
|
|
"Datum/Uhrzeit": "2026-04-03 08:00:00",
|
|
"Trainingsart": "Laufen",
|
|
"Dauer": "0:45:00",
|
|
}
|
|
fm = {"Workout Type": "activity_type"}
|
|
tc = {
|
|
"activity_type": {"type": "string"},
|
|
"start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
|
|
"duration_min": {
|
|
"type": "duration",
|
|
"format": "HH:MM:SS",
|
|
"target_unit": "minutes",
|
|
"flexible": True,
|
|
},
|
|
}
|
|
out = build_row_after_mapping(row, fm, tc, module="activity")
|
|
assert str(out.get("activity_type")) == "Laufen"
|
|
assert isinstance(out.get("start_time"), dt.datetime)
|
|
assert out.get("duration_min") == 45.0
|
|
|
|
|
|
def test_blood_pressure_alias_combined_datetime_column():
|
|
from csv_parser.type_converter import build_row_after_mapping
|
|
|
|
row = {
|
|
"Datum/Uhrzeit": "2026-04-03 10:30:00",
|
|
"Systolisch (mmHg)": "120",
|
|
"Diastolisch (mmHg)": "80",
|
|
}
|
|
fm = {"Date": "measured_date"}
|
|
tc = {
|
|
"systolic": {"type": "int", "flexible": True},
|
|
"diastolic": {"type": "int", "flexible": True},
|
|
}
|
|
out = build_row_after_mapping(row, fm, tc, module="blood_pressure")
|
|
assert isinstance(out.get("start_time"), dt.datetime)
|
|
assert int(out.get("systolic")) == 120
|
|
|
|
|
|
def test_diagnose_vitals_row_and_mapping_smoke():
|
|
fm = {
|
|
"Datum/Uhrzeit": "date",
|
|
"Ruhepuls (count/min)": "resting_hr",
|
|
}
|
|
tc = {
|
|
"date": {
|
|
"type": "datetime",
|
|
"format": "yyyy-mm-dd HH:MM:SS",
|
|
"extract": "date_only",
|
|
"flexible": True,
|
|
},
|
|
"resting_hr": {"type": "int", "flexible": True},
|
|
}
|
|
row = {"Datum/Uhrzeit": "2026-04-03 00:00:00", "Ruhepuls (count/min)": "53"}
|
|
typed = build_row_after_mapping(row, fm, tc, module="vitals_baseline")
|
|
d = diagnose_vitals_row(typed)
|
|
assert d["date_coerced_iso"] == "2026-04-03"
|
|
assert d["would_pass_prefilter"] is True
|
|
dm = diagnose_row_mapping(row, fm, tc, module="vitals_baseline", mapped_typed=typed)
|
|
assert str(dm["mapped"]["date"]).startswith("2026-04-03")
|
|
assert any(c["csv_column"] == "Datum/Uhrzeit" and c["source"] == "template" for c in dm["per_column"])
|