- Enhanced the `merge_column_backed_and_eav_metrics` function to ensure that when both legacy columns and EAV values are present, the legacy column takes precedence. - Revised documentation in `ACTIVITY_PRODUCTION_ARCHITECTURE_AND_PHASES.md` and `ACTIVITY_SCALAR_KANON_TABLE.md` to reflect the new reading logic and clarify the handling of metrics. - Updated the `activity_data_canon.py` to specify the new merge behavior, ensuring consistency in data retrieval. - Added unit tests to validate the new logic, confirming that legacy columns are preferred when available.
407 lines
13 KiB
Python
407 lines
13 KiB
Python
"""
|
|
Zentrale Persistenz für activity_log + EAV-Nebenwirkungen (Eval).
|
|
|
|
Alle Schreibpfade (REST, Universal-CSV, Legacy-Upload) laufen hier zusammen.
|
|
|
|
Feld-Katalog für CSV-Mappings: get_mappable_activity_field_catalog()
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import logging
|
|
import uuid
|
|
from typing import Any, Dict, List, Mapping, Optional
|
|
|
|
from models import ActivityEntry
|
|
|
|
from csv_parser.module_registry import get_module_definition
|
|
from data_layer.activity_data_canon import get_activity_module_registry_field_keys
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity
|
|
|
|
_EVALUATION_AVAILABLE = True
|
|
except Exception: # pragma: no cover
|
|
_evaluate_and_save_activity = None
|
|
_EVALUATION_AVAILABLE = False
|
|
|
|
|
|
def find_activity_duplicate_id(
|
|
cur,
|
|
profile_id: str,
|
|
date_iso: str,
|
|
start_time: Optional[Any],
|
|
) -> Optional[str]:
|
|
cur.execute(
|
|
"""
|
|
SELECT id FROM activity_log
|
|
WHERE profile_id = %s AND date = %s::date
|
|
AND start_time IS NOT DISTINCT FROM %s::time
|
|
""",
|
|
(profile_id, date_iso, start_time),
|
|
)
|
|
row = cur.fetchone()
|
|
return str(row["id"]) if row else None
|
|
|
|
|
|
# Datum/Start/Ende/Typ setzt der CSV-Executor explizit (Normalisierung); nicht aus diesem Patch überschreiben.
|
|
_ACTIVITY_CSV_REGISTRY_EXCLUDE = frozenset({"date", "start_time", "end_time", "activity_type"})
|
|
|
|
|
|
def activity_registry_field_keys() -> frozenset[str]:
|
|
"""Gleiche Menge wie ``ACTIVITY_MODULE_REGISTRY_FIELD_KEYS`` (Registry als Single Source)."""
|
|
return get_activity_module_registry_field_keys()
|
|
|
|
|
|
def activity_csv_registry_updates_from_mapped(mapped: Mapping[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
activity_log-Updates nur aus Modul-Registry-Feldern (Kernspalten).
|
|
Trainingsparameter-Keys (nur in training_parameters) laufen über EAV, nicht hier.
|
|
"""
|
|
mod = get_module_definition("activity")
|
|
if not mod:
|
|
return {}
|
|
fields = mod.get("fields") or {}
|
|
out: Dict[str, Any] = {}
|
|
|
|
def _sf(v: Any) -> float | None:
|
|
try:
|
|
if v is None or (isinstance(v, str) and not str(v).strip()):
|
|
return None
|
|
return round(float(v), 1)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
def _si(v: Any) -> int | None:
|
|
try:
|
|
if v is None or (isinstance(v, str) and not str(v).strip()):
|
|
return None
|
|
return int(round(float(v)))
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
def _hr(v: Any) -> float | None:
|
|
x = _sf(v)
|
|
if x is None or x < 20 or x > 280:
|
|
return None
|
|
return x
|
|
|
|
for key, spec in fields.items():
|
|
if key in _ACTIVITY_CSV_REGISTRY_EXCLUDE:
|
|
continue
|
|
if key not in mapped:
|
|
continue
|
|
raw = mapped[key]
|
|
if raw is None or raw == "":
|
|
continue
|
|
if isinstance(raw, str) and not raw.strip():
|
|
continue
|
|
typ = spec.get("type", "string")
|
|
if typ == "float":
|
|
v = _hr(raw) if key in ("hr_avg", "hr_max") else _sf(raw)
|
|
if v is not None:
|
|
out[key] = v
|
|
elif typ == "int":
|
|
v = _si(raw)
|
|
if v is not None:
|
|
out[key] = v
|
|
elif typ == "datetime":
|
|
if isinstance(raw, dt.datetime):
|
|
out[key] = raw.strftime("%Y-%m-%d %H:%M:%S")
|
|
elif isinstance(raw, dt.date):
|
|
out[key] = f"{raw.isoformat()} 00:00:00"
|
|
elif isinstance(raw, str) and raw.strip():
|
|
out[key] = raw.strip()
|
|
elif typ == "date":
|
|
if isinstance(raw, dt.date):
|
|
out[key] = raw.isoformat()
|
|
elif isinstance(raw, dt.datetime):
|
|
out[key] = raw.date().isoformat()
|
|
elif isinstance(raw, str) and raw.strip():
|
|
out[key] = raw.strip()
|
|
else:
|
|
out[key] = str(raw).strip()
|
|
|
|
return out
|
|
|
|
|
|
def insert_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
|
|
"""INSERT activity_log aus ActivityEntry (manueller API-Pfad)."""
|
|
d = e.model_dump()
|
|
cur.execute(
|
|
"""INSERT INTO activity_log (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting,
|
|
hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain,
|
|
temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes,
|
|
training_type_id,training_category,training_subcategory,created)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""",
|
|
(
|
|
eid,
|
|
profile_id,
|
|
d["date"],
|
|
d["start_time"],
|
|
d["end_time"],
|
|
d["activity_type"],
|
|
d["duration_min"],
|
|
d["kcal_active"],
|
|
d["kcal_resting"],
|
|
d["hr_avg"],
|
|
d["hr_max"],
|
|
d.get("hr_min"),
|
|
d["distance_km"],
|
|
d.get("pace_min_per_km"),
|
|
d.get("cadence"),
|
|
d.get("avg_power"),
|
|
d.get("elevation_gain"),
|
|
d.get("temperature_celsius"),
|
|
d.get("humidity_percent"),
|
|
d.get("avg_hr_percent"),
|
|
d.get("kcal_per_km"),
|
|
d["rpe"],
|
|
d["source"],
|
|
d["notes"],
|
|
d.get("training_type_id"),
|
|
d.get("training_category"),
|
|
d.get("training_subcategory"),
|
|
),
|
|
)
|
|
|
|
|
|
def update_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
|
|
"""Volles UPDATE aus ActivityEntry (REST PUT)."""
|
|
d = e.model_dump()
|
|
cur.execute(
|
|
f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s",
|
|
list(d.values()) + [eid, profile_id],
|
|
)
|
|
|
|
|
|
def update_activity_columns(
|
|
cur,
|
|
profile_id: str,
|
|
eid: str,
|
|
updates: Dict[str, Any],
|
|
) -> None:
|
|
"""Teil-UPDATE nur für übergebene Spalten (Importe)."""
|
|
if not updates:
|
|
return
|
|
cols = [f"{k} = %s" for k in updates]
|
|
vals = list(updates.values()) + [eid, profile_id]
|
|
cur.execute(
|
|
f"UPDATE activity_log SET {', '.join(cols)} WHERE id = %s AND profile_id = %s",
|
|
vals,
|
|
)
|
|
|
|
|
|
def insert_activity_csv_minimal(
|
|
cur,
|
|
profile_id: str,
|
|
eid: str,
|
|
*,
|
|
date_iso: str,
|
|
start_time: Any,
|
|
end_time: Any,
|
|
activity_type: str,
|
|
duration_min: Any,
|
|
kcal_active: Any,
|
|
kcal_resting: Any,
|
|
hr_avg: Any,
|
|
hr_max: Any,
|
|
distance_km: Any,
|
|
training_type_id: Any,
|
|
training_category: Any,
|
|
training_subcategory: Any,
|
|
source: str,
|
|
) -> None:
|
|
"""INSERT minimale activity_log-Zeile (Universal-CSV)."""
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO activity_log (
|
|
id, profile_id, date, start_time, end_time, activity_type, duration_min,
|
|
kcal_active, kcal_resting, hr_avg, hr_max, distance_km,
|
|
source, training_type_id, training_category, training_subcategory, created
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)
|
|
""",
|
|
(
|
|
eid,
|
|
profile_id,
|
|
date_iso,
|
|
start_time,
|
|
end_time,
|
|
activity_type,
|
|
duration_min,
|
|
kcal_active,
|
|
kcal_resting,
|
|
hr_avg,
|
|
hr_max,
|
|
distance_km,
|
|
source,
|
|
training_type_id,
|
|
training_category,
|
|
training_subcategory,
|
|
),
|
|
)
|
|
|
|
|
|
def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None:
|
|
"""Auto-Eval (falls aktiv). Kein Spalte→EAV-Sync: Lesepfad merge_column_backed_and_eav_metrics."""
|
|
if _EVALUATION_AVAILABLE and _evaluate_and_save_activity:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, profile_id, date, training_type_id, duration_min,
|
|
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
|
|
rpe, pace_min_per_km, cadence, elevation_gain
|
|
FROM activity_log
|
|
WHERE id = %s
|
|
""",
|
|
(eid,),
|
|
)
|
|
activity_row = cur.fetchone()
|
|
if activity_row:
|
|
activity_dict = dict(activity_row)
|
|
training_type_id = activity_dict.get("training_type_id")
|
|
if training_type_id:
|
|
try:
|
|
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
|
|
except Exception as eval_error:
|
|
logger.error("[AUTO-EVAL] activity %s: %s", eid, eval_error)
|
|
|
|
|
|
def run_activity_post_write_hooks_import(
|
|
cur,
|
|
profile_id: str,
|
|
eid: str,
|
|
*,
|
|
workout_date: str,
|
|
training_type_id: Optional[int],
|
|
duration_min: Any,
|
|
hr_avg: Any,
|
|
hr_max: Any,
|
|
distance_km: Any,
|
|
kcal_active: Any,
|
|
kcal_resting: Any,
|
|
) -> None:
|
|
"""Auto-Eval nach Import. Kein Spalte→EAV-Sync (siehe run_activity_post_write_hooks)."""
|
|
if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity:
|
|
try:
|
|
activity_dict = {
|
|
"id": eid,
|
|
"profile_id": profile_id,
|
|
"date": workout_date,
|
|
"training_type_id": training_type_id,
|
|
"duration_min": duration_min,
|
|
"hr_avg": hr_avg,
|
|
"hr_max": hr_max,
|
|
"distance_km": distance_km,
|
|
"kcal_active": kcal_active,
|
|
"kcal_resting": kcal_resting,
|
|
"rpe": None,
|
|
"pace_min_per_km": None,
|
|
"cadence": None,
|
|
"elevation_gain": None,
|
|
}
|
|
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
|
|
except Exception as eval_err:
|
|
logger.warning("[activity import] Auto-Eval fehlgeschlagen: %s", eval_err)
|
|
|
|
|
|
def merge_activity_csv_module_fields(
|
|
cur,
|
|
static_fields: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
activity-Modul für CSV: statische Registry-Felder + alle aktiven training_parameters.
|
|
|
|
Gleiche Quelle wie get_mappable_activity_field_catalog.training_parameters — erscheint
|
|
in Admin-CSV-Ziel-Liste, Validierung und Import-Zeilenaggregation.
|
|
"""
|
|
out = dict(static_fields)
|
|
cur.execute(
|
|
"""
|
|
SELECT key, data_type, unit, name_de
|
|
FROM training_parameters
|
|
WHERE is_active = true
|
|
ORDER BY key
|
|
"""
|
|
)
|
|
for row in cur.fetchall():
|
|
k = row["key"]
|
|
if k in out:
|
|
continue
|
|
dt = row["data_type"] or "float"
|
|
if dt == "integer":
|
|
mtype = "int"
|
|
elif dt == "float":
|
|
mtype = "float"
|
|
elif dt == "boolean":
|
|
mtype = "string"
|
|
else:
|
|
mtype = "string"
|
|
spec: Dict[str, Any] = {
|
|
"type": mtype,
|
|
"required": False,
|
|
"from_training_parameter": True,
|
|
}
|
|
if row.get("unit"):
|
|
spec["unit"] = row["unit"]
|
|
if row.get("name_de"):
|
|
spec["label_de"] = row["name_de"]
|
|
out[k] = spec
|
|
return out
|
|
|
|
|
|
def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Felder für konfigurierbare Import-Mappings.
|
|
|
|
core_fields: module_registry „activity“ → activity_log.
|
|
training_parameters: alle aktiven Parameter (global); bei Anwendung auf eine Session
|
|
werden Keys verworfen, die nicht in resolve_activity_attribute_schema(Kategorie/Typ) liegen.
|
|
|
|
profile_id: reserviert für künftige Profil-Filter.
|
|
"""
|
|
_ = profile_id
|
|
mod = get_module_definition("activity") or {}
|
|
fields = mod.get("fields") or {}
|
|
core_fields: List[Dict[str, Any]] = []
|
|
for key, spec in fields.items():
|
|
s = spec or {}
|
|
core_fields.append(
|
|
{
|
|
"key": key,
|
|
"target": "activity_log",
|
|
"column": key,
|
|
"data_type": s.get("type", "string"),
|
|
"required": bool(s.get("required")),
|
|
"unit": s.get("unit"),
|
|
"label_de": s.get("label_de") or key,
|
|
}
|
|
)
|
|
core_fields.sort(key=lambda x: x["key"])
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, key, name_de, name_en, category AS param_category,
|
|
data_type, unit, source_field
|
|
FROM training_parameters
|
|
WHERE is_active = true
|
|
ORDER BY key
|
|
"""
|
|
)
|
|
parameters = [dict(r) for r in cur.fetchall()]
|
|
|
|
return {
|
|
"core_fields": core_fields,
|
|
"training_parameters": parameters,
|
|
"notes": (
|
|
"training_parameters listet alle aktiven Keys. Pro Session werden Werte ignoriert, "
|
|
"die für deren training_category/training_type_id nicht im Attribut-Schema vorkommen."
|
|
),
|
|
}
|
|
|
|
|
|
def new_activity_id() -> str:
|
|
return str(uuid.uuid4())
|