""" CSV → Zieltabellen: Upsert, Fehlerliste, affected_ids für csv_import_log (Issue #21). """ from __future__ import annotations import datetime as dt import uuid from collections import defaultdict from typing import Any import logging from csv_parser.core import iter_csv_dict_rows from csv_parser.import_row_processing import ( aggregate_mapped_rows, resolve_import_row_processing, validate_import_row_processing, ) from csv_parser.module_registry import get_module_definition from csv_parser.type_converter import build_row_after_mapping logger = logging.getLogger(__name__) try: from evaluation_helper import evaluate_and_save_activity as _evaluate_and_save_activity _EVALUATION_AVAILABLE = True except Exception: # pragma: no cover _evaluate_and_save_activity = None _EVALUATION_AVAILABLE = False def _resolve_training_type_for_activity(cur, activity_type: str, profile_id: str): """Lazy import — gleicher DB-Cursor wie der Import (kein verschachteltes get_db / Pool-Deadlock).""" from routers.activity import get_training_type_for_activity_with_cursor return get_training_type_for_activity_with_cursor(cur, activity_type, profile_id) def coerce_date(val: Any) -> dt.date | None: if val is None: return None if isinstance(val, dt.datetime): return val.date() if isinstance(val, dt.date): return val return None def _derive_bp_context(hour: int) -> str: if 5 <= hour < 10: return "morning_fasted" if 18 <= hour < 23: return "evening" return "other" def run_universal_csv_import( cur, profile_id: str, module: str, text: str, filename: str, mapping: dict[str, Any], ) -> dict[str, Any]: """ Nutzt cur innerhalb einer bestehenden Transaktion. Gibt Statistik + affected_ids (+ error_details) zurück. """ mod = get_module_definition(module) if not mod: raise ValueError(f"Unbekanntes Modul: {module}") rows_total = 0 error_details: list[dict[str, Any]] = [] affected_ids: dict[str, list[str]] = defaultdict(list) if module == "sleep": from csv_parser.sleep_apple_import import import_apple_sleep_nights try: r = import_apple_sleep_nights(cur, profile_id, text) except ValueError as e: raise ValueError(str(e)) from e error_details.extend(r.get("error_details") or []) for sid in r.get("affected_ids") or []: affected_ids["sleep_log"].append(sid) return { "rows_total": r["rows_total"], "rows_imported": r["inserted"], "rows_updated": r["updated"], "rows_skipped": r["skipped"], "rows_errors": len(error_details), "error_details": error_details[:50], "new_entries": r.get("new_entries", r["inserted"]), "affected_ids": dict(affected_ids), } fm = mapping.get("field_mappings") or {} if isinstance(fm, str): raise ValueError("field_mappings muss ein Objekt sein") tc = mapping.get("type_conversions") if tc is not None and not isinstance(tc, dict): tc = None delim = mapping.get("delimiter") or "," has_header = mapping.get("has_header", True) if module == "nutrition": stats = _import_nutrition( cur, profile_id, text, delim, bool(has_header), fm, tc, mapping, error_details, affected_ids, ) rows_total = stats.pop("rows_total") elif module == "weight": stats = _import_weight( cur, profile_id, text, delim, bool(has_header), fm, tc, mapping, error_details, affected_ids, ) rows_total = stats.pop("rows_total") elif module == "blood_pressure": stats = _import_blood_pressure( cur, profile_id, text, delim, bool(has_header), fm, tc, error_details, affected_ids, ) rows_total = stats.pop("rows_total") elif module == "activity": stats = _import_activity( cur, profile_id, text, delim, bool(has_header), fm, tc, error_details, affected_ids, ) rows_total = stats.pop("rows_total") elif module == "vitals_baseline": stats = _import_vitals_baseline( cur, profile_id, text, delim, bool(has_header), fm, tc, mapping, error_details, affected_ids, ) rows_total = stats.pop("rows_total") else: raise ValueError(f"Modul '{module}' wird für Universal-Import noch nicht unterstützt") out = { "rows_total": rows_total, "rows_imported": stats.get("inserted", 0), "rows_updated": stats.get("updated", 0), "rows_skipped": stats.get("skipped", 0), "rows_errors": len(error_details), "error_details": error_details[:50], "new_entries": stats.get("new_entries", stats.get("inserted", 0)), "affected_ids": dict(affected_ids), } return out def _import_nutrition( cur, profile_id: str, text: str, delim: str, has_header: bool, fm: dict, tc: dict | None, mapping: dict[str, Any], error_details: list, affected_ids: dict, ) -> dict[str, int]: spec = resolve_import_row_processing("nutrition", mapping) mapped_rows: list[dict[str, Any]] = [] rows_total = 0 for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): rows_total += 1 mapped = build_row_after_mapping(csv_row, fm, tc, module="nutrition") d = coerce_date(mapped.get("date")) if d is None: error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"}) continue mapped["date"] = d mapped_rows.append(mapped) if spec: try: validate_import_row_processing("nutrition", spec, fm) except ValueError as e: raise ValueError(str(e)) from e merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec) error_details.extend(agg_notes) else: merged_rows = list(mapped_rows) agg_notes = [] skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel") inserted = 0 updated = 0 new_entries = 0 for merged in merged_rows: d = coerce_date(merged.get("date")) if d is None: continue iso = d.isoformat() def _sf_macro(x: Any) -> float: if x is None or x == "": return 0.0 try: return float(x) except (TypeError, ValueError): return 0.0 kcal = round(_sf_macro(merged.get("kcal")), 1) fat = round(_sf_macro(merged.get("fat_g")), 1) carbs = round(_sf_macro(merged.get("carbs_g")), 1) prot = round(_sf_macro(merged.get("protein_g")), 1) if kcal == 0 and fat == 0 and carbs == 0 and prot == 0: continue cur.execute( "SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s", (profile_id, iso), ) existing = cur.fetchone() if existing: cur.execute( """ UPDATE nutrition_log SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='csv' WHERE profile_id=%s AND date=%s RETURNING id """, (kcal, prot, fat, carbs, profile_id, iso), ) row = cur.fetchone() updated += 1 if row and row.get("id"): affected_ids["nutrition_log"].append(str(row["id"])) else: eid = str(uuid.uuid4()) cur.execute( """ INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created) VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP) """, (eid, profile_id, iso, kcal, prot, fat, carbs), ) inserted += 1 new_entries += 1 affected_ids["nutrition_log"].append(eid) return { "rows_total": rows_total, "inserted": inserted, "updated": updated, "skipped": skipped_groups, "new_entries": new_entries, } def _import_weight( cur, profile_id: str, text: str, delim: str, has_header: bool, fm: dict, tc: dict | None, mapping: dict[str, Any], error_details: list, affected_ids: dict, ) -> dict[str, int]: spec = resolve_import_row_processing("weight", mapping) mapped_rows: list[dict[str, Any]] = [] rows_total = 0 for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): rows_total += 1 mapped = build_row_after_mapping(csv_row, fm, tc, module="weight") d = coerce_date(mapped.get("date")) w = mapped.get("weight") if d is None: error_details.append({"row": rows_total, "error": "Datum fehlt"}) continue if w is None: error_details.append({"row": rows_total, "error": "Gewicht fehlt"}) continue try: float(w) except (TypeError, ValueError): error_details.append({"row": rows_total, "error": "Gewicht ungültig"}) continue mapped["date"] = d mapped_rows.append(mapped) if spec: try: validate_import_row_processing("weight", spec, fm) except ValueError as e: raise ValueError(str(e)) from e merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec) error_details.extend(agg_notes) else: merged_rows = list(mapped_rows) agg_notes = [] skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel") inserted = 0 updated = 0 new_entries = 0 for merged in merged_rows: d = coerce_date(merged.get("date")) w = merged.get("weight") note = merged.get("note") if d is None: continue if w is None: continue try: w = float(w) except (TypeError, ValueError): continue iso = d.isoformat() cur.execute( "SELECT id FROM weight_log WHERE profile_id=%s AND date=%s", (profile_id, iso), ) existing = cur.fetchone() if existing: cur.execute( """ UPDATE weight_log SET weight=%s, note=COALESCE(%s, note), source='csv' WHERE profile_id=%s AND date=%s RETURNING id """, (w, note, profile_id, iso), ) row = cur.fetchone() updated += 1 if row and row.get("id"): affected_ids["weight_log"].append(str(row["id"])) else: eid = str(uuid.uuid4()) cur.execute( """ INSERT INTO weight_log (id, profile_id, date, weight, note, source, created) VALUES (%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP) """, (eid, profile_id, iso, w, note), ) inserted += 1 new_entries += 1 affected_ids["weight_log"].append(eid) return { "rows_total": rows_total, "inserted": inserted, "updated": updated, "skipped": skipped_groups, "new_entries": new_entries, } def diagnose_blood_pressure_row(mapped_typed: dict[str, Any]) -> dict[str, Any]: """Zeigt, ob Datum/Zeit nach Vorlage + Alias + Apple-Start-Spalte erkannt werden.""" md = coerce_date(mapped_typed.get("measured_date")) mt = mapped_typed.get("measured_time") st_combined = mapped_typed.get("start_time") if isinstance(st_combined, dt.datetime): if md is None: md = st_combined.date() if mt is None: mt = st_combined.time() elif isinstance(st_combined, str) and st_combined.strip() and (md is None or mt is None): try: from dateutil import parser as du_parser dtp = du_parser.parse(st_combined.strip()) if md is None: md = dtp.date() if mt is None: mt = dtp.time() except (ValueError, TypeError, OverflowError): pass sys_v = mapped_typed.get("systolic") dia_v = mapped_typed.get("diastolic") try: int(sys_v) int(dia_v) ok_bp = True except (TypeError, ValueError): ok_bp = False return { "measured_date_iso": md.isoformat() if md else None, "has_measured_time": mt is not None, "start_time_raw_type": type(st_combined).__name__ if st_combined is not None else None, "systolic_ok": ok_bp, "would_reach_insert_check": md is not None and mt is not None, } def diagnose_activity_row(mapped_typed: dict[str, Any]) -> dict[str, Any]: activity_type = mapped_typed.get("activity_type") start_raw = mapped_typed.get("start_time") date_d = coerce_date(mapped_typed.get("date")) start_key: str | None = None fail_hint: str | None = None if isinstance(start_raw, dt.datetime): start_key = start_raw.strftime("%Y-%m-%d %H:%M:%S") if date_d is None: date_d = start_raw.date() elif isinstance(start_raw, dt.time): if date_d is None: fail_hint = "startzeit_ohne_datum" else: start_key = f"{date_d.isoformat()} {start_raw.strftime('%H:%M:%S')}" elif isinstance(start_raw, str) and start_raw.strip(): s = start_raw.strip() if date_d is not None and _looks_like_time_only(s): start_key = f"{date_d.isoformat()} {s}" else: start_key = s if date_d is None and len(start_key) >= 10: for fmt in ("%Y-%m-%d", "%d.%m.%Y"): try: date_d = dt.datetime.strptime(start_key[:10], fmt).date() break except ValueError: continue has_type = bool(activity_type and str(activity_type).strip()) ok = has_type and date_d is not None and bool(start_key) if fail_hint is None and not has_type: fail_hint = "trainingsart_fehlt" elif fail_hint is None and not ok: fail_hint = "datum_start_fehlt" return { "activity_type_preview": (str(activity_type).strip()[:80] if activity_type else None), "date_iso": date_d.isoformat() if date_d else None, "start_key_preview": (start_key[:80] if start_key else None), "would_pass_row_gate": ok, "fail_hint": fail_hint, } def _import_blood_pressure( cur, profile_id: str, text: str, delim: str, has_header: bool, fm: dict, tc: dict | None, error_details: list, affected_ids: dict, ) -> dict[str, int]: rows_total = 0 inserted = 0 updated = 0 skipped = 0 for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): rows_total += 1 mapped = build_row_after_mapping(csv_row, fm, tc, module="blood_pressure") md = coerce_date(mapped.get("measured_date")) mt = mapped.get("measured_time") st_combined = mapped.get("start_time") if isinstance(st_combined, dt.datetime): if md is None: md = st_combined.date() if mt is None: mt = st_combined.time() elif isinstance(st_combined, str) and st_combined.strip() and (md is None or mt is None): try: from dateutil import parser as du_parser dtp = du_parser.parse(st_combined.strip()) if md is None: md = dtp.date() if mt is None: mt = dtp.time() except (ValueError, TypeError, OverflowError): pass if md is None: error_details.append({"row": rows_total, "error": "Datum fehlt"}) continue if mt is None: error_details.append({"row": rows_total, "error": "Zeit fehlt"}) continue if isinstance(mt, str): try: parts = mt.replace(".", ":").split(":") if len(parts) >= 2: mt = dt.time(int(parts[0]), int(parts[1]), int(parts[2]) if len(parts) > 2 else 0) else: raise ValueError() except Exception: error_details.append({"row": rows_total, "error": "Zeit ungültig"}) continue if not isinstance(mt, dt.time): error_details.append({"row": rows_total, "error": "Zeitformat wird nicht unterstützt"}) continue systolic = mapped.get("systolic") diastolic = mapped.get("diastolic") pulse = mapped.get("pulse") try: sys_i = int(systolic) dia_i = int(diastolic) except (TypeError, ValueError): error_details.append({"row": rows_total, "error": "Blutdruckwerte fehlen oder ungültig"}) continue pulse_i = int(pulse) if pulse is not None else None measured_at = dt.datetime.combine(md, mt) hour = mt.hour context = _derive_bp_context(hour) cur.execute( """ SELECT id FROM blood_pressure_log WHERE profile_id = %s AND measured_at = %s """, (profile_id, measured_at), ) existing_bp = cur.fetchone() if existing_bp: cur.execute( """ UPDATE blood_pressure_log SET systolic = %s, diastolic = %s, pulse = %s, context = %s, source = 'csv' WHERE profile_id = %s AND measured_at = %s RETURNING id """, (sys_i, dia_i, pulse_i, context, profile_id, measured_at), ) row = cur.fetchone() updated += 1 if row and row.get("id"): affected_ids["blood_pressure_log"].append(str(row["id"])) else: cur.execute( """ INSERT INTO blood_pressure_log ( profile_id, measured_at, systolic, diastolic, pulse, context, source ) VALUES (%s, %s, %s, %s, %s, %s, 'csv') RETURNING id """, (profile_id, measured_at, sys_i, dia_i, pulse_i, context), ) row = cur.fetchone() inserted += 1 if row and row.get("id"): affected_ids["blood_pressure_log"].append(str(row["id"])) return { "rows_total": rows_total, "inserted": inserted, "updated": updated, "skipped": skipped, "new_entries": inserted, } def _v_safe_int(value: Any) -> int | None: if value is None or value == "": return None try: if isinstance(value, float): return int(value) s = str(value).strip() if "." in s: return int(float(s)) return int(s) except (ValueError, TypeError): return None def _v_safe_float(value: Any) -> float | None: if value is None or value == "": return None try: return float(value) except (ValueError, TypeError): return None def diagnose_vitals_row(mapped_typed: dict[str, Any]) -> dict[str, Any]: """Erklärt Vital-Baseline-Zeile nach Typkonvertierung (ohne DB).""" d = coerce_date(mapped_typed.get("date")) rhr = _v_safe_int(mapped_typed.get("resting_hr")) hrv = _v_safe_int(mapped_typed.get("hrv")) vo2 = _v_safe_float(mapped_typed.get("vo2_max")) spo2 = _v_safe_int(mapped_typed.get("spo2")) resp = _v_safe_float(mapped_typed.get("respiratory_rate")) has_metric = any(x is not None for x in (rhr, hrv, vo2, spo2, resp)) date_raw = mapped_typed.get("date") return { "date_coerced_iso": d.isoformat() if d else None, "date_after_convert_repr": repr(date_raw), "date_after_convert_type": type(date_raw).__name__, "metrics": { "resting_hr": rhr, "hrv": hrv, "vo2_max": vo2, "spo2": spo2, "respiratory_rate": resp, }, "would_pass_prefilter": d is not None and has_metric, "prefilter_fail_reason": ( "datum_fehlt" if d is None else ("keine_baseline_metrik" if not has_metric else None) ), } def _import_vitals_baseline( cur, profile_id: str, text: str, delim: str, has_header: bool, fm: dict, tc: dict | None, mapping: dict[str, Any], error_details: list, affected_ids: dict, ) -> dict[str, int]: spec = resolve_import_row_processing("vitals_baseline", mapping) mapped_rows: list[dict[str, Any]] = [] rows_total = 0 skipped_prefilter = 0 for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): rows_total += 1 mapped = build_row_after_mapping(csv_row, fm, tc, module="vitals_baseline") d = coerce_date(mapped.get("date")) if d is None: error_details.append({"row": rows_total, "error": "Datum fehlt"}) continue rhr = _v_safe_int(mapped.get("resting_hr")) hrv = _v_safe_int(mapped.get("hrv")) vo2 = _v_safe_float(mapped.get("vo2_max")) spo2 = _v_safe_int(mapped.get("spo2")) resp = _v_safe_float(mapped.get("respiratory_rate")) if not any(x is not None for x in (rhr, hrv, vo2, spo2, resp)): skipped_prefilter += 1 continue mapped["date"] = d mapped_rows.append(mapped) if spec: try: validate_import_row_processing("vitals_baseline", spec, fm) except ValueError as e: raise ValueError(str(e)) from e merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec) error_details.extend(agg_notes) else: merged_rows = list(mapped_rows) agg_notes = [] skipped_merge = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel") inserted = 0 updated = 0 skipped = skipped_prefilter + skipped_merge for merged in merged_rows: d = coerce_date(merged.get("date")) if d is None: continue rhr = _v_safe_int(merged.get("resting_hr")) hrv = _v_safe_int(merged.get("hrv")) vo2 = _v_safe_float(merged.get("vo2_max")) spo2 = _v_safe_int(merged.get("spo2")) resp = _v_safe_float(merged.get("respiratory_rate")) if not any(x is not None for x in (rhr, hrv, vo2, spo2, resp)): skipped += 1 continue iso = d.isoformat() try: # Ohne SAVEPOINT: erster fehlgeschlagener INSERT setzt die Xact auf „aborted“, # alle folgenden Queries + commit() schlagen fehl → generischer 500. cur.execute("SAVEPOINT vitals_csv_row") cur.execute( """ INSERT INTO vitals_baseline ( profile_id, date, resting_hr, hrv, vo2_max, spo2, respiratory_rate, source ) VALUES (%s, %s, %s, %s, %s, %s, %s, 'csv') ON CONFLICT (profile_id, date) DO UPDATE SET resting_hr = COALESCE(EXCLUDED.resting_hr, vitals_baseline.resting_hr), hrv = COALESCE(EXCLUDED.hrv, vitals_baseline.hrv), vo2_max = COALESCE(EXCLUDED.vo2_max, vitals_baseline.vo2_max), spo2 = COALESCE(EXCLUDED.spo2, vitals_baseline.spo2), respiratory_rate = COALESCE(EXCLUDED.respiratory_rate, vitals_baseline.respiratory_rate), updated_at = NOW() WHERE vitals_baseline.source != 'manual' RETURNING (xmax = 0) AS inserted, id """, (profile_id, iso, rhr, hrv, vo2, spo2, resp), ) result = cur.fetchone() if result is None: skipped += 1 elif result.get("inserted"): inserted += 1 if result.get("id"): affected_ids["vitals_baseline"].append(str(result["id"])) else: updated += 1 if result.get("id"): affected_ids["vitals_baseline"].append(str(result["id"])) cur.execute("RELEASE SAVEPOINT vitals_csv_row") except Exception as e: try: cur.execute("ROLLBACK TO SAVEPOINT vitals_csv_row") except Exception: pass error_details.append( {"row": rows_total, "error": str(e), "context": "vitals_baseline upsert"}, ) return { "rows_total": rows_total, "inserted": inserted, "updated": updated, "skipped": skipped, "new_entries": inserted, } def _sf_act(val: Any) -> float | None: try: return round(float(val), 1) if val is not None else None except (TypeError, ValueError): return None def _looks_like_time_only(s: str) -> bool: t = s.strip() if not t or " " in t: return False parts = t.split(":") if len(parts) < 2 or len(parts) > 3: return False try: for p in parts: int(p) return True except ValueError: return False def _import_activity( cur, profile_id: str, text: str, delim: str, has_header: bool, fm: dict, tc: dict | None, error_details: list, affected_ids: dict, ) -> dict[str, int]: rows_total = 0 inserted = 0 updated = 0 new_entries = 0 for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): rows_total += 1 mapped = build_row_after_mapping(csv_row, fm, tc, module="activity") activity_type = mapped.get("activity_type") if not activity_type or not str(activity_type).strip(): error_details.append({"row": rows_total, "error": "Trainingsart fehlt"}) continue start_raw = mapped.get("start_time") date_d = coerce_date(mapped.get("date")) start_key: str | None = None if isinstance(start_raw, dt.datetime): start_key = start_raw.strftime("%Y-%m-%d %H:%M:%S") if date_d is None: date_d = start_raw.date() elif isinstance(start_raw, dt.date): date_d = start_raw start_key = f"{start_raw.isoformat()} 00:00:00" elif isinstance(start_raw, dt.time): if date_d is None: error_details.append( {"row": rows_total, "error": "Startzeit (Uhrzeit) ohne Datumsspalte"} ) continue start_key = f"{date_d.isoformat()} {start_raw.strftime('%H:%M:%S')}" elif isinstance(start_raw, str) and start_raw.strip(): s = start_raw.strip() if date_d is not None and _looks_like_time_only(s): start_key = f"{date_d.isoformat()} {s}" else: start_key = s if date_d is None and len(start_key) >= 10: for fmt in ("%Y-%m-%d", "%d.%m.%Y"): try: date_d = dt.datetime.strptime(start_key[:10], fmt).date() break except ValueError: continue if date_d is None or not start_key: error_details.append({"row": rows_total, "error": "Datum/Startzeit fehlt oder ungültig"}) continue end_raw = mapped.get("end_time") if isinstance(end_raw, dt.datetime): end_str = end_raw.strftime("%Y-%m-%d %H:%M:%S") elif isinstance(end_raw, str): end_str = end_raw.strip() else: end_str = "" duration_min = mapped.get("duration_min") if duration_min is not None: try: duration_min = round(float(duration_min), 1) except (TypeError, ValueError): duration_min = None kcal_a = _sf_act(mapped.get("kcal_active")) kcal_r = _sf_act(mapped.get("kcal_resting")) hr_a = _sf_act(mapped.get("hr_avg")) hr_m = _sf_act(mapped.get("hr_max")) dist = _sf_act(mapped.get("distance_km")) wtype = str(activity_type).strip() training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity( cur, wtype, profile_id ) iso = date_d.isoformat() try: cur.execute( """ SELECT id FROM activity_log WHERE profile_id = %s AND date = %s AND start_time = %s """, (profile_id, iso, start_key), ) existing = cur.fetchone() if existing: eid = existing["id"] cur.execute( """ UPDATE activity_log SET end_time = %s, activity_type = %s, duration_min = %s, kcal_active = %s, kcal_resting = %s, hr_avg = %s, hr_max = %s, distance_km = %s, training_type_id = %s, training_category = %s, training_subcategory = %s, source = 'csv' WHERE id = %s RETURNING id """, ( end_str or None, wtype, duration_min, kcal_a, kcal_r, hr_a, hr_m, dist, training_type_id, training_category, training_subcategory, eid, ), ) row = cur.fetchone() updated += 1 if row and row.get("id"): affected_ids["activity_log"].append(str(row["id"])) aid = eid else: eid = str(uuid.uuid4()) cur.execute( """ INSERT INTO activity_log ( id, profile_id, date, start_time, end_time, activity_type, duration_min, kcal_active, kcal_resting, hr_avg, hr_max, distance_km, source, training_type_id, training_category, training_subcategory, created ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'csv',%s,%s,%s,CURRENT_TIMESTAMP) RETURNING id """, ( eid, profile_id, iso, start_key, end_str or None, wtype, duration_min, kcal_a, kcal_r, hr_a, hr_m, dist, training_type_id, training_category, training_subcategory, ), ) row = cur.fetchone() inserted += 1 new_entries += 1 if row and row.get("id"): affected_ids["activity_log"].append(str(row["id"])) aid = eid if _EVALUATION_AVAILABLE and training_type_id and _evaluate_and_save_activity: try: activity_dict = { "id": aid, "profile_id": profile_id, "date": iso, "training_type_id": training_type_id, "duration_min": duration_min, "hr_avg": hr_a, "hr_max": hr_m, "distance_km": dist, "kcal_active": kcal_a, "kcal_resting": kcal_r, "rpe": None, "pace_min_per_km": None, "cadence": None, "elevation_gain": None, } _evaluate_and_save_activity(cur, aid, activity_dict, training_type_id, profile_id) except Exception as eval_err: logger.warning("[csv activity] Auto-Eval fehlgeschlagen: %s", eval_err) except Exception as e: error_details.append({"row": rows_total, "error": str(e)}) return { "rows_total": rows_total, "inserted": inserted, "updated": updated, "skipped": 0, "new_entries": new_entries, }