""" Activity session metrics (EAV) and resolved attribute schema — Layer 1. See: .claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md """ from __future__ import annotations import logging from decimal import Decimal from typing import Any, Dict, List, Optional, Sequence logger = logging.getLogger(__name__) class ActivitySessionMetricsError(Exception): """Raised by Layer 1; routers map to HTTP (404/400).""" def __init__(self, status_code: int, detail: str): self.status_code = status_code self.detail = detail super().__init__(detail) def _effective_training_category( cur, training_category: Optional[str], training_type_id: Optional[int] ) -> Optional[str]: if training_category: return training_category.strip() or None if training_type_id is None: return None cur.execute("SELECT category FROM training_types WHERE id = %s", (training_type_id,)) row = cur.fetchone() if row and row.get("category"): return row["category"] return None def merge_parameter_schema_rows( category_rows: Sequence[Dict[str, Any]], type_rows: Sequence[Dict[str, Any]], ) -> List[Dict[str, Any]]: """ Pure merge: category assignments + type assignments → sorted schema list. Row shapes match SELECTs in resolve_activity_attribute_schema (cat_sort / typ_* aliases). """ merged: Dict[int, Dict[str, Any]] = {} for r in category_rows: pid = r["training_parameter_id"] merged[pid] = { "training_parameter_id": pid, "key": r["key"], "name_de": r["name_de"], "name_en": r["name_en"], "param_category": r["param_category"], "data_type": r["data_type"], "unit": r["unit"], "validation_rules": r["validation_rules"] or {}, "source_field": r["source_field"], "sort_order": r["cat_sort"], "required": bool(r["cat_required"]), "ui_group": r["cat_ui_group"], } for r in type_rows: pid = r["training_parameter_id"] base = merged.get(pid) if base is None: merged[pid] = { "training_parameter_id": pid, "key": r["key"], "name_de": r["name_de"], "name_en": r["name_en"], "param_category": r["param_category"], "data_type": r["data_type"], "unit": r["unit"], "validation_rules": r["validation_rules"] or {}, "source_field": r["source_field"], "sort_order": r["typ_sort"] if r["typ_sort"] is not None else 0, "required": bool(r["typ_required"]) if r["typ_required"] is not None else False, "ui_group": r["typ_ui_group"], } else: if r["typ_sort"] is not None: base["sort_order"] = r["typ_sort"] if r["typ_required"] is not None: base["required"] = bool(r["typ_required"]) if r["typ_ui_group"] is not None: base["ui_group"] = r["typ_ui_group"] out = list(merged.values()) out.sort(key=lambda x: (x["sort_order"], x["key"])) return out def resolve_activity_attribute_schema( cur, training_category: Optional[str], training_type_id: Optional[int], ) -> List[Dict[str, Any]]: """ Merged parameter definitions for UI / validation (category base + type overrides/additions). Sorted by sort_order, then key. """ cat = _effective_training_category(cur, training_category, training_type_id) category_rows: List[Dict[str, Any]] = [] type_rows: List[Dict[str, Any]] = [] if cat: cur.execute( """ SELECT tcp.training_parameter_id, tcp.sort_order AS cat_sort, tcp.required AS cat_required, tcp.ui_group AS cat_ui_group, tp.key, tp.name_de, tp.name_en, tp.category AS param_category, tp.data_type, tp.unit, tp.validation_rules, tp.source_field FROM training_category_parameter tcp JOIN training_parameters tp ON tp.id = tcp.training_parameter_id WHERE tcp.training_category = %s AND tp.is_active = true """, (cat,), ) category_rows = list(cur.fetchall()) if training_type_id is not None: cur.execute( """ SELECT ttp.training_parameter_id, ttp.sort_order AS typ_sort, ttp.required AS typ_required, ttp.ui_group AS typ_ui_group, tp.key, tp.name_de, tp.name_en, tp.category AS param_category, tp.data_type, tp.unit, tp.validation_rules, tp.source_field FROM training_type_parameter ttp JOIN training_parameters tp ON tp.id = ttp.training_parameter_id WHERE ttp.training_type_id = %s AND tp.is_active = true """, (training_type_id,), ) type_rows = list(cur.fetchall()) return merge_parameter_schema_rows(category_rows, type_rows) def _validation_rules_dict(raw: Any) -> Dict[str, Any]: if isinstance(raw, dict): return raw return {} def _validate_single_value(data_type: str, value: Any, rules: Dict[str, Any]) -> None: if data_type == "integer": if not isinstance(value, int) or isinstance(value, bool): raise ActivitySessionMetricsError(400, f"Erwartet integer, erhalten: {type(value).__name__}") if "min" in rules and value < rules["min"]: raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})") if "max" in rules and value > rules["max"]: raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})") elif data_type == "float": if isinstance(value, bool) or not isinstance(value, (int, float, Decimal)): raise ActivitySessionMetricsError(400, f"Erwartet Zahl, erhalten: {type(value).__name__}") v = float(value) if "min" in rules and v < float(rules["min"]): raise ActivitySessionMetricsError(400, f"Wert unter min ({rules['min']})") if "max" in rules and v > float(rules["max"]): raise ActivitySessionMetricsError(400, f"Wert über max ({rules['max']})") elif data_type == "string": if not isinstance(value, str): raise ActivitySessionMetricsError(400, f"Erwartet string, erhalten: {type(value).__name__}") if rules.get("not_empty") and not value.strip(): raise ActivitySessionMetricsError(400, "Leerer String nicht erlaubt") if "max_length" in rules and len(value) > int(rules["max_length"]): raise ActivitySessionMetricsError(400, f"String zu lang (max {rules['max_length']})") allowed = rules.get("allowed_values") if allowed and value not in allowed: raise ActivitySessionMetricsError(400, "Wert nicht in erlaubter Menge") elif data_type == "boolean": if not isinstance(value, bool): raise ActivitySessionMetricsError(400, f"Erwartet boolean, erhalten: {type(value).__name__}") else: raise ActivitySessionMetricsError(400, f"Unbekannter data_type: {data_type}") def _row_value_tuple(data_type: str, value: Any) -> tuple: if data_type == "integer": return (None, int(value), None, None) if data_type == "float": return (float(value), None, None, None) if data_type == "string": return (None, None, str(value), None) if data_type == "boolean": return (None, None, None, bool(value)) raise ValueError(data_type) def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any: """Wert aus activity_log-Spalte in den Typ bringen, den training_parameters.data_type erwartet.""" if data_type == "integer": if isinstance(raw, bool): raise TypeError("boolean nicht als integer erlaubt") return int(round(float(raw))) if data_type == "float": return float(raw) if data_type == "string": return str(raw) if raw is not None else "" if data_type == "boolean": if isinstance(raw, bool): return raw s = str(raw).strip().lower() if s in ("true", "1", "t", "yes"): return True if s in ("false", "0", "f", "no", ""): return False raise TypeError(f"boolean-Koercion nicht möglich: {raw!r}") raise ValueError(data_type) def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None: """ EAV-Zeilen für alle Schema-Parameter mit gesetztem source_field aus der activity_log-Zeile schreiben (Upsert) bzw. bei NULL in der Quellspalte löschen. Reine Layer-1-Logik; keine Router-Abhängigkeit. Synchron mit Übergangsphase: activity_log bleibt kanonisch für klassische Spalten; EAV spiegelt dieselben Werte für Profil/Platzhalter/Detail-API, ohne replace_activity_session_metrics aufzurufen. """ cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,)) row = cur.fetchone() if not row or str(row["profile_id"]) != str(profile_id): return header = dict(row) schema = resolve_activity_attribute_schema( cur, header.get("training_category"), header.get("training_type_id") ) for spec in schema: sf = spec.get("source_field") if sf is None or (isinstance(sf, str) and not str(sf).strip()): continue col = str(sf).strip() if col not in header: continue raw = header[col] tid = spec["training_parameter_id"] dt = spec["data_type"] rules = _validation_rules_dict(spec["validation_rules"]) if raw is None: cur.execute( """ DELETE FROM activity_session_metrics WHERE activity_log_id = %s AND training_parameter_id = %s """, (activity_log_id, tid), ) continue try: coerced = _coerce_raw_value_for_parameter(dt, raw) _validate_single_value(dt, coerced, rules) except (ActivitySessionMetricsError, TypeError, ValueError) as ex: logger.warning( "sync_column_backed_session_metrics: überspringe %s (Spalte %s): %s", spec.get("key"), col, ex, ) continue vn, vi, vt, vb = _row_value_tuple(dt, coerced) cur.execute( """ INSERT INTO activity_session_metrics ( activity_log_id, training_parameter_id, value_num, value_int, value_text, value_bool, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, NOW()) ON CONFLICT (activity_log_id, training_parameter_id) DO UPDATE SET value_num = EXCLUDED.value_num, value_int = EXCLUDED.value_int, value_text = EXCLUDED.value_text, value_bool = EXCLUDED.value_bool, updated_at = NOW() """, (activity_log_id, tid, vn, vi, vt, vb), ) def fetch_activity_session_metrics(cur, activity_log_id: str) -> List[Dict[str, Any]]: cur.execute( """ SELECT m.id, m.activity_log_id, m.training_parameter_id, m.value_num, m.value_int, m.value_text, m.value_bool, tp.key, tp.data_type, tp.unit FROM activity_session_metrics m JOIN training_parameters tp ON tp.id = m.training_parameter_id WHERE m.activity_log_id = %s ORDER BY tp.key """, (activity_log_id,), ) rows = cur.fetchall() out: List[Dict[str, Any]] = [] for r in rows: dt = r["data_type"] if dt == "integer": val = int(r["value_int"]) if r["value_int"] is not None else None elif dt == "float": val = float(r["value_num"]) if r["value_num"] is not None else None elif dt == "string": val = r["value_text"] else: val = r["value_bool"] out.append( { "training_parameter_id": r["training_parameter_id"], "key": r["key"], "data_type": dt, "unit": r["unit"], "value": val, } ) return out def replace_activity_session_metrics( cur, profile_id: str, activity_log_id: str, metrics: Sequence[Dict[str, Any]], ) -> List[Dict[str, Any]]: """ Full replace of EAV rows for this session. metrics: [{ "parameter_key": str, "value": ... }, ...] """ cur.execute( """ SELECT id, profile_id, training_category, training_type_id FROM activity_log WHERE id = %s """, (activity_log_id,), ) row = cur.fetchone() if not row or str(row["profile_id"]) != str(profile_id): raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden") schema = resolve_activity_attribute_schema( cur, row.get("training_category"), row.get("training_type_id") ) by_key = {s["key"]: s for s in schema} payload_keys = set() for item in metrics: raw_k = item.get("parameter_key") if raw_k is None or not str(raw_k).strip(): raise ActivitySessionMetricsError(400, "parameter_key fehlt") k = str(raw_k).strip() if k not in by_key: raise ActivitySessionMetricsError(400, f"Unbekannter oder nicht zugewiesener Parameter: {k}") payload_keys.add(k) for s in schema: if s["required"] and s["key"] not in payload_keys: raise ActivitySessionMetricsError(400, f"Pflichtfeld fehlt: {s['key']}") cur.execute( "DELETE FROM activity_session_metrics WHERE activity_log_id = %s", (activity_log_id,), ) for item in metrics: k = str(item["parameter_key"]).strip() spec = by_key[k] rules = _validation_rules_dict(spec["validation_rules"]) _validate_single_value(spec["data_type"], item.get("value"), rules) vn, vi, vt, vb = _row_value_tuple(spec["data_type"], item["value"]) cur.execute( """ INSERT INTO activity_session_metrics ( activity_log_id, training_parameter_id, value_num, value_int, value_text, value_bool, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, NOW()) """, (activity_log_id, spec["training_parameter_id"], vn, vi, vt, vb), ) # Kein sync_column_backed nach PUT /metrics: der Request ist maßgeblich für EAV. Ein Spalten-Sync würde # Werte aus nicht mitgeschriebenen activity_log-Spalten wieder verwerfen. return fetch_activity_session_metrics(cur, activity_log_id) def get_activity_session_logical_unit(cur, profile_id: str, activity_log_id: str) -> Dict[str, Any]: cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,)) row = cur.fetchone() if not row or str(row["profile_id"]) != str(profile_id): raise ActivitySessionMetricsError(404, "Aktivität nicht gefunden") header = dict(row) schema = resolve_activity_attribute_schema( cur, header.get("training_category"), header.get("training_type_id") ) metrics = fetch_activity_session_metrics(cur, activity_log_id) by_key = {m["key"]: m for m in metrics} merged_metrics: List[Dict[str, Any]] = list(metrics) for s in schema: k = s["key"] if k in by_key: continue sf = s.get("source_field") if not sf or (isinstance(sf, str) and not str(sf).strip()): continue col = str(sf).strip() if col not in header: continue raw = header.get(col) if raw is None: continue dt = s["data_type"] try: val = _coerce_raw_value_for_parameter(dt, raw) except (TypeError, ValueError): continue merged_metrics.append( { "training_parameter_id": s["training_parameter_id"], "key": k, "data_type": dt, "unit": s.get("unit"), "value": val, } ) merged_metrics.sort(key=lambda x: x["key"]) return { "header": header, "schema": schema, "metrics": merged_metrics, } def enrich_sessions_with_metrics(cur, sessions: List[Dict[str, Any]]) -> None: """Mutates each session dict: adds key 'session_metrics' (list) when sessions non-empty.""" if not sessions: return ids = [str(s["id"]) for s in sessions if s.get("id")] if not ids: return ph = ",".join(["%s"] * len(ids)) cur.execute( f""" SELECT m.activity_log_id, tp.key, tp.data_type, tp.unit, m.value_num, m.value_int, m.value_text, m.value_bool FROM activity_session_metrics m JOIN training_parameters tp ON tp.id = m.training_parameter_id WHERE m.activity_log_id IN ({ph}) ORDER BY m.activity_log_id, tp.key """, ids, ) by_act: Dict[str, List[Dict[str, Any]]] = {} for r in cur.fetchall(): aid = str(r["activity_log_id"]) dt = r["data_type"] if dt == "integer": val = int(r["value_int"]) if r["value_int"] is not None else None elif dt == "float": val = float(r["value_num"]) if r["value_num"] is not None else None elif dt == "string": val = r["value_text"] else: val = r["value_bool"] by_act.setdefault(aid, []).append( {"key": r["key"], "data_type": dt, "unit": r["unit"], "value": val} ) for s in sessions: aid = str(s.get("id")) s["session_metrics"] = by_act.get(aid, [])