feat: Enhance activity metrics handling and documentation
All checks were successful
Deploy Development / deploy (push) Successful in 1m1s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s

- Updated the README to include new activity production architecture and phases, improving clarity on the development roadmap.
- Enhanced the `ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE` with details on the target architecture and phase plan for production readiness.
- Introduced a new function `merge_column_backed_and_eav_metrics` to streamline the merging of metrics from column-backed and EAV sources, ensuring data integrity and reducing duplication.
- Refactored session metrics handling to eliminate deprecated synchronization methods, improving the overall efficiency of data processing.
- Added unit tests for the new merging logic, ensuring robust validation of metrics handling.
This commit is contained in:
Lars 2026-04-15 16:59:11 +02:00
parent 58ddde6b1e
commit ca8cee990b
9 changed files with 619 additions and 82 deletions

View File

@ -55,6 +55,7 @@ _Dieser Ordner `.claude/docs/` ist per `.gitignore`-Ausnahme **versioniert** (Sp
| Dashboard-Lab-Widgets | `technical/DASHBOARD_WIDGETS_AGENT_GUIDE.md` | Widget-Katalog + Registrierung (siehe Guide) |
| Training Profiler / Resolver | `technical/TRAINING_PROFILE_RESOLVER_LAYER1.md`, `functional/TRAINING_TYPE_PROFILES.md` | Resolver-Module wie im Guide genannt |
| Universal CSV Import | `technical/UNIVERSAL_CSV_IMPORT_AGENT_GUIDE.md` | `backend/csv_parser/`, `routers/csv_import.py`, `routers/admin_csv_templates.py` |
| Aktivität Produktionsreife | `technical/ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md` (+ EAV-Guide) | `backend/data_layer/activity_session_metrics.py`, `activity_metrics.py`, CSV-Orchestrierung |
| Mitgliedschaft / Features | `technical/MEMBERSHIP_SYSTEM.md`, `architecture/FEATURE_ENFORCEMENT.md` | `backend/auth.py`, Feature-Logging, Router mit Enforcement |
---
@ -114,6 +115,8 @@ _Dieser Ordner `.claude/docs/` ist per `.gitignore`-Ausnahme **versioniert** (Sp
| `TRAINING_TYPE_PROFILES_TECHNICAL.md` | Trainingsprofile technisch |
| `UNIVERSAL_CSV_IMPORT_AGENT_GUIDE.md` | Universal CSV: Registry, Executor, Vorlagen, Agent-Checkliste |
| `ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md` | Session-Metriken EAV, Attributprofile, Layer-1, Prod-Migration |
| `ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md` | **Zielarchitektur** Aktivität (Spine/EAV/Composites/Import/Layer 12) + **Phasenplan AF** Produktionsreife |
| *(Code)* `backend/data_layer/activity_data_canon.py` | **Kanon** activity CSV-Modul vs. EAV-primär; Legacy-Lesefallback |
| `V9D_PHASE2_VITALS_SLEEP.md` | v9d Vitalwerte/Schlaf (Release-Bezug) |
---

View File

@ -0,0 +1,171 @@
# Aktivität: Zielarchitektur & Phasenplan (Produktionsreife)
**Stand:** 2026-04-14
**Status:** Normative Zielrichtung für `activity_log`, EAV, Composites, Import, Layer 1/2.
**Ergänzt:** `ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md` (Ist-Modell, APIs, Tests).
---
## 1. Leitprinzipien
| Prinzip | Bedeutung |
|---------|-----------|
| **Layer 1 = Single Source of Truth** | Alle Auswertungen (Charts, Scores, strukturierte Platzhalter) lesen **nur** über `data_layer` (kanonische Funktionen). Keine parallele SQL-Logik in Routern oder im Placeholder-Resolver für Aktivität. |
| **Eine semantische Größe, eine kanonische Quelle** | Kein Dauer-Sync derselben Bedeutung in `activity_log`-Spalte **und** EAV. Übergang: dokumentierte Abschaltung, nicht implizites Driften. |
| **Spine vs. Parameter** | `activity_log` trägt Identität, Zeit, Typ, Notizen, Audit + **heiße** universelle Skalare (siehe §2.2). Alles Typ-/Admin-Dynamische über EAV. |
| **Composites = Archetyp im Code, Konfiguration in der DB** | Struktur (7+2 Archetypen) und Validierung **versioniert im Repo**; Admin **wählt** Archetyp, **benennt** Slots, **bindet** Sportarten, **mappt** CSV → `(parameter_id, slot_key)`. Kein freies JSON-Schema im Admin. |
| **Import explizit** | Jede CSV-Spalte hat ein klares Ziel: Spine-Spalte, skalarer Parameter oder **Slot** eines Composite-Parameters. Typkonvertierung zentral (Executor / Converter), nicht verteilt. |
---
## 2. Zielarchitektur (Gesamtbild)
### 2.1 Schichtenmodell
```
[CSV / UI / API Write]
Orchestrator & Router (Auth, Transaktionen, Feature-Checks)
Persistenz: activity_log (Spine + heiße Skalare) + activity_session_metrics (EAV)
Layer 1: data_layer (activity_session_metrics.py, activity_metrics.py, …)
Layer 2a/2b: Platzhalter-Resolver (Formatierung), Chart-Endpoints (Chart.js-Shapes)
KI / UI / Export
```
- **Orchestrator:** Schreibpfad, Konsistenz nach Write (kein zweites „Lesen der Wahrheit“ neben Layer 1; optional nur Post-Write-Hooks).
- **Resolver:** für Aktivität **kein** direkter DB-Zugriff; nur Aufruf von Layer 1.
### 2.2 `activity_log` (Spine + heiße Skalare)
**Maschinenlesbarer Kanon:** `backend/data_layer/activity_data_canon.py` (`ACTIVITY_MODULE_REGISTRY_FIELD_KEYS`, `ACTIVITY_EAV_PRIMARY_PARAMETER_KEYS`, Legacy-Lesefallback für EAV-primäre Parameter).
**Immer (fachlich minimal + listenfähig):** `id`, `profile_id`, Kalender-/Zeitfenster (`date`, `started_at`/`ended_at`, ggf. `start_time`/`end_time` bis Konsolidierung), `duration_min`, `training_type_id` (+ ggf. denormalisierte Kategorie), Legacy `activity_type`, `notes`, `source`, `created`.
**Heiße Skalare (CSV-Modul + `source_field` nach Migration 057):** u.a. `kcal_active`, `kcal_resting`, `distance_km`, `hr_avg`/`hr_max` (Parameter `avg_hr`/`max_hr`), `duration_min`, `rpe` für Listen und Standard-Aggregate ohne EAV-Join.
**EAV-primär (erweiterte Metriken):** z.B. Kadenz, Pace, Leistung, Höhe, Umgebung — `training_parameters.source_field` = NULL; Import schreibt EAV; bei leerem EAV optional Lesefallback auf bestehende `activity_log`-Spalte (Migration 057 + Merge-Logik).
**Session-Qualität / Auswertungsblob:** z.B. `evaluation`, `quality_label`, `overall_score` **kein** EAV-Parameter-Raster; semantisch „Ergebnis der Einheit“.
**Nicht dauerhaft doppelt:** dieselbe Semantik nicht parallel pflegen; siehe entfallener Spalte→EAV-Schreib-Sync, Lesepfad `merge_column_backed_and_eav_metrics`.
### 2.3 EAV (`activity_session_metrics`)
- **Skalare:** ein `training_parameter`, genau eine `value_*`-Spalte (wie heute).
- **Composites:** ein `training_parameter` pro Composite-Instanz, **ein** gespeichertes Dokument pro Session (serialisiert z.B. in `value_text` als JSON **oder** künftig dedizierte JSONB-Spalte technische Entscheidung in eigener Migration, Vertrag im Archetyp).
- **Merge-/Schema-Logik:** weiterhin zentral in `activity_session_metrics.py` (effektives Schema aus Kategorie + Typ-Overrides).
### 2.4 Composite-Metamodell (Ziel)
**Archetypen (Code, begrenzte Menge):** u.a. Band-/Zonenverteilung, Sequenz-/Übergangsprofil, Intervallblock-, Ereignis-/Aktions-, Kopplungs-/Effizienz-, Modellparameter-Profil; optional Technik-/Zyklus-, Readiness-/Recovery-Profil.
**Pro Archetyp:** feste strukturelle Regeln (erlaubte Slots, Typen, Pflicht/Optional), Validator + Version.
**In der DB (Admin):** Zuordnung „Parameter X hat Archetyp A“, Slot-Labels (DE/EN), Einheiten, Aktivierung pro Sportart/Kategorie, Sortierung.
**Import:** CSV-Spalten → `(training_parameter_id, slot_key)` mit stabilen Keys (`z1_sec`, …), nie nur „Spaltenreihenfolge“.
### 2.5 Universal CSV & Admin
- Vorlagen: Mapping inkl. **Composite-Slots** und Typkonvertierung (vollständige Matrix Ziel).
- UI: Trennung **Kern activity_log** vs. **Parameter/EAV** vs. **Composite-Blöcke** (optisch/UX), um Doppel-Tabellen-Chaos zu vermeiden.
### 2.6 Layer 2 (Platzhalter & Diagramme)
- Datenbezug **nur** Layer 1.
- Registry-Einträge: `data_layer_module` / `data_layer_function` pflegen; Composite-Auswertung ggf. über Hilfsfunktionen, die JSON → normierte Struktur für Prompts/Charts liefern.
---
## 3. Ist → Soll (Kurz)
| Bereich | Ist (typisch) | Soll |
|---------|----------------|------|
| Schreibpfad | Teilweise Doppelhaltung Spalte ↔ EAV, Sync-Hooks | Kanon + gezielte Abschaltung; eine Quelle pro Semantik |
| Lesepfad | Layer 1 wächst; Legacy-Spalten noch relevant | `get_activity_session_logical_unit` / `activity_metrics` als alleinige Wahrheit für Consumer |
| Composites | Noch nicht im Einklang mit EAV-Metamodell | Archetypen + Slot-Admin + ein Dokument pro Parameter/Session |
| Import | Mapping teilweise; Typkonvertierung lückenhaft | Vollständige Konvertierung + Composite-Zusammenbau |
| Resolver | Aktivität sauber über Layer 1 | Profil/Focus ggf. später ebenfalls aus Layer 1 |
---
## 4. Vorgehensmodell (Phasen)
Phasen sind **sequentiell** wo „Abhängigkeit“ steht; Teile können parallel (z.B. UI-Polish) laufen, wenn der Kanon steht.
### Phase A Kanon & Abschaltplan (Grundlage)
**Inhalt:** Schriftliche **Kanon-Tabelle**: pro Messgröße genau eine Quelle (`activity_log` | `eav_scalar` | `eav_composite` | `session_quality`). Liste der Keys, für die **Sync/Spiegelung** endet.
**Definition of Done:** Review im Team; Referenz in diesem Dokument oder Verweis auf Gitea-Kommentar; keine Code-Änderung zwingend.
**Erster konkreter Schritt:** Kanon-Tabelle als Checkliste (Spreadsheet oder Gitea-Issue) **eine Zeile pro Semantik**.
---
### Phase B Lesepfad härten (Layer 1)
**Inhalt:** Sicherstellen, dass **alle** relevanten Consumer (mind. `activity_metrics` für Platzhalter/Charts, Activity-Detail-API) dieselbe Merge-/Fallback-Logik nutzen; Legacy-Spalten nur noch als dokumentierter Fallback bis Enddatum.
**Definition of Done:** Kurze Audit-Liste „Router/Resolver greifen nicht an Aktivität vorbei“; Tests oder manuelle Stichprobe für Detail + ein Chart + 2 Platzhalter.
**Abhängigkeit:** Phase A für „welche Spalten noch Fallback sind“.
---
### Phase C Schreibpfad entschlacken
**Inhalt:** Orchestrierung/CSV: kein Schreiben derselben Semantik an zwei Orten; `sync_column_backed_session_metrics` (o. ä.) **stufig abschalten** oder auf Notfall-Flag; Import schreibt gemäß Kanon.
**Definition of Done:** Deploy auf Prod mit Monitoring; Stichprobe Import + manuelle Bearbeitung; keine Regression in Listenansicht.
**Abhängigkeit:** Phase A + B (sonst Lücken beim Lesen).
---
### Phase D Composite MVP
**Inhalt:** Ein Archetyp end-to-end (z.B. **Band-/Zonenverteilung**): Code-Validator, DB-Binding (Parameter + Slots), Admin-UI minimal, Import **5 Spalten → ein JSON-Dokument** mit festen Keys, Layer-1-Read (Roh + optional `expand_*`).
**Definition of Done:** Eine Sportart/Kategorie befüllbar; Dokumentation des JSON-Vertrags im Repo; pytest für Validator/Zusammenbau wo möglich.
**Abhängigkeit:** Phase A (Kanon „Composites nur als Dokument, nicht doppelt in Spalten“).
---
### Phase E Composite-Ausbau & Typkonvertierung Import
**Inhalt:** Weitere Archetypen nach Priorität; Universal-CSV **vollständige** Typkonvertierung für alle gemappten Ziele; Dialog-/Mapping-Konzept (Kern vs. Parameter vs. Composite).
**Definition of Done:** Matrix „Zieltyp × Converter“ gepflegt; Admin-Flow reviewt.
---
### Phase F Produktionshärtung
**Inhalt:** Performance-Indizes bei Bedarf; Observability (Import-Fehler, Validierungs-Fails); Resolver/Profil optional komplett ohne `get_db` für domänische Daten; Doku + Gitea-Issues geschlossen/aktualisiert.
---
## 5. Was zuerst?
**Sofort (nächster Schritt):** **Phase A Kanon-Tabelle** (eine Semantik pro Zeile, eine Quelle). Ohne diese Entscheidung riskieren Phase B/C falsche Abschaltungen.
Direkt danach: **Phase B** (Lesepfad), dann **Phase C** (Schreibpfad), dann **Phase D** (ein Composite-MVP).
---
## 6. Referenzen
- `ACTIVITY_SESSION_METRICS_EAV_AGENT_GUIDE.md` Tabellen, APIs, Tests, Backfill-Hinweise
- `UNIVERSAL_CSV_IMPORT_AGENT_GUIDE.md` Executor, Vorlagen, Typen
- `PLACEHOLDER_REGISTRY_FRAMEWORK.md` Layer-2-Registrierung
- `functional/DATA_ARCHITECTURE.md` fachliche Datenarchitektur (Querschnitt)
---
**Version:** 1.0 · Bei Meilensteinen Phasen AF hier Status-Zeile ergänzen (Datum + kurz „erledigt/in Arbeit“).

View File

@ -4,6 +4,10 @@
**Status:** Kern-Backend (Migration 054, Layer 1, Admin- & Nutzer-API) umgesetzt; Admin-UI & CSV-Mapping folgen.
**Ziel:** Sportspezifische **Attributprofile** (Kategorie + optional Trainingstyp-Override) administrierbar; Messwerte pro Session in **EAV**; **alle Auswertungen** sollen künftig über **Layer 1** (`data_layer`) laufen.
**Zielarchitektur, Phasenplan (Produktionsreife):** [`ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md`](./ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md) Kanon `activity_log`/EAV, Composites, Import, Layer 1/2, Reihenfolge AF.
**Kanon (Code):** `backend/data_layer/activity_data_canon.py` (Repo-Root) — CSV-Modul `activity` vs. EAV-primär; Migration **057**.
---
## 1. Produktions-Migrationen (Pflicht)
@ -41,7 +45,9 @@
| Modul | Pfad | Aufgabe |
|-------|------|---------|
| Session-Metriken & Schema | `backend/data_layer/activity_session_metrics.py` | `resolve_activity_attribute_schema`, `fetch_activity_session_metrics`, `replace_activity_session_metrics`, `get_activity_session_logical_unit`, `enrich_sessions_with_metrics`. |
| Session-Metriken & Schema | `backend/data_layer/activity_session_metrics.py` | `resolve_activity_attribute_schema`, `fetch_activity_session_metrics`, `replace_activity_session_metrics`, `get_activity_session_logical_unit`, `enrich_sessions_with_metrics`, `merge_column_backed_and_eav_metrics`. |
**Spalten vs. EAV (Lesepfad):** `merge_column_backed_and_eav_metrics` / `get_activity_session_logical_unit` / `enrich_sessions_with_metrics` werten Parameter mit `source_field` **primär aus `activity_log`** aus; EAV ist Fallback (z.B. Legacy) oder für Parameter ohne Spalte. **Kein** automatischer Spalte→EAV-Schreib-Sync mehr in `run_activity_post_write_hooks` / Import-Hooks (vermeidet Doppelhaltung).
**Regeln für Agenten:**
@ -81,10 +87,21 @@ Router: `backend/routers/admin_training_parameters.py`, `backend/routers/admin_a
## 5. Agent-Checkliste (nächste Iterationen)
Siehe **Phasen AF** in [`ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md`](./ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md). Kurz:
- [ ] **Phase A:** Kanon-Tabelle (eine Quelle pro Semantik).
- [ ] **Phase B:** Lesepfad Layer 1 härten (Consumer-Audit).
- [ ] **Phase C:** Schreibpfad: Doppelhaltung / Sync stufenweise abschalten.
- [ ] **Phase D:** Composite-MVP (ein Archetyp E2E).
- [ ] **Phase E:** Archetypen ausbauen + CSV-Typkonvertierung vollständig + Mapping-UX.
- [ ] **Phase F:** Härtung Prod (Indizes, Observability, Doku).
Legacy-Punkte:
- [x] Admin-UI: `frontend/src/pages/AdminActivityAttributeProfilesPage.jsx`, Route `/admin/activity-attribute-profiles`, Admin-Nav-Gruppe „Trainingstypen“.
- [x] `/activity` Frontend: Bearbeiten lädt `GET /api/activity/{id}`, dynamische Felder + `PUT /api/activity/{id}/metrics`.
- [ ] Universal CSV: Mapping-Spalten → `training_parameters.key` + Schreiben in EAV (Executor).
- [ ] Optional: Backfill `activity_log.*``activity_session_metrics` nach `source_field`.
- [ ] Universal CSV: Mapping inkl. EAV/Composite-Ziele + Executor (fortlaufend).
- [ ] Optional: Backfill / Abschluss `source_field`-Pfad nach Kanon (Phase A/C).
- [ ] Dedupe Polar/Apple: nach stabilen `started_at`/`ended_at` + Policy (eigenes Issue).
---

View File

@ -34,6 +34,8 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
},
},
},
# Kanon: nur Kern/spine + „heiße“ Metriken → activity_log. Erweiterte Parameter → training_parameters / EAV
# (siehe backend/data_layer/activity_data_canon.py).
"activity": {
"table": "activity_log",
"fields": {
@ -63,16 +65,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
"max": 220,
"label_de": "Herzfrequenz max (bpm)",
},
"hr_min": {"type": "int", "required": False, "label_de": "Herzfrequenz min (bpm)"},
"rpe": {"type": "int", "required": False, "label_de": "RPE (110)"},
"pace_min_per_km": {"type": "float", "required": False, "label_de": "Tempo (min/km)"},
"cadence": {"type": "int", "required": False, "label_de": "Kadenz"},
"avg_power": {"type": "int", "required": False, "label_de": "Leistung Ø (W)"},
"elevation_gain": {"type": "int", "required": False, "label_de": "Höhenmeter / Aufstieg"},
"temperature_celsius": {"type": "float", "required": False, "label_de": "Temperatur (°C)"},
"humidity_percent": {"type": "int", "required": False, "label_de": "Luftfeuchtigkeit (%)"},
"avg_hr_percent": {"type": "float", "required": False, "label_de": "HF Ø (% von max)"},
"kcal_per_km": {"type": "float", "required": False, "label_de": "Kalorien pro km"},
"notes": {"type": "string", "required": False, "label_de": "Notiz"},
},
"derive_date_from_datetime_field": "start_time",

View File

@ -0,0 +1,78 @@
"""
Kanonische Aufteilung activity_log vs. EAV für Aktivitätssessions.
Single Source für: welche Felder das CSV-/Registry-Modul activity direkt in activity_log schreibt,
und welche training_parameters primär über EAV laufen (mit optionalem Lesefallback auf Legacy-Spalten).
Normative Doku: .claude/docs/technical/ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md
"""
from __future__ import annotations
from typing import Dict, Final
# ── activity_log: Modul „activity“ (Universal-CSV-Kern) ───────────────────────
# Nur diese Keys erscheinen in csv_parser.module_registry MODULE_DEFINITIONS["activity"].fields.
# Alles Weitere: training_parameters + EAV (Import über upsert_session_metrics_from_csv_mapped).
ACTIVITY_MODULE_REGISTRY_FIELD_KEYS: Final[frozenset[str]] = frozenset(
{
"date",
"start_time",
"end_time",
"activity_type",
"duration_min",
"kcal_active",
"kcal_resting",
"distance_km",
"hr_avg",
"hr_max",
"rpe",
"notes",
}
)
# Parameter-Keys (training_parameters.key), die primär in EAV geführt werden; source_field nach Migration 057 NULL.
# Lesefallback: activity_log-Spalte unter ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM, falls EAV leer.
ACTIVITY_EAV_PRIMARY_PARAMETER_KEYS: Final[frozenset[str]] = frozenset(
{
"min_hr",
"pace_min_per_km",
"cadence",
"avg_power",
"elevation_gain",
"temperature_celsius",
"humidity_percent",
"avg_hr_percent",
"kcal_per_km",
}
)
# Spaltenname activity_log für Legacy-Lesefallback (Merge), wenn EAV für den Parameter fehlt.
ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM: Final[Dict[str, str]] = {
"min_hr": "hr_min",
"pace_min_per_km": "pace_min_per_km",
"cadence": "cadence",
"avg_power": "avg_power",
"elevation_gain": "elevation_gain",
"temperature_celsius": "temperature_celsius",
"humidity_percent": "humidity_percent",
"avg_hr_percent": "avg_hr_percent",
"kcal_per_km": "kcal_per_km",
}
# Spalten, die mit training_parameters.source_field (nach Migration 057) noch activity_log abbilden.
# Erweiterte Metriken sind EAV-primär — nicht hier auflisten.
ACTIVITY_LOG_PATCHABLE_COLUMNS: Final[frozenset[str]] = frozenset(
{
"start_time",
"end_time",
"activity_type",
"duration_min",
"kcal_active",
"kcal_resting",
"hr_avg",
"hr_max",
"distance_km",
"rpe",
"notes",
}
)

View File

@ -15,7 +15,6 @@ from typing import Any, Dict, List, Mapping, Optional
from models import ActivityEntry
from csv_parser.module_registry import get_module_definition
from data_layer.activity_session_metrics import sync_column_backed_session_metrics
logger = logging.getLogger(__name__)
@ -248,7 +247,7 @@ def insert_activity_csv_minimal(
def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None:
"""Auto-Eval (falls aktiv) + EAV-Spiegel aus activity_log-Spalten."""
"""Auto-Eval (falls aktiv). Kein Spalte→EAV-Sync: Lesepfad merge_column_backed_and_eav_metrics."""
if _EVALUATION_AVAILABLE and _evaluate_and_save_activity:
cur.execute(
"""
@ -269,7 +268,6 @@ def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None:
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
except Exception as eval_error:
logger.error("[AUTO-EVAL] activity %s: %s", eid, eval_error)
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def run_activity_post_write_hooks_import(
@ -286,7 +284,7 @@ def run_activity_post_write_hooks_import(
kcal_active: Any,
kcal_resting: Any,
) -> None:
"""Eval + EAV nach Legacy-Import mit vorgebautem Kontext-Dict."""
"""Auto-Eval nach Import. Kein Spalte→EAV-Sync (siehe run_activity_post_write_hooks)."""
if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity:
try:
activity_dict = {
@ -308,7 +306,6 @@ def run_activity_post_write_hooks_import(
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
except Exception as eval_err:
logger.warning("[activity import] Auto-Eval fehlgeschlagen: %s", eval_err)
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def merge_activity_csv_module_fields(

View File

@ -10,36 +10,10 @@ from decimal import Decimal
from typing import Any, Dict, List, Mapping, Optional, Sequence
from csv_parser.module_registry import get_module_definition
from data_layer.activity_data_canon import ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM
logger = logging.getLogger(__name__)
# activity_log-Spalten, die per training_parameters.source_field aus CSV (Parameter-Key) befüllt werden dürfen.
# Muss mit sync_column_backed_session_metrics übereinstimmen (inkl. Kernmetriken wie hr_avg).
ACTIVITY_LOG_PATCHABLE_COLUMNS = frozenset(
{
"start_time",
"end_time",
"activity_type",
"duration_min",
"kcal_active",
"kcal_resting",
"hr_avg",
"hr_max",
"hr_min",
"distance_km",
"rpe",
"pace_min_per_km",
"cadence",
"avg_power",
"elevation_gain",
"temperature_celsius",
"humidity_percent",
"avg_hr_percent",
"kcal_per_km",
"notes",
}
)
# Diese Spalten nicht aus CSV-Parameter-Zuordnung überschreiben (kommen aus Typ-Mapping / System).
ACTIVITY_LOG_PATCH_FORBIDDEN = frozenset(
{
@ -328,13 +302,87 @@ def upsert_session_metrics_from_csv_mapped(
)
def merge_column_backed_and_eav_metrics(
header: Mapping[str, Any],
schema: Sequence[Dict[str, Any]],
eav_metrics: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Effektive Metrikliste: Pro Schema-Parameter mit source_field gilt activity_log als kanonisch, wenn
die Spalte befüllt und koerzierbar ist; sonst Fallback EAV. Reine EAV-Parameter (ohne Spalte oder
leere Spalte) kommen aus EAV. Verhindert doppelte Semantik ohne Schreib-Sync.
"""
eav_by_key = {m["key"]: m for m in eav_metrics}
merged: List[Dict[str, Any]] = []
keys_handled: set[str] = set()
for s in schema:
k = s["key"]
tid = s["training_parameter_id"]
dt = s["data_type"]
unit = s.get("unit")
sf = s.get("source_field")
used_column = False
if sf and isinstance(sf, str) and str(sf).strip():
col = str(sf).strip()
if col in header and header[col] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[col])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
}
)
used_column = True
keys_handled.add(k)
except (TypeError, ValueError):
pass
if used_column:
continue
if k in eav_by_key:
merged.append(dict(eav_by_key[k]))
keys_handled.add(k)
continue
legacy_col = ACTIVITY_LOG_LEGACY_COLUMN_FOR_EAV_PRIMARY_PARAM.get(k)
if legacy_col and legacy_col in header and header[legacy_col] is not None:
try:
val = _coerce_raw_value_for_parameter(dt, header[legacy_col])
merged.append(
{
"training_parameter_id": tid,
"key": k,
"data_type": dt,
"unit": unit,
"value": val,
}
)
keys_handled.add(k)
except (TypeError, ValueError):
pass
for m in eav_metrics:
if m["key"] in keys_handled:
continue
merged.append(dict(m))
merged.sort(key=lambda x: x["key"])
return merged
def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None:
"""
EAV-Zeilen für alle Schema-Parameter mit gesetztem source_field aus der activity_log-Zeile
schreiben (Upsert) bzw. bei NULL in der Quellspalte löschen. Reine Layer-1-Logik; keine Router-Abhängigkeit.
[Veraltet / nicht mehr in Schreibpfaden aufgerufen]
Synchron mit Übergangsphase: activity_log bleibt kanonisch für klassische Spalten; EAV spiegelt dieselben
Werte für Profil/Platzhalter/Detail-API, ohne replace_activity_session_metrics aufzurufen.
Früher: EAV spiegelte activity_log-Spalten für Parameter mit source_field.
Kanon: Spaltenwerte werden bei merge_column_backed_and_eav_metrics beim Lesen berücksichtigt; keine
doppelte Speicherung. Funktion bleibt für optionale Admin-/Reparatur-Skripte.
"""
cur.execute("SELECT * FROM activity_log WHERE id = %s", (activity_log_id,))
row = cur.fetchone()
@ -527,36 +575,7 @@ def get_activity_session_logical_unit(cur, profile_id: str, activity_log_id: str
cur, header.get("training_category"), header.get("training_type_id")
)
metrics = fetch_activity_session_metrics(cur, activity_log_id)
by_key = {m["key"]: m for m in metrics}
merged_metrics: List[Dict[str, Any]] = list(metrics)
for s in schema:
k = s["key"]
if k in by_key:
continue
sf = s.get("source_field")
if not sf or (isinstance(sf, str) and not str(sf).strip()):
continue
col = str(sf).strip()
if col not in header:
continue
raw = header.get(col)
if raw is None:
continue
dt = s["data_type"]
try:
val = _coerce_raw_value_for_parameter(dt, raw)
except (TypeError, ValueError):
continue
merged_metrics.append(
{
"training_parameter_id": s["training_parameter_id"],
"key": k,
"data_type": dt,
"unit": s.get("unit"),
"value": val,
}
)
merged_metrics.sort(key=lambda x: x["key"])
merged_metrics = merge_column_backed_and_eav_metrics(header, schema, metrics)
return {
"header": header,
"schema": schema,
@ -565,17 +584,33 @@ def get_activity_session_logical_unit(cur, profile_id: str, activity_log_id: str
def enrich_sessions_with_metrics(cur, sessions: List[Dict[str, Any]]) -> None:
"""Mutates each session dict: adds key 'session_metrics' (list) when sessions non-empty."""
"""
Mutates each session dict: adds key 'session_metrics' (list).
Kombiniert EAV mit activity_log-Spalten für Parameter mit source_field (kanonisch: Spalte),
analog zu get_activity_session_logical_unit ohne doppelte EAV-Speicherung beim Import.
"""
if not sessions:
return
ids = [str(s["id"]) for s in sessions if s.get("id")]
if not ids:
return
ph = ",".join(["%s"] * len(ids))
cur.execute(
f"SELECT * FROM activity_log WHERE id IN ({ph})",
ids,
)
headers_by_id: Dict[str, Dict[str, Any]] = {}
for r in cur.fetchall():
h = dict(r)
headers_by_id[str(h["id"])] = h
cur.execute(
f"""
SELECT
m.activity_log_id,
m.training_parameter_id,
tp.key,
tp.data_type,
tp.unit,
@ -603,8 +638,33 @@ def enrich_sessions_with_metrics(cur, sessions: List[Dict[str, Any]]) -> None:
else:
val = r["value_bool"]
by_act.setdefault(aid, []).append(
{"key": r["key"], "data_type": dt, "unit": r["unit"], "value": val}
{
"training_parameter_id": r["training_parameter_id"],
"key": r["key"],
"data_type": dt,
"unit": r["unit"],
"value": val,
}
)
schema_cache: Dict[tuple[Any, Any], List[Dict[str, Any]]] = {}
def _schema(cat: Any, tid: Any) -> List[Dict[str, Any]]:
cache_key = (cat, tid)
if cache_key not in schema_cache:
schema_cache[cache_key] = resolve_activity_attribute_schema(cur, cat, tid)
return schema_cache[cache_key]
for s in sessions:
aid = str(s.get("id"))
s["session_metrics"] = by_act.get(aid, [])
header = headers_by_id.get(aid)
if not header:
s["session_metrics"] = []
continue
schema = _schema(header.get("training_category"), header.get("training_type_id"))
eav_list = by_act.get(aid, [])
merged = merge_column_backed_and_eav_metrics(header, schema, eav_list)
s["session_metrics"] = [
{"key": m["key"], "data_type": m["data_type"], "unit": m["unit"], "value": m["value"]}
for m in merged
]

View File

@ -0,0 +1,115 @@
-- Migration 057: Kanon EAV-primär für erweiterte Trainingsmetriken
-- Date: 2026-04-15
-- activity_log-Spalten bleiben erhalten (Lesefallback / API); training_parameters.source_field
-- wird für diese Keys entfernt. Idempotenter EAV-Backfill aus Spalten (wie 055), dann source_field NULL.
-- Siehe: backend/data_layer/activity_data_canon.py
-- min_hr (Spalte hr_min)
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, NULL, a.hr_min, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'min_hr' AND tp.is_active = true
WHERE a.hr_min IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, a.pace_min_per_km::double precision, NULL, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'pace_min_per_km' AND tp.is_active = true
WHERE a.pace_min_per_km IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, NULL, a.cadence, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'cadence' AND tp.is_active = true
WHERE a.cadence IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, NULL, a.avg_power, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'avg_power' AND tp.is_active = true
WHERE a.avg_power IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, NULL, a.elevation_gain, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'elevation_gain' AND tp.is_active = true
WHERE a.elevation_gain IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, a.temperature_celsius::double precision, NULL, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'temperature_celsius' AND tp.is_active = true
WHERE a.temperature_celsius IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, NULL, a.humidity_percent, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'humidity_percent' AND tp.is_active = true
WHERE a.humidity_percent IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, a.avg_hr_percent::double precision, NULL, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'avg_hr_percent' AND tp.is_active = true
WHERE a.avg_hr_percent IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
)
SELECT a.id, tp.id, a.kcal_per_km::double precision, NULL, NULL, NULL, NOW()
FROM activity_log a
JOIN training_parameters tp ON tp.key = 'kcal_per_km' AND tp.is_active = true
WHERE a.kcal_per_km IS NOT NULL
ON CONFLICT (activity_log_id, training_parameter_id) DO NOTHING;
UPDATE training_parameters
SET source_field = NULL
WHERE key IN (
'min_hr',
'pace_min_per_km',
'cadence',
'avg_power',
'elevation_gain',
'temperature_celsius',
'humidity_percent',
'avg_hr_percent',
'kcal_per_km'
);
DO $$
BEGIN
RAISE NOTICE 'Migration 057: EAV-primary canon — backfill + source_field cleared for extended metrics';
END $$;

View File

@ -1,12 +1,14 @@
"""Unit tests for data_layer.activity_session_metrics (no DB for most cases)."""
import uuid
from unittest.mock import patch
import pytest
from data_layer.activity_session_metrics import (
ActivitySessionMetricsError,
enrich_sessions_with_metrics,
merge_column_backed_and_eav_metrics,
merge_parameter_schema_rows,
resolve_activity_attribute_schema,
_row_value_tuple,
@ -171,22 +173,39 @@ def test_resolve_loads_category_from_training_type_id():
assert cur.executes[0][1] == (42,)
def test_enrich_sessions_batch():
@patch("data_layer.activity_session_metrics.resolve_activity_attribute_schema", return_value=[])
def test_enrich_sessions_batch(mock_resolve):
aid = str(uuid.uuid4())
bid = str(uuid.uuid4())
class _Cur:
def __init__(self):
self.params = None
self._fetch_n = 0
def execute(self, sql, params=None):
self.sql = sql
self.params = params
def fetchall(self):
self._fetch_n += 1
if self._fetch_n == 1:
return [
{
"id": uuid.UUID(aid),
"training_category": None,
"training_type_id": None,
},
{
"id": uuid.UUID(bid),
"training_category": None,
"training_type_id": None,
},
]
return [
{
"activity_log_id": uuid.UUID(aid),
"training_parameter_id": 3,
"key": "rpe",
"data_type": "integer",
"unit": None,
@ -202,3 +221,87 @@ def test_enrich_sessions_batch():
assert sessions[0]["session_metrics"][0]["value"] == 7
assert sessions[0]["session_metrics"][0]["key"] == "rpe"
assert sessions[1]["session_metrics"] == []
def test_merge_column_backed_prefers_column_over_stale_eav():
schema = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"validation_rules": {},
"source_field": "hr_avg",
}
]
eav = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"value": 99.0,
}
]
out = merge_column_backed_and_eav_metrics({"hr_avg": 140.0}, schema, eav)
assert len(out) == 1
assert out[0]["value"] == 140.0
def test_merge_falls_back_to_eav_when_column_empty():
schema = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"validation_rules": {},
"source_field": "hr_avg",
}
]
eav = [
{
"training_parameter_id": 1,
"key": "hr_avg",
"data_type": "float",
"unit": "bpm",
"value": 99.0,
}
]
out = merge_column_backed_and_eav_metrics({"hr_avg": None}, schema, eav)
assert len(out) == 1
assert out[0]["value"] == 99.0
def test_merge_keeps_eav_only_keys():
schema = []
eav = [
{
"training_parameter_id": 2,
"key": "custom_param",
"data_type": "string",
"unit": None,
"value": "x",
}
]
out = merge_column_backed_and_eav_metrics({}, schema, eav)
assert len(out) == 1
assert out[0]["key"] == "custom_param"
def test_merge_eav_primary_falls_back_to_legacy_hr_min_column():
"""Kanon: min_hr ohne source_field / ohne EAV — Lesefallback Spalte hr_min."""
schema = [
{
"training_parameter_id": 9,
"key": "min_hr",
"data_type": "integer",
"unit": "bpm",
"validation_rules": {},
"source_field": None,
}
]
out = merge_column_backed_and_eav_metrics({"hr_min": 88}, schema, [])
assert len(out) == 1
assert out[0]["key"] == "min_hr"
assert out[0]["value"] == 88