mitai-jinkendo/backend/data_layer/activity_persistence_orchestrator.py
Lars 574af61349
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance CSV import and validation for activity module
- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy.
- Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports.
- Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity.
- Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
2026-04-15 08:12:58 +02:00

327 lines
10 KiB
Python

"""
Zentrale Persistenz für activity_log + EAV-Nebenwirkungen (Eval, Spalten→EAV).
Alle Schreibpfade (REST, Universal-CSV, Legacy-Upload) laufen hier zusammen.
Feld-Katalog für CSV-Mappings: get_mappable_activity_field_catalog()
"""
from __future__ import annotations
import logging
import uuid
from typing import Any, Dict, List, Optional
from models import ActivityEntry
from csv_parser.module_registry import get_module_definition
from data_layer.activity_session_metrics import sync_column_backed_session_metrics
logger = logging.getLogger(__name__)
try:
from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity
_EVALUATION_AVAILABLE = True
except Exception: # pragma: no cover
_evaluate_and_save_activity = None
_EVALUATION_AVAILABLE = False
def find_activity_duplicate_id(
cur,
profile_id: str,
date_iso: str,
start_time: Optional[Any],
) -> Optional[str]:
cur.execute(
"""
SELECT id FROM activity_log
WHERE profile_id = %s AND date = %s::date
AND start_time IS NOT DISTINCT FROM %s::time
""",
(profile_id, date_iso, start_time),
)
row = cur.fetchone()
return str(row["id"]) if row else None
def insert_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
"""INSERT activity_log aus ActivityEntry (manueller API-Pfad)."""
d = e.model_dump()
cur.execute(
"""INSERT INTO activity_log (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting,
hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain,
temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes,
training_type_id,training_category,training_subcategory,created)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""",
(
eid,
profile_id,
d["date"],
d["start_time"],
d["end_time"],
d["activity_type"],
d["duration_min"],
d["kcal_active"],
d["kcal_resting"],
d["hr_avg"],
d["hr_max"],
d.get("hr_min"),
d["distance_km"],
d.get("pace_min_per_km"),
d.get("cadence"),
d.get("avg_power"),
d.get("elevation_gain"),
d.get("temperature_celsius"),
d.get("humidity_percent"),
d.get("avg_hr_percent"),
d.get("kcal_per_km"),
d["rpe"],
d["source"],
d["notes"],
d.get("training_type_id"),
d.get("training_category"),
d.get("training_subcategory"),
),
)
def update_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
"""Volles UPDATE aus ActivityEntry (REST PUT)."""
d = e.model_dump()
cur.execute(
f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s",
list(d.values()) + [eid, profile_id],
)
def update_activity_columns(
cur,
profile_id: str,
eid: str,
updates: Dict[str, Any],
) -> None:
"""Teil-UPDATE nur für übergebene Spalten (Importe)."""
if not updates:
return
cols = [f"{k} = %s" for k in updates]
vals = list(updates.values()) + [eid, profile_id]
cur.execute(
f"UPDATE activity_log SET {', '.join(cols)} WHERE id = %s AND profile_id = %s",
vals,
)
def insert_activity_csv_minimal(
cur,
profile_id: str,
eid: str,
*,
date_iso: str,
start_time: Any,
end_time: Any,
activity_type: str,
duration_min: Any,
kcal_active: Any,
kcal_resting: Any,
hr_avg: Any,
hr_max: Any,
distance_km: Any,
training_type_id: Any,
training_category: Any,
training_subcategory: Any,
source: str,
) -> None:
"""INSERT minimale activity_log-Zeile (Universal-CSV)."""
cur.execute(
"""
INSERT INTO activity_log (
id, profile_id, date, start_time, end_time, activity_type, duration_min,
kcal_active, kcal_resting, hr_avg, hr_max, distance_km,
source, training_type_id, training_category, training_subcategory, created
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)
""",
(
eid,
profile_id,
date_iso,
start_time,
end_time,
activity_type,
duration_min,
kcal_active,
kcal_resting,
hr_avg,
hr_max,
distance_km,
source,
training_type_id,
training_category,
training_subcategory,
),
)
def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None:
"""Auto-Eval (falls aktiv) + EAV-Spiegel aus activity_log-Spalten."""
if _EVALUATION_AVAILABLE and _evaluate_and_save_activity:
cur.execute(
"""
SELECT id, profile_id, date, training_type_id, duration_min,
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
rpe, pace_min_per_km, cadence, elevation_gain
FROM activity_log
WHERE id = %s
""",
(eid,),
)
activity_row = cur.fetchone()
if activity_row:
activity_dict = dict(activity_row)
training_type_id = activity_dict.get("training_type_id")
if training_type_id:
try:
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
except Exception as eval_error:
logger.error("[AUTO-EVAL] activity %s: %s", eid, eval_error)
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def run_activity_post_write_hooks_import(
cur,
profile_id: str,
eid: str,
*,
workout_date: str,
training_type_id: Optional[int],
duration_min: Any,
hr_avg: Any,
hr_max: Any,
distance_km: Any,
kcal_active: Any,
kcal_resting: Any,
) -> None:
"""Eval + EAV nach Legacy-Import mit vorgebautem Kontext-Dict."""
if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity:
try:
activity_dict = {
"id": eid,
"profile_id": profile_id,
"date": workout_date,
"training_type_id": training_type_id,
"duration_min": duration_min,
"hr_avg": hr_avg,
"hr_max": hr_max,
"distance_km": distance_km,
"kcal_active": kcal_active,
"kcal_resting": kcal_resting,
"rpe": None,
"pace_min_per_km": None,
"cadence": None,
"elevation_gain": None,
}
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
except Exception as eval_err:
logger.warning("[activity import] Auto-Eval fehlgeschlagen: %s", eval_err)
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def merge_activity_csv_module_fields(
cur,
static_fields: Dict[str, Any],
) -> Dict[str, Any]:
"""
activity-Modul für CSV: statische Registry-Felder + alle aktiven training_parameters.
Gleiche Quelle wie get_mappable_activity_field_catalog.training_parameters — erscheint
in Admin-CSV-Ziel-Liste, Validierung und Import-Zeilenaggregation.
"""
out = dict(static_fields)
cur.execute(
"""
SELECT key, data_type, unit, name_de
FROM training_parameters
WHERE is_active = true
ORDER BY key
"""
)
for row in cur.fetchall():
k = row["key"]
if k in out:
continue
dt = row["data_type"] or "float"
if dt == "integer":
mtype = "int"
elif dt == "float":
mtype = "float"
elif dt == "boolean":
mtype = "string"
else:
mtype = "string"
spec: Dict[str, Any] = {
"type": mtype,
"required": False,
"from_training_parameter": True,
}
if row.get("unit"):
spec["unit"] = row["unit"]
if row.get("name_de"):
spec["label_de"] = row["name_de"]
out[k] = spec
return out
def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]:
"""
Felder für konfigurierbare Import-Mappings.
core_fields: module_registry „activity“ → activity_log.
training_parameters: alle aktiven Parameter (global); bei Anwendung auf eine Session
werden Keys verworfen, die nicht in resolve_activity_attribute_schema(Kategorie/Typ) liegen.
profile_id: reserviert für künftige Profil-Filter.
"""
_ = profile_id
mod = get_module_definition("activity") or {}
fields = mod.get("fields") or {}
core_fields: List[Dict[str, Any]] = []
for key, spec in fields.items():
s = spec or {}
core_fields.append(
{
"key": key,
"target": "activity_log",
"column": key,
"data_type": s.get("type", "string"),
"required": bool(s.get("required")),
"unit": s.get("unit"),
"label_de": key,
}
)
core_fields.sort(key=lambda x: x["key"])
cur.execute(
"""
SELECT id, key, name_de, name_en, category AS param_category,
data_type, unit, source_field
FROM training_parameters
WHERE is_active = true
ORDER BY key
"""
)
parameters = [dict(r) for r in cur.fetchall()]
return {
"core_fields": core_fields,
"training_parameters": parameters,
"notes": (
"training_parameters listet alle aktiven Keys. Pro Session werden Werte ignoriert, "
"die für deren training_category/training_type_id nicht im Attribut-Schema vorkommen."
),
}
def new_activity_id() -> str:
return str(uuid.uuid4())