""" Smoke-Tests für Universal-CSV-Import (Executor + Apple-Schlaf-Parser). Nutzt einen minimalen Fake-Cursor (kein PostgreSQL), damit die Pipelines bei jedem pytest-Lauf mitlaufen. """ from __future__ import annotations import uuid import pytest from csv_parser.executor import run_universal_csv_import from csv_parser.sleep_apple_import import detect_apple_sleep_csv_format class _SeqCursor: """Minimaler Cursor: execute protokolliert; fetchone liefert vorgegebene Sequenz.""" def __init__(self, fetch_sequence: list) -> None: self.executes: list[tuple[str, tuple | None]] = [] self._fetch = list(fetch_sequence) def execute(self, sql: str, params=None) -> None: self.executes.append((sql, params)) def fetchone(self): if self._fetch: return self._fetch.pop(0) return None PID = "00000000-0000-0000-0000-000000000001" def test_detect_apple_sleep_summary_vs_segment(): assert detect_apple_sleep_csv_format(["Start", "End", "Total Sleep (hr)", "Core (hr)"]) == "summary" assert detect_apple_sleep_csv_format(["Start", "End", "Duration (hr)", "Value"]) == "segments" def test_run_universal_import_sleep_one_night_inserts(monkeypatch): """Eine Summary-Zeile → INSERT; SELECT vorher ohne Treffer.""" text = ( "Start,End,Total Sleep (hr),Core (hr),Deep (hr),REM (hr),Awake (hr)\n" "2024-01-15 22:00:00,2024-01-16 06:00:00,8.0,5.0,1.0,1.5,0.5\n" ) cur = _SeqCursor( [ None, {"id": 101}, ] ) out = run_universal_csv_import(cur, PID, "sleep", text, "sleep.csv", {}) assert out["rows_total"] >= 1 assert out["rows_imported"] >= 1 assert any("INSERT INTO sleep_log" in q[0] for q in cur.executes) def test_run_universal_import_activity_insert(monkeypatch): monkeypatch.setattr( "csv_parser.executor._resolve_training_type_for_activity", lambda *_a, **_k: (None, None, None), ) text = ( "Workout Type,Start,End,Duration,Distance (km),Active Energy (kcal)\n" "Running,2024-01-15 08:00:00,2024-01-15 09:00:00,1:00:00,10.0,500\n" ) mapping = { "delimiter": ",", "has_header": True, "field_mappings": { "Workout Type": "activity_type", "Start": "start_time", "End": "end_time", "Duration": "duration_min", "Distance (km)": "distance_km", "Active Energy (kcal)": "kcal_active", }, "type_conversions": { "start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True}, "end_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True}, "duration_min": { "type": "duration", "format": "HH:MM:SS", "target_unit": "minutes", "flexible": True, }, "distance_km": {"type": "float", "decimal_separator": ".", "flexible": True}, "kcal_active": {"type": "float", "decimal_separator": ".", "flexible": True}, }, } new_id = str(uuid.uuid4()) cur = _SeqCursor([None, {"id": new_id}]) out = run_universal_csv_import(cur, PID, "activity", text, "act.csv", mapping) assert out["rows_imported"] == 1 assert out["new_entries"] == 1 assert any("INSERT INTO activity_log" in q[0] for q in cur.executes) def test_run_universal_import_vitals_baseline_upsert_insert_path(): text = ( "Start,Resting Heart Rate,Heart Rate Variability,VO2 Max\n" "2024-01-15 07:00:00,55,45,42.5\n" ) mapping = { "delimiter": ",", "has_header": True, "field_mappings": { "Start": "date", "Resting Heart Rate": "resting_hr", "Heart Rate Variability": "hrv", "VO2 Max": "vo2_max", }, "type_conversions": { "date": { "type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "extract": "date_only", "flexible": True, }, "resting_hr": {"type": "int", "flexible": True}, "hrv": {"type": "int", "flexible": True}, "vo2_max": {"type": "float", "decimal_separator": ".", "flexible": True}, }, } cur = _SeqCursor([{"inserted": True, "id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"}]) out = run_universal_csv_import(cur, PID, "vitals_baseline", text, "v.csv", mapping) assert out["rows_imported"] == 1 assert any("INSERT INTO vitals_baseline" in q[0] for q in cur.executes) def test_run_universal_import_activity_garmin_time_plus_date_columns(monkeypatch): """Datum in eigener Spalte, Uhrzeit wie bei Garmin nur als Uhrzeit.""" monkeypatch.setattr( "csv_parser.executor._resolve_training_type_for_activity", lambda *_a, **_k: (None, None, None), ) text = ( "Activity Type,Date,Time,Duration,Distance,Calories,Avg HR\n" "Run,2024-01-20,08:30:00,0:45:00,8.0,400,140\n" ) mapping = { "delimiter": ",", "has_header": True, "field_mappings": { "Activity Type": "activity_type", "Date": "date", "Time": "start_time", "Duration": "duration_min", "Distance": "distance_km", "Calories": "kcal_active", "Avg HR": "hr_avg", }, "type_conversions": { "date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True}, "start_time": {"type": "time", "format": "HH:MM:SS", "flexible": True}, "duration_min": { "type": "duration", "format": "HH:MM:SS", "target_unit": "minutes", "flexible": True, }, "distance_km": {"type": "float", "decimal_separator": ".", "flexible": True}, "kcal_active": {"type": "float", "decimal_separator": ".", "flexible": True}, "hr_avg": {"type": "int", "flexible": True}, }, } new_id = str(uuid.uuid4()) cur = _SeqCursor([None, {"id": new_id}]) out = run_universal_csv_import(cur, PID, "activity", text, "garmin.csv", mapping) assert out["rows_imported"] == 1 # Duplicate-Key muss Datum + kombinierte Startzeit enthalten assert any( params and "2024-01-20 08:30:00" in str(params) for _sql, params in cur.executes if params ) def test_run_universal_import_nutrition_two_rows_same_day_aggregates_to_one_row(): """Modul-Default: mehrere CSV-Zeilen pro Tag → Summe, ein nutrition_log-Eintrag.""" text = ( "Date,Kalorien,Protein,Fett,KH\n" "2024-01-15,500,10,20,30\n" "2024-01-15,300,5,10,15\n" ) mapping = { "delimiter": ",", "has_header": True, "field_mappings": { "Date": "date", "Kalorien": "kcal", "Protein": "protein_g", "Fett": "fat_g", "KH": "carbs_g", }, "type_conversions": { "date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True}, "kcal": {"type": "float", "decimal_separator": ".", "flexible": True}, "protein_g": {"type": "float", "decimal_separator": ".", "flexible": True}, "fat_g": {"type": "float", "decimal_separator": ".", "flexible": True}, "carbs_g": {"type": "float", "decimal_separator": ".", "flexible": True}, }, } cur = _SeqCursor([None]) out = run_universal_csv_import(cur, PID, "nutrition", text, "n.csv", mapping) assert out["rows_total"] == 2 assert out["rows_imported"] == 1 insert_sqls = [q for q in cur.executes if "INSERT INTO nutrition_log" in q[0]] assert len(insert_sqls) == 1 params = insert_sqls[0][1] # (eid, profile_id, iso, kcal, prot, fat, carbs) assert params[3] == 800.0 assert params[4] == 15.0 assert params[5] == 30.0 assert params[6] == 45.0