- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy. - Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports. - Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity. - Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
327 lines
10 KiB
Python
327 lines
10 KiB
Python
"""
|
|
Zentrale Persistenz für activity_log + EAV-Nebenwirkungen (Eval, Spalten→EAV).
|
|
|
|
Alle Schreibpfade (REST, Universal-CSV, Legacy-Upload) laufen hier zusammen.
|
|
|
|
Feld-Katalog für CSV-Mappings: get_mappable_activity_field_catalog()
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from models import ActivityEntry
|
|
|
|
from csv_parser.module_registry import get_module_definition
|
|
from data_layer.activity_session_metrics import sync_column_backed_session_metrics
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity
|
|
|
|
_EVALUATION_AVAILABLE = True
|
|
except Exception: # pragma: no cover
|
|
_evaluate_and_save_activity = None
|
|
_EVALUATION_AVAILABLE = False
|
|
|
|
|
|
def find_activity_duplicate_id(
|
|
cur,
|
|
profile_id: str,
|
|
date_iso: str,
|
|
start_time: Optional[Any],
|
|
) -> Optional[str]:
|
|
cur.execute(
|
|
"""
|
|
SELECT id FROM activity_log
|
|
WHERE profile_id = %s AND date = %s::date
|
|
AND start_time IS NOT DISTINCT FROM %s::time
|
|
""",
|
|
(profile_id, date_iso, start_time),
|
|
)
|
|
row = cur.fetchone()
|
|
return str(row["id"]) if row else None
|
|
|
|
|
|
def insert_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
|
|
"""INSERT activity_log aus ActivityEntry (manueller API-Pfad)."""
|
|
d = e.model_dump()
|
|
cur.execute(
|
|
"""INSERT INTO activity_log (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting,
|
|
hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain,
|
|
temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes,
|
|
training_type_id,training_category,training_subcategory,created)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""",
|
|
(
|
|
eid,
|
|
profile_id,
|
|
d["date"],
|
|
d["start_time"],
|
|
d["end_time"],
|
|
d["activity_type"],
|
|
d["duration_min"],
|
|
d["kcal_active"],
|
|
d["kcal_resting"],
|
|
d["hr_avg"],
|
|
d["hr_max"],
|
|
d.get("hr_min"),
|
|
d["distance_km"],
|
|
d.get("pace_min_per_km"),
|
|
d.get("cadence"),
|
|
d.get("avg_power"),
|
|
d.get("elevation_gain"),
|
|
d.get("temperature_celsius"),
|
|
d.get("humidity_percent"),
|
|
d.get("avg_hr_percent"),
|
|
d.get("kcal_per_km"),
|
|
d["rpe"],
|
|
d["source"],
|
|
d["notes"],
|
|
d.get("training_type_id"),
|
|
d.get("training_category"),
|
|
d.get("training_subcategory"),
|
|
),
|
|
)
|
|
|
|
|
|
def update_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
|
|
"""Volles UPDATE aus ActivityEntry (REST PUT)."""
|
|
d = e.model_dump()
|
|
cur.execute(
|
|
f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s",
|
|
list(d.values()) + [eid, profile_id],
|
|
)
|
|
|
|
|
|
def update_activity_columns(
|
|
cur,
|
|
profile_id: str,
|
|
eid: str,
|
|
updates: Dict[str, Any],
|
|
) -> None:
|
|
"""Teil-UPDATE nur für übergebene Spalten (Importe)."""
|
|
if not updates:
|
|
return
|
|
cols = [f"{k} = %s" for k in updates]
|
|
vals = list(updates.values()) + [eid, profile_id]
|
|
cur.execute(
|
|
f"UPDATE activity_log SET {', '.join(cols)} WHERE id = %s AND profile_id = %s",
|
|
vals,
|
|
)
|
|
|
|
|
|
def insert_activity_csv_minimal(
|
|
cur,
|
|
profile_id: str,
|
|
eid: str,
|
|
*,
|
|
date_iso: str,
|
|
start_time: Any,
|
|
end_time: Any,
|
|
activity_type: str,
|
|
duration_min: Any,
|
|
kcal_active: Any,
|
|
kcal_resting: Any,
|
|
hr_avg: Any,
|
|
hr_max: Any,
|
|
distance_km: Any,
|
|
training_type_id: Any,
|
|
training_category: Any,
|
|
training_subcategory: Any,
|
|
source: str,
|
|
) -> None:
|
|
"""INSERT minimale activity_log-Zeile (Universal-CSV)."""
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO activity_log (
|
|
id, profile_id, date, start_time, end_time, activity_type, duration_min,
|
|
kcal_active, kcal_resting, hr_avg, hr_max, distance_km,
|
|
source, training_type_id, training_category, training_subcategory, created
|
|
)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)
|
|
""",
|
|
(
|
|
eid,
|
|
profile_id,
|
|
date_iso,
|
|
start_time,
|
|
end_time,
|
|
activity_type,
|
|
duration_min,
|
|
kcal_active,
|
|
kcal_resting,
|
|
hr_avg,
|
|
hr_max,
|
|
distance_km,
|
|
source,
|
|
training_type_id,
|
|
training_category,
|
|
training_subcategory,
|
|
),
|
|
)
|
|
|
|
|
|
def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None:
|
|
"""Auto-Eval (falls aktiv) + EAV-Spiegel aus activity_log-Spalten."""
|
|
if _EVALUATION_AVAILABLE and _evaluate_and_save_activity:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, profile_id, date, training_type_id, duration_min,
|
|
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
|
|
rpe, pace_min_per_km, cadence, elevation_gain
|
|
FROM activity_log
|
|
WHERE id = %s
|
|
""",
|
|
(eid,),
|
|
)
|
|
activity_row = cur.fetchone()
|
|
if activity_row:
|
|
activity_dict = dict(activity_row)
|
|
training_type_id = activity_dict.get("training_type_id")
|
|
if training_type_id:
|
|
try:
|
|
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
|
|
except Exception as eval_error:
|
|
logger.error("[AUTO-EVAL] activity %s: %s", eid, eval_error)
|
|
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
|
|
|
|
|
|
def run_activity_post_write_hooks_import(
|
|
cur,
|
|
profile_id: str,
|
|
eid: str,
|
|
*,
|
|
workout_date: str,
|
|
training_type_id: Optional[int],
|
|
duration_min: Any,
|
|
hr_avg: Any,
|
|
hr_max: Any,
|
|
distance_km: Any,
|
|
kcal_active: Any,
|
|
kcal_resting: Any,
|
|
) -> None:
|
|
"""Eval + EAV nach Legacy-Import mit vorgebautem Kontext-Dict."""
|
|
if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity:
|
|
try:
|
|
activity_dict = {
|
|
"id": eid,
|
|
"profile_id": profile_id,
|
|
"date": workout_date,
|
|
"training_type_id": training_type_id,
|
|
"duration_min": duration_min,
|
|
"hr_avg": hr_avg,
|
|
"hr_max": hr_max,
|
|
"distance_km": distance_km,
|
|
"kcal_active": kcal_active,
|
|
"kcal_resting": kcal_resting,
|
|
"rpe": None,
|
|
"pace_min_per_km": None,
|
|
"cadence": None,
|
|
"elevation_gain": None,
|
|
}
|
|
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
|
|
except Exception as eval_err:
|
|
logger.warning("[activity import] Auto-Eval fehlgeschlagen: %s", eval_err)
|
|
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
|
|
|
|
|
|
def merge_activity_csv_module_fields(
|
|
cur,
|
|
static_fields: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
activity-Modul für CSV: statische Registry-Felder + alle aktiven training_parameters.
|
|
|
|
Gleiche Quelle wie get_mappable_activity_field_catalog.training_parameters — erscheint
|
|
in Admin-CSV-Ziel-Liste, Validierung und Import-Zeilenaggregation.
|
|
"""
|
|
out = dict(static_fields)
|
|
cur.execute(
|
|
"""
|
|
SELECT key, data_type, unit, name_de
|
|
FROM training_parameters
|
|
WHERE is_active = true
|
|
ORDER BY key
|
|
"""
|
|
)
|
|
for row in cur.fetchall():
|
|
k = row["key"]
|
|
if k in out:
|
|
continue
|
|
dt = row["data_type"] or "float"
|
|
if dt == "integer":
|
|
mtype = "int"
|
|
elif dt == "float":
|
|
mtype = "float"
|
|
elif dt == "boolean":
|
|
mtype = "string"
|
|
else:
|
|
mtype = "string"
|
|
spec: Dict[str, Any] = {
|
|
"type": mtype,
|
|
"required": False,
|
|
"from_training_parameter": True,
|
|
}
|
|
if row.get("unit"):
|
|
spec["unit"] = row["unit"]
|
|
if row.get("name_de"):
|
|
spec["label_de"] = row["name_de"]
|
|
out[k] = spec
|
|
return out
|
|
|
|
|
|
def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Felder für konfigurierbare Import-Mappings.
|
|
|
|
core_fields: module_registry „activity“ → activity_log.
|
|
training_parameters: alle aktiven Parameter (global); bei Anwendung auf eine Session
|
|
werden Keys verworfen, die nicht in resolve_activity_attribute_schema(Kategorie/Typ) liegen.
|
|
|
|
profile_id: reserviert für künftige Profil-Filter.
|
|
"""
|
|
_ = profile_id
|
|
mod = get_module_definition("activity") or {}
|
|
fields = mod.get("fields") or {}
|
|
core_fields: List[Dict[str, Any]] = []
|
|
for key, spec in fields.items():
|
|
s = spec or {}
|
|
core_fields.append(
|
|
{
|
|
"key": key,
|
|
"target": "activity_log",
|
|
"column": key,
|
|
"data_type": s.get("type", "string"),
|
|
"required": bool(s.get("required")),
|
|
"unit": s.get("unit"),
|
|
"label_de": key,
|
|
}
|
|
)
|
|
core_fields.sort(key=lambda x: x["key"])
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, key, name_de, name_en, category AS param_category,
|
|
data_type, unit, source_field
|
|
FROM training_parameters
|
|
WHERE is_active = true
|
|
ORDER BY key
|
|
"""
|
|
)
|
|
parameters = [dict(r) for r in cur.fetchall()]
|
|
|
|
return {
|
|
"core_fields": core_fields,
|
|
"training_parameters": parameters,
|
|
"notes": (
|
|
"training_parameters listet alle aktiven Keys. Pro Session werden Werte ignoriert, "
|
|
"die für deren training_category/training_type_id nicht im Attribut-Schema vorkommen."
|
|
),
|
|
}
|
|
|
|
|
|
def new_activity_id() -> str:
|
|
return str(uuid.uuid4())
|