- Revised the `upsert_session_metrics_from_csv_mapped` function to clarify EAV writing conditions, ensuring only relevant parameters are processed. - Enhanced the `merge_column_backed_and_eav_metrics` function to exclude EAV rows for parameters not present in the schema, improving data integrity. - Updated unit tests to reflect changes in EAV handling and ensure correct functionality when parameters are mapped or not mapped in the profile schema. - Improved frontend logic to prevent duplicate display of metrics already handled in the entry form, enhancing user experience.
405 lines
12 KiB
Python
405 lines
12 KiB
Python
"""Unit tests for data_layer.activity_session_metrics (no DB for most cases)."""
|
|
|
|
import uuid
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from data_layer.activity_session_metrics import (
|
|
ActivitySessionMetricsError,
|
|
enrich_sessions_with_metrics,
|
|
merge_column_backed_and_eav_metrics,
|
|
merge_parameter_schema_rows,
|
|
resolve_activity_attribute_schema,
|
|
upsert_session_metrics_from_csv_mapped,
|
|
_row_value_tuple,
|
|
_validate_single_value,
|
|
)
|
|
|
|
|
|
def _tp_row(
|
|
pid: int,
|
|
key: str,
|
|
*,
|
|
data_type: str = "integer",
|
|
cat_sort: int = 0,
|
|
cat_required: bool = False,
|
|
name_de: str = "X",
|
|
name_en: str = "X",
|
|
param_category: str = "physical",
|
|
unit: str | None = None,
|
|
validation_rules: dict | None = None,
|
|
source_field: str | None = None,
|
|
):
|
|
return {
|
|
"training_parameter_id": pid,
|
|
"cat_sort": cat_sort,
|
|
"cat_required": cat_required,
|
|
"cat_ui_group": None,
|
|
"key": key,
|
|
"name_de": name_de,
|
|
"name_en": name_en,
|
|
"param_category": param_category,
|
|
"data_type": data_type,
|
|
"unit": unit,
|
|
"validation_rules": validation_rules or {},
|
|
"source_field": source_field,
|
|
}
|
|
|
|
|
|
def _ttp_row(
|
|
pid: int,
|
|
key: str,
|
|
*,
|
|
typ_sort: int | None = None,
|
|
typ_required: bool | None = None,
|
|
typ_ui_group: str | None = None,
|
|
data_type: str = "integer",
|
|
name_de: str = "X",
|
|
name_en: str = "X",
|
|
param_category: str = "physical",
|
|
unit: str | None = None,
|
|
validation_rules: dict | None = None,
|
|
source_field: str | None = None,
|
|
):
|
|
return {
|
|
"training_parameter_id": pid,
|
|
"typ_sort": typ_sort,
|
|
"typ_required": typ_required,
|
|
"typ_ui_group": typ_ui_group,
|
|
"key": key,
|
|
"name_de": name_de,
|
|
"name_en": name_en,
|
|
"param_category": param_category,
|
|
"data_type": data_type,
|
|
"unit": unit,
|
|
"validation_rules": validation_rules or {},
|
|
"source_field": source_field,
|
|
}
|
|
|
|
|
|
def test_merge_category_only_sorted_by_sort_order_then_key():
|
|
cat = [
|
|
_tp_row(2, "zebra", cat_sort=10),
|
|
_tp_row(1, "alpha", cat_sort=5),
|
|
]
|
|
merged = merge_parameter_schema_rows(cat, [])
|
|
assert [m["key"] for m in merged] == ["alpha", "zebra"]
|
|
assert merged[0]["required"] is False
|
|
|
|
|
|
def test_merge_type_overrides_required_and_sort():
|
|
cat = [_tp_row(1, "rpe", cat_sort=0, cat_required=False)]
|
|
typ = [_ttp_row(1, "rpe", typ_sort=99, typ_required=True)]
|
|
merged = merge_parameter_schema_rows(cat, typ)
|
|
assert len(merged) == 1
|
|
assert merged[0]["sort_order"] == 99
|
|
assert merged[0]["required"] is True
|
|
|
|
|
|
def test_merge_type_adds_parameter_not_in_category():
|
|
typ = [_ttp_row(7, "cadence", typ_sort=1, typ_required=True, data_type="integer")]
|
|
merged = merge_parameter_schema_rows([], typ)
|
|
assert len(merged) == 1
|
|
assert merged[0]["key"] == "cadence"
|
|
assert merged[0]["required"] is True
|
|
|
|
|
|
def test_validate_integer_range():
|
|
_validate_single_value("integer", 5, {"min": 1, "max": 10})
|
|
with pytest.raises(ActivitySessionMetricsError) as ei:
|
|
_validate_single_value("integer", 0, {"min": 1})
|
|
assert ei.value.status_code == 400
|
|
|
|
|
|
def test_validate_float_accepts_int():
|
|
_validate_single_value("float", 3, {"min": 0, "max": 10})
|
|
|
|
|
|
def test_validate_boolean_rejects_int():
|
|
with pytest.raises(ActivitySessionMetricsError):
|
|
_validate_single_value("boolean", 1, {})
|
|
|
|
|
|
def test_row_value_tuple_mapping():
|
|
assert _row_value_tuple("integer", 42) == (None, 42, None, None)
|
|
assert _row_value_tuple("float", 1.5) == (1.5, None, None, None)
|
|
assert _row_value_tuple("string", "hi") == (None, None, "hi", None)
|
|
assert _row_value_tuple("boolean", True) == (None, None, None, True)
|
|
|
|
|
|
class _FakeCursor:
|
|
"""Sequences fetchone/fetchall for resolve_activity_attribute_schema."""
|
|
|
|
def __init__(self, fetchone_chain, fetchall_chain):
|
|
self._fetchone = list(fetchone_chain)
|
|
self._fetchall = list(fetchall_chain)
|
|
self.executes: list[tuple] = []
|
|
|
|
def execute(self, sql, params=None):
|
|
self.executes.append((sql, params))
|
|
|
|
def fetchone(self):
|
|
return self._fetchone.pop(0)
|
|
|
|
def fetchall(self):
|
|
return self._fetchall.pop(0)
|
|
|
|
|
|
def test_resolve_with_explicit_category_no_type():
|
|
cur = _FakeCursor(
|
|
fetchone_chain=[],
|
|
fetchall_chain=[
|
|
[
|
|
_tp_row(1, "rpe", cat_sort=0),
|
|
],
|
|
],
|
|
)
|
|
out = resolve_activity_attribute_schema(cur, "cardio", None)
|
|
assert len(out) == 1
|
|
assert out[0]["key"] == "rpe"
|
|
assert len(cur.executes) == 1
|
|
|
|
|
|
def test_resolve_loads_category_from_training_type_id():
|
|
cur = _FakeCursor(
|
|
fetchone_chain=[{"category": "strength"}],
|
|
fetchall_chain=[
|
|
[_tp_row(1, "rpe", cat_sort=0)],
|
|
[],
|
|
],
|
|
)
|
|
out = resolve_activity_attribute_schema(cur, None, 42)
|
|
assert len(out) == 1
|
|
assert cur.executes[0][1] == (42,)
|
|
|
|
|
|
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[])
|
|
def test_enrich_sessions_batch(mock_resolve):
|
|
aid = str(uuid.uuid4())
|
|
bid = str(uuid.uuid4())
|
|
|
|
class _Cur:
|
|
def __init__(self):
|
|
self.params = None
|
|
self._fetch_n = 0
|
|
|
|
def execute(self, sql, params=None):
|
|
self.sql = sql
|
|
self.params = params
|
|
|
|
def fetchall(self):
|
|
self._fetch_n += 1
|
|
if self._fetch_n == 1:
|
|
return [
|
|
{
|
|
"id": uuid.UUID(aid),
|
|
"training_category": None,
|
|
"training_type_id": None,
|
|
},
|
|
{
|
|
"id": uuid.UUID(bid),
|
|
"training_category": None,
|
|
"training_type_id": None,
|
|
},
|
|
]
|
|
return [
|
|
{
|
|
"activity_log_id": uuid.UUID(aid),
|
|
"training_parameter_id": 3,
|
|
"key": "rpe",
|
|
"data_type": "integer",
|
|
"unit": None,
|
|
"value_num": None,
|
|
"value_int": 7,
|
|
"value_text": None,
|
|
"value_bool": None,
|
|
},
|
|
]
|
|
|
|
sessions = [{"id": aid}, {"id": bid}]
|
|
enrich_sessions_with_metrics(_Cur(), sessions)
|
|
assert sessions[0]["session_metrics"] == []
|
|
assert sessions[1]["session_metrics"] == []
|
|
|
|
|
|
def test_merge_column_backed_prefers_column_over_stale_eav():
|
|
schema = [
|
|
{
|
|
"training_parameter_id": 1,
|
|
"key": "hr_avg",
|
|
"data_type": "float",
|
|
"unit": "bpm",
|
|
"validation_rules": {},
|
|
"source_field": "hr_avg",
|
|
}
|
|
]
|
|
eav = [
|
|
{
|
|
"training_parameter_id": 1,
|
|
"key": "hr_avg",
|
|
"data_type": "float",
|
|
"unit": "bpm",
|
|
"value": 99.0,
|
|
}
|
|
]
|
|
out = merge_column_backed_and_eav_metrics({"hr_avg": 140.0}, schema, eav)
|
|
assert len(out) == 1
|
|
assert out[0]["value"] == 140.0
|
|
|
|
|
|
def test_merge_falls_back_to_eav_when_column_empty():
|
|
schema = [
|
|
{
|
|
"training_parameter_id": 1,
|
|
"key": "hr_avg",
|
|
"data_type": "float",
|
|
"unit": "bpm",
|
|
"validation_rules": {},
|
|
"source_field": "hr_avg",
|
|
}
|
|
]
|
|
eav = [
|
|
{
|
|
"training_parameter_id": 1,
|
|
"key": "hr_avg",
|
|
"data_type": "float",
|
|
"unit": "bpm",
|
|
"value": 99.0,
|
|
}
|
|
]
|
|
out = merge_column_backed_and_eav_metrics({"hr_avg": None}, schema, eav)
|
|
assert len(out) == 1
|
|
assert out[0]["value"] == 99.0
|
|
|
|
|
|
def test_merge_ignores_eav_when_parameter_not_in_schema():
|
|
"""Nur tcp/ttp-Schema zählt: verwaiste EAV-Zeilen erscheinen nicht in der effektiven Liste."""
|
|
schema = []
|
|
eav = [
|
|
{
|
|
"training_parameter_id": 2,
|
|
"key": "custom_param",
|
|
"data_type": "string",
|
|
"unit": None,
|
|
"value": "x",
|
|
}
|
|
]
|
|
out = merge_column_backed_and_eav_metrics({}, schema, eav)
|
|
assert out == []
|
|
|
|
|
|
def test_merge_eav_primary_falls_back_to_legacy_hr_min_column():
|
|
"""Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min."""
|
|
schema = [
|
|
{
|
|
"training_parameter_id": 9,
|
|
"key": "min_hr",
|
|
"data_type": "integer",
|
|
"unit": "bpm",
|
|
"validation_rules": {},
|
|
"source_field": None,
|
|
}
|
|
]
|
|
out = merge_column_backed_and_eav_metrics({"hr_min": 88}, schema, [])
|
|
assert len(out) == 1
|
|
assert out[0]["key"] == "min_hr"
|
|
assert out[0]["value"] == 88
|
|
|
|
|
|
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema")
|
|
def test_upsert_csv_skips_eav_when_source_field_maps_activity_log(mock_schema):
|
|
"""Parameter mit source_field: kanonisch activity_log — kein doppeltes EAV (z. B. avg_hr → hr_avg)."""
|
|
mock_schema.return_value = [
|
|
{
|
|
"key": "avg_hr",
|
|
"training_parameter_id": 42,
|
|
"data_type": "integer",
|
|
"validation_rules": {"min": 30, "max": 220},
|
|
"source_field": "hr_avg",
|
|
}
|
|
]
|
|
|
|
class Cur:
|
|
def __init__(self):
|
|
self.asm_inserts = 0
|
|
|
|
def execute(self, sql, params=None):
|
|
if "INSERT INTO activity_session_metrics" in sql:
|
|
self.asm_inserts += 1
|
|
|
|
def fetchone(self):
|
|
return {"profile_id": "00000000-0000-0000-0000-000000000001"}
|
|
|
|
cur = Cur()
|
|
upsert_session_metrics_from_csv_mapped(
|
|
cur,
|
|
"00000000-0000-0000-0000-000000000001",
|
|
"00000000-0000-0000-0000-000000000002",
|
|
{"avg_hr": 130},
|
|
"cardio",
|
|
1,
|
|
)
|
|
assert cur.asm_inserts == 0
|
|
|
|
|
|
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema")
|
|
def test_upsert_csv_writes_eav_when_no_source_field(mock_schema):
|
|
mock_schema.return_value = [
|
|
{
|
|
"key": "custom_note",
|
|
"training_parameter_id": 99,
|
|
"data_type": "string",
|
|
"validation_rules": {},
|
|
"source_field": None,
|
|
}
|
|
]
|
|
|
|
class Cur:
|
|
def __init__(self):
|
|
self.asm_inserts = 0
|
|
|
|
def execute(self, sql, params=None):
|
|
if "INSERT INTO activity_session_metrics" in sql:
|
|
self.asm_inserts += 1
|
|
|
|
def fetchone(self):
|
|
return {"profile_id": "00000000-0000-0000-0000-000000000001"}
|
|
|
|
cur = Cur()
|
|
upsert_session_metrics_from_csv_mapped(
|
|
cur,
|
|
"00000000-0000-0000-0000-000000000001",
|
|
"00000000-0000-0000-0000-000000000002",
|
|
{"custom_note": "x"},
|
|
"cardio",
|
|
1,
|
|
)
|
|
assert cur.asm_inserts == 1
|
|
|
|
|
|
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[])
|
|
def test_upsert_csv_skips_eav_when_mapped_key_not_in_profile_schema(mock_resolve):
|
|
"""Import-Mapping allein legt kein EAV an — Key muss in tcp/ttp (resolve) vorkommen."""
|
|
class Cur:
|
|
def __init__(self):
|
|
self.asm_inserts = 0
|
|
|
|
def execute(self, sql, params=None):
|
|
if "INSERT INTO activity_session_metrics" in sql:
|
|
self.asm_inserts += 1
|
|
|
|
def fetchone(self):
|
|
return {"profile_id": "00000000-0000-0000-0000-000000000001"}
|
|
|
|
cur = Cur()
|
|
upsert_session_metrics_from_csv_mapped(
|
|
cur,
|
|
"00000000-0000-0000-0000-000000000001",
|
|
"00000000-0000-0000-0000-000000000002",
|
|
{"stola": 12},
|
|
"cardio",
|
|
1,
|
|
)
|
|
assert cur.asm_inserts == 0
|