mitai-jinkendo/backend/tests/test_csv_import_executor.py
Lars 5b96bd4f75
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat(csv-import): Add blood pressure and activity row diagnosis functionality
- Introduced `diagnose_blood_pressure_row` and `diagnose_activity_row` functions to validate and analyze blood pressure and activity data from CSV imports.
- Updated the CSV import logic to handle combined datetime columns for blood pressure and activity, improving data integrity during import.
- Enhanced type conversion specifications to include `start_time` for blood pressure and activity, ensuring accurate data mapping.
- Added tests to validate the new diagnosis functions and their integration with existing import processes, ensuring robustness and reliability.
- Updated frontend messages to provide clearer guidance on blood pressure and activity data handling during CSV imports.
2026-04-10 16:43:00 +02:00

386 lines
14 KiB
Python

"""
Smoke-Tests für Universal-CSV-Import (Executor + Apple-Schlaf-Parser).
Nutzt einen minimalen Fake-Cursor (kein PostgreSQL), damit die Pipelines bei jedem
pytest-Lauf mitlaufen.
"""
from __future__ import annotations
import datetime as dt
import uuid
import pytest
from csv_parser.executor import diagnose_vitals_row, run_universal_csv_import
from csv_parser.sleep_apple_import import detect_apple_sleep_csv_format
from csv_parser.type_converter import build_row_after_mapping, diagnose_row_mapping
class _SeqCursor:
"""Minimaler Cursor: execute protokolliert; fetchone liefert vorgegebene Sequenz."""
def __init__(self, fetch_sequence: list) -> None:
self.executes: list[tuple[str, tuple | None]] = []
self._fetch = list(fetch_sequence)
def execute(self, sql: str, params=None) -> None:
self.executes.append((sql, params))
def fetchone(self):
if self._fetch:
return self._fetch.pop(0)
return None
PID = "00000000-0000-0000-0000-000000000001"
def test_detect_apple_sleep_summary_vs_segment():
assert detect_apple_sleep_csv_format(["Start", "End", "Total Sleep (hr)", "Core (hr)"]) == "summary"
assert detect_apple_sleep_csv_format(["Start", "End", "Duration (hr)", "Value"]) == "segments"
def test_run_universal_import_sleep_one_night_inserts(monkeypatch):
"""Eine Summary-Zeile → INSERT; SELECT vorher ohne Treffer."""
text = (
"Start,End,Total Sleep (hr),Core (hr),Deep (hr),REM (hr),Awake (hr)\n"
"2024-01-15 22:00:00,2024-01-16 06:00:00,8.0,5.0,1.0,1.5,0.5\n"
)
cur = _SeqCursor(
[
None,
{"id": 101},
]
)
out = run_universal_csv_import(cur, PID, "sleep", text, "sleep.csv", {})
assert out["rows_total"] >= 1
assert out["rows_imported"] >= 1
assert any("INSERT INTO sleep_log" in q[0] for q in cur.executes)
def test_run_universal_import_activity_insert(monkeypatch):
monkeypatch.setattr(
"csv_parser.executor._resolve_training_type_for_activity",
lambda *_a, **_k: (None, None, None),
)
text = (
"Workout Type,Start,End,Duration,Distance (km),Active Energy (kcal)\n"
"Running,2024-01-15 08:00:00,2024-01-15 09:00:00,1:00:00,10.0,500\n"
)
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {
"Workout Type": "activity_type",
"Start": "start_time",
"End": "end_time",
"Duration": "duration_min",
"Distance (km)": "distance_km",
"Active Energy (kcal)": "kcal_active",
},
"type_conversions": {
"start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
"end_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
"duration_min": {
"type": "duration",
"format": "HH:MM:SS",
"target_unit": "minutes",
"flexible": True,
},
"distance_km": {"type": "float", "decimal_separator": ".", "flexible": True},
"kcal_active": {"type": "float", "decimal_separator": ".", "flexible": True},
},
}
new_id = str(uuid.uuid4())
cur = _SeqCursor([None, {"id": new_id}])
out = run_universal_csv_import(cur, PID, "activity", text, "act.csv", mapping)
assert out["rows_imported"] == 1
assert out["new_entries"] == 1
assert any("INSERT INTO activity_log" in q[0] for q in cur.executes)
def test_run_universal_import_vitals_baseline_upsert_insert_path():
text = (
"Start,Resting Heart Rate,Heart Rate Variability,VO2 Max\n"
"2024-01-15 07:00:00,55,45,42.5\n"
)
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {
"Start": "date",
"Resting Heart Rate": "resting_hr",
"Heart Rate Variability": "hrv",
"VO2 Max": "vo2_max",
},
"type_conversions": {
"date": {
"type": "datetime",
"format": "yyyy-mm-dd HH:MM:SS",
"extract": "date_only",
"flexible": True,
},
"resting_hr": {"type": "int", "flexible": True},
"hrv": {"type": "int", "flexible": True},
"vo2_max": {"type": "float", "decimal_separator": ".", "flexible": True},
},
}
cur = _SeqCursor([{"inserted": True, "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}])
out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "v.csv", mapping)
assert out["rows_imported"] == 1
assert any("INSERT INTO vitals_baseline" in q[0] for q in cur.executes)
def test_run_universal_import_wide_german_vitals_with_english_template_slots():
"""Breiter Apple-DE-Export, aber nur englische Vorlagen-Spalten → über Aliase erkennbar."""
text = (
"Datum/Uhrzeit,Aktive Energie (kJ),Ruhepuls (count/min),Atemfrequenz (count/min)"
",Blutsauerstoffsättigung (%),Herzfrequenzvariabilität (ms),VO2 max (ml/(kg·min))\n"
"2026-04-03 00:00:00,,53,15.61,95.22,37.26,\n"
)
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {
"Start": "date",
"Resting Heart Rate": "resting_hr",
"Heart Rate Variability": "hrv",
"VO2 Max": "vo2_max",
"Oxygen Saturation": "spo2",
"Respiratory Rate": "respiratory_rate",
},
"type_conversions": {
"date": {
"type": "datetime",
"format": "yyyy-mm-dd HH:MM:SS",
"extract": "date_only",
"flexible": True,
},
"resting_hr": {"type": "int", "flexible": True},
"hrv": {"type": "int", "flexible": True},
"vo2_max": {"type": "float", "decimal_separator": "auto", "flexible": True},
"spo2": {"type": "int", "flexible": True},
"respiratory_rate": {"type": "float", "decimal_separator": "auto", "flexible": True},
},
}
cur = _SeqCursor([{"inserted": True, "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}])
out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "wide.csv", mapping)
assert out["rows_errors"] == 0
assert out["rows_imported"] == 1
def test_run_universal_import_vitals_baseline_two_rows_same_day_averages():
text = (
"Start,Resting Heart Rate,Heart Rate Variability,VO2 Max\n"
"2024-01-15 07:00:00,50,40,42.0\n"
"2024-01-15 18:00:00,60,50,43.0\n"
)
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {
"Start": "date",
"Resting Heart Rate": "resting_hr",
"Heart Rate Variability": "hrv",
"VO2 Max": "vo2_max",
},
"type_conversions": {
"date": {
"type": "datetime",
"format": "yyyy-mm-dd HH:MM:SS",
"extract": "date_only",
"flexible": True,
},
"resting_hr": {"type": "int", "flexible": True},
"hrv": {"type": "int", "flexible": True},
"vo2_max": {"type": "float", "decimal_separator": ".", "flexible": True},
},
}
cur = _SeqCursor([{"inserted": True, "id": "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"}])
out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "v2.csv", mapping)
assert out["rows_total"] == 2
assert out["rows_imported"] == 1
upsert = [q for q in cur.executes if "INSERT INTO vitals_baseline" in q[0]]
assert len(upsert) == 1
params = upsert[0][1]
assert params[2] == 55
assert params[3] == 45
assert params[4] == 42.5
def test_run_universal_import_activity_garmin_time_plus_date_columns(monkeypatch):
"""Datum in eigener Spalte, Uhrzeit wie bei Garmin nur als Uhrzeit."""
monkeypatch.setattr(
"csv_parser.executor._resolve_training_type_for_activity",
lambda *_a, **_k: (None, None, None),
)
text = (
"Activity Type,Date,Time,Duration,Distance,Calories,Avg HR\n"
"Run,2024-01-20,08:30:00,0:45:00,8.0,400,140\n"
)
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {
"Activity Type": "activity_type",
"Date": "date",
"Time": "start_time",
"Duration": "duration_min",
"Distance": "distance_km",
"Calories": "kcal_active",
"Avg HR": "hr_avg",
},
"type_conversions": {
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
"start_time": {"type": "time", "format": "HH:MM:SS", "flexible": True},
"duration_min": {
"type": "duration",
"format": "HH:MM:SS",
"target_unit": "minutes",
"flexible": True,
},
"distance_km": {"type": "float", "decimal_separator": ".", "flexible": True},
"kcal_active": {"type": "float", "decimal_separator": ".", "flexible": True},
"hr_avg": {"type": "int", "flexible": True},
},
}
new_id = str(uuid.uuid4())
cur = _SeqCursor([None, {"id": new_id}])
out = run_universal_csv_import(cur, PID, "activity", text, "garmin.csv", mapping)
assert out["rows_imported"] == 1
# Duplicate-Key muss Datum + kombinierte Startzeit enthalten
assert any(
params and "2024-01-20 08:30:00" in str(params)
for _sql, params in cur.executes
if params
)
def test_run_universal_import_nutrition_two_rows_same_day_aggregates_to_one_row():
"""Modul-Default: mehrere CSV-Zeilen pro Tag → Summe, ein nutrition_log-Eintrag."""
text = (
"Date,Kalorien,Protein,Fett,KH\n"
"2024-01-15,500,10,20,30\n"
"2024-01-15,300,5,10,15\n"
)
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {
"Date": "date",
"Kalorien": "kcal",
"Protein": "protein_g",
"Fett": "fat_g",
"KH": "carbs_g",
},
"type_conversions": {
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
"kcal": {"type": "float", "decimal_separator": ".", "flexible": True},
"protein_g": {"type": "float", "decimal_separator": ".", "flexible": True},
"fat_g": {"type": "float", "decimal_separator": ".", "flexible": True},
"carbs_g": {"type": "float", "decimal_separator": ".", "flexible": True},
},
}
cur = _SeqCursor([None])
out = run_universal_csv_import(cur, PID, "nutrition", text, "n.csv", mapping)
assert out["rows_total"] == 2
assert out["rows_imported"] == 1
insert_sqls = [q for q in cur.executes if "INSERT INTO nutrition_log" in q[0]]
assert len(insert_sqls) == 1
params = insert_sqls[0][1]
# (eid, profile_id, iso, kcal, prot, fat, carbs)
assert params[3] == 800.0
assert params[4] == 15.0
assert params[5] == 30.0
assert params[6] == 45.0
def test_run_universal_import_weight_two_rows_same_day_last_value():
"""Mehrere Gewichtszeilen pro Tag → Standard: letzter Wert in der Datei."""
text = "Date,Weight\n2024-01-15,85.0\n2024-01-15,83.5\n"
mapping = {
"delimiter": ",",
"has_header": True,
"field_mappings": {"Date": "date", "Weight": "weight"},
"type_conversions": {
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
"weight": {"type": "float", "decimal_separator": ".", "flexible": True},
},
}
cur = _SeqCursor([None])
out = run_universal_csv_import(cur, PID, "weight", text, "w.csv", mapping)
assert out["rows_total"] == 2
assert out["rows_imported"] == 1
insert_sqls = [q for q in cur.executes if "INSERT INTO weight_log" in q[0]]
assert len(insert_sqls) == 1
params = insert_sqls[0][1]
assert params[3] == 83.5
def test_activity_alias_maps_german_workout_wide_columns():
from csv_parser.type_converter import build_row_after_mapping
row = {
"Datum/Uhrzeit": "2026-04-03 08:00:00",
"Trainingsart": "Laufen",
"Dauer": "0:45:00",
}
fm = {"Workout Type": "activity_type"}
tc = {
"activity_type": {"type": "string"},
"start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
"duration_min": {
"type": "duration",
"format": "HH:MM:SS",
"target_unit": "minutes",
"flexible": True,
},
}
out = build_row_after_mapping(row, fm, tc, module="activity")
assert str(out.get("activity_type")) == "Laufen"
assert isinstance(out.get("start_time"), dt.datetime)
assert out.get("duration_min") == 45.0
def test_blood_pressure_alias_combined_datetime_column():
from csv_parser.type_converter import build_row_after_mapping
row = {
"Datum/Uhrzeit": "2026-04-03 10:30:00",
"Systolisch (mmHg)": "120",
"Diastolisch (mmHg)": "80",
}
fm = {"Date": "measured_date"}
tc = {
"systolic": {"type": "int", "flexible": True},
"diastolic": {"type": "int", "flexible": True},
}
out = build_row_after_mapping(row, fm, tc, module="blood_pressure")
assert isinstance(out.get("start_time"), dt.datetime)
assert int(out.get("systolic")) == 120
def test_diagnose_vitals_row_and_mapping_smoke():
fm = {
"Datum/Uhrzeit": "date",
"Ruhepuls (count/min)": "resting_hr",
}
tc = {
"date": {
"type": "datetime",
"format": "yyyy-mm-dd HH:MM:SS",
"extract": "date_only",
"flexible": True,
},
"resting_hr": {"type": "int", "flexible": True},
}
row = {"Datum/Uhrzeit": "2026-04-03 00:00:00", "Ruhepuls (count/min)": "53"}
typed = build_row_after_mapping(row, fm, tc, module="vitals_baseline")
d = diagnose_vitals_row(typed)
assert d["date_coerced_iso"] == "2026-04-03"
assert d["would_pass_prefilter"] is True
dm = diagnose_row_mapping(row, fm, tc, module="vitals_baseline", mapped_typed=typed)
assert str(dm["mapped"]["date"]).startswith("2026-04-03")
assert any(c["csv_column"] == "Datum/Uhrzeit" and c["source"] == "template" for c in dm["per_column"])