mitai-jinkendo/backend/data_layer/activity_session_metrics.py
Lars 8d0a6dd487
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Refactor activity data handling to use dynamic registry fields
- Replaced hardcoded keys in `activity_data_canon.py` with a dynamic retrieval method from the module registry, ensuring that `ACTIVITY_MODULE_REGISTRY_FIELD_KEYS` reflects the current configuration.
- Updated `activity_persistence_orchestrator.py` to utilize the new dynamic field retrieval function, enhancing consistency across the data layer.
- Modified `activity_session_metrics.py` to reference the dynamic field keys, improving maintainability and reducing redundancy in the codebase.
2026-04-16 12:14:39 +02:00

671 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Activity session metrics (EAV) and resolved attribute schema — Layer 1.
See: .claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md
"""
from __future__ import annotations
import logging
from decimal import Decimal
from typing import Any, Dict, List, Mapping, Optional, Sequence
from data_layer.activity_data_canon import (
ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM,
ACTIVITY_MODULE_REGISTRY_FIELD_KEYS,
)
logger = logging.getLogger(__name__)
# Diese Spalten nicht aus CSV-Parameter-Zuordnung überschreiben (kommen aus Typ-Mapping / System).
ACTIVITY_LOG_PATCH_FORBIDDEN = frozenset(
{
"id",
"profile_id",
"date",
"created",
"training_type_id",
"training_category",
"training_subcategory",
"source",
}
)
class ActivitySessionMetricsError(Exception):
"""Raised by Layer 1; routers map to HTTP (404/400)."""
def __init__(self, status_code: int, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(detail)
def _effective_training_category(
cur, training_category: Optional[str], training_type_id: Optional[int]
) -> Optional[str]:
if training_category:
return training_category.strip() or None
if training_type_id is None:
return None
cur.execute("SELECT category FROM training_types WHERE id = %s", (training_type_id,))
row = cur.fetchone()
if row and row.get("category"):
return row["category"]
return None
def merge_parameter_schema_rows(
category_rows: Sequence[Dict[str, Any]],
type_rows: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Pure merge: category assignments + type assignments → sorted schema list.
Row shapes match SELECTs in resolve_activity_attribute_schema (cat_sort / typ_* aliases).
"""
merged: Dict[int, Dict[str, Any]] = {}
for r in category_rows:
pid = r["training_parameter_id"]
merged[pid] = {
"training_parameter_id": pid,
"key": r["key"],
"name_de": r["name_de"],
"name_en": r["name_en"],
"param_category": r["param_category"],
"data_type": r["data_type"],
"unit": r["unit"],
"validation_rules": r["validation_rules"] or {},
"source_field": r["source_field"],
"sort_order": r["cat_sort"],
"required": bool(r["cat_required"]),
"ui_group": r["cat_ui_group"],
}
for r in type_rows:
pid = r["training_parameter_id"]
base = merged.get(pid)
if base is None:
merged[pid] = {
"training_parameter_id": pid,
"key": r["key"],
"name_de": r["name_de"],
"name_en": r["name_en"],
"param_category": r["param_category"],
"data_type": r["data_type"],
"unit": r["unit"],
"validation_rules": r["validation_rules"] or {},
"source_field": r["source_field"],
"sort_order": r["typ_sort"] if r["typ_sort"] is not None else 0,
"required": bool(r["typ_required"]) if r["typ_required"] is not None else False,
"ui_group": r["typ_ui_group"],
}
else:
if r["typ_sort"] is not None:
base["sort_order"] = r["typ_sort"]
if r["typ_required"] is not None:
base["required"] = bool(r["typ_required"])
if r["typ_ui_group"] is not None:
base["ui_group"] = r["typ_ui_group"]
out = list(merged.values())
out.sort(key=lambda x: (x["sort_order"], x["key"]))
return out
def resolve_activity_attribute_schema(
cur,
training_category: Optional[str],
training_type_id: Optional[int],
) -> List[Dict[str, Any]]:
"""
Merged parameter definitions for UI / validation (category base + type overrides/additions).
Sorted by sort_order, then key.
"""
cat = _effective_training_category(cur, training_category, training_type_id)
category_rows: List[Dict[str, Any]] = []
type_rows: List[Dict[str, Any]] = []
if cat:
cur.execute(
"""
SELECT
tcp.training_parameter_id,
tcp.sort_order AS cat_sort,
tcp.required AS cat_required,
tcp.ui_group AS cat_ui_group,
tp.key, tp.name_de, tp.name_en, tp.category AS param_category,
tp.data_type, tp.unit, tp.validation_rules, tp.source_field
FROM training_category_parameter tcp
JOIN training_parameters tp ON tp.id = tcp.training_parameter_id
WHERE tcp.training_category = %s AND tp.is_active = true
""",
(cat,),
)
category_rows = list(cur.fetchall())
if training_type_id is not None:
cur.execute(
"""
SELECT
ttp.training_parameter_id,
ttp.sort_order AS typ_sort,
ttp.required AS typ_required,
ttp.ui_group AS typ_ui_group,
tp.key, tp.name_de, tp.name_en, tp.category AS param_category,
tp.data_type, tp.unit, tp.validation_rules, tp.source_field
FROM training_type_parameter ttp
JOIN training_parameters tp ON tp.id = ttp.training_parameter_id
WHERE ttp.training_type_id = %s AND tp.is_active = true
""",
(training_type_id,),
)
type_rows = list(cur.fetchall())
return merge_parameter_schema_rows(category_rows, type_rows)
def _validation_rules_dict(raw: Any) -> Dict[str, Any]:
if isinstance(raw, dict):
return raw
return {}
def _validate_single_value(data_type: str, value: Any, rules: Dict[str, Any]) -> None:
if data_type == "integer":
if not isinstance(value, int) or isinstance(value, bool):
raise ActivitySessionMetricsError(400, f"Erwartet integer, erhalten: {type(value).__name__}")
if "min" in rules and value < rules["min"]:
raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})")
if "max" in rules and value > rules["max"]:
raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})")
elif data_type == "float":
if isinstance(value, bool) or not isinstance(value, (int, float, Decimal)):
raise ActivitySessionMetricsError(400, f"Erwartet Zahl, erhalten: {type(value).__name__}")
v = float(value)
if "min" in rules and v < float(rules["min"]):
raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})")
if "max" in rules and v > float(rules["max"]):
raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})")
elif data_type == "string":
if not isinstance(value, str):
raise ActivitySessionMetricsError(400, f"Erwartet string, erhalten: {type(value).__name__}")
if rules.get("not_empty") and not value.strip():
raise ActivitySessionMetricsError(400, "Leerer String nicht erlaubt")
if "max_length" in rules and len(value) > int(rules["max_length"]):
raise ActivitySessionMetricsError(400, f"String zu lang (max {rules['max_length']})")
allowed = rules.get("allowed_values")
if allowed and value not in allowed:
raise ActivitySessionMetricsError(400, "Wert nicht in erlaubter Menge")
elif data_type == "boolean":
if not isinstance(value, bool):
raise ActivitySessionMetricsError(400, f"Erwartet boolean, erhalten: {type(value).__name__}")
else:
raise ActivitySessionMetricsError(400, f"Unbekannter data_type: {data_type}")
def _row_value_tuple(data_type: str, value: Any) -> tuple:
if data_type == "integer":
return (None, int(value), None, None)
if data_type == "float":
return (float(value), None, None, None)
if data_type == "string":
return (None, None, str(value), None)
if data_type == "boolean":
return (None, None, None, bool(value))
raise ValueError(data_type)
def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any:
"""Wert aus activity_log-Spalte in den Typ bringen, den training_parameters.data_type erwartet."""
if data_type == "integer":
if isinstance(raw, bool):
raise TypeError("boolean nicht als integer erlaubt")
if isinstance(raw, str):
s = raw.strip().replace(",", ".")
return int(round(float(s)))
return int(round(float(raw)))
if data_type == "float":
if isinstance(raw, str):
s = raw.strip().replace(",", ".")
return float(s)
return float(raw)
if data_type == "string":
return str(raw) if raw is not None else ""
if data_type == "boolean":
if isinstance(raw, bool):
return raw
s = str(raw).strip().lower()
if s in ("true", "1", "t", "yes"):
return True
if s in ("false", "0", "f", "no", ""):
return False
raise TypeError(f"boolean-Koercion nicht möglich: {raw!r}")
raise ValueError(data_type)
def upsert_session_metrics_from_csv_mapped(
cur,
profile_id: str,
activity_log_id: str,
mapped: Mapping[str, Any],
training_category: Optional[str],
training_type_id: Optional[int],
) -> None:
"""
EAV für Trainingsparameter aus CSV (nur Keys, die nicht im activity-Modul-Registry liegen).
Kernfelder (Datum, Start, Distanz, HF, …) schreibt der Executor nach activity_log;
hier keine doppelten EAV-Zeilen für dieselben Registry-Keys.
"""
cur.execute(
"SELECT profile_id FROM activity_log WHERE id = %s",
(activity_log_id,),
)
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
return
schema = resolve_activity_attribute_schema(cur, training_category, training_type_id)
for spec in schema:
pkey = spec["key"]
if pkey not in mapped:
continue
raw = mapped[pkey]
if raw is None or raw == "":
continue
if pkey in ACTIVITY_MODULE_REGISTRY_FIELD_KEYS:
continue
tid = spec["training_parameter_id"]
dt = spec["data_type"]
rules = _validation_rules_dict(spec["validation_rules"])
try:
coerced = _coerce_raw_value_for_parameter(dt, raw)
_validate_single_value(dt, coerced, rules)
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
logger.warning("CSV EAV skipped %s: %s", pkey, ex)
continue
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (activity_log_id, training_parameter_id)
DO UPDATE SET
value_num = EXCLUDED.value_num,
value_int = EXCLUDED.value_int,
value_text = EXCLUDED.value_text,
value_bool = EXCLUDED.value_bool,
updated_at = NOW()
""",
(activity_log_id, tid, vn, vi, vt, vb),
)
def merge_column_backed_and_eav_metrics(
header: Mapping[str, Any],
schema: Sequence[Dict[str, Any]],
eav_metrics: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Effektive Metrikliste: Pro Schema-Parameter mit source_field gilt activity_log als kanonisch, wenn
die Spalte befüllt und koerzierbar ist; sonst Fallback EAV. Reine EAV-Parameter (ohne Spalte oder
leere Spalte) kommen aus EAV. Verhindert doppelte Semantik ohne Schreib-Sync.
"""
eav_by_key = {m["key"]: m for m in eav_metrics}
merged: List[Dict[str, Any]] = []
keys_handled: set[str] = set()
for s in schema:
k = s["key"]
tid = s["training_parameter_id"]
dt = s["data_type"]
unit = s.get("unit")
sf = s.get("source_field")
used_column = False
if sf and isinstance(sf, str) and str(sf).strip():
col = str(sf).strip()
if col in header and header[col] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[col])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
}
)
used_column = True
keys_handled.add(k)
except (TypeError, ValueError):
pass
if used_column:
continue
if k in eav_by_key:
merged.append(dict(eav_by_key[k]))
keys_handled.add(k)
continue
legacy_col = ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM.get(k)
if legacy_col and legacy_col in header and header[legacy_col] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[legacy_col])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
}
)
keys_handled.add(k)
except (TypeError, ValueError):
pass
for m in eav_metrics:
if m["key"] in keys_handled:
continue
merged.append(dict(m))
merged.sort(key=lambda x: x["key"])
return merged
def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None:
"""
[Veraltet / nicht mehr in Schreibpfaden aufgerufen]
Früher: EAV spiegelte activity_log-Spalten für Parameter mit source_field.
Kanon: Spaltenwerte werden bei merge_column_backed_and_eav_metrics beim Lesen berücksichtigt; keine
doppelte Speicherung. Funktion bleibt für optionale Admin-/Reparatur-Skripte.
"""
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
return
header = dict(row)
schema = resolve_activity_attribute_schema(
cur, header.get("training_category"), header.get("training_type_id")
)
for spec in schema:
sf = spec.get("source_field")
if sf is None or (isinstance(sf, str) and not str(sf).strip()):
continue
col = str(sf).strip()
if col not in header:
continue
raw = header[col]
tid = spec["training_parameter_id"]
dt = spec["data_type"]
rules = _validation_rules_dict(spec["validation_rules"])
if raw is None:
cur.execute(
"""
DELETE FROM activity_session_metrics
WHERE activity_log_id = %s AND training_parameter_id = %s
""",
(activity_log_id, tid),
)
continue
try:
coerced = _coerce_raw_value_for_parameter(dt, raw)
_validate_single_value(dt, coerced, rules)
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
logger.warning(
"sync_column_backed_session_metrics: überspringe %s (Spalte %s): %s",
spec.get("key"),
col,
ex,
)
continue
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (activity_log_id, training_parameter_id)
DO UPDATE SET
value_num = EXCLUDED.value_num,
value_int = EXCLUDED.value_int,
value_text = EXCLUDED.value_text,
value_bool = EXCLUDED.value_bool,
updated_at = NOW()
""",
(activity_log_id, tid, vn, vi, vt, vb),
)
def fetch_activity_session_metrics(cur, activity_log_id: str) -> List[Dict[str, Any]]:
cur.execute(
"""
SELECT
m.id,
m.activity_log_id,
m.training_parameter_id,
m.value_num,
m.value_int,
m.value_text,
m.value_bool,
tp.key,
tp.data_type,
tp.unit
FROM activity_session_metrics m
JOIN training_parameters tp ON tp.id = m.training_parameter_id
WHERE m.activity_log_id = %s
ORDER BY tp.key
""",
(activity_log_id,),
)
rows = cur.fetchall()
out: List[Dict[str, Any]] = []
for r in rows:
dt = r["data_type"]
if dt == "integer":
val = int(r["value_int"]) if r["value_int"] is not None else None
elif dt == "float":
val = float(r["value_num"]) if r["value_num"] is not None else None
elif dt == "string":
val = r["value_text"]
else:
val = r["value_bool"]
out.append(
{
"training_parameter_id": r["training_parameter_id"],
"key": r["key"],
"data_type": dt,
"unit": r["unit"],
"value": val,
}
)
return out
def replace_activity_session_metrics(
cur,
profile_id: str,
activity_log_id: str,
metrics: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Full replace of EAV rows for this session. metrics: [{ "parameter_key": str, "value": ... }, ...]
"""
cur.execute(
"""
SELECT id, profile_id, training_category, training_type_id
FROM activity_log WHERE id = %s
""",
(activity_log_id,),
)
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden")
schema = resolve_activity_attribute_schema(
cur, row.get("training_category"), row.get("training_type_id")
)
by_key = {s["key"]: s for s in schema}
payload_by_key: Dict[str, Dict[str, Any]] = {}
for item in metrics:
raw_k = item.get("parameter_key")
if raw_k is None or not str(raw_k).strip():
raise ActivitySessionMetricsError(400, "parameter_key fehlt")
k = str(raw_k).strip()
if k not in by_key:
raise ActivitySessionMetricsError(400, f"Unbekannter oder nicht zugewiesener Parameter: {k}")
payload_by_key[k] = item
for s in schema:
if not s["required"]:
continue
itk = s["key"]
hit = payload_by_key.get(itk)
if hit is None or hit.get("value") is None:
raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {itk}")
cur.execute(
"DELETE FROM activity_session_metrics WHERE activity_log_id = %s",
(activity_log_id,),
)
for item in metrics:
k = str(item["parameter_key"]).strip()
spec = by_key[k]
val = item.get("value")
if val is None:
if spec["required"]:
raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {k}")
continue
rules = _validation_rules_dict(spec["validation_rules"])
_validate_single_value(spec["data_type"], val, rules)
vn, vi, vt, vb = _row_value_tuple(spec["data_type"], val)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
""",
(activity_log_id, spec["training_parameter_id"], vn, vi, vt, vb),
)
# Kein sync_column_backed nach PUT /metrics: der Request ist maßgeblich für EAV. Ein Spalten-Sync würde
# Werte aus nicht mitgeschriebenen activity_log-Spalten wieder verwerfen.
return fetch_activity_session_metrics(cur, activity_log_id)
def get_activity_session_logical_unit(cur, profile_id: str, activity_log_id: str) -> Dict[str, Any]:
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden")
header = dict(row)
schema = resolve_activity_attribute_schema(
cur, header.get("training_category"), header.get("training_type_id")
)
metrics = fetch_activity_session_metrics(cur, activity_log_id)
merged_metrics = merge_column_backed_and_eav_metrics(header, schema, metrics)
return {
"header": header,
"schema": schema,
"metrics": merged_metrics,
}
def enrich_sessions_with_metrics(cur, sessions: List[Dict[str, Any]]) -> None:
"""
Mutates each session dict: adds key 'session_metrics' (list).
Kombiniert EAV mit activity_log-Spalten für Parameter mit source_field (kanonisch: Spalte),
analog zu get_activity_session_logical_unit ohne doppelte EAV-Speicherung beim Import.
"""
if not sessions:
return
ids = [str(s["id"]) for s in sessions if s.get("id")]
if not ids:
return
ph = ",".join(["%s"] * len(ids))
cur.execute(
f"SELECT * FROM activity_log WHERE id IN ({ph})",
ids,
)
headers_by_id: Dict[str, Dict[str, Any]] = {}
for r in cur.fetchall():
h = dict(r)
headers_by_id[str(h["id"])] = h
cur.execute(
f"""
SELECT
m.activity_log_id,
m.training_parameter_id,
tp.key,
tp.data_type,
tp.unit,
m.value_num,
m.value_int,
m.value_text,
m.value_bool
FROM activity_session_metrics m
JOIN training_parameters tp ON tp.id = m.training_parameter_id
WHERE m.activity_log_id IN ({ph})
ORDER BY m.activity_log_id, tp.key
""",
ids,
)
by_act: Dict[str, List[Dict[str, Any]]] = {}
for r in cur.fetchall():
aid = str(r["activity_log_id"])
dt = r["data_type"]
if dt == "integer":
val = int(r["value_int"]) if r["value_int"] is not None else None
elif dt == "float":
val = float(r["value_num"]) if r["value_num"] is not None else None
elif dt == "string":
val = r["value_text"]
else:
val = r["value_bool"]
by_act.setdefault(aid, []).append(
{
"training_parameter_id": r["training_parameter_id"],
"key": r["key"],
"data_type": dt,
"unit": r["unit"],
"value": val,
}
)
schema_cache: Dict[tuple[Any, Any], List[Dict[str, Any]]] = {}
def _schema(cat: Any, tid: Any) -> List[Dict[str, Any]]:
cache_key = (cat, tid)
if cache_key not in schema_cache:
schema_cache[cache_key] = resolve_activity_attribute_schema(cur, cat, tid)
return schema_cache[cache_key]
for s in sessions:
aid = str(s.get("id"))
header = headers_by_id.get(aid)
if not header:
s["session_metrics"] = []
continue
schema = _schema(header.get("training_category"), header.get("training_type_id"))
eav_list = by_act.get(aid, [])
merged = merge_column_backed_and_eav_metrics(header, schema, eav_list)
s["session_metrics"] = [
{"key": m["key"], "data_type": m["data_type"], "unit": m["unit"], "value": m["value"]}
for m in merged
]