mitai-jinkendo/backend/data_layer/activity_session_metrics.py
Lars bc8e9fb7fa
All checks were successful
Deploy Development / deploy (push) Successful in 48s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance training parameters handling and documentation
- Introduced new fields for descriptions in training parameters, improving clarity for AI context in `training_sessions_recent_json`.
- Added a glossary placeholder `{{training_parameters_glossary_md}}` to provide a Markdown table of active training parameters, including names and descriptions.
- Updated the `placeholder_resolver.py` and `activity_metrics.py` to support the new glossary functionality.
- Enhanced the `AdminActivityAttributeProfilesPage` to allow input for descriptions in both German and English, ensuring better context for metrics.
- Revised tests to validate the inclusion of description fields in parameter schema merges and metrics handling.
2026-04-17 20:42:11 +02:00

737 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Activity session metrics (EAV) and resolved attribute schema — Layer 1.
See: .claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md
"""
from __future__ import annotations
import logging
from decimal import Decimal
from typing import Any, Dict, List, Mapping, Optional, Sequence
from data_layer.activity_data_canon import (
ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM,
ACTIVITY_MODULE_REGISTRY_FIELD_KEYS,
)
logger = logging.getLogger(__name__)
# Diese Spalten nicht aus CSV-Parameter-Zuordnung überschreiben (kommen aus Typ-Mapping / System).
ACTIVITY_LOG_PATCH_FORBIDDEN = frozenset(
{
"id",
"profile_id",
"date",
"created",
"training_type_id",
"training_category",
"training_subcategory",
"source",
}
)
class ActivitySessionMetricsError(Exception):
"""Raised by Layer 1; routers map to HTTP (404/400)."""
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(detail)
def _effective_training_category(
cur, training_category: Optional[str], training_type_id: Optional[int]
) -> Optional[str]:
if training_category:
return training_category.strip() or None
if training_type_id is None:
return None
cur.execute("SELECT category FROM training_types WHERE id = %s", (training_type_id,))
row = cur.fetchone()
if row and row.get("category"):
return row["category"]
return None
def merge_parameter_schema_rows(
category_rows: Sequence[Dict[str, Any]],
type_rows: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Pure merge: category assignments + type assignments → sorted schema list.
Row shapes match SELECTs in resolve_activity_attribute_schema (cat_sort / typ_* aliases).
"""
merged: Dict[int, Dict[str, Any]] = {}
for r in category_rows:
pid = r["training_parameter_id"]
merged[pid] = {
"training_parameter_id": pid,
"key": r["key"],
"name_de": r["name_de"],
"name_en": r["name_en"],
"description_de": r.get("description_de"),
"description_en": r.get("description_en"),
"param_category": r["param_category"],
"data_type": r["data_type"],
"unit": r["unit"],
"validation_rules": r["validation_rules"] or {},
"source_field": r["source_field"],
"sort_order": r["cat_sort"],
"required": bool(r["cat_required"]),
"ui_group": r["cat_ui_group"],
}
for r in type_rows:
pid = r["training_parameter_id"]
base = merged.get(pid)
if base is None:
merged[pid] = {
"training_parameter_id": pid,
"key": r["key"],
"name_de": r["name_de"],
"name_en": r["name_en"],
"description_de": r.get("description_de"),
"description_en": r.get("description_en"),
"param_category": r["param_category"],
"data_type": r["data_type"],
"unit": r["unit"],
"validation_rules": r["validation_rules"] or {},
"source_field": r["source_field"],
"sort_order": r["typ_sort"] if r["typ_sort"] is not None else 0,
"required": bool(r["typ_required"]) if r["typ_required"] is not None else False,
"ui_group": r["typ_ui_group"],
}
else:
if r["typ_sort"] is not None:
base["sort_order"] = r["typ_sort"]
if r["typ_required"] is not None:
base["required"] = bool(r["typ_required"])
if r["typ_ui_group"] is not None:
base["ui_group"] = r["typ_ui_group"]
out = list(merged.values())
out.sort(key=lambda x: (x["sort_order"], x["key"]))
return out
def resolve_activity_attribute_schema(
cur,
training_category: Optional[str],
training_type_id: Optional[int],
) -> List[Dict[str, Any]]:
"""
Merged parameter definitions for UI / validation (category base + type overrides/additions).
Sorted by sort_order, then key.
"""
cat = _effective_training_category(cur, training_category, training_type_id)
category_rows: List[Dict[str, Any]] = []
type_rows: List[Dict[str, Any]] = []
if cat:
cur.execute(
"""
SELECT
tcp.training_parameter_id,
tcp.sort_order AS cat_sort,
tcp.required AS cat_required,
tcp.ui_group AS cat_ui_group,
tp.key, tp.name_de, tp.name_en,
tp.description_de, tp.description_en,
tp.category AS param_category,
tp.data_type, tp.unit, tp.validation_rules, tp.source_field
FROM training_category_parameter tcp
JOIN training_parameters tp ON tp.id = tcp.training_parameter_id
WHERE tcp.training_category = %s AND tp.is_active = true
""",
(cat,),
)
category_rows = list(cur.fetchall())
if training_type_id is not None:
cur.execute(
"""
SELECT
ttp.training_parameter_id,
ttp.sort_order AS typ_sort,
ttp.required AS typ_required,
ttp.ui_group AS typ_ui_group,
tp.key, tp.name_de, tp.name_en,
tp.description_de, tp.description_en,
tp.category AS param_category,
tp.data_type, tp.unit, tp.validation_rules, tp.source_field
FROM training_type_parameter ttp
JOIN training_parameters tp ON tp.id = ttp.training_parameter_id
WHERE ttp.training_type_id = %s AND tp.is_active = true
""",
(training_type_id,),
)
type_rows = list(cur.fetchall())
return merge_parameter_schema_rows(category_rows, type_rows)
def _metric_human_labels(schema_row: Mapping[str, Any]) -> Dict[str, Any]:
"""Bezeichnung + Kurzbeschreibung aus training_parameters (KI / Export)."""
return {
"name_de": schema_row.get("name_de"),
"name_en": schema_row.get("name_en"),
"description_de": schema_row.get("description_de"),
"description_en": schema_row.get("description_en"),
}
def _validation_rules_dict(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return raw
return {}
def _validate_single_value(data_type: str, value: Any, rules: Dict[str, Any]) -> None:
if data_type == "integer":
if not isinstance(value, int) or isinstance(value, bool):
raise ActivitySessionMetricsError(400, f"Erwartet integer, erhalten: {type(value).__name__}")
if "min" in rules and value < rules["min"]:
raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})")
if "max" in rules and value > rules["max"]:
raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})")
elif data_type == "float":
if isinstance(value, bool) or not isinstance(value, (int, float, Decimal)):
raise ActivitySessionMetricsError(400, f"Erwartet Zahl, erhalten: {type(value).__name__}")
v = float(value)
if "min" in rules and v < float(rules["min"]):
raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})")
if "max" in rules and v > float(rules["max"]):
raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})")
elif data_type == "string":
if not isinstance(value, str):
raise ActivitySessionMetricsError(400, f"Erwartet string, erhalten: {type(value).__name__}")
if rules.get("not_empty") and not value.strip():
raise ActivitySessionMetricsError(400, "Leerer String nicht erlaubt")
if "max_length" in rules and len(value) > int(rules["max_length"]):
raise ActivitySessionMetricsError(400, f"String zu lang (max {rules['max_length']})")
allowed = rules.get("allowed_values")
if allowed and value not in allowed:
raise ActivitySessionMetricsError(400, "Wert nicht in erlaubter Menge")
elif data_type == "boolean":
if not isinstance(value, bool):
raise ActivitySessionMetricsError(400, f"Erwartet boolean, erhalten: {type(value).__name__}")
else:
raise ActivitySessionMetricsError(400, f"Unbekannter data_type: {data_type}")
def _row_value_tuple(data_type: str, value: Any) -> tuple:
if data_type == "integer":
return (None, int(value), None, None)
if data_type == "float":
return (float(value), None, None, None)
if data_type == "string":
return (None, None, str(value), None)
if data_type == "boolean":
return (None, None, None, bool(value))
raise ValueError(data_type)
def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any:
"""Wert aus activity_log-Spalte in den Typ bringen, den training_parameters.data_type erwartet."""
if data_type == "integer":
if isinstance(raw, bool):
raise TypeError("boolean nicht als integer erlaubt")
if isinstance(raw, str):
s = raw.strip().replace(",", ".")
return int(round(float(s)))
return int(round(float(raw)))
if data_type == "float":
if isinstance(raw, str):
s = raw.strip().replace(",", ".")
return float(s)
return float(raw)
if data_type == "string":
return str(raw) if raw is not None else ""
if data_type == "boolean":
if isinstance(raw, bool):
return raw
s = str(raw).strip().lower()
if s in ("true", "1", "t", "yes"):
return True
if s in ("false", "0", "f", "no", ""):
return False
raise TypeError(f"boolean-Koercion nicht möglich: {raw!r}")
raise ValueError(data_type)
def upsert_session_metrics_from_csv_mapped(
cur,
profile_id: str,
activity_log_id: str,
mapped: Mapping[str, Any],
training_category: Optional[str],
training_type_id: Optional[int],
) -> None:
"""
EAV für Trainingsparameter aus CSV.
Es werden nur Parameter geschrieben, die in ``resolve_activity_attribute_schema`` (Kategorie +
Trainingstyp) vorkommen. CSV-Spalten-Mappings sind import-spezifisch und definieren **nicht** das
UI-/Auswertungs-Schema — fehlende tcp/ttp-Zuordnung bedeutet: kein EAV für diesen Key (Werte ggf.
nur in ``activity_log``-Kernfeldern).
Kernfelder schreibt der Executor nach ``activity_log``; hier keine EAV-Zeilen für Registry-Keys.
Hat ein Parameter ``source_field`` (Semantik aus ``activity_log``), wird EAV nur dann **nicht**
geschrieben, wenn diese Spalte nach dem Import bereits befüllt ist — sonst gäbe es doppelte
Speicherung und der Merge würde ohnehin die Spalte bevorzugen. Ist die Spalte leer (z. B. Feld
nur noch über EAV / Custom-Mapping, ohne Registry-Patch), schreibt der Import den Wert aus
``mapped`` nach EAV — analog zum Lesepfad (Spalte zuerst, sonst EAV).
"""
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
return
header = dict(row)
schema = resolve_activity_attribute_schema(cur, training_category, training_type_id)
for spec in schema:
pkey = spec["key"]
if pkey not in mapped:
continue
raw = mapped[pkey]
if raw is None or raw == "":
continue
if pkey in ACTIVITY_MODULE_REGISTRY_FIELD_KEYS:
continue
sf_raw = spec.get("source_field")
if sf_raw is not None and str(sf_raw).strip():
col = str(sf_raw).strip()
if col in header and header[col] is not None:
continue
tid = spec["training_parameter_id"]
dt = spec["data_type"]
rules = _validation_rules_dict(spec["validation_rules"])
try:
coerced = _coerce_raw_value_for_parameter(dt, raw)
_validate_single_value(dt, coerced, rules)
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
logger.warning("CSV EAV skipped %s: %s", pkey, ex)
continue
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (activity_log_id, training_parameter_id)
DO UPDATE SET
value_num = EXCLUDED.value_num,
value_int = EXCLUDED.value_int,
value_text = EXCLUDED.value_text,
value_bool = EXCLUDED.value_bool,
updated_at = NOW()
""",
(activity_log_id, tid, vn, vi, vt, vb),
)
def merge_column_backed_and_eav_metrics(
header: Mapping[str, Any],
schema: Sequence[Dict[str, Any]],
eav_metrics: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Effektive Metrikliste **nur** für Parameter aus ``schema`` (Kategorie + Trainingstyp / tcp+ttp).
Kanon beim Lesen: **activity_log** schlägt EAV, sobald ein passender Spaltenwert existiert und
koerzierbar ist — in dieser Reihenfolge:
1. ``source_field`` → Spalte
2. Parameter-Key = Registry-Kernfeld (``ACTIVITY_MODULE_REGISTRY_FIELD_KEYS``) → gleichnamige Spalte
3. EAV-primäre Keys → Legacy-Spalte laut ``ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM``
4. sonst EAV
EAV-Zeilen zu Parametern, die nicht im Schema sind, werden nicht ausgegeben.
"""
eav_by_key = {m["key"]: m for m in eav_metrics}
merged: List[Dict[str, Any]] = []
keys_handled: set[str] = set()
for s in schema:
k = s["key"]
tid = s["training_parameter_id"]
dt = s["data_type"]
unit = s.get("unit")
sf = s.get("source_field")
used_column = False
if sf and isinstance(sf, str) and str(sf).strip():
col = str(sf).strip()
if col in header and header[col] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[col])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
**_metric_human_labels(s),
}
)
used_column = True
keys_handled.add(k)
except (TypeError, ValueError):
pass
if used_column:
continue
if k in ACTIVITY_MODULE_REGISTRY_FIELD_KEYS and k in header and header[k] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[k])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
**_metric_human_labels(s),
}
)
keys_handled.add(k)
continue
except (TypeError, ValueError):
pass
legacy_col = ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM.get(k)
if legacy_col and legacy_col in header and header[legacy_col] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[legacy_col])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
**_metric_human_labels(s),
}
)
keys_handled.add(k)
continue
except (TypeError, ValueError):
pass
if k in eav_by_key:
row = dict(eav_by_key[k])
row.update(_metric_human_labels(s))
merged.append(row)
keys_handled.add(k)
merged.sort(key=lambda x: x["key"])
return merged
def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None:
"""
[Veraltet / nicht mehr in Schreibpfaden aufgerufen]
Früher: EAV spiegelte activity_log-Spalten für Parameter mit source_field.
Kanon: Spaltenwerte werden bei merge_column_backed_and_eav_metrics beim Lesen berücksichtigt; keine
doppelte Speicherung. Funktion bleibt für optionale Admin-/Reparatur-Skripte.
"""
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
return
header = dict(row)
schema = resolve_activity_attribute_schema(
cur, header.get("training_category"), header.get("training_type_id")
)
for spec in schema:
sf = spec.get("source_field")
if sf is None or (isinstance(sf, str) and not str(sf).strip()):
continue
col = str(sf).strip()
if col not in header:
continue
raw = header[col]
tid = spec["training_parameter_id"]
dt = spec["data_type"]
rules = _validation_rules_dict(spec["validation_rules"])
if raw is None:
cur.execute(
"""
DELETE FROM activity_session_metrics
WHERE activity_log_id = %s AND training_parameter_id = %s
""",
(activity_log_id, tid),
)
continue
try:
coerced = _coerce_raw_value_for_parameter(dt, raw)
_validate_single_value(dt, coerced, rules)
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
logger.warning(
"sync_column_backed_session_metrics: überspringe %s (Spalte %s): %s",
spec.get("key"),
col,
ex,
)
continue
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (activity_log_id, training_parameter_id)
DO UPDATE SET
value_num = EXCLUDED.value_num,
value_int = EXCLUDED.value_int,
value_text = EXCLUDED.value_text,
value_bool = EXCLUDED.value_bool,
updated_at = NOW()
""",
(activity_log_id, tid, vn, vi, vt, vb),
)
def fetch_activity_session_metrics(cur, activity_log_id: str) -> List[Dict[str, Any]]:
cur.execute(
"""
SELECT
m.id,
m.activity_log_id,
m.training_parameter_id,
m.value_num,
m.value_int,
m.value_text,
m.value_bool,
tp.key,
tp.data_type,
tp.unit
FROM activity_session_metrics m
JOIN training_parameters tp ON tp.id = m.training_parameter_id
WHERE m.activity_log_id = %s
ORDER BY tp.key
""",
(activity_log_id,),
)
rows = cur.fetchall()
out: List[Dict[str, Any]] = []
for r in rows:
dt = r["data_type"]
if dt == "integer":
val = int(r["value_int"]) if r["value_int"] is not None else None
elif dt == "float":
val = float(r["value_num"]) if r["value_num"] is not None else None
elif dt == "string":
val = r["value_text"]
else:
val = r["value_bool"]
out.append(
{
"training_parameter_id": r["training_parameter_id"],
"key": r["key"],
"data_type": dt,
"unit": r["unit"],
"value": val,
}
)
return out
def replace_activity_session_metrics(
cur,
profile_id: str,
activity_log_id: str,
metrics: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Full replace of EAV rows for this session. metrics: [{ "parameter_key": str, "value": ... }, ...]
"""
cur.execute(
"""
SELECT id, profile_id, training_category, training_type_id
FROM activity_log WHERE id = %s
""",
(activity_log_id,),
)
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden")
schema = resolve_activity_attribute_schema(
cur, row.get("training_category"), row.get("training_type_id")
)
by_key = {s["key"]: s for s in schema}
payload_by_key: Dict[str, Dict[str, Any]] = {}
for item in metrics:
raw_k = item.get("parameter_key")
if raw_k is None or not str(raw_k).strip():
raise ActivitySessionMetricsError(400, "parameter_key fehlt")
k = str(raw_k).strip()
if k not in by_key:
raise ActivitySessionMetricsError(400, f"Unbekannter oder nicht zugewiesener Parameter: {k}")
payload_by_key[k] = item
for s in schema:
if not s["required"]:
continue
itk = s["key"]
hit = payload_by_key.get(itk)
if hit is None or hit.get("value") is None:
raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {itk}")
cur.execute(
"DELETE FROM activity_session_metrics WHERE activity_log_id = %s",
(activity_log_id,),
)
for item in metrics:
k = str(item["parameter_key"]).strip()
spec = by_key[k]
val = item.get("value")
if val is None:
if spec["required"]:
raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {k}")
continue
rules = _validation_rules_dict(spec["validation_rules"])
_validate_single_value(spec["data_type"], val, rules)
vn, vi, vt, vb = _row_value_tuple(spec["data_type"], val)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
""",
(activity_log_id, spec["training_parameter_id"], vn, vi, vt, vb),
)
# Kein sync_column_backed nach PUT /metrics: der Request ist maßgeblich für EAV. Ein Spalten-Sync würde
# Werte aus nicht mitgeschriebenen activity_log-Spalten wieder verwerfen.
return fetch_activity_session_metrics(cur, activity_log_id)
def get_activity_session_logical_unit(cur, profile_id: str, activity_log_id: str) -> Dict[str, Any]:
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden")
header = dict(row)
schema = resolve_activity_attribute_schema(
cur, header.get("training_category"), header.get("training_type_id")
)
metrics = fetch_activity_session_metrics(cur, activity_log_id)
merged_metrics = merge_column_backed_and_eav_metrics(header, schema, metrics)
return {
"header": header,
"schema": schema,
"metrics": merged_metrics,
}
def enrich_sessions_with_metrics(cur, sessions: List[Dict[str, Any]]) -> None:
"""
Mutates each session dict: adds key 'session_metrics' (list).
Kombiniert EAV mit activity_log-Spalten für Parameter mit source_field (kanonisch: Spalte),
analog zu get_activity_session_logical_unit ohne doppelte EAV-Speicherung beim Import.
"""
if not sessions:
return
ids = [str(s["id"]) for s in sessions if s.get("id")]
if not ids:
return
ph = ",".join(["%s"] * len(ids))
cur.execute(
f"SELECT * FROM activity_log WHERE id IN ({ph})",
ids,
)
headers_by_id: Dict[str, Dict[str, Any]] = {}
for r in cur.fetchall():
h = dict(r)
headers_by_id[str(h["id"])] = h
cur.execute(
f"""
SELECT
m.activity_log_id,
m.training_parameter_id,
tp.key,
tp.data_type,
tp.unit,
m.value_num,
m.value_int,
m.value_text,
m.value_bool
FROM activity_session_metrics m
JOIN training_parameters tp ON tp.id = m.training_parameter_id
WHERE m.activity_log_id IN ({ph})
ORDER BY m.activity_log_id, tp.key
""",
ids,
)
by_act: Dict[str, List[Dict[str, Any]]] = {}
for r in cur.fetchall():
aid = str(r["activity_log_id"])
dt = r["data_type"]
if dt == "integer":
val = int(r["value_int"]) if r["value_int"] is not None else None
elif dt == "float":
val = float(r["value_num"]) if r["value_num"] is not None else None
elif dt == "string":
val = r["value_text"]
else:
val = r["value_bool"]
by_act.setdefault(aid, []).append(
{
"training_parameter_id": r["training_parameter_id"],
"key": r["key"],
"data_type": dt,
"unit": r["unit"],
"value": val,
}
)
schema_cache: Dict[tuple[Any, Any], List[Dict[str, Any]]] = {}
def _schema(cat: Any, tid: Any) -> List[Dict[str, Any]]:
cache_key = (cat, tid)
if cache_key not in schema_cache:
schema_cache[cache_key] = resolve_activity_attribute_schema(cur, cat, tid)
return schema_cache[cache_key]
for s in sessions:
aid = str(s.get("id"))
header = headers_by_id.get(aid)
if not header:
s["session_metrics"] = []
continue
schema = _schema(header.get("training_category"), header.get("training_type_id"))
eav_list = by_act.get(aid, [])
merged = merge_column_backed_and_eav_metrics(header, schema, eav_list)
s["session_metrics"] = [
{
"key": m["key"],
"data_type": m["data_type"],
"unit": m["unit"],
"value": m["value"],
"name_de": m.get("name_de"),
"name_en": m.get("name_en"),
"description_de": m.get("description_de"),
"description_en": m.get("description_en"),
}
for m in merged
]