"""Unit tests for data_layer.activity_session_metrics (no DB for most cases).""" import uuid from unittest.mock import patch import pytest from data_layer.activity_session_metrics import ( ActivitySessionMetricsError, apply_activity_mapped_column_aliases_from_schema, enrich_sessions_with_metrics, merge_column_backed_and_eav_metrics, merge_parameter_schema_rows, replace_activity_session_metrics, resolve_activity_attribute_schema, upsert_session_metrics_from_csv_mapped, _row_value_tuple, _validate_single_value, ) def _tp_row( pid: int, key: str, *, data_type: str = "integer", cat_sort: int = 0, cat_required: bool = False, name_de: str = "X", name_en: str = "X", param_category: str = "physical", unit: str | None = None, validation_rules: dict | None = None, source_field: str | None = None, ): return { "training_parameter_id": pid, "cat_sort": cat_sort, "cat_required": cat_required, "cat_ui_group": None, "key": key, "name_de": name_de, "name_en": name_en, "param_category": param_category, "data_type": data_type, "unit": unit, "validation_rules": validation_rules or {}, "source_field": source_field, } def _ttp_row( pid: int, key: str, *, typ_sort: int | None = None, typ_required: bool | None = None, typ_ui_group: str | None = None, data_type: str = "integer", name_de: str = "X", name_en: str = "X", param_category: str = "physical", unit: str | None = None, validation_rules: dict | None = None, source_field: str | None = None, ): return { "training_parameter_id": pid, "typ_sort": typ_sort, "typ_required": typ_required, "typ_ui_group": typ_ui_group, "key": key, "name_de": name_de, "name_en": name_en, "param_category": param_category, "data_type": data_type, "unit": unit, "validation_rules": validation_rules or {}, "source_field": source_field, } def test_merge_category_only_sorted_by_sort_order_then_key(): cat = [ _tp_row(2, "zebra", cat_sort=10), _tp_row(1, "alpha", cat_sort=5), ] merged = merge_parameter_schema_rows(cat, []) assert [m["key"] for m in merged] == ["alpha", "zebra"] assert merged[0]["required"] is False def test_merge_type_overrides_required_and_sort(): cat = [_tp_row(1, "rpe", cat_sort=0, cat_required=False)] typ = [_ttp_row(1, "rpe", typ_sort=99, typ_required=True)] merged = merge_parameter_schema_rows(cat, typ) assert len(merged) == 1 assert merged[0]["sort_order"] == 99 assert merged[0]["required"] is True def test_merge_type_adds_parameter_not_in_category(): typ = [_ttp_row(7, "cadence", typ_sort=1, typ_required=True, data_type="integer")] merged = merge_parameter_schema_rows([], typ) assert len(merged) == 1 assert merged[0]["key"] == "cadence" assert merged[0]["required"] is True def test_validate_integer_range(): _validate_single_value("integer", 5, {"min": 1, "max": 10}) with pytest.raises(ActivitySessionMetricsError) as ei: _validate_single_value("integer", 0, {"min": 1}) assert ei.value.status_code == 400 def test_validate_float_accepts_int(): _validate_single_value("float", 3, {"min": 0, "max": 10}) def test_validate_boolean_rejects_int(): with pytest.raises(ActivitySessionMetricsError): _validate_single_value("boolean", 1, {}) def test_row_value_tuple_mapping(): assert _row_value_tuple("integer", 42) == (None, 42, None, None) assert _row_value_tuple("float", 1.5) == (1.5, None, None, None) assert _row_value_tuple("string", "hi") == (None, None, "hi", None) assert _row_value_tuple("boolean", True) == (None, None, None, True) class _FakeCursor: """Sequences fetchone/fetchall for resolve_activity_attribute_schema.""" def __init__(self, fetchone_chain, fetchall_chain): self._fetchone = list(fetchone_chain) self._fetchall = list(fetchall_chain) self.executes: list[tuple] = [] def execute(self, sql, params=None): self.executes.append((sql, params)) def fetchone(self): return self._fetchone.pop(0) def fetchall(self): return self._fetchall.pop(0) def test_resolve_with_explicit_category_no_type(): cur = _FakeCursor( fetchone_chain=[], fetchall_chain=[ [ _tp_row(1, "rpe", cat_sort=0), ], ], ) out = resolve_activity_attribute_schema(cur, "cardio", None) assert len(out) == 1 assert out[0]["key"] == "rpe" assert len(cur.executes) == 1 def test_resolve_loads_category_from_training_type_id(): cur = _FakeCursor( fetchone_chain=[{"category": "strength"}], fetchall_chain=[ [_tp_row(1, "rpe", cat_sort=0)], [], ], ) out = resolve_activity_attribute_schema(cur, None, 42) assert len(out) == 1 assert cur.executes[0][1] == (42,) @patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[]) def test_enrich_sessions_batch(mock_resolve): aid = str(uuid.uuid4()) bid = str(uuid.uuid4()) class _Cur: def __init__(self): self.params = None self._fetch_n = 0 def execute(self, sql, params=None): self.sql = sql self.params = params def fetchall(self): self._fetch_n += 1 if self._fetch_n == 1: return [ { "id": uuid.UUID(aid), "training_category": None, "training_type_id": None, }, { "id": uuid.UUID(bid), "training_category": None, "training_type_id": None, }, ] return [ { "activity_log_id": uuid.UUID(aid), "training_parameter_id": 3, "key": "rpe", "data_type": "integer", "unit": None, "value_num": None, "value_int": 7, "value_text": None, "value_bool": None, }, ] sessions = [{"id": aid}, {"id": bid}] enrich_sessions_with_metrics(_Cur(), sessions) assert sessions[0]["session_metrics"][0]["value"] == 7 assert sessions[0]["session_metrics"][0]["key"] == "rpe" assert sessions[1]["session_metrics"] == [] def test_merge_column_backed_prefers_column_over_stale_eav(): schema = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "validation_rules": {}, "source_field": "hr_avg", } ] eav = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "value": 99.0, } ] out = merge_column_backed_and_eav_metrics({"hr_avg": 140.0}, schema, eav) assert len(out) == 1 assert out[0]["value"] == 140.0 def test_merge_falls_back_to_eav_when_column_empty(): schema = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "validation_rules": {}, "source_field": "hr_avg", } ] eav = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "value": 99.0, } ] out = merge_column_backed_and_eav_metrics({"hr_avg": None}, schema, eav) assert len(out) == 1 assert out[0]["value"] == 99.0 def test_merge_keeps_eav_only_keys(): schema = [] eav = [ { "training_parameter_id": 2, "key": "custom_param", "data_type": "string", "unit": None, "value": "x", } ] out = merge_column_backed_and_eav_metrics({}, schema, eav) assert len(out) == 1 assert out[0]["key"] == "custom_param" def test_apply_mapped_aliases_copies_avg_hr_to_hr_avg(): schema = [ { "key": "avg_hr", "training_parameter_id": 1, "source_field": "hr_avg", "data_type": "integer", "unit": "bpm", "validation_rules": {}, } ] out = apply_activity_mapped_column_aliases_from_schema({"avg_hr": 118}, schema) assert out["avg_hr"] == 118 assert out["hr_avg"] == 118 def test_apply_mapped_aliases_does_not_overwrite_existing_column(): schema = [ { "key": "avg_hr", "training_parameter_id": 1, "source_field": "hr_avg", "data_type": "integer", "unit": "bpm", "validation_rules": {}, } ] out = apply_activity_mapped_column_aliases_from_schema({"avg_hr": 999, "hr_avg": 120}, schema) assert out["hr_avg"] == 120 @patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema") def test_upsert_csv_skips_parameter_with_source_field(mock_schema): """Kein INSERT in activity_session_metrics für Spalten-Parameter (avg_hr → hr_avg).""" mock_schema.return_value = [ { "key": "avg_hr", "training_parameter_id": 42, "data_type": "integer", "validation_rules": {"min": 30, "max": 220}, "source_field": "hr_avg", } ] class Cur: def __init__(self): self.asm_inserts = 0 def execute(self, sql, params=None): if "INSERT INTO activity_session_metrics" in sql: self.asm_inserts += 1 def fetchone(self): return {"profile_id": "00000000-0000-0000-0000-000000000001"} cur = Cur() upsert_session_metrics_from_csv_mapped( cur, "00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000002", {"avg_hr": 130}, "cardio", 1, ) assert cur.asm_inserts == 0 @patch("data_layer.activity_session_metrics.fetch_activity_session_metrics") @patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema") def test_replace_metrics_skips_column_backed_kcal(mock_schema, mock_fetch): """PUT /metrics: keine EAV-Zeile für kcal_active (liegt in activity_log).""" pid = str(uuid.uuid4()) eid = str(uuid.uuid4()) mock_schema.return_value = [ { "training_parameter_id": 1, "key": "kcal_active", "data_type": "float", "validation_rules": {}, "source_field": "kcal_active", "required": False, }, { "training_parameter_id": 2, "key": "custom_reps", "data_type": "integer", "validation_rules": {"min": 0}, "source_field": None, "required": False, }, ] mock_fetch.return_value = [] class Cur: def __init__(self): self.asm_inserts = 0 def execute(self, sql, params=None): if "INSERT INTO activity_session_metrics" in sql: self.asm_inserts += 1 def fetchone(self): return { "profile_id": pid, "training_category": "strength", "training_type_id": 1, } cur = Cur() replace_activity_session_metrics( cur, pid, eid, [ {"parameter_key": "kcal_active", "value": 450.0}, {"parameter_key": "custom_reps", "value": 12}, ], ) assert cur.asm_inserts == 1 mock_fetch.assert_called_once_with(cur, eid) def test_merge_eav_primary_falls_back_to_legacy_hr_min_column(): """Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min.""" schema = [ { "training_parameter_id": 9, "key": "min_hr", "data_type": "integer", "unit": "bpm", "validation_rules": {}, "source_field": None, } ] out = merge_column_backed_and_eav_metrics({"hr_min": 88}, schema, []) assert len(out) == 1 assert out[0]["key"] == "min_hr" assert out[0]["value"] == 88 def test_activity_module_registry_field_keys_match_csv_module_definition(): """Phase A: Kanon-Spine = module_registry „activity“.fields (keine zweite Liste).""" from csv_parser.module_registry import get_module_definition from data_layer.activity_data_canon import ( ACTIVITY_MODULE_REGISTRY_FIELD_KEYS, get_activity_module_registry_field_keys, ) mod = get_module_definition("activity") assert mod is not None expected = frozenset((mod.get("fields") or {}).keys()) assert get_activity_module_registry_field_keys() == expected assert ACTIVITY_MODULE_REGISTRY_FIELD_KEYS == expected