diff --git a/.claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md b/.claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md index 604834f..0b8edcc 100644 --- a/.claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md +++ b/.claude/docs/technical/ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md @@ -10,6 +10,14 @@ --- +## 0. CSV-Import & Doppel-EAV (Kanon) + +- Vor Schreibzugriff: **`apply_activity_mapped_column_aliases`** kopiert Werte von `training_parameters.key` auf `source_field`-Spalte, wenn die Spalte leer ist (z. B. `avg_hr` → `hr_avg`). +- **`activity_csv_registry_updates_from_mapped`** ist die **einzige** Quelle für `activity_log`-Kernspalten aus dem Mapping (Keys = `module_registry.activity.fields`); der Executor **liest** keine parallelen `mapped.get("hr_avg")`-Pfade mehr. +- Plausible Zahlen: **`min`/`max`** in den Feld-Specs der Registry (keine HF-speziellen Key-Listen im Executor). +- **`upsert_session_metrics_from_csv_mapped`** schreibt **keine** EAV-Zeilen für Parameter mit gesetztem **`source_field`** (kanonisch `activity_log`). +- **Migration 058:** Entfernt bestehende redundante EAV-Zeilen für alle Parameter mit `source_field`. + ## 1. Produktions-Migrationen (Pflicht) - **Nur additive Änderungen** bis zur Stabilisierung: neue Tabellen/Spalten **nullable**, kein `DROP COLUMN` / `DELETE` von Altbestand in derselben Story. diff --git a/backend/csv_parser/executor.py b/backend/csv_parser/executor.py index 67c78c7..69f1c06 100644 --- a/backend/csv_parser/executor.py +++ b/backend/csv_parser/executor.py @@ -763,23 +763,6 @@ def _import_vitals_baseline( } -def _sf_act(val: Any) -> float | None: - try: - return round(float(val), 1) if val is not None else None - except (TypeError, ValueError): - return None - - -def _activity_hr_bpm(val: Any) -> float | None: - """Plausible Herzfrequenz (Import); größere Werte oft Fehlzuordnung (z. B. Schrittzahl) → NUMERIC-Overflow.""" - v = _sf_act(val) - if v is None: - return None - if v < 20 or v > 280: - return None - return v - - def _looks_like_time_only(s: str) -> bool: t = s.strip() if not t or " " in t: @@ -815,7 +798,10 @@ def _import_activity( run_activity_post_write_hooks_import, update_activity_columns, ) - from data_layer.activity_session_metrics import upsert_session_metrics_from_csv_mapped + from data_layer.activity_session_metrics import ( + apply_activity_mapped_column_aliases, + upsert_session_metrics_from_csv_mapped, + ) rows_total = 0 inserted = 0 @@ -873,19 +859,6 @@ def _import_activity( else: end_str = "" - duration_min = mapped.get("duration_min") - if duration_min is not None: - try: - duration_min = round(float(duration_min), 1) - except (TypeError, ValueError): - duration_min = None - - kcal_a = _sf_act(mapped.get("kcal_active")) - kcal_r = _sf_act(mapped.get("kcal_resting")) - hr_a = _activity_hr_bpm(mapped.get("hr_avg")) - hr_m = _activity_hr_bpm(mapped.get("hr_max")) - dist = _sf_act(mapped.get("distance_km")) - wtype = str(activity_type).strip() iso = date_d.isoformat() _, workout_start_t = normalize_activity_start(start_key) @@ -896,6 +869,8 @@ def _import_activity( training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity( cur, wtype, profile_id ) + mapped = apply_activity_mapped_column_aliases(cur, dict(mapped), training_category, training_type_id) + # Nur Modul-Registry (Zielstruktur) + Mapping — keine parallelen hardcodierten CSV-Schlüssel. registry_updates = activity_csv_registry_updates_from_mapped(mapped) existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t) @@ -904,12 +879,6 @@ def _import_activity( "start_time": workout_start_t, "end_time": end_str or None, "activity_type": wtype, - "duration_min": duration_min, - "kcal_active": kcal_a, - "kcal_resting": kcal_r, - "hr_avg": hr_a, - "hr_max": hr_m, - "distance_km": dist, "training_type_id": training_type_id, "training_category": training_category, "training_subcategory": training_subcategory, @@ -930,12 +899,12 @@ def _import_activity( start_time=workout_start_t, end_time=end_str or None, activity_type=wtype, - duration_min=duration_min, - kcal_active=kcal_a, - kcal_resting=kcal_r, - hr_avg=hr_a, - hr_max=hr_m, - distance_km=dist, + duration_min=registry_updates.get("duration_min"), + kcal_active=registry_updates.get("kcal_active"), + kcal_resting=registry_updates.get("kcal_resting"), + hr_avg=registry_updates.get("hr_avg"), + hr_max=registry_updates.get("hr_max"), + distance_km=registry_updates.get("distance_km"), training_type_id=training_type_id, training_category=training_category, training_subcategory=training_subcategory, @@ -954,12 +923,12 @@ def _import_activity( str(aid), workout_date=iso, training_type_id=training_type_id, - duration_min=duration_min, - hr_avg=hr_a, - hr_max=hr_m, - distance_km=dist, - kcal_active=kcal_a, - kcal_resting=kcal_r, + duration_min=registry_updates.get("duration_min"), + hr_avg=registry_updates.get("hr_avg"), + hr_max=registry_updates.get("hr_max"), + distance_km=registry_updates.get("distance_km"), + kcal_active=registry_updates.get("kcal_active"), + kcal_resting=registry_updates.get("kcal_resting"), ) upsert_session_metrics_from_csv_mapped( cur, diff --git a/backend/csv_parser/module_registry.py b/backend/csv_parser/module_registry.py index 3786327..5564fc2 100644 --- a/backend/csv_parser/module_registry.py +++ b/backend/csv_parser/module_registry.py @@ -65,7 +65,13 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = { "max": 220, "label_de": "Herzfrequenz max (bpm)", }, - "rpe": {"type": "int", "required": False, "label_de": "RPE (1–10)"}, + "rpe": { + "type": "int", + "required": False, + "min": 1, + "max": 10, + "label_de": "RPE (1–10)", + }, "notes": {"type": "string", "required": False, "label_de": "Notiz"}, }, "derive_date_from_datetime_field": "start_time", diff --git a/backend/data_layer/activity_persistence_orchestrator.py b/backend/data_layer/activity_persistence_orchestrator.py index 6085550..46feac4 100644 --- a/backend/data_layer/activity_persistence_orchestrator.py +++ b/backend/data_layer/activity_persistence_orchestrator.py @@ -83,11 +83,20 @@ def activity_csv_registry_updates_from_mapped(mapped: Mapping[str, Any]) -> Dict except (TypeError, ValueError): return None - def _hr(v: Any) -> float | None: - x = _sf(v) - if x is None or x < 20 or x > 280: - return None - return x + def _within_num_bounds(v: float | int, spec: dict, *, as_float: bool) -> bool: + mn = spec.get("min") + mx = spec.get("max") + if mn is not None: + if as_float and v < float(mn): + return False + if not as_float and v < int(mn): + return False + if mx is not None: + if as_float and v > float(mx): + return False + if not as_float and v > int(mx): + return False + return True for key, spec in fields.items(): if key in _ACTIVITY_CSV_REGISTRY_EXCLUDE: @@ -101,11 +110,15 @@ def activity_csv_registry_updates_from_mapped(mapped: Mapping[str, Any]) -> Dict continue typ = spec.get("type", "string") if typ == "float": - v = _hr(raw) if key in ("hr_avg", "hr_max") else _sf(raw) + v = _sf(raw) + if v is not None and not _within_num_bounds(v, spec, as_float=True): + v = None if v is not None: out[key] = v elif typ == "int": v = _si(raw) + if v is not None and not _within_num_bounds(v, spec, as_float=False): + v = None if v is not None: out[key] = v elif typ == "datetime": diff --git a/backend/data_layer/activity_session_metrics.py b/backend/data_layer/activity_session_metrics.py index b5ef19d..8ca9c88 100644 --- a/backend/data_layer/activity_session_metrics.py +++ b/backend/data_layer/activity_session_metrics.py @@ -241,6 +241,43 @@ def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any: raise ValueError(data_type) +def apply_activity_mapped_column_aliases_from_schema( + mapped: Mapping[str, Any], + schema: Sequence[Dict[str, Any]], +) -> Dict[str, Any]: + """ + training_parameters.key weicht oft von activity_log-Spalte ab (z. B. avg_hr → hr_avg). + Kopiert Werte auf die Spalte, wenn die Spalte leer ist, damit CSV/Registry activity_log befüllt. + """ + m = dict(mapped) + for s in schema: + sf = s.get("source_field") + if not sf or not str(sf).strip(): + continue + col = str(sf).strip() + pkey = s["key"] + if pkey == col: + continue + col_v = m.get(col) + if col_v is not None and col_v != "": + continue + pk_v = m.get(pkey) + if pk_v is None or pk_v == "": + continue + m[col] = pk_v + return m + + +def apply_activity_mapped_column_aliases( + cur, + mapped: Mapping[str, Any], + training_category: Optional[str], + training_type_id: Optional[int], +) -> Dict[str, Any]: + schema = resolve_activity_attribute_schema(cur, training_category, training_type_id) + return apply_activity_mapped_column_aliases_from_schema(mapped, schema) + + def upsert_session_metrics_from_csv_mapped( cur, profile_id: str, @@ -250,10 +287,10 @@ def upsert_session_metrics_from_csv_mapped( training_type_id: Optional[int], ) -> None: """ - EAV für Trainingsparameter aus CSV (nur Keys, die nicht im activity-Modul-Registry liegen). + EAV für Trainingsparameter aus CSV (nur Keys ohne activity_log-Spalte / ohne source_field). - Kernfelder (Datum, Start, Distanz, HF, …) schreibt der Executor nach activity_log; - hier keine doppelten EAV-Zeilen für dieselben Registry-Keys. + Parameter mit gesetztem source_field sind kanonisch in activity_log — kein EAV-Schreiben (vermeidet + Doppelung zu avg_hr vs. hr_avg o. Ä.). Keys im activity-CSV-Modul werden ebenfalls übersprungen. """ cur.execute( "SELECT profile_id FROM activity_log WHERE id = %s", @@ -274,6 +311,9 @@ def upsert_session_metrics_from_csv_mapped( continue if pkey in activity_registry_keys: continue + sf_raw = spec.get("source_field") + if sf_raw is not None and str(sf_raw).strip(): + continue tid = spec["training_parameter_id"] dt = spec["data_type"] rules = _validation_rules_dict(spec["validation_rules"]) diff --git a/backend/migrations/058_remove_redundant_eav_for_column_backed_parameters.sql b/backend/migrations/058_remove_redundant_eav_for_column_backed_parameters.sql new file mode 100644 index 0000000..c50b312 --- /dev/null +++ b/backend/migrations/058_remove_redundant_eav_for_column_backed_parameters.sql @@ -0,0 +1,14 @@ +-- Migration 058: EAV-Zeilen entfernen, die nur activity_log-Spalten spiegeln (source_field gesetzt). +-- Kanon: merge_column_backed_and_eav_metrics liest diese Werte aus activity_log; Doppelzeilen vermeiden. +-- Date: 2026-04-15 + +DELETE FROM activity_session_metrics asm +USING training_parameters tp +WHERE asm.training_parameter_id = tp.id + AND tp.source_field IS NOT NULL + AND trim(tp.source_field) <> ''; + +DO $$ +BEGIN + RAISE NOTICE 'Migration 058: removed EAV rows for column-backed training_parameters (source_field set)'; +END $$; diff --git a/backend/tests/test_activity_session_metrics.py b/backend/tests/test_activity_session_metrics.py index b8d8bc2..02dfe2d 100644 --- a/backend/tests/test_activity_session_metrics.py +++ b/backend/tests/test_activity_session_metrics.py @@ -7,10 +7,12 @@ import pytest from data_layer.activity_session_metrics import ( ActivitySessionMetricsError, + apply_activity_mapped_column_aliases_from_schema, enrich_sessions_with_metrics, merge_column_backed_and_eav_metrics, merge_parameter_schema_rows, resolve_activity_attribute_schema, + upsert_session_metrics_from_csv_mapped, _row_value_tuple, _validate_single_value, ) @@ -289,6 +291,73 @@ def test_merge_keeps_eav_only_keys(): assert out[0]["key"] == "custom_param" +def test_apply_mapped_aliases_copies_avg_hr_to_hr_avg(): + schema = [ + { + "key": "avg_hr", + "training_parameter_id": 1, + "source_field": "hr_avg", + "data_type": "integer", + "unit": "bpm", + "validation_rules": {}, + } + ] + out = apply_activity_mapped_column_aliases_from_schema({"avg_hr": 118}, schema) + assert out["avg_hr"] == 118 + assert out["hr_avg"] == 118 + + +def test_apply_mapped_aliases_does_not_overwrite_existing_column(): + schema = [ + { + "key": "avg_hr", + "training_parameter_id": 1, + "source_field": "hr_avg", + "data_type": "integer", + "unit": "bpm", + "validation_rules": {}, + } + ] + out = apply_activity_mapped_column_aliases_from_schema({"avg_hr": 999, "hr_avg": 120}, schema) + assert out["hr_avg"] == 120 + + +@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema") +def test_upsert_csv_skips_parameter_with_source_field(mock_schema): + """Kein INSERT in activity_session_metrics für Spalten-Parameter (avg_hr → hr_avg).""" + mock_schema.return_value = [ + { + "key": "avg_hr", + "training_parameter_id": 42, + "data_type": "integer", + "validation_rules": {"min": 30, "max": 220}, + "source_field": "hr_avg", + } + ] + + class Cur: + def __init__(self): + self.asm_inserts = 0 + + def execute(self, sql, params=None): + if "INSERT INTO activity_session_metrics" in sql: + self.asm_inserts += 1 + + def fetchone(self): + return {"profile_id": "00000000-0000-0000-0000-000000000001"} + + cur = Cur() + upsert_session_metrics_from_csv_mapped( + cur, + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + {"avg_hr": 130}, + "cardio", + 1, + ) + assert cur.asm_inserts == 0 + + def test_merge_eav_primary_falls_back_to_legacy_hr_min_column(): """Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min.""" schema = [