mitai-jinkendo/backend/tests/test_activity_session_metrics.py
Lars 06f83e2ffc
All checks were successful
Deploy Development / deploy (push) Successful in 53s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
revert: Wiederherstellung Codezustand von ca8cee9 (ohne Branch-Historie zu überschreiben)
Reverts cd29c7d..026c51b per git revert. Alle zwischenliegenden Commits bleiben in Gitea sichtbar; der Arbeitsbaum entspricht wieder dem Stand von ca8cee9.

Made-with: Cursor
2026-04-16 11:59:23 +02:00

308 lines
8.7 KiB
Python

"""Unit tests for data_layer.activity_session_metrics (no DB for most cases)."""
import uuid
from unittest.mock import patch
import pytest
from data_layer.activity_session_metrics import (
ActivitySessionMetricsError,
enrich_sessions_with_metrics,
merge_column_backed_and_eav_metrics,
merge_parameter_schema_rows,
resolve_activity_attribute_schema,
_row_value_tuple,
_validate_single_value,
)
def _tp_row(
pid: int,
key: str,
*,
data_type: str = "integer",
cat_sort: int = 0,
cat_required: bool = False,
name_de: str = "X",
name_en: str = "X",
param_category: str = "physical",
unit: str | None = None,
validation_rules: dict | None = None,
source_field: str | None = None,
):
return {
"training_parameter_id": pid,
"cat_sort": cat_sort,
"cat_required": cat_required,
"cat_ui_group": None,
"key": key,
"name_de": name_de,
"name_en": name_en,
"param_category": param_category,
"data_type": data_type,
"unit": unit,
"validation_rules": validation_rules or {},
"source_field": source_field,
}
def _ttp_row(
pid: int,
key: str,
*,
typ_sort: int | None = None,
typ_required: bool | None = None,
typ_ui_group: str | None = None,
data_type: str = "integer",
name_de: str = "X",
name_en: str = "X",
param_category: str = "physical",
unit: str | None = None,
validation_rules: dict | None = None,
source_field: str | None = None,
):
return {
"training_parameter_id": pid,
"typ_sort": typ_sort,
"typ_required": typ_required,
"typ_ui_group": typ_ui_group,
"key": key,
"name_de": name_de,
"name_en": name_en,
"param_category": param_category,
"data_type": data_type,
"unit": unit,
"validation_rules": validation_rules or {},
"source_field": source_field,
}
def test_merge_category_only_sorted_by_sort_order_then_key():
cat = [
_tp_row(2, "zebra", cat_sort=10),
_tp_row(1, "alpha", cat_sort=5),
]
merged = merge_parameter_schema_rows(cat, [])
assert [m["key"] for m in merged] == ["alpha", "zebra"]
assert merged[0]["required"] is False
def test_merge_type_overrides_required_and_sort():
cat = [_tp_row(1, "rpe", cat_sort=0, cat_required=False)]
typ = [_ttp_row(1, "rpe", typ_sort=99, typ_required=True)]
merged = merge_parameter_schema_rows(cat, typ)
assert len(merged) == 1
assert merged[0]["sort_order"] == 99
assert merged[0]["required"] is True
def test_merge_type_adds_parameter_not_in_category():
typ = [_ttp_row(7, "cadence", typ_sort=1, typ_required=True, data_type="integer")]
merged = merge_parameter_schema_rows([], typ)
assert len(merged) == 1
assert merged[0]["key"] == "cadence"
assert merged[0]["required"] is True
def test_validate_integer_range():
_validate_single_value("integer", 5, {"min": 1, "max": 10})
with pytest.raises(ActivitySessionMetricsError) as ei:
_validate_single_value("integer", 0, {"min": 1})
assert ei.value.status_code == 400
def test_validate_float_accepts_int():
_validate_single_value("float", 3, {"min": 0, "max": 10})
def test_validate_boolean_rejects_int():
with pytest.raises(ActivitySessionMetricsError):
_validate_single_value("boolean", 1, {})
def test_row_value_tuple_mapping():
assert _row_value_tuple("integer", 42) == (None, 42, None, None)
assert _row_value_tuple("float", 1.5) == (1.5, None, None, None)
assert _row_value_tuple("string", "hi") == (None, None, "hi", None)
assert _row_value_tuple("boolean", True) == (None, None, None, True)
class _FakeCursor:
"""Sequences fetchone/fetchall for resolve_activity_attribute_schema."""
def __init__(self, fetchone_chain, fetchall_chain):
self._fetchone = list(fetchone_chain)
self._fetchall = list(fetchall_chain)
self.executes: list[tuple] = []
def execute(self, sql, params=None):
self.executes.append((sql, params))
def fetchone(self):
return self._fetchone.pop(0)
def fetchall(self):
return self._fetchall.pop(0)
def test_resolve_with_explicit_category_no_type():
cur = _FakeCursor(
fetchone_chain=[],
fetchall_chain=[
[
_tp_row(1, "rpe", cat_sort=0),
],
],
)
out = resolve_activity_attribute_schema(cur, "cardio", None)
assert len(out) == 1
assert out[0]["key"] == "rpe"
assert len(cur.executes) == 1
def test_resolve_loads_category_from_training_type_id():
cur = _FakeCursor(
fetchone_chain=[{"category": "strength"}],
fetchall_chain=[
[_tp_row(1, "rpe", cat_sort=0)],
[],
],
)
out = resolve_activity_attribute_schema(cur, None, 42)
assert len(out) == 1
assert cur.executes[0][1] == (42,)
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[])
def test_enrich_sessions_batch(mock_resolve):
aid = str(uuid.uuid4())
bid = str(uuid.uuid4())
class _Cur:
def __init__(self):
self.params = None
self._fetch_n = 0
def execute(self, sql, params=None):
self.sql = sql
self.params = params
def fetchall(self):
self._fetch_n += 1
if self._fetch_n == 1:
return [
{
"id": uuid.UUID(aid),
"training_category": None,
"training_type_id": None,
},
{
"id": uuid.UUID(bid),
"training_category": None,
"training_type_id": None,
},
]
return [
{
"activity_log_id": uuid.UUID(aid),
"training_parameter_id": 3,
"key": "rpe",
"data_type": "integer",
"unit": None,
"value_num": None,
"value_int": 7,
"value_text": None,
"value_bool": None,
},
]
sessions = [{"id": aid}, {"id": bid}]
enrich_sessions_with_metrics(_Cur(), sessions)
assert sessions[0]["session_metrics"][0]["value"] == 7
assert sessions[0]["session_metrics"][0]["key"] == "rpe"
assert sessions[1]["session_metrics"] == []
def test_merge_column_backed_prefers_column_over_stale_eav():
schema = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"validation_rules": {},
"source_field": "hr_avg",
}
]
eav = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"value": 99.0,
}
]
out = merge_column_backed_and_eav_metrics({"hr_avg": 140.0}, schema, eav)
assert len(out) == 1
assert out[0]["value"] == 140.0
def test_merge_falls_back_to_eav_when_column_empty():
schema = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"validation_rules": {},
"source_field": "hr_avg",
}
]
eav = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"value": 99.0,
}
]
out = merge_column_backed_and_eav_metrics({"hr_avg": None}, schema, eav)
assert len(out) == 1
assert out[0]["value"] == 99.0
def test_merge_keeps_eav_only_keys():
schema = []
eav = [
{
"training_parameter_id": 2,
"key": "custom_param",
"data_type": "string",
"unit": None,
"value": "x",
}
]
out = merge_column_backed_and_eav_metrics({}, schema, eav)
assert len(out) == 1
assert out[0]["key"] == "custom_param"
def test_merge_eav_primary_falls_back_to_legacy_hr_min_column():
"""Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min."""
schema = [
{
"training_parameter_id": 9,
"key": "min_hr",
"data_type": "integer",
"unit": "bpm",
"validation_rules": {},
"source_field": None,
}
]
out = merge_column_backed_and_eav_metrics({"hr_min": 88}, schema, [])
assert len(out) == 1
assert out[0]["key"] == "min_hr"
assert out[0]["value"] == 88