First Version EAV Importer. feat: Enhance activity detail retrieval with EAV metrics and refactor activity import logic
All checks were successful
Deploy Development / deploy (push) Successful in 53s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s

- Updated the `get_activity_detail` function to include session metrics in the activity detail output, allowing for enriched data representation.
- Refactored the activity import logic to streamline the process of inserting and updating activity records, utilizing new helper functions for better maintainability.
- Improved the handling of duplicate activity entries by implementing a more robust identification mechanism.
- Enhanced the metadata for activity detail registration to reflect the inclusion of EAV metrics and updated source tables.
This commit is contained in:
Lars 2026-04-15 07:25:39 +02:00
parent c6e8371d5a
commit 934b915357
8 changed files with 550 additions and 361 deletions

View File

@ -23,14 +23,6 @@ from csv_parser.type_converter import build_row_after_mapping
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try:
from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity
_EVALUATION_AVAILABLE = True
except Exception: # pragma: no cover
_evaluate_and_save_activity = None
_EVALUATION_AVAILABLE = False
def _resolve_training_type_for_activity(cur, activity_type: str, profile_id: str): def _resolve_training_type_for_activity(cur, activity_type: str, profile_id: str):
"""Lazy import — gleicher DB-Cursor wie der Import (kein verschachteltes get_db / Pool-Deadlock).""" """Lazy import — gleicher DB-Cursor wie der Import (kein verschachteltes get_db / Pool-Deadlock)."""
@ -814,6 +806,15 @@ def _import_activity(
error_details: list, error_details: list,
affected_ids: dict, affected_ids: dict,
) -> dict[str, int]: ) -> dict[str, int]:
from data_layer.activity_time_normalize import normalize_activity_start
from data_layer.activity_persistence_orchestrator import (
find_activity_duplicate_id,
insert_activity_csv_minimal,
new_activity_id,
run_activity_post_write_hooks_import,
update_activity_columns,
)
rows_total = 0 rows_total = 0
inserted = 0 inserted = 0
updated = 0 updated = 0
@ -885,6 +886,7 @@ def _import_activity(
wtype = str(activity_type).strip() wtype = str(activity_type).strip()
iso = date_d.isoformat() iso = date_d.isoformat()
_, workout_start_t = normalize_activity_start(start_key)
# Pro Zeile: bei SQL-Fehler sonst „current transaction is aborted“ bis Xact-Ende. # Pro Zeile: bei SQL-Fehler sonst „current transaction is aborted“ bis Xact-Ende.
cur.execute("SAVEPOINT csv_activity_row") cur.execute("SAVEPOINT csv_activity_row")
@ -892,113 +894,71 @@ def _import_activity(
training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity( training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity(
cur, wtype, profile_id cur, wtype, profile_id
) )
cur.execute( existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t)
"""
SELECT id FROM activity_log
WHERE profile_id = %s AND date = %s AND start_time = %s
""",
(profile_id, iso, start_key),
)
existing = cur.fetchone()
if existing: if existing_id:
eid = existing["id"] update_activity_columns(
cur.execute( cur,
""" profile_id,
UPDATE activity_log existing_id,
SET end_time = %s, {
activity_type = %s, "start_time": workout_start_t,
duration_min = %s, "end_time": end_str or None,
kcal_active = %s, "activity_type": wtype,
kcal_resting = %s,
hr_avg = %s,
hr_max = %s,
distance_km = %s,
training_type_id = %s,
training_category = %s,
training_subcategory = %s,
source = 'csv'
WHERE id = %s
RETURNING id
""",
(
end_str or None,
wtype,
duration_min,
kcal_a,
kcal_r,
hr_a,
hr_m,
dist,
training_type_id,
training_category,
training_subcategory,
eid,
),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["activity_log"].append(str(row["id"]))
aid = eid
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO activity_log (
id, profile_id, date, start_time, end_time, activity_type, duration_min,
kcal_active, kcal_resting, hr_avg, hr_max, distance_km,
source, training_type_id, training_category, training_subcategory, created
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'csv',%s,%s,%s,CURRENT_TIMESTAMP)
RETURNING id
""",
(
eid,
profile_id,
iso,
start_key,
end_str or None,
wtype,
duration_min,
kcal_a,
kcal_r,
hr_a,
hr_m,
dist,
training_type_id,
training_category,
training_subcategory,
),
)
row = cur.fetchone()
inserted += 1
new_entries += 1
if row and row.get("id"):
affected_ids["activity_log"].append(str(row["id"]))
aid = eid
if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity:
try:
activity_dict = {
"id": aid,
"profile_id": profile_id,
"date": iso,
"training_type_id": training_type_id,
"duration_min": duration_min, "duration_min": duration_min,
"kcal_active": kcal_a,
"kcal_resting": kcal_r,
"hr_avg": hr_a, "hr_avg": hr_a,
"hr_max": hr_m, "hr_max": hr_m,
"distance_km": dist, "distance_km": dist,
"kcal_active": kcal_a, "training_type_id": training_type_id,
"kcal_resting": kcal_r, "training_category": training_category,
"rpe": None, "training_subcategory": training_subcategory,
"pace_min_per_km": None, "source": "csv",
"cadence": None, },
"elevation_gain": None, )
} updated += 1
_evaluate_and_save_activity(cur, aid, activity_dict, training_type_id, profile_id) affected_ids["activity_log"].append(str(existing_id))
except Exception as eval_err: aid = existing_id
logger.warning("[csv activity] Auto-Eval fehlgeschlagen: %s", eval_err) else:
eid = new_activity_id()
insert_activity_csv_minimal(
cur,
profile_id,
eid,
date_iso=iso,
start_time=workout_start_t,
end_time=end_str or None,
activity_type=wtype,
duration_min=duration_min,
kcal_active=kcal_a,
kcal_resting=kcal_r,
hr_avg=hr_a,
hr_max=hr_m,
distance_km=dist,
training_type_id=training_type_id,
training_category=training_category,
training_subcategory=training_subcategory,
source="csv",
)
inserted += 1
new_entries += 1
affected_ids["activity_log"].append(str(eid))
aid = eid
run_activity_post_write_hooks_import(
cur,
profile_id,
str(aid),
workout_date=iso,
training_type_id=training_type_id,
duration_min=duration_min,
hr_avg=hr_a,
hr_max=hr_m,
distance_km=dist,
kcal_active=kcal_a,
kcal_resting=kcal_r,
)
cur.execute("RELEASE SAVEPOINT csv_activity_row") cur.execute("RELEASE SAVEPOINT csv_activity_row")
except Exception as e: except Exception as e:
try: try:

View File

@ -124,7 +124,8 @@ def get_activity_detail_data(
"duration_min": int, "duration_min": int,
"kcal_active": int, "kcal_active": int,
"hr_avg": int | None, "hr_avg": int | None,
"training_category": str | None "training_category": str | None,
"session_metrics": list | None, # EAV (enrich_sessions_with_metrics)
}, },
... ...
], ],
@ -143,6 +144,7 @@ def get_activity_detail_data(
cur.execute( cur.execute(
"""SELECT """SELECT
id,
date, date,
activity_type, activity_type,
duration_min, duration_min,
@ -153,7 +155,7 @@ def get_activity_detail_data(
WHERE profile_id=%s AND date >= %s WHERE profile_id=%s AND date >= %s
ORDER BY date DESC ORDER BY date DESC
LIMIT %s""", LIMIT %s""",
(profile_id, cutoff, limit) (profile_id, cutoff, limit),
) )
rows = cur.fetchall() rows = cur.fetchall()
@ -162,19 +164,24 @@ def get_activity_detail_data(
"activities": [], "activities": [],
"total_count": 0, "total_count": 0,
"confidence": "insufficient", "confidence": "insufficient",
"days_analyzed": days "days_analyzed": days,
} }
activities = [] activities = []
for row in rows: for row in rows:
activities.append({ activities.append(
"date": row['date'], {
"activity_type": row['activity_type'], "id": str(row["id"]),
"duration_min": safe_int(row['duration_min']), "date": row["date"],
"kcal_active": safe_int(row['kcal_active']), "activity_type": row["activity_type"],
"hr_avg": safe_int(row['hr_avg']) if row.get('hr_avg') else None, "duration_min": safe_int(row["duration_min"]),
"training_category": row.get('training_category') "kcal_active": safe_int(row["kcal_active"]),
}) "hr_avg": safe_int(row["hr_avg"]) if row.get("hr_avg") else None,
"training_category": row.get("training_category"),
}
)
enrich_sessions_with_metrics(cur, activities)
confidence = calculate_confidence(len(activities), days, "general") confidence = calculate_confidence(len(activities), days, "general")
@ -182,7 +189,7 @@ def get_activity_detail_data(
"activities": activities, "activities": activities,
"total_count": len(activities), "total_count": len(activities),
"confidence": confidence, "confidence": confidence,
"days_analyzed": days "days_analyzed": days,
} }

View File

@ -0,0 +1,281 @@
"""
Zentrale Persistenz für activity_log + EAV-Nebenwirkungen (Eval, SpaltenEAV).
Alle Schreibpfade (REST, Universal-CSV, Legacy-Upload) laufen hier zusammen.
Feld-Katalog für CSV-Mappings: get_mappable_activity_field_catalog()
"""
from __future__ import annotations
import logging
import uuid
from typing import Any, Dict, List, Optional
from models import ActivityEntry
from csv_parser.module_registry import get_module_definition
from data_layer.activity_session_metrics import sync_column_backed_session_metrics
logger = logging.getLogger(__name__)
try:
from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity
_EVALUATION_AVAILABLE = True
except Exception: # pragma: no cover
_evaluate_and_save_activity = None
_EVALUATION_AVAILABLE = False
def find_activity_duplicate_id(
cur,
profile_id: str,
date_iso: str,
start_time: Optional[Any],
) -> Optional[str]:
cur.execute(
"""
SELECT id FROM activity_log
WHERE profile_id = %s AND date = %s::date
AND start_time IS NOT DISTINCT FROM %s::time
""",
(profile_id, date_iso, start_time),
)
row = cur.fetchone()
return str(row["id"]) if row else None
def insert_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
"""INSERT activity_log aus ActivityEntry (manueller API-Pfad)."""
d = e.model_dump()
cur.execute(
"""INSERT INTO activity_log (id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting,
hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain,
temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes,
training_type_id,training_category,training_subcategory,created)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""",
(
eid,
profile_id,
d["date"],
d["start_time"],
d["end_time"],
d["activity_type"],
d["duration_min"],
d["kcal_active"],
d["kcal_resting"],
d["hr_avg"],
d["hr_max"],
d.get("hr_min"),
d["distance_km"],
d.get("pace_min_per_km"),
d.get("cadence"),
d.get("avg_power"),
d.get("elevation_gain"),
d.get("temperature_celsius"),
d.get("humidity_percent"),
d.get("avg_hr_percent"),
d.get("kcal_per_km"),
d["rpe"],
d["source"],
d["notes"],
d.get("training_type_id"),
d.get("training_category"),
d.get("training_subcategory"),
),
)
def update_activity_from_entry(cur, profile_id: str, eid: str, e: ActivityEntry) -> None:
"""Volles UPDATE aus ActivityEntry (REST PUT)."""
d = e.model_dump()
cur.execute(
f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s",
list(d.values()) + [eid, profile_id],
)
def update_activity_columns(
cur,
profile_id: str,
eid: str,
updates: Dict[str, Any],
) -> None:
"""Teil-UPDATE nur für übergebene Spalten (Importe)."""
if not updates:
return
cols = [f"{k} = %s" for k in updates]
vals = list(updates.values()) + [eid, profile_id]
cur.execute(
f"UPDATE activity_log SET {', '.join(cols)} WHERE id = %s AND profile_id = %s",
vals,
)
def insert_activity_csv_minimal(
cur,
profile_id: str,
eid: str,
*,
date_iso: str,
start_time: Any,
end_time: Any,
activity_type: str,
duration_min: Any,
kcal_active: Any,
kcal_resting: Any,
hr_avg: Any,
hr_max: Any,
distance_km: Any,
training_type_id: Any,
training_category: Any,
training_subcategory: Any,
source: str,
) -> None:
"""INSERT minimale activity_log-Zeile (Universal-CSV)."""
cur.execute(
"""
INSERT INTO activity_log (
id, profile_id, date, start_time, end_time, activity_type, duration_min,
kcal_active, kcal_resting, hr_avg, hr_max, distance_km,
source, training_type_id, training_category, training_subcategory, created
)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)
""",
(
eid,
profile_id,
date_iso,
start_time,
end_time,
activity_type,
duration_min,
kcal_active,
kcal_resting,
hr_avg,
hr_max,
distance_km,
source,
training_type_id,
training_category,
training_subcategory,
),
)
def run_activity_post_write_hooks(cur, profile_id: str, eid: str) -> None:
"""Auto-Eval (falls aktiv) + EAV-Spiegel aus activity_log-Spalten."""
if _EVALUATION_AVAILABLE and _evaluate_and_save_activity:
cur.execute(
"""
SELECT id, profile_id, date, training_type_id, duration_min,
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
rpe, pace_min_per_km, cadence, elevation_gain
FROM activity_log
WHERE id = %s
""",
(eid,),
)
activity_row = cur.fetchone()
if activity_row:
activity_dict = dict(activity_row)
training_type_id = activity_dict.get("training_type_id")
if training_type_id:
try:
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
except Exception as eval_error:
logger.error("[AUTO-EVAL] activity %s: %s", eid, eval_error)
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def run_activity_post_write_hooks_import(
cur,
profile_id: str,
eid: str,
*,
workout_date: str,
training_type_id: Optional[int],
duration_min: Any,
hr_avg: Any,
hr_max: Any,
distance_km: Any,
kcal_active: Any,
kcal_resting: Any,
) -> None:
"""Eval + EAV nach Legacy-Import mit vorgebautem Kontext-Dict."""
if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity:
try:
activity_dict = {
"id": eid,
"profile_id": profile_id,
"date": workout_date,
"training_type_id": training_type_id,
"duration_min": duration_min,
"hr_avg": hr_avg,
"hr_max": hr_max,
"distance_km": distance_km,
"kcal_active": kcal_active,
"kcal_resting": kcal_resting,
"rpe": None,
"pace_min_per_km": None,
"cadence": None,
"elevation_gain": None,
}
_evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, profile_id)
except Exception as eval_err:
logger.warning("[activity import] Auto-Eval fehlgeschlagen: %s", eval_err)
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]:
"""
Felder für konfigurierbare Import-Mappings.
core_fields: module_registry activity activity_log.
training_parameters: alle aktiven Parameter (global); bei Anwendung auf eine Session
werden Keys verworfen, die nicht in resolve_activity_attribute_schema(Kategorie/Typ) liegen.
profile_id: reserviert für künftige Profil-Filter.
"""
_ = profile_id
mod = get_module_definition("activity") or {}
fields = mod.get("fields") or {}
core_fields: List[Dict[str, Any]] = []
for key, spec in fields.items():
s = spec or {}
core_fields.append(
{
"key": key,
"target": "activity_log",
"column": key,
"data_type": s.get("type", "string"),
"required": bool(s.get("required")),
"unit": s.get("unit"),
"label_de": key,
}
)
core_fields.sort(key=lambda x: x["key"])
cur.execute(
"""
SELECT id, key, name_de, name_en, category AS param_category,
data_type, unit, source_field
FROM training_parameters
WHERE is_active = true
ORDER BY key
"""
)
parameters = [dict(r) for r in cur.fetchall()]
return {
"core_fields": core_fields,
"training_parameters": parameters,
"notes": (
"training_parameters listet alle aktiven Keys. Pro Session werden Werte ignoriert, "
"die für deren training_category/training_type_id nicht im Attribut-Schema vorkommen."
),
}
def new_activity_id() -> str:
return str(uuid.uuid4())

View File

@ -0,0 +1,30 @@
"""
Einheitliche Startzeit-Normalisierung für Aktivität (CSV, Legacy-Import, Dedupe).
Anbieter-agnostisch: beliebige ISO-/Export-Strings über dateutil.
"""
from __future__ import annotations
from datetime import time as dt_time
from typing import Optional
from dateutil import parser as du_parser
def normalize_activity_start(start_raw: str) -> tuple[str, Optional[dt_time]]:
"""
Roh-String Start aus Exporten (YYYY-MM-DD, TIME ohne μs) für DB Dedupe/INSERT.
Leerer Input ("", None). Fallback bei Parse-Fehler: erstes Datum aus ersten 10 Zeichen.
"""
s = (start_raw or "").strip()
if not s:
return "", None
try:
parsed = du_parser.parse(s, dayfirst=False)
t = parsed.time().replace(microsecond=0)
return parsed.date().isoformat(), t
except (ValueError, TypeError, OverflowError):
if len(s) >= 10:
return s[:10], None
return "", None

View File

@ -127,16 +127,17 @@ def register_activity_group_1():
activity_detail_metadata = PlaceholderMetadata( activity_detail_metadata = PlaceholderMetadata(
key="activity_detail", key="activity_detail",
category="Aktivität", category="Aktivität",
description="Detaillierte Liste der letzten 14 Tage Aktivität", description="Detaillierte Liste der letzten 14 Tage Aktivität (Kopfzeile + EAV-Metriken)",
resolver_module="backend/placeholder_resolver.py", resolver_module="backend/placeholder_resolver.py",
resolver_function="_format_activity_detail", resolver_function="get_activity_detail",
data_layer_module=None, data_layer_module="backend/data_layer/activity_metrics.py",
data_layer_function=None, data_layer_function="get_activity_detail_data",
source_tables=["activity_log", "training_types"], source_tables=["activity_log", "activity_session_metrics", "training_parameters"],
semantic_contract=( semantic_contract=(
"Liefert eine strukturierte Liste aller Trainingseinheiten der letzten 14 Tage. " "Liefert bis zu 50 Einheiten (neueste zuerst) der letzten 14 Tage über "
"Jede Einheit: Datum, Trainingstyp, Dauer (Minuten), optional Notizen. " "get_activity_detail_data: activity_log-Spalten plus "
"Sortiert chronologisch absteigend (neueste zuerst)." "enrich_sessions_with_metrics (activity_session_metrics / Profil-EAV). "
"Formatter hängt nicht-leere EAV-Werte als „| EAV: key=value; …“ an."
), ),
business_meaning=( business_meaning=(
"Detaillierte Trainingshistorie für KI-Prompts, die Muster, Progressionen " "Detaillierte Trainingshistorie für KI-Prompts, die Muster, Progressionen "
@ -147,7 +148,9 @@ def register_activity_group_1():
time_window="14d", time_window="14d",
output_type=OutputType.LIST, output_type=OutputType.LIST,
placeholder_type=PlaceholderType.RAW_DATA, placeholder_type=PlaceholderType.RAW_DATA,
format_hint="Liste von Strings, eine Zeile pro Einheit: 'YYYY-MM-DD: Typ (Dauer min)'", format_hint=(
"Pro Zeile: Datum, Typ, Dauer, kcal, optional HF, optional „| EAV: …“ aus Session-Metriken"
),
example_output=( example_output=(
"2026-03-28: Krafttraining (45 min)\\n" "2026-03-28: Krafttraining (45 min)\\n"
"2026-03-27: Laufen (30 min)\\n" "2026-03-27: Laufen (30 min)\\n"
@ -163,19 +166,15 @@ def register_activity_group_1():
legacy_display="Keine Aktivitätsdaten" legacy_display="Keine Aktivitätsdaten"
), ),
known_limitations=( known_limitations=(
"OLD RESOLVER PATTERN: Keine Data Layer Funktion. " "Keine Profil-Qualitätsfilterung in dieser Liste. Max. 20 Zeilen im Prompt-Output "
"Formatierung direkt im Resolver. " "(Hard-Limit Resolver). Doppelte Spalten (z.B. duration_min in Kopf und EAV) können "
"CRITICAL: Keine Qualitätsfilterung - auch ungültige Einheiten (z.B. 0 min) " "in EAV wiederholt erscheinen — KI kann dominante Spalte nutzen."
"werden gelistet. JOIN mit training_types für Typ-Namen."
), ),
layer_1_decision="NONE - Old resolver pattern (direct SQL in resolver)", layer_1_decision="activity_metrics.get_activity_detail_data (+ enrich_sessions_with_metrics)",
layer_2a_decision="Placeholder Resolver (formatting + SQL query)", layer_2a_decision="get_activity_detail (Formatierung)",
layer_2b_reuse_possible=False, layer_2b_reuse_possible=True,
architecture_alignment=( architecture_alignment="Phase 0c Layer 1 + EAV-Anreicherung",
"NOT ALIGNED with Phase 0c Multi-Layer Architecture. " issue_53_alignment="Layer 1"
"Should be refactored to use data_layer function."
),
issue_53_alignment="NOT ALIGNED - no layer separation"
) )
activity_detail_metadata.set_evidence("key", EvidenceType.CODE_DERIVED) activity_detail_metadata.set_evidence("key", EvidenceType.CODE_DERIVED)

View File

@ -419,14 +419,21 @@ def get_activity_detail(profile_id: str, days: int = 14) -> str:
# Format as readable list (max 20 entries to avoid token bloat) # Format as readable list (max 20 entries to avoid token bloat)
lines = [] lines = []
for activity in data['activities'][:20]: for activity in data["activities"][:20]:
hr_str = f" HF={activity['hr_avg']}" if activity['hr_avg'] else "" hr_str = f", HF={activity['hr_avg']}" if activity.get("hr_avg") else ""
eav_parts = []
for m in activity.get("session_metrics") or []:
k, v = m.get("key"), m.get("value")
if k is None or v is None:
continue
eav_parts.append(f"{k}={v}")
eav_str = f" | EAV: {'; '.join(eav_parts)}" if eav_parts else ""
lines.append( lines.append(
f"{activity['date']}: {activity['activity_type']} " f"{activity['date']}: {activity['activity_type']} "
f"({activity['duration_min']}min, {activity['kcal_active']}kcal{hr_str})" f"({activity['duration_min']}min, {activity['kcal_active']}kcal{hr_str}{eav_str})"
) )
return '\n'.join(lines) return "\n".join(lines)
def get_trainingstyp_verteilung(profile_id: str, days: int = 14) -> str: def get_trainingstyp_verteilung(profile_id: str, days: int = 14) -> str:

View File

@ -9,10 +9,9 @@ import uuid
import logging import logging
import re import re
import calendar import calendar
from datetime import date, time as dt_time from datetime import date
from typing import Optional from typing import Optional
from dateutil import parser as du_parser
from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends, Query from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends, Query
from db import get_db, get_cursor, r2d from db import get_db, get_cursor, r2d
@ -21,7 +20,18 @@ from models import ActivityEntry, ActivityMetricsReplace
from routers.profiles import get_pid from routers.profiles import get_pid
from feature_logger import log_feature_usage from feature_logger import log_feature_usage
from quality_filter import get_quality_filter_sql from quality_filter import get_quality_filter_sql
from data_layer.activity_session_metrics import sync_column_backed_session_metrics from data_layer.activity_persistence_orchestrator import (
get_mappable_activity_field_catalog,
insert_activity_from_entry,
run_activity_post_write_hooks,
update_activity_from_entry,
find_activity_duplicate_id,
update_activity_columns,
insert_activity_csv_minimal,
run_activity_post_write_hooks_import,
new_activity_id,
)
from data_layer.activity_time_normalize import normalize_activity_start
router = APIRouter(prefix="/api/activity", tags=["activity"]) router = APIRouter(prefix="/api/activity", tags=["activity"])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -40,21 +50,6 @@ def _month_date_bounds(ym: str) -> tuple[date, date]:
return date(y, mo, 1), date(y, mo, last) return date(y, mo, 1), date(y, mo, last)
def _normalize_apple_health_start(start_raw: str) -> tuple[str, Optional[dt_time]]:
"""ISO/Apple-Export Start → (YYYY-MM-DD, TIME ohne μs) für stabile Dedupe + INSERT."""
s = (start_raw or "").strip()
if not s:
return "", None
try:
parsed = du_parser.parse(s, dayfirst=False)
t = parsed.time().replace(microsecond=0)
return parsed.date().isoformat(), t
except (ValueError, TypeError, OverflowError):
if len(s) >= 10:
return s[:10], None
return "", None
_ACTIVITY_DEDUP_WINDOW = """ _ACTIVITY_DEDUP_WINDOW = """
PARTITION BY al.profile_id, al.date, PARTITION BY al.profile_id, al.date,
COALESCE(al.activity_type, ''), COALESCE(al.activity_type, ''),
@ -243,69 +238,10 @@ def create_activity(e: ActivityEntry, x_profile_id: Optional[str]=Header(default
) )
eid = str(uuid.uuid4()) eid = str(uuid.uuid4())
d = e.model_dump()
with get_db() as conn: with get_db() as conn:
cur = get_cursor(conn) cur = get_cursor(conn)
cur.execute( insert_activity_from_entry(cur, pid, eid, e)
"""INSERT INTO activity_log run_activity_post_write_hooks(cur, pid, eid)
(id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting,
hr_avg,hr_max,hr_min,distance_km,pace_min_per_km,cadence,avg_power,elevation_gain,
temperature_celsius,humidity_percent,avg_hr_percent,kcal_per_km,rpe,source,notes,
training_type_id,training_category,training_subcategory,created)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP)""",
(
eid,
pid,
d["date"],
d["start_time"],
d["end_time"],
d["activity_type"],
d["duration_min"],
d["kcal_active"],
d["kcal_resting"],
d["hr_avg"],
d["hr_max"],
d.get("hr_min"),
d["distance_km"],
d.get("pace_min_per_km"),
d.get("cadence"),
d.get("avg_power"),
d.get("elevation_gain"),
d.get("temperature_celsius"),
d.get("humidity_percent"),
d.get("avg_hr_percent"),
d.get("kcal_per_km"),
d["rpe"],
d["source"],
d["notes"],
d.get("training_type_id"),
d.get("training_category"),
d.get("training_subcategory"),
),
)
# Phase 1.2: Auto-evaluation after INSERT
if EVALUATION_AVAILABLE:
# Load the activity data to evaluate
cur.execute("""
SELECT id, profile_id, date, training_type_id, duration_min,
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
rpe, pace_min_per_km, cadence, elevation_gain
FROM activity_log
WHERE id = %s
""", (eid,))
activity_row = cur.fetchone()
if activity_row:
activity_dict = dict(activity_row)
training_type_id = activity_dict.get("training_type_id")
if training_type_id:
try:
evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, pid)
logger.info(f"[AUTO-EVAL] Evaluated activity {eid} on INSERT")
except Exception as eval_error:
logger.error(f"[AUTO-EVAL] Failed to evaluate activity {eid}: {eval_error}")
sync_column_backed_session_metrics(cur, str(pid), eid)
# Phase 2: Increment usage counter (always for new entries) # Phase 2: Increment usage counter (always for new entries)
increment_feature_usage(pid, 'activity_entries') increment_feature_usage(pid, 'activity_entries')
@ -414,38 +350,26 @@ def list_uncategorized_activities(
return [r2d(r) for r in cur.fetchall()] return [r2d(r) for r in cur.fetchall()]
@router.get("/mappable-fields")
def get_activity_mappable_fields(session: dict = Depends(require_auth)):
"""
Vollständiger Katalog für Import-Mappings (activity_log-Kernfelder + alle aktiven training_parameters).
Werte für Keys ohne Schema zur konkreten Session werden beim Import ignoriert.
"""
pid = str(session["profile_id"])
with get_db() as conn:
cur = get_cursor(conn)
return get_mappable_activity_field_catalog(cur, pid)
@router.put("/{eid}") @router.put("/{eid}")
def update_activity(eid: str, e: ActivityEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): def update_activity(eid: str, e: ActivityEntry, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Update existing activity entry.""" """Update existing activity entry."""
pid = get_pid(x_profile_id) pid = get_pid(x_profile_id)
with get_db() as conn: with get_db() as conn:
d = e.model_dump()
cur = get_cursor(conn) cur = get_cursor(conn)
cur.execute(f"UPDATE activity_log SET {', '.join(f'{k}=%s' for k in d)} WHERE id=%s AND profile_id=%s", update_activity_from_entry(cur, pid, eid, e)
list(d.values())+[eid,pid]) run_activity_post_write_hooks(cur, pid, eid)
# Phase 1.2: Auto-evaluation after UPDATE
if EVALUATION_AVAILABLE:
# Load the updated activity data to evaluate
cur.execute("""
SELECT id, profile_id, date, training_type_id, duration_min,
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
rpe, pace_min_per_km, cadence, elevation_gain
FROM activity_log
WHERE id = %s
""", (eid,))
activity_row = cur.fetchone()
if activity_row:
activity_dict = dict(activity_row)
training_type_id = activity_dict.get("training_type_id")
if training_type_id:
try:
evaluate_and_save_activity(cur, eid, activity_dict, training_type_id, pid)
logger.info(f"[AUTO-EVAL] Re-evaluated activity {eid} on UPDATE")
except Exception as eval_error:
logger.error(f"[AUTO-EVAL] Failed to re-evaluate activity {eid}: {eval_error}")
sync_column_backed_session_metrics(cur, str(pid), eid)
return {"id":eid} return {"id":eid}
@ -647,7 +571,10 @@ def bulk_categorize_activities(
@router.post("/import-csv") @router.post("/import-csv")
async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Import Apple Health workout CSV with automatic training type mapping.""" """
Legacy-Upload (Apple Health Workout-CSV-Spaltennamen).
Persistenz läuft über activity_persistence_orchestrator gleiche Schicht wie Universal-CSV.
"""
pid = get_pid(x_profile_id) pid = get_pid(x_profile_id)
raw = await file.read() raw = await file.read()
try: text = raw.decode('utf-8') try: text = raw.decode('utf-8')
@ -663,7 +590,7 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
start = row.get('Start','').strip() start = row.get('Start','').strip()
if not wtype or not start: if not wtype or not start:
continue continue
workout_date, workout_start_t = _normalize_apple_health_start(start) workout_date, workout_start_t = normalize_activity_start(start)
if not workout_date: if not workout_date:
continue continue
dur = row.get('Duration','').strip() dur = row.get('Duration','').strip()
@ -682,111 +609,82 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
# Map activity_type to training_type_id using database mappings # Map activity_type to training_type_id using database mappings
training_type_id, training_category, training_subcategory = get_training_type_for_activity(wtype, pid) training_type_id, training_category, training_subcategory = get_training_type_for_activity(wtype, pid)
kcal_a = kj(row.get("Aktive Energie (kJ)", ""))
kcal_r = kj(row.get("Ruheeinträge (kJ)", ""))
hr_av = tf(row.get("Durchschn. Herzfrequenz (count/min)", ""))
hr_mx = tf(row.get("Max. Herzfrequenz (count/min)", ""))
dist_km = tf(row.get("Distanz (km)", ""))
try: try:
# Duplicate detection: normiertes Datum + TIME (Apple-Export kann Start in verschiedenen Formaten liefern) existing_id = find_activity_duplicate_id(cur, pid, workout_date, workout_start_t)
cur.execute( if existing_id:
""" update_activity_columns(
SELECT id FROM activity_log cur,
WHERE profile_id = %s AND date = %s::date pid,
AND start_time IS NOT DISTINCT FROM %s::time str(existing_id),
""", {
(pid, workout_date, workout_start_t), "start_time": workout_start_t,
) "end_time": row.get("End", "") or None,
existing = cur.fetchone() "activity_type": wtype,
"duration_min": duration_min,
if existing: "kcal_active": kcal_a,
# Update existing entry (e.g., to add training type mapping) "kcal_resting": kcal_r,
existing_id = existing['id'] "hr_avg": hr_av,
cur.execute(""" "hr_max": hr_mx,
UPDATE activity_log "distance_km": dist_km,
SET start_time = %s, "training_type_id": training_type_id,
end_time = %s, "training_category": training_category,
activity_type = %s, "training_subcategory": training_subcategory,
duration_min = %s, },
kcal_active = %s, )
kcal_resting = %s, skipped += 1
hr_avg = %s, run_activity_post_write_hooks_import(
hr_max = %s, cur,
distance_km = %s, pid,
training_type_id = %s, str(existing_id),
training_category = %s, workout_date=workout_date,
training_subcategory = %s training_type_id=training_type_id,
WHERE id = %s duration_min=duration_min,
""", ( hr_avg=hr_av,
workout_start_t, row.get('End',''), wtype, duration_min, hr_max=hr_mx,
kj(row.get('Aktive Energie (kJ)','')), distance_km=dist_km,
kj(row.get('Ruheeinträge (kJ)','')), kcal_active=kcal_a,
tf(row.get('Durchschn. Herzfrequenz (count/min)','')), kcal_resting=kcal_r,
tf(row.get('Max. Herzfrequenz (count/min)','')), )
tf(row.get('Distanz (km)','')),
training_type_id, training_category, training_subcategory,
existing_id
))
skipped += 1 # Count as skipped (not newly inserted)
# Phase 1.2: Auto-evaluation after CSV import UPDATE
if EVALUATION_AVAILABLE and training_type_id:
try:
# Build activity dict for evaluation
activity_dict = {
"id": existing_id,
"profile_id": pid,
"date": workout_date,
"training_type_id": training_type_id,
"duration_min": duration_min,
"hr_avg": tf(row.get('Durchschn. Herzfrequenz (count/min)','')),
"hr_max": tf(row.get('Max. Herzfrequenz (count/min)','')),
"distance_km": tf(row.get('Distanz (km)','')),
"kcal_active": kj(row.get('Aktive Energie (kJ)','')),
"kcal_resting": kj(row.get('Ruheeinträge (kJ)','')),
"rpe": None,
"pace_min_per_km": None,
"cadence": None,
"elevation_gain": None
}
evaluate_and_save_activity(cur, existing_id, activity_dict, training_type_id, pid)
logger.debug(f"[AUTO-EVAL] Re-evaluated updated activity {existing_id}")
except Exception as eval_error:
logger.warning(f"[AUTO-EVAL] Failed to re-evaluate updated activity {existing_id}: {eval_error}")
else: else:
# Insert new entry new_id = new_activity_id()
new_id = str(uuid.uuid4()) insert_activity_csv_minimal(
cur.execute("""INSERT INTO activity_log cur,
(id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting, pid,
hr_avg,hr_max,distance_km,source,training_type_id,training_category,training_subcategory,created) new_id,
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'apple_health',%s,%s,%s,CURRENT_TIMESTAMP)""", date_iso=workout_date,
(new_id,pid,workout_date,workout_start_t,row.get('End',''),wtype,duration_min, start_time=workout_start_t,
kj(row.get('Aktive Energie (kJ)','')),kj(row.get('Ruheeinträge (kJ)','')), end_time=row.get("End", "") or None,
tf(row.get('Durchschn. Herzfrequenz (count/min)','')), activity_type=wtype,
tf(row.get('Max. Herzfrequenz (count/min)','')), duration_min=duration_min,
tf(row.get('Distanz (km)','')), kcal_active=kcal_a,
training_type_id,training_category,training_subcategory)) kcal_resting=kcal_r,
inserted+=1 hr_avg=hr_av,
hr_max=hr_mx,
# Phase 1.2: Auto-evaluation after CSV import INSERT distance_km=dist_km,
if EVALUATION_AVAILABLE and training_type_id: training_type_id=training_type_id,
try: training_category=training_category,
# Build activity dict for evaluation training_subcategory=training_subcategory,
activity_dict = { source="apple_health",
"id": new_id, )
"profile_id": pid, inserted += 1
"date": workout_date, run_activity_post_write_hooks_import(
"training_type_id": training_type_id, cur,
"duration_min": duration_min, pid,
"hr_avg": tf(row.get('Durchschn. Herzfrequenz (count/min)','')), new_id,
"hr_max": tf(row.get('Max. Herzfrequenz (count/min)','')), workout_date=workout_date,
"distance_km": tf(row.get('Distanz (km)','')), training_type_id=training_type_id,
"kcal_active": kj(row.get('Aktive Energie (kJ)','')), duration_min=duration_min,
"kcal_resting": kj(row.get('Ruheeinträge (kJ)','')), hr_avg=hr_av,
"rpe": None, hr_max=hr_mx,
"pace_min_per_km": None, distance_km=dist_km,
"cadence": None, kcal_active=kcal_a,
"elevation_gain": None kcal_resting=kcal_r,
} )
evaluate_and_save_activity(cur, new_id, activity_dict, training_type_id, pid)
logger.debug(f"[AUTO-EVAL] Evaluated imported activity {new_id}")
except Exception as eval_error:
logger.warning(f"[AUTO-EVAL] Failed to evaluate imported activity {new_id}: {eval_error}")
except Exception as e: except Exception as e:
logger.warning(f"Import row failed: {e}") logger.warning(f"Import row failed: {e}")
skipped+=1 skipped+=1

View File

@ -249,11 +249,18 @@ def test_run_universal_import_activity_garmin_time_plus_date_columns(monkeypatch
cur = _SeqCursor([None, {"id": new_id}]) cur = _SeqCursor([None, {"id": new_id}])
out = run_universal_csv_import(cur, PID, "activity", text, "garmin.csv", mapping) out = run_universal_csv_import(cur, PID, "activity", text, "garmin.csv", mapping)
assert out["rows_imported"] == 1 assert out["rows_imported"] == 1
# Duplicate-Key muss Datum + kombinierte Startzeit enthalten # Duplicate-Check: Datum + TIME, IS NOT DISTINCT FROM (kein String-Vergleich für start_time)
assert any( dup_sqls = [
params and "2024-01-20 08:30:00" in str(params) (_sql, params)
for _sql, params in cur.executes for _sql, params in cur.executes
if params if params and "IS NOT DISTINCT FROM" in _sql and "activity_log" in _sql
]
assert dup_sqls, f"erwarteter Duplicate-SELECT fehlt; executes={cur.executes!r}"
assert any(
len(p) >= 3
and str(p[1]).startswith("2024-01-20")
and (getattr(p[2], "hour", None) == 8 and getattr(p[2], "minute", None) == 30)
for _, p in dup_sqls
) )