- Introduced `resolve_activity_attribute_schema_for_csv_import` to enhance the handling of training parameters during CSV imports, allowing for the inclusion of active parameters not present in the category/type profile. - Updated `apply_activity_mapped_column_aliases` and `upsert_session_metrics_from_csv_mapped` to utilize the new CSV import function, ensuring comprehensive mapping and insertion of metrics. - Added unit tests to validate the new functionality and ensure correct behavior when handling mapped training parameters during CSV imports.
794 lines
28 KiB
Python
794 lines
28 KiB
Python
"""
|
||
Activity session metrics (EAV) and resolved attribute schema — Layer 1.
|
||
|
||
See: .claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Mapping, Optional, Sequence
|
||
|
||
from data_layer.activity_data_canon import (
|
||
ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM,
|
||
ACTIVITY_MODULE_REGISTRY_FIELD_KEYS,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Diese Spalten nicht aus CSV-Parameter-Zuordnung überschreiben (kommen aus Typ-Mapping / System).
|
||
ACTIVITY_LOG_PATCH_FORBIDDEN = frozenset(
|
||
{
|
||
"id",
|
||
"profile_id",
|
||
"date",
|
||
"created",
|
||
"training_type_id",
|
||
"training_category",
|
||
"training_subcategory",
|
||
"source",
|
||
}
|
||
)
|
||
|
||
|
||
def _parameter_value_stored_in_eav_only(spec: Mapping[str, Any], parameter_key: str) -> bool:
|
||
"""False = kanonisch activity_log (Modul-Registry oder training_parameters.source_field)."""
|
||
if parameter_key in ACTIVITY_MODULE_REGISTRY_FIELD_KEYS:
|
||
return False
|
||
sf = spec.get("source_field")
|
||
if sf is not None and str(sf).strip():
|
||
return False
|
||
return True
|
||
|
||
|
||
class ActivitySessionMetricsError(Exception):
|
||
"""Raised by Layer 1; routers map to HTTP (404/400)."""
|
||
|
||
def __init__(self, status_code: int, detail: str):
|
||
self.status_code = status_code
|
||
self.detail = detail
|
||
super().__init__(detail)
|
||
|
||
|
||
def _effective_training_category(
|
||
cur, training_category: Optional[str], training_type_id: Optional[int]
|
||
) -> Optional[str]:
|
||
if training_category:
|
||
return training_category.strip() or None
|
||
if training_type_id is None:
|
||
return None
|
||
cur.execute("SELECT category FROM training_types WHERE id = %s", (training_type_id,))
|
||
row = cur.fetchone()
|
||
if row and row.get("category"):
|
||
return row["category"]
|
||
return None
|
||
|
||
|
||
def merge_parameter_schema_rows(
|
||
category_rows: Sequence[Dict[str, Any]],
|
||
type_rows: Sequence[Dict[str, Any]],
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Pure merge: category assignments + type assignments → sorted schema list.
|
||
Row shapes match SELECTs in resolve_activity_attribute_schema (cat_sort / typ_* aliases).
|
||
"""
|
||
merged: Dict[int, Dict[str, Any]] = {}
|
||
|
||
for r in category_rows:
|
||
pid = r["training_parameter_id"]
|
||
merged[pid] = {
|
||
"training_parameter_id": pid,
|
||
"key": r["key"],
|
||
"name_de": r["name_de"],
|
||
"name_en": r["name_en"],
|
||
"param_category": r["param_category"],
|
||
"data_type": r["data_type"],
|
||
"unit": r["unit"],
|
||
"validation_rules": r["validation_rules"] or {},
|
||
"source_field": r["source_field"],
|
||
"sort_order": r["cat_sort"],
|
||
"required": bool(r["cat_required"]),
|
||
"ui_group": r["cat_ui_group"],
|
||
}
|
||
|
||
for r in type_rows:
|
||
pid = r["training_parameter_id"]
|
||
base = merged.get(pid)
|
||
if base is None:
|
||
merged[pid] = {
|
||
"training_parameter_id": pid,
|
||
"key": r["key"],
|
||
"name_de": r["name_de"],
|
||
"name_en": r["name_en"],
|
||
"param_category": r["param_category"],
|
||
"data_type": r["data_type"],
|
||
"unit": r["unit"],
|
||
"validation_rules": r["validation_rules"] or {},
|
||
"source_field": r["source_field"],
|
||
"sort_order": r["typ_sort"] if r["typ_sort"] is not None else 0,
|
||
"required": bool(r["typ_required"]) if r["typ_required"] is not None else False,
|
||
"ui_group": r["typ_ui_group"],
|
||
}
|
||
else:
|
||
if r["typ_sort"] is not None:
|
||
base["sort_order"] = r["typ_sort"]
|
||
if r["typ_required"] is not None:
|
||
base["required"] = bool(r["typ_required"])
|
||
if r["typ_ui_group"] is not None:
|
||
base["ui_group"] = r["typ_ui_group"]
|
||
|
||
out = list(merged.values())
|
||
out.sort(key=lambda x: (x["sort_order"], x["key"]))
|
||
return out
|
||
|
||
|
||
def resolve_activity_attribute_schema(
|
||
cur,
|
||
training_category: Optional[str],
|
||
training_type_id: Optional[int],
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Merged parameter definitions for UI / validation (category base + type overrides/additions).
|
||
Sorted by sort_order, then key.
|
||
"""
|
||
cat = _effective_training_category(cur, training_category, training_type_id)
|
||
category_rows: List[Dict[str, Any]] = []
|
||
type_rows: List[Dict[str, Any]] = []
|
||
|
||
if cat:
|
||
cur.execute(
|
||
"""
|
||
SELECT
|
||
tcp.training_parameter_id,
|
||
tcp.sort_order AS cat_sort,
|
||
tcp.required AS cat_required,
|
||
tcp.ui_group AS cat_ui_group,
|
||
tp.key, tp.name_de, tp.name_en, tp.category AS param_category,
|
||
tp.data_type, tp.unit, tp.validation_rules, tp.source_field
|
||
FROM training_category_parameter tcp
|
||
JOIN training_parameters tp ON tp.id = tcp.training_parameter_id
|
||
WHERE tcp.training_category = %s AND tp.is_active = true
|
||
""",
|
||
(cat,),
|
||
)
|
||
category_rows = list(cur.fetchall())
|
||
|
||
if training_type_id is not None:
|
||
cur.execute(
|
||
"""
|
||
SELECT
|
||
ttp.training_parameter_id,
|
||
ttp.sort_order AS typ_sort,
|
||
ttp.required AS typ_required,
|
||
ttp.ui_group AS typ_ui_group,
|
||
tp.key, tp.name_de, tp.name_en, tp.category AS param_category,
|
||
tp.data_type, tp.unit, tp.validation_rules, tp.source_field
|
||
FROM training_type_parameter ttp
|
||
JOIN training_parameters tp ON tp.id = ttp.training_parameter_id
|
||
WHERE ttp.training_type_id = %s AND tp.is_active = true
|
||
""",
|
||
(training_type_id,),
|
||
)
|
||
type_rows = list(cur.fetchall())
|
||
|
||
return merge_parameter_schema_rows(category_rows, type_rows)
|
||
|
||
|
||
def resolve_activity_attribute_schema_for_csv_import(
|
||
cur,
|
||
training_category: Optional[str],
|
||
training_type_id: Optional[int],
|
||
mapped: Mapping[str, Any],
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Wie resolve_activity_attribute_schema, plus alle aktiven training_parameters, deren key in
|
||
``mapped`` vorkommt (nicht Modul-Registry), aber nicht in Kategorie/Typ-Profil — z. B. wenn
|
||
``activity_type_mappings`` fehlt oder der Parameter nur für andere Typen gebucht ist.
|
||
|
||
Damit schreibt der Universal-CSV-Import EAV für gültig gemappte Zielfelder wie bei cd29c7d,
|
||
sobald der Parameter in ``training_parameters`` existiert.
|
||
"""
|
||
base = resolve_activity_attribute_schema(cur, training_category, training_type_id)
|
||
by_key: Dict[str, Dict[str, Any]] = {s["key"]: s for s in base}
|
||
|
||
for k, raw in mapped.items():
|
||
if raw is None or raw == "":
|
||
continue
|
||
if k in by_key:
|
||
continue
|
||
if k in ACTIVITY_MODULE_REGISTRY_FIELD_KEYS:
|
||
continue
|
||
cur.execute(
|
||
"""
|
||
SELECT
|
||
id,
|
||
key,
|
||
name_de,
|
||
name_en,
|
||
category AS param_category,
|
||
data_type,
|
||
unit,
|
||
validation_rules,
|
||
source_field
|
||
FROM training_parameters
|
||
WHERE key = %s AND is_active = true
|
||
LIMIT 1
|
||
""",
|
||
(k,),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row:
|
||
continue
|
||
pid = int(row["id"])
|
||
if any(s["training_parameter_id"] == pid for s in by_key.values()):
|
||
continue
|
||
by_key[k] = {
|
||
"training_parameter_id": pid,
|
||
"key": row["key"],
|
||
"name_de": row["name_de"],
|
||
"name_en": row["name_en"],
|
||
"param_category": row["param_category"],
|
||
"data_type": row["data_type"],
|
||
"unit": row["unit"],
|
||
"validation_rules": row["validation_rules"] or {},
|
||
"source_field": row["source_field"],
|
||
"sort_order": 100_000,
|
||
"required": False,
|
||
"ui_group": None,
|
||
}
|
||
|
||
out = list(by_key.values())
|
||
out.sort(key=lambda x: (x.get("sort_order", 0), x["key"]))
|
||
return out
|
||
|
||
|
||
def _validation_rules_dict(raw: Any) -> Dict[str, Any]:
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
return {}
|
||
|
||
|
||
def _validate_single_value(data_type: str, value: Any, rules: Dict[str, Any]) -> None:
|
||
if data_type == "integer":
|
||
if not isinstance(value, int) or isinstance(value, bool):
|
||
raise ActivitySessionMetricsError(400, f"Erwartet integer, erhalten: {type(value).__name__}")
|
||
if "min" in rules and value < rules["min"]:
|
||
raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})")
|
||
if "max" in rules and value > rules["max"]:
|
||
raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})")
|
||
elif data_type == "float":
|
||
if isinstance(value, bool) or not isinstance(value, (int, float, Decimal)):
|
||
raise ActivitySessionMetricsError(400, f"Erwartet Zahl, erhalten: {type(value).__name__}")
|
||
v = float(value)
|
||
if "min" in rules and v < float(rules["min"]):
|
||
raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})")
|
||
if "max" in rules and v > float(rules["max"]):
|
||
raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})")
|
||
elif data_type == "string":
|
||
if not isinstance(value, str):
|
||
raise ActivitySessionMetricsError(400, f"Erwartet string, erhalten: {type(value).__name__}")
|
||
if rules.get("not_empty") and not value.strip():
|
||
raise ActivitySessionMetricsError(400, "Leerer String nicht erlaubt")
|
||
if "max_length" in rules and len(value) > int(rules["max_length"]):
|
||
raise ActivitySessionMetricsError(400, f"String zu lang (max {rules['max_length']})")
|
||
allowed = rules.get("allowed_values")
|
||
if allowed and value not in allowed:
|
||
raise ActivitySessionMetricsError(400, "Wert nicht in erlaubter Menge")
|
||
elif data_type == "boolean":
|
||
if not isinstance(value, bool):
|
||
raise ActivitySessionMetricsError(400, f"Erwartet boolean, erhalten: {type(value).__name__}")
|
||
else:
|
||
raise ActivitySessionMetricsError(400, f"Unbekannter data_type: {data_type}")
|
||
|
||
|
||
def _row_value_tuple(data_type: str, value: Any) -> tuple:
|
||
if data_type == "integer":
|
||
return (None, int(value), None, None)
|
||
if data_type == "float":
|
||
return (float(value), None, None, None)
|
||
if data_type == "string":
|
||
return (None, None, str(value), None)
|
||
if data_type == "boolean":
|
||
return (None, None, None, bool(value))
|
||
raise ValueError(data_type)
|
||
|
||
|
||
def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any:
|
||
"""Wert aus activity_log-Spalte in den Typ bringen, den training_parameters.data_type erwartet."""
|
||
if data_type == "integer":
|
||
if isinstance(raw, bool):
|
||
raise TypeError("boolean nicht als integer erlaubt")
|
||
if isinstance(raw, str):
|
||
s = raw.strip().replace(",", ".")
|
||
return int(round(float(s)))
|
||
return int(round(float(raw)))
|
||
if data_type == "float":
|
||
if isinstance(raw, str):
|
||
s = raw.strip().replace(",", ".")
|
||
return float(s)
|
||
return float(raw)
|
||
if data_type == "string":
|
||
return str(raw) if raw is not None else ""
|
||
if data_type == "boolean":
|
||
if isinstance(raw, bool):
|
||
return raw
|
||
s = str(raw).strip().lower()
|
||
if s in ("true", "1", "t", "yes"):
|
||
return True
|
||
if s in ("false", "0", "f", "no", ""):
|
||
return False
|
||
raise TypeError(f"boolean-Koercion nicht möglich: {raw!r}")
|
||
raise ValueError(data_type)
|
||
|
||
|
||
def apply_activity_mapped_column_aliases_from_schema(
|
||
mapped: Mapping[str, Any],
|
||
schema: Sequence[Dict[str, Any]],
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
training_parameters.key weicht oft von activity_log-Spalte ab (z. B. avg_hr → hr_avg).
|
||
Kopiert Werte auf die Spalte, wenn die Spalte leer ist, damit CSV/Registry activity_log befüllt.
|
||
"""
|
||
m = dict(mapped)
|
||
for s in schema:
|
||
sf = s.get("source_field")
|
||
if not sf or not str(sf).strip():
|
||
continue
|
||
col = str(sf).strip()
|
||
pkey = s["key"]
|
||
if pkey == col:
|
||
continue
|
||
col_v = m.get(col)
|
||
if col_v is not None and col_v != "":
|
||
continue
|
||
pk_v = m.get(pkey)
|
||
if pk_v is None or pk_v == "":
|
||
continue
|
||
m[col] = pk_v
|
||
return m
|
||
|
||
|
||
def apply_activity_mapped_column_aliases(
|
||
cur,
|
||
mapped: Mapping[str, Any],
|
||
training_category: Optional[str],
|
||
training_type_id: Optional[int],
|
||
) -> Dict[str, Any]:
|
||
schema = resolve_activity_attribute_schema_for_csv_import(
|
||
cur, training_category, training_type_id, mapped
|
||
)
|
||
return apply_activity_mapped_column_aliases_from_schema(mapped, schema)
|
||
|
||
|
||
def upsert_session_metrics_from_csv_mapped(
|
||
cur,
|
||
profile_id: str,
|
||
activity_log_id: str,
|
||
mapped: Mapping[str, Any],
|
||
training_category: Optional[str],
|
||
training_type_id: Optional[int],
|
||
) -> None:
|
||
"""
|
||
EAV für Trainingsparameter aus CSV (nur Keys ohne activity_log-Spalte / ohne source_field).
|
||
|
||
Parameter mit gesetztem source_field sind kanonisch in activity_log — kein EAV-Schreiben (vermeidet
|
||
Doppelung zu avg_hr vs. hr_avg o. Ä.). Keys im activity-CSV-Modul werden ebenfalls übersprungen.
|
||
"""
|
||
cur.execute(
|
||
"SELECT profile_id FROM activity_log WHERE id = %s",
|
||
(activity_log_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row or str(row["profile_id"]) != str(profile_id):
|
||
return
|
||
schema = resolve_activity_attribute_schema_for_csv_import(
|
||
cur, training_category, training_type_id, mapped
|
||
)
|
||
for spec in schema:
|
||
pkey = spec["key"]
|
||
if pkey not in mapped:
|
||
continue
|
||
raw = mapped[pkey]
|
||
if raw is None or raw == "":
|
||
continue
|
||
if not _parameter_value_stored_in_eav_only(spec, pkey):
|
||
continue
|
||
tid = spec["training_parameter_id"]
|
||
dt = spec["data_type"]
|
||
rules = _validation_rules_dict(spec["validation_rules"])
|
||
try:
|
||
coerced = _coerce_raw_value_for_parameter(dt, raw)
|
||
_validate_single_value(dt, coerced, rules)
|
||
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
|
||
logger.warning("CSV EAV skipped %s: %s", pkey, ex)
|
||
continue
|
||
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO activity_session_metrics (
|
||
activity_log_id, training_parameter_id,
|
||
value_num, value_int, value_text, value_bool, updated_at
|
||
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
|
||
ON CONFLICT (activity_log_id, training_parameter_id)
|
||
DO UPDATE SET
|
||
value_num = EXCLUDED.value_num,
|
||
value_int = EXCLUDED.value_int,
|
||
value_text = EXCLUDED.value_text,
|
||
value_bool = EXCLUDED.value_bool,
|
||
updated_at = NOW()
|
||
""",
|
||
(activity_log_id, tid, vn, vi, vt, vb),
|
||
)
|
||
|
||
|
||
def merge_column_backed_and_eav_metrics(
|
||
header: Mapping[str, Any],
|
||
schema: Sequence[Dict[str, Any]],
|
||
eav_metrics: Sequence[Dict[str, Any]],
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Effektive Metrikliste: Pro Schema-Parameter mit source_field gilt activity_log als kanonisch, wenn
|
||
die Spalte befüllt und koerzierbar ist; sonst Fallback EAV. Reine EAV-Parameter (ohne Spalte oder
|
||
leere Spalte) kommen aus EAV. Verhindert doppelte Semantik ohne Schreib-Sync.
|
||
"""
|
||
eav_by_key = {m["key"]: m for m in eav_metrics}
|
||
merged: List[Dict[str, Any]] = []
|
||
keys_handled: set[str] = set()
|
||
|
||
for s in schema:
|
||
k = s["key"]
|
||
tid = s["training_parameter_id"]
|
||
dt = s["data_type"]
|
||
unit = s.get("unit")
|
||
sf = s.get("source_field")
|
||
|
||
used_column = False
|
||
if sf and isinstance(sf, str) and str(sf).strip():
|
||
col = str(sf).strip()
|
||
if col in header and header[col] is not None:
|
||
try:
|
||
val = _coerce_raw_value_for_parameter(dt, header[col])
|
||
merged.append(
|
||
{
|
||
"training_parameter_id": tid,
|
||
"key": k,
|
||
"data_type": dt,
|
||
"unit": unit,
|
||
"value": val,
|
||
}
|
||
)
|
||
used_column = True
|
||
keys_handled.add(k)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
if used_column:
|
||
continue
|
||
if k in eav_by_key:
|
||
merged.append(dict(eav_by_key[k]))
|
||
keys_handled.add(k)
|
||
continue
|
||
|
||
legacy_col = ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM.get(k)
|
||
if legacy_col and legacy_col in header and header[legacy_col] is not None:
|
||
try:
|
||
val = _coerce_raw_value_for_parameter(dt, header[legacy_col])
|
||
merged.append(
|
||
{
|
||
"training_parameter_id": tid,
|
||
"key": k,
|
||
"data_type": dt,
|
||
"unit": unit,
|
||
"value": val,
|
||
}
|
||
)
|
||
keys_handled.add(k)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
for m in eav_metrics:
|
||
if m["key"] in keys_handled:
|
||
continue
|
||
merged.append(dict(m))
|
||
|
||
merged.sort(key=lambda x: x["key"])
|
||
return merged
|
||
|
||
|
||
def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None:
|
||
"""
|
||
[Veraltet / nicht mehr in Schreibpfaden aufgerufen]
|
||
|
||
Früher: EAV spiegelte activity_log-Spalten für Parameter mit source_field.
|
||
Kanon: Spaltenwerte werden bei merge_column_backed_and_eav_metrics beim Lesen berücksichtigt; keine
|
||
doppelte Speicherung. Funktion bleibt für optionale Admin-/Reparatur-Skripte.
|
||
"""
|
||
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
|
||
row = cur.fetchone()
|
||
if not row or str(row["profile_id"]) != str(profile_id):
|
||
return
|
||
header = dict(row)
|
||
schema = resolve_activity_attribute_schema(
|
||
cur, header.get("training_category"), header.get("training_type_id")
|
||
)
|
||
for spec in schema:
|
||
sf = spec.get("source_field")
|
||
if sf is None or (isinstance(sf, str) and not str(sf).strip()):
|
||
continue
|
||
col = str(sf).strip()
|
||
if col not in header:
|
||
continue
|
||
raw = header[col]
|
||
tid = spec["training_parameter_id"]
|
||
dt = spec["data_type"]
|
||
rules = _validation_rules_dict(spec["validation_rules"])
|
||
|
||
if raw is None:
|
||
cur.execute(
|
||
"""
|
||
DELETE FROM activity_session_metrics
|
||
WHERE activity_log_id = %s AND training_parameter_id = %s
|
||
""",
|
||
(activity_log_id, tid),
|
||
)
|
||
continue
|
||
|
||
try:
|
||
coerced = _coerce_raw_value_for_parameter(dt, raw)
|
||
_validate_single_value(dt, coerced, rules)
|
||
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
|
||
logger.warning(
|
||
"sync_column_backed_session_metrics: überspringe %s (Spalte %s): %s",
|
||
spec.get("key"),
|
||
col,
|
||
ex,
|
||
)
|
||
continue
|
||
|
||
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO activity_session_metrics (
|
||
activity_log_id, training_parameter_id,
|
||
value_num, value_int, value_text, value_bool, updated_at
|
||
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
|
||
ON CONFLICT (activity_log_id, training_parameter_id)
|
||
DO UPDATE SET
|
||
value_num = EXCLUDED.value_num,
|
||
value_int = EXCLUDED.value_int,
|
||
value_text = EXCLUDED.value_text,
|
||
value_bool = EXCLUDED.value_bool,
|
||
updated_at = NOW()
|
||
""",
|
||
(activity_log_id, tid, vn, vi, vt, vb),
|
||
)
|
||
|
||
|
||
def fetch_activity_session_metrics(cur, activity_log_id: str) -> List[Dict[str, Any]]:
|
||
cur.execute(
|
||
"""
|
||
SELECT
|
||
m.id,
|
||
m.activity_log_id,
|
||
m.training_parameter_id,
|
||
m.value_num,
|
||
m.value_int,
|
||
m.value_text,
|
||
m.value_bool,
|
||
tp.key,
|
||
tp.data_type,
|
||
tp.unit
|
||
FROM activity_session_metrics m
|
||
JOIN training_parameters tp ON tp.id = m.training_parameter_id
|
||
WHERE m.activity_log_id = %s
|
||
ORDER BY tp.key
|
||
""",
|
||
(activity_log_id,),
|
||
)
|
||
rows = cur.fetchall()
|
||
out: List[Dict[str, Any]] = []
|
||
for r in rows:
|
||
dt = r["data_type"]
|
||
if dt == "integer":
|
||
val = int(r["value_int"]) if r["value_int"] is not None else None
|
||
elif dt == "float":
|
||
val = float(r["value_num"]) if r["value_num"] is not None else None
|
||
elif dt == "string":
|
||
val = r["value_text"]
|
||
else:
|
||
val = r["value_bool"]
|
||
out.append(
|
||
{
|
||
"training_parameter_id": r["training_parameter_id"],
|
||
"key": r["key"],
|
||
"data_type": dt,
|
||
"unit": r["unit"],
|
||
"value": val,
|
||
}
|
||
)
|
||
return out
|
||
|
||
|
||
def replace_activity_session_metrics(
|
||
cur,
|
||
profile_id: str,
|
||
activity_log_id: str,
|
||
metrics: Sequence[Dict[str, Any]],
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Full replace of EAV rows for this session. metrics: [{ "parameter_key": str, "value": ... }, ...]
|
||
"""
|
||
cur.execute(
|
||
"""
|
||
SELECT id, profile_id, training_category, training_type_id
|
||
FROM activity_log WHERE id = %s
|
||
""",
|
||
(activity_log_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
if not row or str(row["profile_id"]) != str(profile_id):
|
||
raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden")
|
||
|
||
schema = resolve_activity_attribute_schema(
|
||
cur, row.get("training_category"), row.get("training_type_id")
|
||
)
|
||
by_key = {s["key"]: s for s in schema}
|
||
payload_by_key: Dict[str, Dict[str, Any]] = {}
|
||
for item in metrics:
|
||
raw_k = item.get("parameter_key")
|
||
if raw_k is None or not str(raw_k).strip():
|
||
raise ActivitySessionMetricsError(400, "parameter_key fehlt")
|
||
k = str(raw_k).strip()
|
||
if k not in by_key:
|
||
raise ActivitySessionMetricsError(400, f"Unbekannter oder nicht zugewiesener Parameter: {k}")
|
||
payload_by_key[k] = item
|
||
|
||
for s in schema:
|
||
if not s["required"]:
|
||
continue
|
||
itk = s["key"]
|
||
if not _parameter_value_stored_in_eav_only(s, itk):
|
||
continue
|
||
hit = payload_by_key.get(itk)
|
||
if hit is None or hit.get("value") is None:
|
||
raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {itk}")
|
||
|
||
cur.execute(
|
||
"DELETE FROM activity_session_metrics WHERE activity_log_id = %s",
|
||
(activity_log_id,),
|
||
)
|
||
|
||
for item in metrics:
|
||
k = str(item["parameter_key"]).strip()
|
||
spec = by_key[k]
|
||
if not _parameter_value_stored_in_eav_only(spec, k):
|
||
continue
|
||
val = item.get("value")
|
||
if val is None:
|
||
if spec["required"] and _parameter_value_stored_in_eav_only(spec, k):
|
||
raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {k}")
|
||
continue
|
||
rules = _validation_rules_dict(spec["validation_rules"])
|
||
_validate_single_value(spec["data_type"], val, rules)
|
||
vn, vi, vt, vb = _row_value_tuple(spec["data_type"], val)
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO activity_session_metrics (
|
||
activity_log_id, training_parameter_id,
|
||
value_num, value_int, value_text, value_bool, updated_at
|
||
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
|
||
""",
|
||
(activity_log_id, spec["training_parameter_id"], vn, vi, vt, vb),
|
||
)
|
||
|
||
# Kein sync_column_backed nach PUT /metrics: der Request ist maßgeblich für EAV. Ein Spalten-Sync würde
|
||
# Werte aus nicht mitgeschriebenen activity_log-Spalten wieder verwerfen.
|
||
|
||
return fetch_activity_session_metrics(cur, activity_log_id)
|
||
|
||
|
||
def get_activity_session_logical_unit(cur, profile_id: str, activity_log_id: str) -> Dict[str, Any]:
|
||
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
|
||
row = cur.fetchone()
|
||
if not row or str(row["profile_id"]) != str(profile_id):
|
||
raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden")
|
||
|
||
header = dict(row)
|
||
schema = resolve_activity_attribute_schema(
|
||
cur, header.get("training_category"), header.get("training_type_id")
|
||
)
|
||
metrics = fetch_activity_session_metrics(cur, activity_log_id)
|
||
merged_metrics = merge_column_backed_and_eav_metrics(header, schema, metrics)
|
||
return {
|
||
"header": header,
|
||
"schema": schema,
|
||
"metrics": merged_metrics,
|
||
}
|
||
|
||
|
||
def enrich_sessions_with_metrics(cur, sessions: List[Dict[str, Any]]) -> None:
|
||
"""
|
||
Mutates each session dict: adds key 'session_metrics' (list).
|
||
|
||
Kombiniert EAV mit activity_log-Spalten für Parameter mit source_field (kanonisch: Spalte),
|
||
analog zu get_activity_session_logical_unit – ohne doppelte EAV-Speicherung beim Import.
|
||
"""
|
||
if not sessions:
|
||
return
|
||
ids = [str(s["id"]) for s in sessions if s.get("id")]
|
||
if not ids:
|
||
return
|
||
ph = ",".join(["%s"] * len(ids))
|
||
|
||
cur.execute(
|
||
f"SELECT * FROM activity_log WHERE id IN ({ph})",
|
||
ids,
|
||
)
|
||
headers_by_id: Dict[str, Dict[str, Any]] = {}
|
||
for r in cur.fetchall():
|
||
h = dict(r)
|
||
headers_by_id[str(h["id"])] = h
|
||
|
||
cur.execute(
|
||
f"""
|
||
SELECT
|
||
m.activity_log_id,
|
||
m.training_parameter_id,
|
||
tp.key,
|
||
tp.data_type,
|
||
tp.unit,
|
||
m.value_num,
|
||
m.value_int,
|
||
m.value_text,
|
||
m.value_bool
|
||
FROM activity_session_metrics m
|
||
JOIN training_parameters tp ON tp.id = m.training_parameter_id
|
||
WHERE m.activity_log_id IN ({ph})
|
||
ORDER BY m.activity_log_id, tp.key
|
||
""",
|
||
ids,
|
||
)
|
||
by_act: Dict[str, List[Dict[str, Any]]] = {}
|
||
for r in cur.fetchall():
|
||
aid = str(r["activity_log_id"])
|
||
dt = r["data_type"]
|
||
if dt == "integer":
|
||
val = int(r["value_int"]) if r["value_int"] is not None else None
|
||
elif dt == "float":
|
||
val = float(r["value_num"]) if r["value_num"] is not None else None
|
||
elif dt == "string":
|
||
val = r["value_text"]
|
||
else:
|
||
val = r["value_bool"]
|
||
by_act.setdefault(aid, []).append(
|
||
{
|
||
"training_parameter_id": r["training_parameter_id"],
|
||
"key": r["key"],
|
||
"data_type": dt,
|
||
"unit": r["unit"],
|
||
"value": val,
|
||
}
|
||
)
|
||
|
||
schema_cache: Dict[tuple[Any, Any], List[Dict[str, Any]]] = {}
|
||
|
||
def _schema(cat: Any, tid: Any) -> List[Dict[str, Any]]:
|
||
cache_key = (cat, tid)
|
||
if cache_key not in schema_cache:
|
||
schema_cache[cache_key] = resolve_activity_attribute_schema(cur, cat, tid)
|
||
return schema_cache[cache_key]
|
||
|
||
for s in sessions:
|
||
aid = str(s.get("id"))
|
||
header = headers_by_id.get(aid)
|
||
if not header:
|
||
s["session_metrics"] = []
|
||
continue
|
||
schema = _schema(header.get("training_category"), header.get("training_type_id"))
|
||
eav_list = by_act.get(aid, [])
|
||
merged = merge_column_backed_and_eav_metrics(header, schema, eav_list)
|
||
s["session_metrics"] = [
|
||
{"key": m["key"], "data_type": m["data_type"], "unit": m["unit"], "value": m["value"]}
|
||
for m in merged
|
||
]
|