mitai-jinkendo/backend/csv_parser/executor.py
Lars 08eae86ddc
All checks were successful
Deploy Development / deploy (push) Successful in 56s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Refactor activity import logic and enhance CSV handling
- Replaced the deprecated `resolve_activity_log_column_patch_from_csv` function with `activity_csv_registry_updates_from_mapped` to streamline updates from CSV mappings.
- Updated the `_import_activity` function to utilize the new registry updates, improving data integrity during activity imports.
- Enhanced the activity module registry by adding German labels for various fields, improving localization support.
- Refactored the session metrics handling to ensure only relevant fields are processed, enhancing the overall robustness of CSV imports.
2026-04-15 10:35:48 +02:00

988 lines
33 KiB
Python

"""
CSV → Zieltabellen: Upsert, Fehlerliste, affected_ids für csv_import_log (Issue #21).
"""
from __future__ import annotations
import datetime as dt
import uuid
from collections import defaultdict
from typing import Any
import logging
from csv_parser.core import iter_csv_dict_rows
from csv_parser.import_row_processing import (
aggregate_mapped_rows,
resolve_import_row_processing,
validate_import_row_processing,
)
from csv_parser.module_registry import get_module_definition
from csv_parser.import_errors import enrich_row_error
from csv_parser.type_converter import build_row_after_mapping
logger = logging.getLogger(__name__)
def _resolve_training_type_for_activity(cur, activity_type: str, profile_id: str):
"""Lazy import — gleicher DB-Cursor wie der Import (kein verschachteltes get_db / Pool-Deadlock)."""
from routers.activity import get_training_type_for_activity_with_cursor
return get_training_type_for_activity_with_cursor(cur, activity_type, profile_id)
def coerce_date(val: Any) -> dt.date | None:
if val is None:
return None
if isinstance(val, dt.datetime):
return val.date()
if isinstance(val, dt.date):
return val
return None
def _derive_bp_context(hour: int) -> str:
if 5 <= hour < 10:
return "morning_fasted"
if 18 <= hour < 23:
return "evening"
return "other"
def run_universal_csv_import(
cur,
profile_id: str,
module: str,
text: str,
filename: str,
mapping: dict[str, Any],
) -> dict[str, Any]:
"""
Nutzt cur innerhalb einer bestehenden Transaktion.
Gibt Statistik + affected_ids (+ error_details) zurück.
"""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
rows_total = 0
error_details: list[dict[str, Any]] = []
affected_ids: dict[str, list[str]] = defaultdict(list)
if module == "sleep":
from csv_parser.sleep_apple_import import import_apple_sleep_nights
try:
r = import_apple_sleep_nights(cur, profile_id, text)
except ValueError as e:
raise ValueError(str(e)) from e
error_details.extend(r.get("error_details") or [])
for sid in r.get("affected_ids") or []:
affected_ids["sleep_log"].append(sid)
return {
"rows_total": r["rows_total"],
"rows_imported": r["inserted"],
"rows_updated": r["updated"],
"rows_skipped": r["skipped"],
"rows_errors": len(error_details),
"error_details": error_details[:50],
"new_entries": r.get("new_entries", r["inserted"]),
"affected_ids": dict(affected_ids),
}
fm = mapping.get("field_mappings") or {}
if isinstance(fm, str):
raise ValueError("field_mappings muss ein Objekt sein")
tc = mapping.get("type_conversions")
if tc is not None and not isinstance(tc, dict):
tc = None
delim = mapping.get("delimiter") or ","
has_header = mapping.get("has_header", True)
if module == "nutrition":
stats = _import_nutrition(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
mapping,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "weight":
stats = _import_weight(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
mapping,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "blood_pressure":
stats = _import_blood_pressure(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "activity":
stats = _import_activity(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "vitals_baseline":
stats = _import_vitals_baseline(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
mapping,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
else:
raise ValueError(f"Modul '{module}' wird für Universal-Import noch nicht unterstützt")
out = {
"rows_total": rows_total,
"rows_imported": stats.get("inserted", 0),
"rows_updated": stats.get("updated", 0),
"rows_skipped": stats.get("skipped", 0),
"rows_errors": len(error_details),
"error_details": error_details[:50],
"new_entries": stats.get("new_entries", stats.get("inserted", 0)),
"affected_ids": dict(affected_ids),
}
return out
def _import_nutrition(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
mapping: dict[str, Any],
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
spec = resolve_import_row_processing("nutrition", mapping)
mapped_rows: list[dict[str, Any]] = []
rows_total = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc, module="nutrition")
d = coerce_date(mapped.get("date"))
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"})
continue
mapped["date"] = d
mapped_rows.append(mapped)
if spec:
try:
validate_import_row_processing("nutrition", spec, fm)
except ValueError as e:
raise ValueError(str(e)) from e
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
error_details.extend(agg_notes)
else:
merged_rows = list(mapped_rows)
agg_notes = []
skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
inserted = 0
updated = 0
new_entries = 0
for merged in merged_rows:
d = coerce_date(merged.get("date"))
if d is None:
continue
iso = d.isoformat()
def _sf_macro(x: Any) -> float:
if x is None or x == "":
return 0.0
try:
return float(x)
except (TypeError, ValueError):
return 0.0
kcal = round(_sf_macro(merged.get("kcal")), 1)
fat = round(_sf_macro(merged.get("fat_g")), 1)
carbs = round(_sf_macro(merged.get("carbs_g")), 1)
prot = round(_sf_macro(merged.get("protein_g")), 1)
if kcal == 0 and fat == 0 and carbs == 0 and prot == 0:
continue
cur.execute(
"SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s",
(profile_id, iso),
)
existing = cur.fetchone()
if existing:
cur.execute(
"""
UPDATE nutrition_log
SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='csv'
WHERE profile_id=%s AND date=%s
RETURNING id
""",
(kcal, prot, fat, carbs, profile_id, iso),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["nutrition_log"].append(str(row["id"]))
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created)
VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
""",
(eid, profile_id, iso, kcal, prot, fat, carbs),
)
inserted += 1
new_entries += 1
affected_ids["nutrition_log"].append(eid)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": skipped_groups,
"new_entries": new_entries,
}
def _import_weight(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
mapping: dict[str, Any],
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
spec = resolve_import_row_processing("weight", mapping)
mapped_rows: list[dict[str, Any]] = []
rows_total = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc, module="weight")
d = coerce_date(mapped.get("date"))
w = mapped.get("weight")
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
if w is None:
error_details.append({"row": rows_total, "error": "Gewicht fehlt"})
continue
try:
float(w)
except (TypeError, ValueError):
error_details.append({"row": rows_total, "error": "Gewicht ungültig"})
continue
mapped["date"] = d
mapped_rows.append(mapped)
if spec:
try:
validate_import_row_processing("weight", spec, fm)
except ValueError as e:
raise ValueError(str(e)) from e
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
error_details.extend(agg_notes)
else:
merged_rows = list(mapped_rows)
agg_notes = []
skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
inserted = 0
updated = 0
new_entries = 0
for merged in merged_rows:
d = coerce_date(merged.get("date"))
w = merged.get("weight")
note = merged.get("note")
if d is None:
continue
if w is None:
continue
try:
w = float(w)
except (TypeError, ValueError):
continue
iso = d.isoformat()
cur.execute(
"SELECT id FROM weight_log WHERE profile_id=%s AND date=%s",
(profile_id, iso),
)
existing = cur.fetchone()
if existing:
cur.execute(
"""
UPDATE weight_log SET weight=%s, note=COALESCE(%s, note), source='csv'
WHERE profile_id=%s AND date=%s
RETURNING id
""",
(w, note, profile_id, iso),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["weight_log"].append(str(row["id"]))
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO weight_log (id, profile_id, date, weight, note, source, created)
VALUES (%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
""",
(eid, profile_id, iso, w, note),
)
inserted += 1
new_entries += 1
affected_ids["weight_log"].append(eid)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": skipped_groups,
"new_entries": new_entries,
}
def diagnose_blood_pressure_row(mapped_typed: dict[str, Any]) -> dict[str, Any]:
"""Zeigt, ob Datum/Zeit nach Vorlage + Alias + Apple-Start-Spalte erkannt werden."""
md = coerce_date(mapped_typed.get("measured_date"))
mt = mapped_typed.get("measured_time")
st_combined = mapped_typed.get("start_time")
if isinstance(st_combined, dt.datetime):
if md is None:
md = st_combined.date()
if mt is None:
mt = st_combined.time()
elif isinstance(st_combined, str) and st_combined.strip() and (md is None or mt is None):
try:
from dateutil import parser as du_parser
dtp = du_parser.parse(st_combined.strip())
if md is None:
md = dtp.date()
if mt is None:
mt = dtp.time()
except (ValueError, TypeError, OverflowError):
pass
sys_v = mapped_typed.get("systolic")
dia_v = mapped_typed.get("diastolic")
try:
int(sys_v)
int(dia_v)
ok_bp = True
except (TypeError, ValueError):
ok_bp = False
return {
"measured_date_iso": md.isoformat() if md else None,
"has_measured_time": mt is not None,
"start_time_raw_type": type(st_combined).__name__ if st_combined is not None else None,
"systolic_ok": ok_bp,
"would_reach_insert_check": md is not None and mt is not None,
}
def diagnose_activity_row(mapped_typed: dict[str, Any]) -> dict[str, Any]:
activity_type = mapped_typed.get("activity_type")
start_raw = mapped_typed.get("start_time")
date_d = coerce_date(mapped_typed.get("date"))
start_key: str | None = None
fail_hint: str | None = None
if isinstance(start_raw, dt.datetime):
start_key = start_raw.strftime("%Y-%m-%d %H:%M:%S")
if date_d is None:
date_d = start_raw.date()
elif isinstance(start_raw, dt.time):
if date_d is None:
fail_hint = "startzeit_ohne_datum"
else:
start_key = f"{date_d.isoformat()} {start_raw.strftime('%H:%M:%S')}"
elif isinstance(start_raw, str) and start_raw.strip():
s = start_raw.strip()
if date_d is not None and _looks_like_time_only(s):
start_key = f"{date_d.isoformat()} {s}"
else:
start_key = s
if date_d is None and len(start_key) >= 10:
for fmt in ("%Y-%m-%d", "%d.%m.%Y"):
try:
date_d = dt.datetime.strptime(start_key[:10], fmt).date()
break
except ValueError:
continue
has_type = bool(activity_type and str(activity_type).strip())
ok = has_type and date_d is not None and bool(start_key)
if fail_hint is None and not has_type:
fail_hint = "trainingsart_fehlt"
elif fail_hint is None and not ok:
fail_hint = "datum_start_fehlt"
return {
"activity_type_preview": (str(activity_type).strip()[:80] if activity_type else None),
"date_iso": date_d.isoformat() if date_d else None,
"start_key_preview": (start_key[:80] if start_key else None),
"would_pass_row_gate": ok,
"fail_hint": fail_hint,
}
def _import_blood_pressure(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
rows_total = 0
inserted = 0
updated = 0
skipped = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc, module="blood_pressure")
md = coerce_date(mapped.get("measured_date"))
mt = mapped.get("measured_time")
st_combined = mapped.get("start_time")
if isinstance(st_combined, dt.datetime):
if md is None:
md = st_combined.date()
if mt is None:
mt = st_combined.time()
elif isinstance(st_combined, str) and st_combined.strip() and (md is None or mt is None):
try:
from dateutil import parser as du_parser
dtp = du_parser.parse(st_combined.strip())
if md is None:
md = dtp.date()
if mt is None:
mt = dtp.time()
except (ValueError, TypeError, OverflowError):
pass
if md is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
if mt is None:
error_details.append({"row": rows_total, "error": "Zeit fehlt"})
continue
if isinstance(mt, str):
try:
parts = mt.replace(".", ":").split(":")
if len(parts) >= 2:
mt = dt.time(int(parts[0]), int(parts[1]), int(parts[2]) if len(parts) > 2 else 0)
else:
raise ValueError()
except Exception:
error_details.append({"row": rows_total, "error": "Zeit ungültig"})
continue
if not isinstance(mt, dt.time):
error_details.append({"row": rows_total, "error": "Zeitformat wird nicht unterstützt"})
continue
systolic = mapped.get("systolic")
diastolic = mapped.get("diastolic")
pulse = mapped.get("pulse")
try:
sys_i = int(systolic)
dia_i = int(diastolic)
except (TypeError, ValueError):
error_details.append({"row": rows_total, "error": "Blutdruckwerte fehlen oder ungültig"})
continue
pulse_i = int(pulse) if pulse is not None else None
measured_at = dt.datetime.combine(md, mt)
hour = mt.hour
context = _derive_bp_context(hour)
cur.execute(
"""
SELECT id FROM blood_pressure_log
WHERE profile_id = %s AND measured_at = %s
""",
(profile_id, measured_at),
)
existing_bp = cur.fetchone()
if existing_bp:
cur.execute(
"""
UPDATE blood_pressure_log SET
systolic = %s, diastolic = %s, pulse = %s,
context = %s, source = 'csv'
WHERE profile_id = %s AND measured_at = %s
RETURNING id
""",
(sys_i, dia_i, pulse_i, context, profile_id, measured_at),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["blood_pressure_log"].append(str(row["id"]))
else:
cur.execute(
"""
INSERT INTO blood_pressure_log (
profile_id, measured_at,
systolic, diastolic, pulse,
context, source
) VALUES (%s, %s, %s, %s, %s, %s, 'csv')
RETURNING id
""",
(profile_id, measured_at, sys_i, dia_i, pulse_i, context),
)
row = cur.fetchone()
inserted += 1
if row and row.get("id"):
affected_ids["blood_pressure_log"].append(str(row["id"]))
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"new_entries": inserted,
}
def _v_safe_int(value: Any) -> int | None:
if value is None or value == "":
return None
try:
if isinstance(value, float):
return int(value)
s = str(value).strip()
if "." in s:
return int(float(s))
return int(s)
except (ValueError, TypeError):
return None
def _v_safe_float(value: Any) -> float | None:
if value is None or value == "":
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def diagnose_vitals_row(mapped_typed: dict[str, Any]) -> dict[str, Any]:
"""Erklärt Vital-Baseline-Zeile nach Typkonvertierung (ohne DB)."""
d = coerce_date(mapped_typed.get("date"))
rhr = _v_safe_int(mapped_typed.get("resting_hr"))
hrv = _v_safe_int(mapped_typed.get("hrv"))
vo2 = _v_safe_float(mapped_typed.get("vo2_max"))
spo2 = _v_safe_int(mapped_typed.get("spo2"))
resp = _v_safe_float(mapped_typed.get("respiratory_rate"))
has_metric = any(x is not None for x in (rhr, hrv, vo2, spo2, resp))
date_raw = mapped_typed.get("date")
return {
"date_coerced_iso": d.isoformat() if d else None,
"date_after_convert_repr": repr(date_raw),
"date_after_convert_type": type(date_raw).__name__,
"metrics": {
"resting_hr": rhr,
"hrv": hrv,
"vo2_max": vo2,
"spo2": spo2,
"respiratory_rate": resp,
},
"would_pass_prefilter": d is not None and has_metric,
"prefilter_fail_reason": (
"datum_fehlt"
if d is None
else ("keine_baseline_metrik" if not has_metric else None)
),
}
def _import_vitals_baseline(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
mapping: dict[str, Any],
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
spec = resolve_import_row_processing("vitals_baseline", mapping)
mapped_rows: list[dict[str, Any]] = []
rows_total = 0
skipped_prefilter = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc, module="vitals_baseline")
d = coerce_date(mapped.get("date"))
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
rhr = _v_safe_int(mapped.get("resting_hr"))
hrv = _v_safe_int(mapped.get("hrv"))
vo2 = _v_safe_float(mapped.get("vo2_max"))
spo2 = _v_safe_int(mapped.get("spo2"))
resp = _v_safe_float(mapped.get("respiratory_rate"))
if not any(x is not None for x in (rhr, hrv, vo2, spo2, resp)):
skipped_prefilter += 1
continue
mapped["date"] = d
mapped_rows.append(mapped)
if spec:
try:
validate_import_row_processing("vitals_baseline", spec, fm)
except ValueError as e:
raise ValueError(str(e)) from e
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
error_details.extend(agg_notes)
else:
merged_rows = list(mapped_rows)
agg_notes = []
skipped_merge = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
inserted = 0
updated = 0
skipped = skipped_prefilter + skipped_merge
for merged in merged_rows:
d = coerce_date(merged.get("date"))
if d is None:
continue
rhr = _v_safe_int(merged.get("resting_hr"))
hrv = _v_safe_int(merged.get("hrv"))
vo2 = _v_safe_float(merged.get("vo2_max"))
spo2 = _v_safe_int(merged.get("spo2"))
resp = _v_safe_float(merged.get("respiratory_rate"))
if not any(x is not None for x in (rhr, hrv, vo2, spo2, resp)):
skipped += 1
continue
iso = d.isoformat()
try:
# Ohne SAVEPOINT: erster fehlgeschlagener INSERT setzt die Xact auf „aborted“,
# alle folgenden Queries + commit() schlagen fehl → generischer 500.
cur.execute("SAVEPOINT vitals_csv_row")
cur.execute(
"""
INSERT INTO vitals_baseline (
profile_id, date,
resting_hr, hrv, vo2_max, spo2, respiratory_rate,
source
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'csv')
ON CONFLICT (profile_id, date)
DO UPDATE SET
resting_hr = COALESCE(EXCLUDED.resting_hr, vitals_baseline.resting_hr),
hrv = COALESCE(EXCLUDED.hrv, vitals_baseline.hrv),
vo2_max = COALESCE(EXCLUDED.vo2_max, vitals_baseline.vo2_max),
spo2 = COALESCE(EXCLUDED.spo2, vitals_baseline.spo2),
respiratory_rate = COALESCE(EXCLUDED.respiratory_rate, vitals_baseline.respiratory_rate),
updated_at = NOW()
WHERE vitals_baseline.source != 'manual'
RETURNING (xmax = 0) AS inserted, id
""",
(profile_id, iso, rhr, hrv, vo2, spo2, resp),
)
result = cur.fetchone()
if result is None:
skipped += 1
elif result.get("inserted"):
inserted += 1
if result.get("id"):
affected_ids["vitals_baseline"].append(str(result["id"]))
else:
updated += 1
if result.get("id"):
affected_ids["vitals_baseline"].append(str(result["id"]))
cur.execute("RELEASE SAVEPOINT vitals_csv_row")
except Exception as e:
try:
cur.execute("ROLLBACK TO SAVEPOINT vitals_csv_row")
except Exception:
pass
err = enrich_row_error(str(e), module="vitals_baseline")
error_details.append(
{"row": rows_total, "context": "vitals_baseline upsert", **err},
)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"new_entries": inserted,
}
def _sf_act(val: Any) -> float | None:
try:
return round(float(val), 1) if val is not None else None
except (TypeError, ValueError):
return None
def _activity_hr_bpm(val: Any) -> float | None:
"""Plausible Herzfrequenz (Import); größere Werte oft Fehlzuordnung (z. B. Schrittzahl) → NUMERIC-Overflow."""
v = _sf_act(val)
if v is None:
return None
if v < 20 or v > 280:
return None
return v
def _looks_like_time_only(s: str) -> bool:
t = s.strip()
if not t or " " in t:
return False
parts = t.split(":")
if len(parts) < 2 or len(parts) > 3:
return False
try:
for p in parts:
int(p)
return True
except ValueError:
return False
def _import_activity(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
from data_layer.activity_time_normalize import normalize_activity_start
from data_layer.activity_persistence_orchestrator import (
activity_csv_registry_updates_from_mapped,
find_activity_duplicate_id,
insert_activity_csv_minimal,
new_activity_id,
run_activity_post_write_hooks_import,
update_activity_columns,
)
from data_layer.activity_session_metrics import upsert_session_metrics_from_csv_mapped
rows_total = 0
inserted = 0
updated = 0
new_entries = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc, module="activity")
activity_type = mapped.get("activity_type")
if not activity_type or not str(activity_type).strip():
error_details.append({"row": rows_total, "error": "Trainingsart fehlt"})
continue
start_raw = mapped.get("start_time")
date_d = coerce_date(mapped.get("date"))
start_key: str | None = None
if isinstance(start_raw, dt.datetime):
start_key = start_raw.strftime("%Y-%m-%d %H:%M:%S")
if date_d is None:
date_d = start_raw.date()
elif isinstance(start_raw, dt.date):
date_d = start_raw
start_key = f"{start_raw.isoformat()} 00:00:00"
elif isinstance(start_raw, dt.time):
if date_d is None:
error_details.append(
{"row": rows_total, "error": "Startzeit (Uhrzeit) ohne Datumsspalte"}
)
continue
start_key = f"{date_d.isoformat()} {start_raw.strftime('%H:%M:%S')}"
elif isinstance(start_raw, str) and start_raw.strip():
s = start_raw.strip()
if date_d is not None and _looks_like_time_only(s):
start_key = f"{date_d.isoformat()} {s}"
else:
start_key = s
if date_d is None and len(start_key) >= 10:
for fmt in ("%Y-%m-%d", "%d.%m.%Y"):
try:
date_d = dt.datetime.strptime(start_key[:10], fmt).date()
break
except ValueError:
continue
if date_d is None or not start_key:
error_details.append({"row": rows_total, "error": "Datum/Startzeit fehlt oder ungültig"})
continue
end_raw = mapped.get("end_time")
if isinstance(end_raw, dt.datetime):
end_str = end_raw.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(end_raw, str):
end_str = end_raw.strip()
else:
end_str = ""
duration_min = mapped.get("duration_min")
if duration_min is not None:
try:
duration_min = round(float(duration_min), 1)
except (TypeError, ValueError):
duration_min = None
kcal_a = _sf_act(mapped.get("kcal_active"))
kcal_r = _sf_act(mapped.get("kcal_resting"))
hr_a = _activity_hr_bpm(mapped.get("hr_avg"))
hr_m = _activity_hr_bpm(mapped.get("hr_max"))
dist = _sf_act(mapped.get("distance_km"))
wtype = str(activity_type).strip()
iso = date_d.isoformat()
_, workout_start_t = normalize_activity_start(start_key)
# Pro Zeile: bei SQL-Fehler sonst „current transaction is aborted“ bis Xact-Ende.
cur.execute("SAVEPOINT csv_activity_row")
try:
training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity(
cur, wtype, profile_id
)
registry_updates = activity_csv_registry_updates_from_mapped(mapped)
existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t)
if existing_id:
upd = {
"start_time": workout_start_t,
"end_time": end_str or None,
"activity_type": wtype,
"duration_min": duration_min,
"kcal_active": kcal_a,
"kcal_resting": kcal_r,
"hr_avg": hr_a,
"hr_max": hr_m,
"distance_km": dist,
"training_type_id": training_type_id,
"training_category": training_category,
"training_subcategory": training_subcategory,
"source": "csv",
}
upd.update(registry_updates)
update_activity_columns(cur, profile_id, existing_id, upd)
updated += 1
affected_ids["activity_log"].append(str(existing_id))
aid = existing_id
else:
eid = new_activity_id()
insert_activity_csv_minimal(
cur,
profile_id,
eid,
date_iso=iso,
start_time=workout_start_t,
end_time=end_str or None,
activity_type=wtype,
duration_min=duration_min,
kcal_active=kcal_a,
kcal_resting=kcal_r,
hr_avg=hr_a,
hr_max=hr_m,
distance_km=dist,
training_type_id=training_type_id,
training_category=training_category,
training_subcategory=training_subcategory,
source="csv",
)
inserted += 1
new_entries += 1
affected_ids["activity_log"].append(str(eid))
aid = eid
if registry_updates:
update_activity_columns(cur, profile_id, aid, registry_updates)
run_activity_post_write_hooks_import(
cur,
profile_id,
str(aid),
workout_date=iso,
training_type_id=training_type_id,
duration_min=duration_min,
hr_avg=hr_a,
hr_max=hr_m,
distance_km=dist,
kcal_active=kcal_a,
kcal_resting=kcal_r,
)
upsert_session_metrics_from_csv_mapped(
cur,
profile_id,
str(aid),
mapped,
training_category,
training_type_id,
)
cur.execute("RELEASE SAVEPOINT csv_activity_row")
except Exception as e:
try:
cur.execute("ROLLBACK TO SAVEPOINT csv_activity_row")
except Exception:
pass
err = enrich_row_error(str(e), module="activity")
error_details.append({"row": rows_total, **err})
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"new_entries": new_entries,
}