"""Unit tests for data_layer.activity_session_metrics (no DB for most cases).""" import uuid from unittest.mock import patch import pytest from data_layer.activity_session_metrics import ( ActivitySessionMetricsError, enrich_sessions_with_metrics, merge_column_backed_and_eav_metrics, merge_parameter_schema_rows, resolve_activity_attribute_schema, _row_value_tuple, _validate_single_value, ) def _tp_row( pid: int, key: str, *, data_type: str = "integer", cat_sort: int = 0, cat_required: bool = False, name_de: str = "X", name_en: str = "X", param_category: str = "physical", unit: str | None = None, validation_rules: dict | None = None, source_field: str | None = None, ): return { "training_parameter_id": pid, "cat_sort": cat_sort, "cat_required": cat_required, "cat_ui_group": None, "key": key, "name_de": name_de, "name_en": name_en, "param_category": param_category, "data_type": data_type, "unit": unit, "validation_rules": validation_rules or {}, "source_field": source_field, } def _ttp_row( pid: int, key: str, *, typ_sort: int | None = None, typ_required: bool | None = None, typ_ui_group: str | None = None, data_type: str = "integer", name_de: str = "X", name_en: str = "X", param_category: str = "physical", unit: str | None = None, validation_rules: dict | None = None, source_field: str | None = None, ): return { "training_parameter_id": pid, "typ_sort": typ_sort, "typ_required": typ_required, "typ_ui_group": typ_ui_group, "key": key, "name_de": name_de, "name_en": name_en, "param_category": param_category, "data_type": data_type, "unit": unit, "validation_rules": validation_rules or {}, "source_field": source_field, } def test_merge_category_only_sorted_by_sort_order_then_key(): cat = [ _tp_row(2, "zebra", cat_sort=10), _tp_row(1, "alpha", cat_sort=5), ] merged = merge_parameter_schema_rows(cat, []) assert [m["key"] for m in merged] == ["alpha", "zebra"] assert merged[0]["required"] is False def test_merge_type_overrides_required_and_sort(): cat = [_tp_row(1, "rpe", cat_sort=0, cat_required=False)] typ = [_ttp_row(1, "rpe", typ_sort=99, typ_required=True)] merged = merge_parameter_schema_rows(cat, typ) assert len(merged) == 1 assert merged[0]["sort_order"] == 99 assert merged[0]["required"] is True def test_merge_type_adds_parameter_not_in_category(): typ = [_ttp_row(7, "cadence", typ_sort=1, typ_required=True, data_type="integer")] merged = merge_parameter_schema_rows([], typ) assert len(merged) == 1 assert merged[0]["key"] == "cadence" assert merged[0]["required"] is True def test_validate_integer_range(): _validate_single_value("integer", 5, {"min": 1, "max": 10}) with pytest.raises(ActivitySessionMetricsError) as ei: _validate_single_value("integer", 0, {"min": 1}) assert ei.value.status_code == 400 def test_validate_float_accepts_int(): _validate_single_value("float", 3, {"min": 0, "max": 10}) def test_validate_boolean_rejects_int(): with pytest.raises(ActivitySessionMetricsError): _validate_single_value("boolean", 1, {}) def test_row_value_tuple_mapping(): assert _row_value_tuple("integer", 42) == (None, 42, None, None) assert _row_value_tuple("float", 1.5) == (1.5, None, None, None) assert _row_value_tuple("string", "hi") == (None, None, "hi", None) assert _row_value_tuple("boolean", True) == (None, None, None, True) class _FakeCursor: """Sequences fetchone/fetchall for resolve_activity_attribute_schema.""" def __init__(self, fetchone_chain, fetchall_chain): self._fetchone = list(fetchone_chain) self._fetchall = list(fetchall_chain) self.executes: list[tuple] = [] def execute(self, sql, params=None): self.executes.append((sql, params)) def fetchone(self): return self._fetchone.pop(0) def fetchall(self): return self._fetchall.pop(0) def test_resolve_with_explicit_category_no_type(): cur = _FakeCursor( fetchone_chain=[], fetchall_chain=[ [ _tp_row(1, "rpe", cat_sort=0), ], ], ) out = resolve_activity_attribute_schema(cur, "cardio", None) assert len(out) == 1 assert out[0]["key"] == "rpe" assert len(cur.executes) == 1 def test_resolve_loads_category_from_training_type_id(): cur = _FakeCursor( fetchone_chain=[{"category": "strength"}], fetchall_chain=[ [_tp_row(1, "rpe", cat_sort=0)], [], ], ) out = resolve_activity_attribute_schema(cur, None, 42) assert len(out) == 1 assert cur.executes[0][1] == (42,) @patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[]) def test_enrich_sessions_batch(mock_resolve): aid = str(uuid.uuid4()) bid = str(uuid.uuid4()) class _Cur: def __init__(self): self.params = None self._fetch_n = 0 def execute(self, sql, params=None): self.sql = sql self.params = params def fetchall(self): self._fetch_n += 1 if self._fetch_n == 1: return [ { "id": uuid.UUID(aid), "training_category": None, "training_type_id": None, }, { "id": uuid.UUID(bid), "training_category": None, "training_type_id": None, }, ] return [ { "activity_log_id": uuid.UUID(aid), "training_parameter_id": 3, "key": "rpe", "data_type": "integer", "unit": None, "value_num": None, "value_int": 7, "value_text": None, "value_bool": None, }, ] sessions = [{"id": aid}, {"id": bid}] enrich_sessions_with_metrics(_Cur(), sessions) assert sessions[0]["session_metrics"][0]["value"] == 7 assert sessions[0]["session_metrics"][0]["key"] == "rpe" assert sessions[1]["session_metrics"] == [] def test_merge_column_backed_prefers_column_over_stale_eav(): schema = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "validation_rules": {}, "source_field": "hr_avg", } ] eav = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "value": 99.0, } ] out = merge_column_backed_and_eav_metrics({"hr_avg": 140.0}, schema, eav) assert len(out) == 1 assert out[0]["value"] == 140.0 def test_merge_falls_back_to_eav_when_column_empty(): schema = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "validation_rules": {}, "source_field": "hr_avg", } ] eav = [ { "training_parameter_id": 1, "key": "hr_avg", "data_type": "float", "unit": "bpm", "value": 99.0, } ] out = merge_column_backed_and_eav_metrics({"hr_avg": None}, schema, eav) assert len(out) == 1 assert out[0]["value"] == 99.0 def test_merge_keeps_eav_only_keys(): schema = [] eav = [ { "training_parameter_id": 2, "key": "custom_param", "data_type": "string", "unit": None, "value": "x", } ] out = merge_column_backed_and_eav_metrics({}, schema, eav) assert len(out) == 1 assert out[0]["key"] == "custom_param" def test_merge_eav_primary_falls_back_to_legacy_hr_min_column(): """Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min.""" schema = [ { "training_parameter_id": 9, "key": "min_hr", "data_type": "integer", "unit": "bpm", "validation_rules": {}, "source_field": None, } ] out = merge_column_backed_and_eav_metrics({"hr_min": 88}, schema, []) assert len(out) == 1 assert out[0]["key"] == "min_hr" assert out[0]["value"] == 88