diff --git a/backend/csv_parser/executor.py b/backend/csv_parser/executor.py index 60c5a80..10169a0 100644 --- a/backend/csv_parser/executor.py +++ b/backend/csv_parser/executor.py @@ -23,14 +23,6 @@ from csv_parser.type_converter import build_row_after_mapping logger = logging.getLogger(__name__) -try: - from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity - - _EVALUATION_AVAILABLE = True -except Exception: # pragma: no cover - _evaluate_and_save_activity = None - _EVALUATION_AVAILABLE = False - def _resolve_training_type_for_activity(cur, activity_type: str, profile_id: str): """Lazy import — gleicher DB-Cursor wie der Import (kein verschachteltes get_db / Pool-Deadlock).""" @@ -814,6 +806,15 @@ def _import_activity( error_details: list, affected_ids: dict, ) -> dict[str, int]: + from data_layer.activity_time_normalize import normalize_activity_start + from data_layer.activity_persistence_orchestrator import ( + find_activity_duplicate_id, + insert_activity_csv_minimal, + new_activity_id, + run_activity_post_write_hooks_import, + update_activity_columns, + ) + rows_total = 0 inserted = 0 updated = 0 @@ -885,6 +886,7 @@ def _import_activity( wtype = str(activity_type).strip() iso = date_d.isoformat() + _, workout_start_t = normalize_activity_start(start_key) # Pro Zeile: bei SQL-Fehler sonst „current transaction is aborted“ bis Xact-Ende. cur.execute("SAVEPOINT csv_activity_row") @@ -892,113 +894,71 @@ def _import_activity( training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity( cur, wtype, profile_id ) - cur.execute( - """ - SELECT id FROM activity_log - WHERE profile_id = %s AND date = %s AND start_time = %s - """, - (profile_id, iso, start_key), - ) - existing = cur.fetchone() + existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t) - if existing: - eid = existing["id"] - cur.execute( - """ - UPDATE activity_log - SET end_time = %s, - activity_type = %s, - duration_min = %s, - kcal_active = %s, - kcal_resting = %s, - hr_avg = %s, - hr_max = %s, - distance_km = %s, - training_type_id = %s, - training_category = %s, - training_subcategory = %s, - source = 'csv' - WHERE id = %s - RETURNING id - """, - ( - end_str or None, - wtype, - duration_min, - kcal_a, - kcal_r, - hr_a, - hr_m, - dist, - training_type_id, - training_category, - training_subcategory, - eid, - ), - ) - row = cur.fetchone() - updated += 1 - if row and row.get("id"): - affected_ids["activity_log"].append(str(row["id"])) - aid = eid - else: - eid = str(uuid.uuid4()) - cur.execute( - """ - INSERT INTO activity_log ( - id, profile_id, date, start_time, end_time, activity_type, duration_min, - kcal_active, kcal_resting, hr_avg, hr_max, distance_km, - source, training_type_id, training_category, training_subcategory, created - ) - VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'csv',%s,%s,%s,CURRENT_TIMESTAMP) - RETURNING id - """, - ( - eid, - profile_id, - iso, - start_key, - end_str or None, - wtype, - duration_min, - kcal_a, - kcal_r, - hr_a, - hr_m, - dist, - training_type_id, - training_category, - training_subcategory, - ), - ) - row = cur.fetchone() - inserted += 1 - new_entries += 1 - if row and row.get("id"): - affected_ids["activity_log"].append(str(row["id"])) - aid = eid - - if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity: - try: - activity_dict = { - "id": aid, - "profile_id": profile_id, - "date": iso, - "training_type_id": training_type_id, + if existing_id: + update_activity_columns( + cur, + profile_id, + existing_id, + { + "start_time": workout_start_t, + "end_time": end_str or None, + "activity_type": wtype, "duration_min": duration_min, + "kcal_active": kcal_a, + "kcal_resting": kcal_r, "hr_avg": hr_a, "hr_max": hr_m, "distance_km": dist, - "kcal_active": kcal_a, - "kcal_resting": kcal_r, - "rpe": None, - "pace_min_per_km": None, - "cadence": None, - "elevation_gain": None, - } - _evaluate_and_save_activity(cur, aid, activity_dict, training_type_id, profile_id) - except Exception as eval_err: - logger.warning("[csv activity] Auto-Eval fehlgeschlagen: %s", eval_err) + "training_type_id": training_type_id, + "training_category": training_category, + "training_subcategory": training_subcategory, + "source": "csv", + }, + ) + updated += 1 + affected_ids["activity_log"].append(str(existing_id)) + aid = existing_id + else: + eid = new_activity_id() + insert_activity_csv_minimal( + cur, + profile_id, + eid, + date_iso=iso, + start_time=workout_start_t, + end_time=end_str or None, + activity_type=wtype, + duration_min=duration_min, + kcal_active=kcal_a, + kcal_resting=kcal_r, + hr_avg=hr_a, + hr_max=hr_m, + distance_km=dist, + training_type_id=training_type_id, + training_category=training_category, + training_subcategory=training_subcategory, + source="csv", + ) + inserted += 1 + new_entries += 1 + affected_ids["activity_log"].append(str(eid)) + aid = eid + + run_activity_post_write_hooks_import( + cur, + profile_id, + str(aid), + workout_date=iso, + training_type_id=training_type_id, + duration_min=duration_min, + hr_avg=hr_a, + hr_max=hr_m, + distance_km=dist, + kcal_active=kcal_a, + kcal_resting=kcal_r, + ) cur.execute("RELEASE SAVEPOINT csv_activity_row") except Exception as e: try: diff --git a/backend/data_layer/activity_metrics.py b/backend/data_layer/activity_metrics.py index a1afd7e..cfdd940 100644 --- a/backend/data_layer/activity_metrics.py +++ b/backend/data_layer/activity_metrics.py @@ -124,7 +124,8 @@ def get_activity_detail_data( "duration_min": int, "kcal_active": int, "hr_avg": int | None, - "training_category": str | None + "training_category": str | None, + "session_metrics": list | None, # EAV (enrich_sessions_with_metrics) }, ... ], @@ -143,6 +144,7 @@ def get_activity_detail_data( cur.execute( """SELECT + id, date, activity_type, duration_min, @@ -153,7 +155,7 @@ def get_activity_detail_data( WHERE profile_id=%s AND date >= %s ORDER BY date DESC LIMIT %s""", - (profile_id, cutoff, limit) + (profile_id, cutoff, limit), ) rows = cur.fetchall() @@ -162,19 +164,24 @@ def get_activity_detail_data( "activities": [], "total_count": 0, "confidence": "insufficient", - "days_analyzed": days + "days_analyzed": days, } activities = [] for row in rows: - activities.append({ - "date": row['date'], - "activity_type": row['activity_type'], - "duration_min": safe_int(row['duration_min']), - "kcal_active": safe_int(row['kcal_active']), - "hr_avg": safe_int(row['hr_avg']) if row.get('hr_avg') else None, - "training_category": row.get('training_category') - }) + activities.append( + { + "id": str(row["id"]), + "date": row["date"], + "activity_type": row["activity_type"], + "duration_min": safe_int(row["duration_min"]), + "kcal_active": safe_int(row["kcal_active"]), + "hr_avg": safe_int(row["hr_avg"]) if row.get("hr_avg") else None, + "training_category": row.get("training_category"), + } + ) + + enrich_sessions_with_metrics(cur, activities) confidence = calculate_confidence(len(activities), days, "general") @@ -182,7 +189,7 @@ def get_activity_detail_data( "activities": activities, "total_count": len(activities), "confidence": confidence, - "days_analyzed": days + "days_analyzed": days, } diff --git a/backend/data_layer/activity_persistence_orchestrator.py b/backend/data_layer/activity_persistence_orchestrator.py new file mode 100644 index 0000000..6e74242 --- /dev/null +++ b/backend/data_layer/activity_persistence_orchestrator.py @@ -0,0 +1,281 @@ +""" +Zentrale Persistenz für activity_log + EAV-Nebenwirkungen (Eval, Spalten→EAV). + +Alle Schreibpfade (REST, Universal-CSV, Legacy-Upload) laufen hier zusammen. + +Feld-Katalog für CSV-Mappings: get_mappable_activity_field_catalog() +""" +from __future__ import annotations + +import logging +import uuid +from typing import Any, Dict, List, Optional + +from models import ActivityEntry + +from csv_parser.module_registry import get_module_definition +from data_layer.activity_session_metrics import sync_column_backed_session_metrics + +logger = logging.getLogger(__name__) + +try: + from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity + + _EVALUATION_AVAILABLE = True +except Exception: # pragma: no cover + _evaluate_and_save_activity = None + _EVALUATION_AVAILABLE = False + + +def find_activity_duplicate_id( + cur, + profile_id: str, + date_iso: str, + start_time: Optional[Any], +) -> Optional[str]: + cur.execute( + """ + SELECT id FROM activity_log + WHERE profile_id = %s AND date = %s::date + AND start_time IS NOT DISTINCT FROM %s::time + """, + (profile_id, date_iso, start_time), + ) + row = cur.fetchone() + return str(row["id"]) if row else None + + +def insert_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None: + """INSERT activity_log aus ActivityEntry (manueller API-Pfad).""" + d = e.model_dump() + cur.execute( + """INSERT INTO activity_log (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting, + hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain, + temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes, + training_type_id,training_category,training_subcategory,created) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""", + ( + eid, + profile_id, + d["date"], + d["start_time"], + d["end_time"], + d["activity_type"], + d["duration_min"], + d["kcal_active"], + d["kcal_resting"], + d["hr_avg"], + d["hr_max"], + d.get("hr_min"), + d["distance_km"], + d.get("pace_min_per_km"), + d.get("cadence"), + d.get("avg_power"), + d.get("elevation_gain"), + d.get("temperature_celsius"), + d.get("humidity_percent"), + d.get("avg_hr_percent"), + d.get("kcal_per_km"), + d["rpe"], + d["source"], + d["notes"], + d.get("training_type_id"), + d.get("training_category"), + d.get("training_subcategory"), + ), + ) + + +def update_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None: + """Volles UPDATE aus ActivityEntry (REST PUT).""" + d = e.model_dump() + cur.execute( + f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s", + list(d.values()) + [eid, profile_id], + ) + + +def update_activity_columns( + cur, + profile_id: str, + eid: str, + updates: Dict[str, Any], +) -> None: + """Teil-UPDATE nur für übergebene Spalten (Importe).""" + if not updates: + return + cols = [f"{k} = %s" for k in updates] + vals = list(updates.values()) + [eid, profile_id] + cur.execute( + f"UPDATE activity_log SET {', '.join(cols)} WHERE id = %s AND profile_id = %s", + vals, + ) + + +def insert_activity_csv_minimal( + cur, + profile_id: str, + eid: str, + *, + date_iso: str, + start_time: Any, + end_time: Any, + activity_type: str, + duration_min: Any, + kcal_active: Any, + kcal_resting: Any, + hr_avg: Any, + hr_max: Any, + distance_km: Any, + training_type_id: Any, + training_category: Any, + training_subcategory: Any, + source: str, +) -> None: + """INSERT minimale activity_log-Zeile (Universal-CSV).""" + cur.execute( + """ + INSERT INTO activity_log ( + id, profile_id, date, start_time, end_time, activity_type, duration_min, + kcal_active, kcal_resting, hr_avg, hr_max, distance_km, + source, training_type_id, training_category, training_subcategory, created + ) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP) + """, + ( + eid, + profile_id, + date_iso, + start_time, + end_time, + activity_type, + duration_min, + kcal_active, + kcal_resting, + hr_avg, + hr_max, + distance_km, + source, + training_type_id, + training_category, + training_subcategory, + ), + ) + + +def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None: + """Auto-Eval (falls aktiv) + EAV-Spiegel aus activity_log-Spalten.""" + if _EVALUATION_AVAILABLE and _evaluate_and_save_activity: + cur.execute( + """ + SELECT id, profile_id, date, training_type_id, duration_min, + hr_avg, hr_max, distance_km, kcal_active, kcal_resting, + rpe, pace_min_per_km, cadence, elevation_gain + FROM activity_log + WHERE id = %s + """, + (eid,), + ) + activity_row = cur.fetchone() + if activity_row: + activity_dict = dict(activity_row) + training_type_id = activity_dict.get("training_type_id") + if training_type_id: + try: + _evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id) + except Exception as eval_error: + logger.error("[AUTO-EVAL] activity %s: %s", eid, eval_error) + sync_column_backed_session_metrics(cur, str(profile_id), str(eid)) + + +def run_activity_post_write_hooks_import( + cur, + profile_id: str, + eid: str, + *, + workout_date: str, + training_type_id: Optional[int], + duration_min: Any, + hr_avg: Any, + hr_max: Any, + distance_km: Any, + kcal_active: Any, + kcal_resting: Any, +) -> None: + """Eval + EAV nach Legacy-Import mit vorgebautem Kontext-Dict.""" + if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity: + try: + activity_dict = { + "id": eid, + "profile_id": profile_id, + "date": workout_date, + "training_type_id": training_type_id, + "duration_min": duration_min, + "hr_avg": hr_avg, + "hr_max": hr_max, + "distance_km": distance_km, + "kcal_active": kcal_active, + "kcal_resting": kcal_resting, + "rpe": None, + "pace_min_per_km": None, + "cadence": None, + "elevation_gain": None, + } + _evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id) + except Exception as eval_err: + logger.warning("[activity import] Auto-Eval fehlgeschlagen: %s", eval_err) + sync_column_backed_session_metrics(cur, str(profile_id), str(eid)) + + +def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]: + """ + Felder für konfigurierbare Import-Mappings. + + core_fields: module_registry „activity“ → activity_log. + training_parameters: alle aktiven Parameter (global); bei Anwendung auf eine Session + werden Keys verworfen, die nicht in resolve_activity_attribute_schema(Kategorie/Typ) liegen. + + profile_id: reserviert für künftige Profil-Filter. + """ + _ = profile_id + mod = get_module_definition("activity") or {} + fields = mod.get("fields") or {} + core_fields: List[Dict[str, Any]] = [] + for key, spec in fields.items(): + s = spec or {} + core_fields.append( + { + "key": key, + "target": "activity_log", + "column": key, + "data_type": s.get("type", "string"), + "required": bool(s.get("required")), + "unit": s.get("unit"), + "label_de": key, + } + ) + core_fields.sort(key=lambda x: x["key"]) + + cur.execute( + """ + SELECT id, key, name_de, name_en, category AS param_category, + data_type, unit, source_field + FROM training_parameters + WHERE is_active = true + ORDER BY key + """ + ) + parameters = [dict(r) for r in cur.fetchall()] + + return { + "core_fields": core_fields, + "training_parameters": parameters, + "notes": ( + "training_parameters listet alle aktiven Keys. Pro Session werden Werte ignoriert, " + "die für deren training_category/training_type_id nicht im Attribut-Schema vorkommen." + ), + } + + +def new_activity_id() -> str: + return str(uuid.uuid4()) diff --git a/backend/data_layer/activity_time_normalize.py b/backend/data_layer/activity_time_normalize.py new file mode 100644 index 0000000..1b4fd8c --- /dev/null +++ b/backend/data_layer/activity_time_normalize.py @@ -0,0 +1,30 @@ +""" +Einheitliche Startzeit-Normalisierung für Aktivität (CSV, Legacy-Import, Dedupe). + +Anbieter-agnostisch: beliebige ISO-/Export-Strings über dateutil. +""" +from __future__ import annotations + +from datetime import time as dt_time +from typing import Optional + +from dateutil import parser as du_parser + + +def normalize_activity_start(start_raw: str) -> tuple[str, Optional[dt_time]]: + """ + Roh-String „Start“ aus Exporten → (YYYY-MM-DD, TIME ohne μs) für DB Dedupe/INSERT. + + Leerer Input → ("", None). Fallback bei Parse-Fehler: erstes Datum aus ersten 10 Zeichen. + """ + s = (start_raw or "").strip() + if not s: + return "", None + try: + parsed = du_parser.parse(s, dayfirst=False) + t = parsed.time().replace(microsecond=0) + return parsed.date().isoformat(), t + except (ValueError, TypeError, OverflowError): + if len(s) >= 10: + return s[:10], None + return "", None diff --git a/backend/placeholder_registrations/activity_metrics.py b/backend/placeholder_registrations/activity_metrics.py index a61e526..5a01a20 100644 --- a/backend/placeholder_registrations/activity_metrics.py +++ b/backend/placeholder_registrations/activity_metrics.py @@ -127,16 +127,17 @@ def register_activity_group_1(): activity_detail_metadata = PlaceholderMetadata( key="activity_detail", category="Aktivität", - description="Detaillierte Liste der letzten 14 Tage Aktivität", + description="Detaillierte Liste der letzten 14 Tage Aktivität (Kopfzeile + EAV-Metriken)", resolver_module="backend/placeholder_resolver.py", - resolver_function="_format_activity_detail", - data_layer_module=None, - data_layer_function=None, - source_tables=["activity_log", "training_types"], + resolver_function="get_activity_detail", + data_layer_module="backend/data_layer/activity_metrics.py", + data_layer_function="get_activity_detail_data", + source_tables=["activity_log", "activity_session_metrics", "training_parameters"], semantic_contract=( - "Liefert eine strukturierte Liste aller Trainingseinheiten der letzten 14 Tage. " - "Jede Einheit: Datum, Trainingstyp, Dauer (Minuten), optional Notizen. " - "Sortiert chronologisch absteigend (neueste zuerst)." + "Liefert bis zu 50 Einheiten (neueste zuerst) der letzten 14 Tage über " + "get_activity_detail_data: activity_log-Spalten plus " + "enrich_sessions_with_metrics (activity_session_metrics / Profil-EAV). " + "Formatter hängt nicht-leere EAV-Werte als „| EAV: key=value; …“ an." ), business_meaning=( "Detaillierte Trainingshistorie für KI-Prompts, die Muster, Progressionen " @@ -147,7 +148,9 @@ def register_activity_group_1(): time_window="14d", output_type=OutputType.LIST, placeholder_type=PlaceholderType.RAW_DATA, - format_hint="Liste von Strings, eine Zeile pro Einheit: 'YYYY-MM-DD: Typ (Dauer min)'", + format_hint=( + "Pro Zeile: Datum, Typ, Dauer, kcal, optional HF, optional „| EAV: …“ aus Session-Metriken" + ), example_output=( "2026-03-28: Krafttraining (45 min)\\n" "2026-03-27: Laufen (30 min)\\n" @@ -163,19 +166,15 @@ def register_activity_group_1(): legacy_display="Keine Aktivitätsdaten" ), known_limitations=( - "OLD RESOLVER PATTERN: Keine Data Layer Funktion. " - "Formatierung direkt im Resolver. " - "CRITICAL: Keine Qualitätsfilterung - auch ungültige Einheiten (z.B. 0 min) " - "werden gelistet. JOIN mit training_types für Typ-Namen." + "Keine Profil-Qualitätsfilterung in dieser Liste. Max. 20 Zeilen im Prompt-Output " + "(Hard-Limit Resolver). Doppelte Spalten (z.B. duration_min in Kopf und EAV) können " + "in EAV wiederholt erscheinen — KI kann dominante Spalte nutzen." ), - layer_1_decision="NONE - Old resolver pattern (direct SQL in resolver)", - layer_2a_decision="Placeholder Resolver (formatting + SQL query)", - layer_2b_reuse_possible=False, - architecture_alignment=( - "NOT ALIGNED with Phase 0c Multi-Layer Architecture. " - "Should be refactored to use data_layer function." - ), - issue_53_alignment="NOT ALIGNED - no layer separation" + layer_1_decision="activity_metrics.get_activity_detail_data (+ enrich_sessions_with_metrics)", + layer_2a_decision="get_activity_detail (Formatierung)", + layer_2b_reuse_possible=True, + architecture_alignment="Phase 0c Layer 1 + EAV-Anreicherung", + issue_53_alignment="Layer 1" ) activity_detail_metadata.set_evidence("key", EvidenceType.CODE_DERIVED) diff --git a/backend/placeholder_resolver.py b/backend/placeholder_resolver.py index 61a5d43..7c97dab 100644 --- a/backend/placeholder_resolver.py +++ b/backend/placeholder_resolver.py @@ -419,14 +419,21 @@ def get_activity_detail(profile_id: str, days: int = 14) -> str: # Format as readable list (max 20 entries to avoid token bloat) lines = [] - for activity in data['activities'][:20]: - hr_str = f" HF={activity['hr_avg']}" if activity['hr_avg'] else "" + for activity in data["activities"][:20]: + hr_str = f", HF={activity['hr_avg']}" if activity.get("hr_avg") else "" + eav_parts = [] + for m in activity.get("session_metrics") or []: + k, v = m.get("key"), m.get("value") + if k is None or v is None: + continue + eav_parts.append(f"{k}={v}") + eav_str = f" | EAV: {'; '.join(eav_parts)}" if eav_parts else "" lines.append( f"{activity['date']}: {activity['activity_type']} " - f"({activity['duration_min']}min, {activity['kcal_active']}kcal{hr_str})" + f"({activity['duration_min']}min, {activity['kcal_active']}kcal{hr_str}{eav_str})" ) - return '\n'.join(lines) + return "\n".join(lines) def get_trainingstyp_verteilung(profile_id: str, days: int = 14) -> str: diff --git a/backend/routers/activity.py b/backend/routers/activity.py index 46f7991..852fc8e 100644 --- a/backend/routers/activity.py +++ b/backend/routers/activity.py @@ -9,10 +9,9 @@ import uuid import logging import re import calendar -from datetime import date, time as dt_time +from datetime import date from typing import Optional -from dateutil import parser as du_parser from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends, Query from db import get_db, get_cursor, r2d @@ -21,7 +20,18 @@ from models import ActivityEntry, ActivityMetricsReplace from routers.profiles import get_pid from feature_logger import log_feature_usage from quality_filter import get_quality_filter_sql -from data_layer.activity_session_metrics import sync_column_backed_session_metrics +from data_layer.activity_persistence_orchestrator import ( + get_mappable_activity_field_catalog, + insert_activity_from_entry, + run_activity_post_write_hooks, + update_activity_from_entry, + find_activity_duplicate_id, + update_activity_columns, + insert_activity_csv_minimal, + run_activity_post_write_hooks_import, + new_activity_id, +) +from data_layer.activity_time_normalize import normalize_activity_start router = APIRouter(prefix="/api/activity", tags=["activity"]) logger = logging.getLogger(__name__) @@ -40,21 +50,6 @@ def _month_date_bounds(ym: str) -> tuple[date, date]: return date(y, mo, 1), date(y, mo, last) -def _normalize_apple_health_start(start_raw: str) -> tuple[str, Optional[dt_time]]: - """ISO/Apple-Export Start → (YYYY-MM-DD, TIME ohne μs) für stabile Dedupe + INSERT.""" - s = (start_raw or "").strip() - if not s: - return "", None - try: - parsed = du_parser.parse(s, dayfirst=False) - t = parsed.time().replace(microsecond=0) - return parsed.date().isoformat(), t - except (ValueError, TypeError, OverflowError): - if len(s) >= 10: - return s[:10], None - return "", None - - _ACTIVITY_DEDUP_WINDOW = """ PARTITION BY al.profile_id, al.date, COALESCE(al.activity_type, ''), @@ -243,69 +238,10 @@ def create_activity(e: ActivityEntry, x_profile_id: Optional[str]=Header(default ) eid = str(uuid.uuid4()) - d = e.model_dump() with get_db() as conn: cur = get_cursor(conn) - cur.execute( - """INSERT INTO activity_log - (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting, - hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain, - temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes, - training_type_id,training_category,training_subcategory,created) - VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""", - ( - eid, - pid, - d["date"], - d["start_time"], - d["end_time"], - d["activity_type"], - d["duration_min"], - d["kcal_active"], - d["kcal_resting"], - d["hr_avg"], - d["hr_max"], - d.get("hr_min"), - d["distance_km"], - d.get("pace_min_per_km"), - d.get("cadence"), - d.get("avg_power"), - d.get("elevation_gain"), - d.get("temperature_celsius"), - d.get("humidity_percent"), - d.get("avg_hr_percent"), - d.get("kcal_per_km"), - d["rpe"], - d["source"], - d["notes"], - d.get("training_type_id"), - d.get("training_category"), - d.get("training_subcategory"), - ), - ) - - # Phase 1.2: Auto-evaluation after INSERT - if EVALUATION_AVAILABLE: - # Load the activity data to evaluate - cur.execute(""" - SELECT id, profile_id, date, training_type_id, duration_min, - hr_avg, hr_max, distance_km, kcal_active, kcal_resting, - rpe, pace_min_per_km, cadence, elevation_gain - FROM activity_log - WHERE id = %s - """, (eid,)) - activity_row = cur.fetchone() - if activity_row: - activity_dict = dict(activity_row) - training_type_id = activity_dict.get("training_type_id") - if training_type_id: - try: - evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, pid) - logger.info(f"[AUTO-EVAL] Evaluated activity {eid} on INSERT") - except Exception as eval_error: - logger.error(f"[AUTO-EVAL] Failed to evaluate activity {eid}: {eval_error}") - - sync_column_backed_session_metrics(cur, str(pid), eid) + insert_activity_from_entry(cur, pid, eid, e) + run_activity_post_write_hooks(cur, pid, eid) # Phase 2: Increment usage counter (always for new entries) increment_feature_usage(pid, 'activity_entries') @@ -414,38 +350,26 @@ def list_uncategorized_activities( return [r2d(r) for r in cur.fetchall()] +@router.get("/mappable-fields") +def get_activity_mappable_fields(session: dict = Depends(require_auth)): + """ + Vollständiger Katalog für Import-Mappings (activity_log-Kernfelder + alle aktiven training_parameters). + Werte für Keys ohne Schema zur konkreten Session werden beim Import ignoriert. + """ + pid = str(session["profile_id"]) + with get_db() as conn: + cur = get_cursor(conn) + return get_mappable_activity_field_catalog(cur, pid) + + @router.put("/{eid}") def update_activity(eid: str, e: ActivityEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): """Update existing activity entry.""" pid = get_pid(x_profile_id) with get_db() as conn: - d = e.model_dump() cur = get_cursor(conn) - cur.execute(f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s", - list(d.values())+[eid,pid]) - - # Phase 1.2: Auto-evaluation after UPDATE - if EVALUATION_AVAILABLE: - # Load the updated activity data to evaluate - cur.execute(""" - SELECT id, profile_id, date, training_type_id, duration_min, - hr_avg, hr_max, distance_km, kcal_active, kcal_resting, - rpe, pace_min_per_km, cadence, elevation_gain - FROM activity_log - WHERE id = %s - """, (eid,)) - activity_row = cur.fetchone() - if activity_row: - activity_dict = dict(activity_row) - training_type_id = activity_dict.get("training_type_id") - if training_type_id: - try: - evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, pid) - logger.info(f"[AUTO-EVAL] Re-evaluated activity {eid} on UPDATE") - except Exception as eval_error: - logger.error(f"[AUTO-EVAL] Failed to re-evaluate activity {eid}: {eval_error}") - - sync_column_backed_session_metrics(cur, str(pid), eid) + update_activity_from_entry(cur, pid, eid, e) + run_activity_post_write_hooks(cur, pid, eid) return {"id":eid} @@ -647,7 +571,10 @@ def bulk_categorize_activities( @router.post("/import-csv") async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): - """Import Apple Health workout CSV with automatic training type mapping.""" + """ + Legacy-Upload (Apple Health Workout-CSV-Spaltennamen). + Persistenz läuft über activity_persistence_orchestrator — gleiche Schicht wie Universal-CSV. + """ pid = get_pid(x_profile_id) raw = await file.read() try: text = raw.decode('utf-8') @@ -663,7 +590,7 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional start = row.get('Start','').strip() if not wtype or not start: continue - workout_date, workout_start_t = _normalize_apple_health_start(start) + workout_date, workout_start_t = normalize_activity_start(start) if not workout_date: continue dur = row.get('Duration','').strip() @@ -682,111 +609,82 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional # Map activity_type to training_type_id using database mappings training_type_id, training_category, training_subcategory = get_training_type_for_activity(wtype, pid) + kcal_a = kj(row.get("Aktive Energie (kJ)", "")) + kcal_r = kj(row.get("Ruheeinträge (kJ)", "")) + hr_av = tf(row.get("Durchschn. Herzfrequenz (count/min)", "")) + hr_mx = tf(row.get("Max. Herzfrequenz (count/min)", "")) + dist_km = tf(row.get("Distanz (km)", "")) try: - # Duplicate detection: normiertes Datum + TIME (Apple-Export kann Start in verschiedenen Formaten liefern) - cur.execute( - """ - SELECT id FROM activity_log - WHERE profile_id = %s AND date = %s::date - AND start_time IS NOT DISTINCT FROM %s::time - """, - (pid, workout_date, workout_start_t), - ) - existing = cur.fetchone() - - if existing: - # Update existing entry (e.g., to add training type mapping) - existing_id = existing['id'] - cur.execute(""" - UPDATE activity_log - SET start_time = %s, - end_time = %s, - activity_type = %s, - duration_min = %s, - kcal_active = %s, - kcal_resting = %s, - hr_avg = %s, - hr_max = %s, - distance_km = %s, - training_type_id = %s, - training_category = %s, - training_subcategory = %s - WHERE id = %s - """, ( - workout_start_t, row.get('End',''), wtype, duration_min, - kj(row.get('Aktive Energie (kJ)','')), - kj(row.get('Ruheeinträge (kJ)','')), - tf(row.get('Durchschn. Herzfrequenz (count/min)','')), - tf(row.get('Max. Herzfrequenz (count/min)','')), - tf(row.get('Distanz (km)','')), - training_type_id, training_category, training_subcategory, - existing_id - )) - skipped += 1 # Count as skipped (not newly inserted) - - # Phase 1.2: Auto-evaluation after CSV import UPDATE - if EVALUATION_AVAILABLE and training_type_id: - try: - # Build activity dict for evaluation - activity_dict = { - "id": existing_id, - "profile_id": pid, - "date": workout_date, - "training_type_id": training_type_id, - "duration_min": duration_min, - "hr_avg": tf(row.get('Durchschn. Herzfrequenz (count/min)','')), - "hr_max": tf(row.get('Max. Herzfrequenz (count/min)','')), - "distance_km": tf(row.get('Distanz (km)','')), - "kcal_active": kj(row.get('Aktive Energie (kJ)','')), - "kcal_resting": kj(row.get('Ruheeinträge (kJ)','')), - "rpe": None, - "pace_min_per_km": None, - "cadence": None, - "elevation_gain": None - } - evaluate_and_save_activity(cur, existing_id, activity_dict, training_type_id, pid) - logger.debug(f"[AUTO-EVAL] Re-evaluated updated activity {existing_id}") - except Exception as eval_error: - logger.warning(f"[AUTO-EVAL] Failed to re-evaluate updated activity {existing_id}: {eval_error}") + existing_id = find_activity_duplicate_id(cur, pid, workout_date, workout_start_t) + if existing_id: + update_activity_columns( + cur, + pid, + str(existing_id), + { + "start_time": workout_start_t, + "end_time": row.get("End", "") or None, + "activity_type": wtype, + "duration_min": duration_min, + "kcal_active": kcal_a, + "kcal_resting": kcal_r, + "hr_avg": hr_av, + "hr_max": hr_mx, + "distance_km": dist_km, + "training_type_id": training_type_id, + "training_category": training_category, + "training_subcategory": training_subcategory, + }, + ) + skipped += 1 + run_activity_post_write_hooks_import( + cur, + pid, + str(existing_id), + workout_date=workout_date, + training_type_id=training_type_id, + duration_min=duration_min, + hr_avg=hr_av, + hr_max=hr_mx, + distance_km=dist_km, + kcal_active=kcal_a, + kcal_resting=kcal_r, + ) else: - # Insert new entry - new_id = str(uuid.uuid4()) - cur.execute("""INSERT INTO activity_log - (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting, - hr_avg,hr_max,distance_km,source,training_type_id,training_category,training_subcategory,created) - VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'apple_health',%s,%s,%s,CURRENT_TIMESTAMP)""", - (new_id,pid,workout_date,workout_start_t,row.get('End',''),wtype,duration_min, - kj(row.get('Aktive Energie (kJ)','')),kj(row.get('Ruheeinträge (kJ)','')), - tf(row.get('Durchschn. Herzfrequenz (count/min)','')), - tf(row.get('Max. Herzfrequenz (count/min)','')), - tf(row.get('Distanz (km)','')), - training_type_id,training_category,training_subcategory)) - inserted+=1 - - # Phase 1.2: Auto-evaluation after CSV import INSERT - if EVALUATION_AVAILABLE and training_type_id: - try: - # Build activity dict for evaluation - activity_dict = { - "id": new_id, - "profile_id": pid, - "date": workout_date, - "training_type_id": training_type_id, - "duration_min": duration_min, - "hr_avg": tf(row.get('Durchschn. Herzfrequenz (count/min)','')), - "hr_max": tf(row.get('Max. Herzfrequenz (count/min)','')), - "distance_km": tf(row.get('Distanz (km)','')), - "kcal_active": kj(row.get('Aktive Energie (kJ)','')), - "kcal_resting": kj(row.get('Ruheeinträge (kJ)','')), - "rpe": None, - "pace_min_per_km": None, - "cadence": None, - "elevation_gain": None - } - evaluate_and_save_activity(cur, new_id, activity_dict, training_type_id, pid) - logger.debug(f"[AUTO-EVAL] Evaluated imported activity {new_id}") - except Exception as eval_error: - logger.warning(f"[AUTO-EVAL] Failed to evaluate imported activity {new_id}: {eval_error}") + new_id = new_activity_id() + insert_activity_csv_minimal( + cur, + pid, + new_id, + date_iso=workout_date, + start_time=workout_start_t, + end_time=row.get("End", "") or None, + activity_type=wtype, + duration_min=duration_min, + kcal_active=kcal_a, + kcal_resting=kcal_r, + hr_avg=hr_av, + hr_max=hr_mx, + distance_km=dist_km, + training_type_id=training_type_id, + training_category=training_category, + training_subcategory=training_subcategory, + source="apple_health", + ) + inserted += 1 + run_activity_post_write_hooks_import( + cur, + pid, + new_id, + workout_date=workout_date, + training_type_id=training_type_id, + duration_min=duration_min, + hr_avg=hr_av, + hr_max=hr_mx, + distance_km=dist_km, + kcal_active=kcal_a, + kcal_resting=kcal_r, + ) except Exception as e: logger.warning(f"Import row failed: {e}") skipped+=1 diff --git a/backend/tests/test_csv_import_executor.py b/backend/tests/test_csv_import_executor.py index f5b10cd..d997920 100644 --- a/backend/tests/test_csv_import_executor.py +++ b/backend/tests/test_csv_import_executor.py @@ -249,11 +249,18 @@ def test_run_universal_import_activity_garmin_time_plus_date_columns(monkeypatch cur = _SeqCursor([None, {"id": new_id}]) out = run_universal_csv_import(cur, PID, "activity", text, "garmin.csv", mapping) assert out["rows_imported"] == 1 - # Duplicate-Key muss Datum + kombinierte Startzeit enthalten - assert any( - params and "2024-01-20 08:30:00" in str(params) + # Duplicate-Check: Datum + TIME, IS NOT DISTINCT FROM (kein String-Vergleich für start_time) + dup_sqls = [ + (_sql, params) for _sql, params in cur.executes - if params + if params and "IS NOT DISTINCT FROM" in _sql and "activity_log" in _sql + ] + assert dup_sqls, f"erwarteter Duplicate-SELECT fehlt; executes={cur.executes!r}" + assert any( + len(p) >= 3 + and str(p[1]).startswith("2024-01-20") + and (getattr(p[2], "hour", None) == 8 and getattr(p[2], "minute", None) == 30) + for _, p in dup_sqls )