mitai-jinkendo/backend/tests/test_activity_session_metrics.py
Lars 94bb4a8199
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat: Update activity session metrics handling to skip source field inserts
- Modified the `replace_activity_session_metrics` function to skip EAV inserts for parameters with a defined `source_field`, aligning with the existing logic in `upsert_session_metrics_from_csv_mapped`.
- Enhanced the frontend logic to filter out metrics associated with `source_field` during payload construction, improving data integrity and user experience.
- Added unit tests to validate the new behavior, ensuring that metrics with `source_field` are correctly excluded from inserts.
2026-04-16 12:29:01 +02:00

436 lines
13 KiB
Python

"""Unit tests for data_layer.activity_session_metrics (no DB for most cases)."""
import uuid
from unittest.mock import patch
import pytest
from data_layer.activity_session_metrics import (
ActivitySessionMetricsError,
enrich_sessions_with_metrics,
merge_column_backed_and_eav_metrics,
merge_parameter_schema_rows,
replace_activity_session_metrics,
resolve_activity_attribute_schema,
upsert_session_metrics_from_csv_mapped,
_row_value_tuple,
_validate_single_value,
)
def _tp_row(
pid: int,
key: str,
*,
data_type: str = "integer",
cat_sort: int = 0,
cat_required: bool = False,
name_de: str = "X",
name_en: str = "X",
param_category: str = "physical",
unit: str | None = None,
validation_rules: dict | None = None,
source_field: str | None = None,
):
return {
"training_parameter_id": pid,
"cat_sort": cat_sort,
"cat_required": cat_required,
"cat_ui_group": None,
"key": key,
"name_de": name_de,
"name_en": name_en,
"param_category": param_category,
"data_type": data_type,
"unit": unit,
"validation_rules": validation_rules or {},
"source_field": source_field,
}
def _ttp_row(
pid: int,
key: str,
*,
typ_sort: int | None = None,
typ_required: bool | None = None,
typ_ui_group: str | None = None,
data_type: str = "integer",
name_de: str = "X",
name_en: str = "X",
param_category: str = "physical",
unit: str | None = None,
validation_rules: dict | None = None,
source_field: str | None = None,
):
return {
"training_parameter_id": pid,
"typ_sort": typ_sort,
"typ_required": typ_required,
"typ_ui_group": typ_ui_group,
"key": key,
"name_de": name_de,
"name_en": name_en,
"param_category": param_category,
"data_type": data_type,
"unit": unit,
"validation_rules": validation_rules or {},
"source_field": source_field,
}
def test_merge_category_only_sorted_by_sort_order_then_key():
cat = [
_tp_row(2, "zebra", cat_sort=10),
_tp_row(1, "alpha", cat_sort=5),
]
merged = merge_parameter_schema_rows(cat, [])
assert [m["key"] for m in merged] == ["alpha", "zebra"]
assert merged[0]["required"] is False
def test_merge_type_overrides_required_and_sort():
cat = [_tp_row(1, "rpe", cat_sort=0, cat_required=False)]
typ = [_ttp_row(1, "rpe", typ_sort=99, typ_required=True)]
merged = merge_parameter_schema_rows(cat, typ)
assert len(merged) == 1
assert merged[0]["sort_order"] == 99
assert merged[0]["required"] is True
def test_merge_type_adds_parameter_not_in_category():
typ = [_ttp_row(7, "cadence", typ_sort=1, typ_required=True, data_type="integer")]
merged = merge_parameter_schema_rows([], typ)
assert len(merged) == 1
assert merged[0]["key"] == "cadence"
assert merged[0]["required"] is True
def test_validate_integer_range():
_validate_single_value("integer", 5, {"min": 1, "max": 10})
with pytest.raises(ActivitySessionMetricsError) as ei:
_validate_single_value("integer", 0, {"min": 1})
assert ei.value.status_code == 400
def test_validate_float_accepts_int():
_validate_single_value("float", 3, {"min": 0, "max": 10})
def test_validate_boolean_rejects_int():
with pytest.raises(ActivitySessionMetricsError):
_validate_single_value("boolean", 1, {})
def test_row_value_tuple_mapping():
assert _row_value_tuple("integer", 42) == (None, 42, None, None)
assert _row_value_tuple("float", 1.5) == (1.5, None, None, None)
assert _row_value_tuple("string", "hi") == (None, None, "hi", None)
assert _row_value_tuple("boolean", True) == (None, None, None, True)
class _FakeCursor:
"""Sequences fetchone/fetchall for resolve_activity_attribute_schema."""
def __init__(self, fetchone_chain, fetchall_chain):
self._fetchone = list(fetchone_chain)
self._fetchall = list(fetchall_chain)
self.executes: list[tuple] = []
def execute(self, sql, params=None):
self.executes.append((sql, params))
def fetchone(self):
return self._fetchone.pop(0)
def fetchall(self):
return self._fetchall.pop(0)
def test_resolve_with_explicit_category_no_type():
cur = _FakeCursor(
fetchone_chain=[],
fetchall_chain=[
[
_tp_row(1, "rpe", cat_sort=0),
],
],
)
out = resolve_activity_attribute_schema(cur, "cardio", None)
assert len(out) == 1
assert out[0]["key"] == "rpe"
assert len(cur.executes) == 1
def test_resolve_loads_category_from_training_type_id():
cur = _FakeCursor(
fetchone_chain=[{"category": "strength"}],
fetchall_chain=[
[_tp_row(1, "rpe", cat_sort=0)],
[],
],
)
out = resolve_activity_attribute_schema(cur, None, 42)
assert len(out) == 1
assert cur.executes[0][1] == (42,)
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[])
def test_enrich_sessions_batch(mock_resolve):
aid = str(uuid.uuid4())
bid = str(uuid.uuid4())
class _Cur:
def __init__(self):
self.params = None
self._fetch_n = 0
def execute(self, sql, params=None):
self.sql = sql
self.params = params
def fetchall(self):
self._fetch_n += 1
if self._fetch_n == 1:
return [
{
"id": uuid.UUID(aid),
"training_category": None,
"training_type_id": None,
},
{
"id": uuid.UUID(bid),
"training_category": None,
"training_type_id": None,
},
]
return [
{
"activity_log_id": uuid.UUID(aid),
"training_parameter_id": 3,
"key": "rpe",
"data_type": "integer",
"unit": None,
"value_num": None,
"value_int": 7,
"value_text": None,
"value_bool": None,
},
]
sessions = [{"id": aid}, {"id": bid}]
enrich_sessions_with_metrics(_Cur(), sessions)
assert sessions[0]["session_metrics"][0]["value"] == 7
assert sessions[0]["session_metrics"][0]["key"] == "rpe"
assert sessions[1]["session_metrics"] == []
def test_merge_column_backed_prefers_column_over_stale_eav():
schema = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"validation_rules": {},
"source_field": "hr_avg",
}
]
eav = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"value": 99.0,
}
]
out = merge_column_backed_and_eav_metrics({"hr_avg": 140.0}, schema, eav)
assert len(out) == 1
assert out[0]["value"] == 140.0
def test_merge_falls_back_to_eav_when_column_empty():
schema = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"validation_rules": {},
"source_field": "hr_avg",
}
]
eav = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"value": 99.0,
}
]
out = merge_column_backed_and_eav_metrics({"hr_avg": None}, schema, eav)
assert len(out) == 1
assert out[0]["value"] == 99.0
def test_merge_keeps_eav_only_keys():
schema = []
eav = [
{
"training_parameter_id": 2,
"key": "custom_param",
"data_type": "string",
"unit": None,
"value": "x",
}
]
out = merge_column_backed_and_eav_metrics({}, schema, eav)
assert len(out) == 1
assert out[0]["key"] == "custom_param"
def test_merge_eav_primary_falls_back_to_legacy_hr_min_column():
"""Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min."""
schema = [
{
"training_parameter_id": 9,
"key": "min_hr",
"data_type": "integer",
"unit": "bpm",
"validation_rules": {},
"source_field": None,
}
]
out = merge_column_backed_and_eav_metrics({"hr_min": 88}, schema, [])
assert len(out) == 1
assert out[0]["key"] == "min_hr"
assert out[0]["value"] == 88
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema")
def test_upsert_csv_skips_eav_when_source_field_maps_activity_log(mock_schema):
"""Parameter mit source_field: kanonisch activity_log — kein doppeltes EAV (z. B. avg_hr → hr_avg)."""
mock_schema.return_value = [
{
"key": "avg_hr",
"training_parameter_id": 42,
"data_type": "integer",
"validation_rules": {"min": 30, "max": 220},
"source_field": "hr_avg",
}
]
class Cur:
def __init__(self):
self.asm_inserts = 0
def execute(self, sql, params=None):
if "INSERT INTO activity_session_metrics" in sql:
self.asm_inserts += 1
def fetchone(self):
return {"profile_id": "00000000-0000-0000-0000-000000000001"}
cur = Cur()
upsert_session_metrics_from_csv_mapped(
cur,
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
{"avg_hr": 130},
"cardio",
1,
)
assert cur.asm_inserts == 0
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema")
def test_upsert_csv_writes_eav_when_no_source_field(mock_schema):
mock_schema.return_value = [
{
"key": "custom_note",
"training_parameter_id": 99,
"data_type": "string",
"validation_rules": {},
"source_field": None,
}
]
class Cur:
def __init__(self):
self.asm_inserts = 0
def execute(self, sql, params=None):
if "INSERT INTO activity_session_metrics" in sql:
self.asm_inserts += 1
def fetchone(self):
return {"profile_id": "00000000-0000-0000-0000-000000000001"}
cur = Cur()
upsert_session_metrics_from_csv_mapped(
cur,
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
{"custom_note": "x"},
"cardio",
1,
)
assert cur.asm_inserts == 1
@patch("data_layer.activity_session_metrics.fetch_activity_session_metrics", return_value=[])
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema")
def test_replace_metrics_skips_inserts_for_source_field_column(mock_schema, mock_fetch):
"""PUT /metrics: keine EAV-Zeile für Parameter mit source_field (kanonisch activity_log)."""
mock_schema.return_value = [
{
"key": "avg_hr",
"training_parameter_id": 1,
"data_type": "integer",
"validation_rules": {},
"source_field": "hr_avg",
"required": False,
},
{
"key": "custom_x",
"training_parameter_id": 2,
"data_type": "string",
"validation_rules": {},
"source_field": None,
"required": False,
},
]
class Cur:
def __init__(self):
self.asm_inserts = 0
def execute(self, sql, params=None):
self._sql = sql
if "INSERT INTO activity_session_metrics" in sql:
self.asm_inserts += 1
def fetchone(self):
if "FROM activity_log" in getattr(self, "_sql", ""):
return {
"id": "00000000-0000-0000-0000-000000000002",
"profile_id": "00000000-0000-0000-0000-000000000001",
"training_category": "cardio",
"training_type_id": 1,
}
return None
cur = Cur()
replace_activity_session_metrics(
cur,
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
[
{"parameter_key": "avg_hr", "value": 120},
{"parameter_key": "custom_x", "value": "y"},
],
)
assert cur.asm_inserts == 1