- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy. - Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports. - Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity. - Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
992 lines
33 KiB
Python
992 lines
33 KiB
Python
"""
|
|
CSV → Zieltabellen: Upsert, Fehlerliste, affected_ids für csv_import_log (Issue #21).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import uuid
|
|
from collections import defaultdict
|
|
from typing import Any
|
|
|
|
import logging
|
|
|
|
from csv_parser.core import iter_csv_dict_rows
|
|
from csv_parser.import_row_processing import (
|
|
aggregate_mapped_rows,
|
|
resolve_import_row_processing,
|
|
validate_import_row_processing,
|
|
)
|
|
from csv_parser.module_registry import get_module_definition
|
|
from csv_parser.import_errors import enrich_row_error
|
|
from csv_parser.type_converter import build_row_after_mapping
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _resolve_training_type_for_activity(cur, activity_type: str, profile_id: str):
|
|
"""Lazy import — gleicher DB-Cursor wie der Import (kein verschachteltes get_db / Pool-Deadlock)."""
|
|
from routers.activity import get_training_type_for_activity_with_cursor
|
|
|
|
return get_training_type_for_activity_with_cursor(cur, activity_type, profile_id)
|
|
|
|
|
|
def coerce_date(val: Any) -> dt.date | None:
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, dt.datetime):
|
|
return val.date()
|
|
if isinstance(val, dt.date):
|
|
return val
|
|
return None
|
|
|
|
|
|
def _derive_bp_context(hour: int) -> str:
|
|
if 5 <= hour < 10:
|
|
return "morning_fasted"
|
|
if 18 <= hour < 23:
|
|
return "evening"
|
|
return "other"
|
|
|
|
|
|
def run_universal_csv_import(
|
|
cur,
|
|
profile_id: str,
|
|
module: str,
|
|
text: str,
|
|
filename: str,
|
|
mapping: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Nutzt cur innerhalb einer bestehenden Transaktion.
|
|
Gibt Statistik + affected_ids (+ error_details) zurück.
|
|
"""
|
|
mod = get_module_definition(module)
|
|
if not mod:
|
|
raise ValueError(f"Unbekanntes Modul: {module}")
|
|
|
|
rows_total = 0
|
|
error_details: list[dict[str, Any]] = []
|
|
affected_ids: dict[str, list[str]] = defaultdict(list)
|
|
|
|
if module == "sleep":
|
|
from csv_parser.sleep_apple_import import import_apple_sleep_nights
|
|
|
|
try:
|
|
r = import_apple_sleep_nights(cur, profile_id, text)
|
|
except ValueError as e:
|
|
raise ValueError(str(e)) from e
|
|
error_details.extend(r.get("error_details") or [])
|
|
for sid in r.get("affected_ids") or []:
|
|
affected_ids["sleep_log"].append(sid)
|
|
return {
|
|
"rows_total": r["rows_total"],
|
|
"rows_imported": r["inserted"],
|
|
"rows_updated": r["updated"],
|
|
"rows_skipped": r["skipped"],
|
|
"rows_errors": len(error_details),
|
|
"error_details": error_details[:50],
|
|
"new_entries": r.get("new_entries", r["inserted"]),
|
|
"affected_ids": dict(affected_ids),
|
|
}
|
|
|
|
fm = mapping.get("field_mappings") or {}
|
|
if isinstance(fm, str):
|
|
raise ValueError("field_mappings muss ein Objekt sein")
|
|
tc = mapping.get("type_conversions")
|
|
if tc is not None and not isinstance(tc, dict):
|
|
tc = None
|
|
|
|
delim = mapping.get("delimiter") or ","
|
|
has_header = mapping.get("has_header", True)
|
|
|
|
if module == "nutrition":
|
|
stats = _import_nutrition(
|
|
cur,
|
|
profile_id,
|
|
text,
|
|
delim,
|
|
bool(has_header),
|
|
fm,
|
|
tc,
|
|
mapping,
|
|
error_details,
|
|
affected_ids,
|
|
)
|
|
rows_total = stats.pop("rows_total")
|
|
elif module == "weight":
|
|
stats = _import_weight(
|
|
cur,
|
|
profile_id,
|
|
text,
|
|
delim,
|
|
bool(has_header),
|
|
fm,
|
|
tc,
|
|
mapping,
|
|
error_details,
|
|
affected_ids,
|
|
)
|
|
rows_total = stats.pop("rows_total")
|
|
elif module == "blood_pressure":
|
|
stats = _import_blood_pressure(
|
|
cur,
|
|
profile_id,
|
|
text,
|
|
delim,
|
|
bool(has_header),
|
|
fm,
|
|
tc,
|
|
error_details,
|
|
affected_ids,
|
|
)
|
|
rows_total = stats.pop("rows_total")
|
|
elif module == "activity":
|
|
stats = _import_activity(
|
|
cur,
|
|
profile_id,
|
|
text,
|
|
delim,
|
|
bool(has_header),
|
|
fm,
|
|
tc,
|
|
error_details,
|
|
affected_ids,
|
|
)
|
|
rows_total = stats.pop("rows_total")
|
|
elif module == "vitals_baseline":
|
|
stats = _import_vitals_baseline(
|
|
cur,
|
|
profile_id,
|
|
text,
|
|
delim,
|
|
bool(has_header),
|
|
fm,
|
|
tc,
|
|
mapping,
|
|
error_details,
|
|
affected_ids,
|
|
)
|
|
rows_total = stats.pop("rows_total")
|
|
else:
|
|
raise ValueError(f"Modul '{module}' wird für Universal-Import noch nicht unterstützt")
|
|
|
|
out = {
|
|
"rows_total": rows_total,
|
|
"rows_imported": stats.get("inserted", 0),
|
|
"rows_updated": stats.get("updated", 0),
|
|
"rows_skipped": stats.get("skipped", 0),
|
|
"rows_errors": len(error_details),
|
|
"error_details": error_details[:50],
|
|
"new_entries": stats.get("new_entries", stats.get("inserted", 0)),
|
|
"affected_ids": dict(affected_ids),
|
|
}
|
|
return out
|
|
|
|
|
|
def _import_nutrition(
|
|
cur,
|
|
profile_id: str,
|
|
text: str,
|
|
delim: str,
|
|
has_header: bool,
|
|
fm: dict,
|
|
tc: dict | None,
|
|
mapping: dict[str, Any],
|
|
error_details: list,
|
|
affected_ids: dict,
|
|
) -> dict[str, int]:
|
|
spec = resolve_import_row_processing("nutrition", mapping)
|
|
mapped_rows: list[dict[str, Any]] = []
|
|
rows_total = 0
|
|
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
|
|
rows_total += 1
|
|
mapped = build_row_after_mapping(csv_row, fm, tc, module="nutrition")
|
|
d = coerce_date(mapped.get("date"))
|
|
if d is None:
|
|
error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"})
|
|
continue
|
|
mapped["date"] = d
|
|
mapped_rows.append(mapped)
|
|
|
|
if spec:
|
|
try:
|
|
validate_import_row_processing("nutrition", spec, fm)
|
|
except ValueError as e:
|
|
raise ValueError(str(e)) from e
|
|
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
|
|
error_details.extend(agg_notes)
|
|
else:
|
|
merged_rows = list(mapped_rows)
|
|
agg_notes = []
|
|
|
|
skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
|
|
|
|
inserted = 0
|
|
updated = 0
|
|
new_entries = 0
|
|
for merged in merged_rows:
|
|
d = coerce_date(merged.get("date"))
|
|
if d is None:
|
|
continue
|
|
iso = d.isoformat()
|
|
|
|
def _sf_macro(x: Any) -> float:
|
|
if x is None or x == "":
|
|
return 0.0
|
|
try:
|
|
return float(x)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
|
|
kcal = round(_sf_macro(merged.get("kcal")), 1)
|
|
fat = round(_sf_macro(merged.get("fat_g")), 1)
|
|
carbs = round(_sf_macro(merged.get("carbs_g")), 1)
|
|
prot = round(_sf_macro(merged.get("protein_g")), 1)
|
|
if kcal == 0 and fat == 0 and carbs == 0 and prot == 0:
|
|
continue
|
|
|
|
cur.execute(
|
|
"SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s",
|
|
(profile_id, iso),
|
|
)
|
|
existing = cur.fetchone()
|
|
if existing:
|
|
cur.execute(
|
|
"""
|
|
UPDATE nutrition_log
|
|
SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='csv'
|
|
WHERE profile_id=%s AND date=%s
|
|
RETURNING id
|
|
""",
|
|
(kcal, prot, fat, carbs, profile_id, iso),
|
|
)
|
|
row = cur.fetchone()
|
|
updated += 1
|
|
if row and row.get("id"):
|
|
affected_ids["nutrition_log"].append(str(row["id"]))
|
|
else:
|
|
eid = str(uuid.uuid4())
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created)
|
|
VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
|
|
""",
|
|
(eid, profile_id, iso, kcal, prot, fat, carbs),
|
|
)
|
|
inserted += 1
|
|
new_entries += 1
|
|
affected_ids["nutrition_log"].append(eid)
|
|
|
|
return {
|
|
"rows_total": rows_total,
|
|
"inserted": inserted,
|
|
"updated": updated,
|
|
"skipped": skipped_groups,
|
|
"new_entries": new_entries,
|
|
}
|
|
|
|
|
|
def _import_weight(
|
|
cur,
|
|
profile_id: str,
|
|
text: str,
|
|
delim: str,
|
|
has_header: bool,
|
|
fm: dict,
|
|
tc: dict | None,
|
|
mapping: dict[str, Any],
|
|
error_details: list,
|
|
affected_ids: dict,
|
|
) -> dict[str, int]:
|
|
spec = resolve_import_row_processing("weight", mapping)
|
|
mapped_rows: list[dict[str, Any]] = []
|
|
rows_total = 0
|
|
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
|
|
rows_total += 1
|
|
mapped = build_row_after_mapping(csv_row, fm, tc, module="weight")
|
|
d = coerce_date(mapped.get("date"))
|
|
w = mapped.get("weight")
|
|
if d is None:
|
|
error_details.append({"row": rows_total, "error": "Datum fehlt"})
|
|
continue
|
|
if w is None:
|
|
error_details.append({"row": rows_total, "error": "Gewicht fehlt"})
|
|
continue
|
|
try:
|
|
float(w)
|
|
except (TypeError, ValueError):
|
|
error_details.append({"row": rows_total, "error": "Gewicht ungültig"})
|
|
continue
|
|
mapped["date"] = d
|
|
mapped_rows.append(mapped)
|
|
|
|
if spec:
|
|
try:
|
|
validate_import_row_processing("weight", spec, fm)
|
|
except ValueError as e:
|
|
raise ValueError(str(e)) from e
|
|
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
|
|
error_details.extend(agg_notes)
|
|
else:
|
|
merged_rows = list(mapped_rows)
|
|
agg_notes = []
|
|
|
|
skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
|
|
|
|
inserted = 0
|
|
updated = 0
|
|
new_entries = 0
|
|
for merged in merged_rows:
|
|
d = coerce_date(merged.get("date"))
|
|
w = merged.get("weight")
|
|
note = merged.get("note")
|
|
if d is None:
|
|
continue
|
|
if w is None:
|
|
continue
|
|
try:
|
|
w = float(w)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
iso = d.isoformat()
|
|
cur.execute(
|
|
"SELECT id FROM weight_log WHERE profile_id=%s AND date=%s",
|
|
(profile_id, iso),
|
|
)
|
|
existing = cur.fetchone()
|
|
if existing:
|
|
cur.execute(
|
|
"""
|
|
UPDATE weight_log SET weight=%s, note=COALESCE(%s, note), source='csv'
|
|
WHERE profile_id=%s AND date=%s
|
|
RETURNING id
|
|
""",
|
|
(w, note, profile_id, iso),
|
|
)
|
|
row = cur.fetchone()
|
|
updated += 1
|
|
if row and row.get("id"):
|
|
affected_ids["weight_log"].append(str(row["id"]))
|
|
else:
|
|
eid = str(uuid.uuid4())
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO weight_log (id, profile_id, date, weight, note, source, created)
|
|
VALUES (%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
|
|
""",
|
|
(eid, profile_id, iso, w, note),
|
|
)
|
|
inserted += 1
|
|
new_entries += 1
|
|
affected_ids["weight_log"].append(eid)
|
|
|
|
return {
|
|
"rows_total": rows_total,
|
|
"inserted": inserted,
|
|
"updated": updated,
|
|
"skipped": skipped_groups,
|
|
"new_entries": new_entries,
|
|
}
|
|
|
|
|
|
def diagnose_blood_pressure_row(mapped_typed: dict[str, Any]) -> dict[str, Any]:
|
|
"""Zeigt, ob Datum/Zeit nach Vorlage + Alias + Apple-Start-Spalte erkannt werden."""
|
|
md = coerce_date(mapped_typed.get("measured_date"))
|
|
mt = mapped_typed.get("measured_time")
|
|
st_combined = mapped_typed.get("start_time")
|
|
if isinstance(st_combined, dt.datetime):
|
|
if md is None:
|
|
md = st_combined.date()
|
|
if mt is None:
|
|
mt = st_combined.time()
|
|
elif isinstance(st_combined, str) and st_combined.strip() and (md is None or mt is None):
|
|
try:
|
|
from dateutil import parser as du_parser
|
|
|
|
dtp = du_parser.parse(st_combined.strip())
|
|
if md is None:
|
|
md = dtp.date()
|
|
if mt is None:
|
|
mt = dtp.time()
|
|
except (ValueError, TypeError, OverflowError):
|
|
pass
|
|
sys_v = mapped_typed.get("systolic")
|
|
dia_v = mapped_typed.get("diastolic")
|
|
try:
|
|
int(sys_v)
|
|
int(dia_v)
|
|
ok_bp = True
|
|
except (TypeError, ValueError):
|
|
ok_bp = False
|
|
return {
|
|
"measured_date_iso": md.isoformat() if md else None,
|
|
"has_measured_time": mt is not None,
|
|
"start_time_raw_type": type(st_combined).__name__ if st_combined is not None else None,
|
|
"systolic_ok": ok_bp,
|
|
"would_reach_insert_check": md is not None and mt is not None,
|
|
}
|
|
|
|
|
|
def diagnose_activity_row(mapped_typed: dict[str, Any]) -> dict[str, Any]:
|
|
activity_type = mapped_typed.get("activity_type")
|
|
start_raw = mapped_typed.get("start_time")
|
|
date_d = coerce_date(mapped_typed.get("date"))
|
|
start_key: str | None = None
|
|
fail_hint: str | None = None
|
|
if isinstance(start_raw, dt.datetime):
|
|
start_key = start_raw.strftime("%Y-%m-%d %H:%M:%S")
|
|
if date_d is None:
|
|
date_d = start_raw.date()
|
|
elif isinstance(start_raw, dt.time):
|
|
if date_d is None:
|
|
fail_hint = "startzeit_ohne_datum"
|
|
else:
|
|
start_key = f"{date_d.isoformat()} {start_raw.strftime('%H:%M:%S')}"
|
|
elif isinstance(start_raw, str) and start_raw.strip():
|
|
s = start_raw.strip()
|
|
if date_d is not None and _looks_like_time_only(s):
|
|
start_key = f"{date_d.isoformat()} {s}"
|
|
else:
|
|
start_key = s
|
|
if date_d is None and len(start_key) >= 10:
|
|
for fmt in ("%Y-%m-%d", "%d.%m.%Y"):
|
|
try:
|
|
date_d = dt.datetime.strptime(start_key[:10], fmt).date()
|
|
break
|
|
except ValueError:
|
|
continue
|
|
has_type = bool(activity_type and str(activity_type).strip())
|
|
ok = has_type and date_d is not None and bool(start_key)
|
|
if fail_hint is None and not has_type:
|
|
fail_hint = "trainingsart_fehlt"
|
|
elif fail_hint is None and not ok:
|
|
fail_hint = "datum_start_fehlt"
|
|
return {
|
|
"activity_type_preview": (str(activity_type).strip()[:80] if activity_type else None),
|
|
"date_iso": date_d.isoformat() if date_d else None,
|
|
"start_key_preview": (start_key[:80] if start_key else None),
|
|
"would_pass_row_gate": ok,
|
|
"fail_hint": fail_hint,
|
|
}
|
|
|
|
|
|
def _import_blood_pressure(
|
|
cur,
|
|
profile_id: str,
|
|
text: str,
|
|
delim: str,
|
|
has_header: bool,
|
|
fm: dict,
|
|
tc: dict | None,
|
|
error_details: list,
|
|
affected_ids: dict,
|
|
) -> dict[str, int]:
|
|
rows_total = 0
|
|
inserted = 0
|
|
updated = 0
|
|
skipped = 0
|
|
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
|
|
rows_total += 1
|
|
mapped = build_row_after_mapping(csv_row, fm, tc, module="blood_pressure")
|
|
md = coerce_date(mapped.get("measured_date"))
|
|
mt = mapped.get("measured_time")
|
|
st_combined = mapped.get("start_time")
|
|
if isinstance(st_combined, dt.datetime):
|
|
if md is None:
|
|
md = st_combined.date()
|
|
if mt is None:
|
|
mt = st_combined.time()
|
|
elif isinstance(st_combined, str) and st_combined.strip() and (md is None or mt is None):
|
|
try:
|
|
from dateutil import parser as du_parser
|
|
|
|
dtp = du_parser.parse(st_combined.strip())
|
|
if md is None:
|
|
md = dtp.date()
|
|
if mt is None:
|
|
mt = dtp.time()
|
|
except (ValueError, TypeError, OverflowError):
|
|
pass
|
|
if md is None:
|
|
error_details.append({"row": rows_total, "error": "Datum fehlt"})
|
|
continue
|
|
if mt is None:
|
|
error_details.append({"row": rows_total, "error": "Zeit fehlt"})
|
|
continue
|
|
if isinstance(mt, str):
|
|
try:
|
|
parts = mt.replace(".", ":").split(":")
|
|
if len(parts) >= 2:
|
|
mt = dt.time(int(parts[0]), int(parts[1]), int(parts[2]) if len(parts) > 2 else 0)
|
|
else:
|
|
raise ValueError()
|
|
except Exception:
|
|
error_details.append({"row": rows_total, "error": "Zeit ungültig"})
|
|
continue
|
|
if not isinstance(mt, dt.time):
|
|
error_details.append({"row": rows_total, "error": "Zeitformat wird nicht unterstützt"})
|
|
continue
|
|
|
|
systolic = mapped.get("systolic")
|
|
diastolic = mapped.get("diastolic")
|
|
pulse = mapped.get("pulse")
|
|
try:
|
|
sys_i = int(systolic)
|
|
dia_i = int(diastolic)
|
|
except (TypeError, ValueError):
|
|
error_details.append({"row": rows_total, "error": "Blutdruckwerte fehlen oder ungültig"})
|
|
continue
|
|
pulse_i = int(pulse) if pulse is not None else None
|
|
|
|
measured_at = dt.datetime.combine(md, mt)
|
|
hour = mt.hour
|
|
context = _derive_bp_context(hour)
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id FROM blood_pressure_log
|
|
WHERE profile_id = %s AND measured_at = %s
|
|
""",
|
|
(profile_id, measured_at),
|
|
)
|
|
existing_bp = cur.fetchone()
|
|
if existing_bp:
|
|
cur.execute(
|
|
"""
|
|
UPDATE blood_pressure_log SET
|
|
systolic = %s, diastolic = %s, pulse = %s,
|
|
context = %s, source = 'csv'
|
|
WHERE profile_id = %s AND measured_at = %s
|
|
RETURNING id
|
|
""",
|
|
(sys_i, dia_i, pulse_i, context, profile_id, measured_at),
|
|
)
|
|
row = cur.fetchone()
|
|
updated += 1
|
|
if row and row.get("id"):
|
|
affected_ids["blood_pressure_log"].append(str(row["id"]))
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO blood_pressure_log (
|
|
profile_id, measured_at,
|
|
systolic, diastolic, pulse,
|
|
context, source
|
|
) VALUES (%s, %s, %s, %s, %s, %s, 'csv')
|
|
RETURNING id
|
|
""",
|
|
(profile_id, measured_at, sys_i, dia_i, pulse_i, context),
|
|
)
|
|
row = cur.fetchone()
|
|
inserted += 1
|
|
if row and row.get("id"):
|
|
affected_ids["blood_pressure_log"].append(str(row["id"]))
|
|
|
|
return {
|
|
"rows_total": rows_total,
|
|
"inserted": inserted,
|
|
"updated": updated,
|
|
"skipped": skipped,
|
|
"new_entries": inserted,
|
|
}
|
|
|
|
|
|
def _v_safe_int(value: Any) -> int | None:
|
|
if value is None or value == "":
|
|
return None
|
|
try:
|
|
if isinstance(value, float):
|
|
return int(value)
|
|
s = str(value).strip()
|
|
if "." in s:
|
|
return int(float(s))
|
|
return int(s)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def _v_safe_float(value: Any) -> float | None:
|
|
if value is None or value == "":
|
|
return None
|
|
try:
|
|
return float(value)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
def diagnose_vitals_row(mapped_typed: dict[str, Any]) -> dict[str, Any]:
|
|
"""Erklärt Vital-Baseline-Zeile nach Typkonvertierung (ohne DB)."""
|
|
d = coerce_date(mapped_typed.get("date"))
|
|
rhr = _v_safe_int(mapped_typed.get("resting_hr"))
|
|
hrv = _v_safe_int(mapped_typed.get("hrv"))
|
|
vo2 = _v_safe_float(mapped_typed.get("vo2_max"))
|
|
spo2 = _v_safe_int(mapped_typed.get("spo2"))
|
|
resp = _v_safe_float(mapped_typed.get("respiratory_rate"))
|
|
has_metric = any(x is not None for x in (rhr, hrv, vo2, spo2, resp))
|
|
date_raw = mapped_typed.get("date")
|
|
return {
|
|
"date_coerced_iso": d.isoformat() if d else None,
|
|
"date_after_convert_repr": repr(date_raw),
|
|
"date_after_convert_type": type(date_raw).__name__,
|
|
"metrics": {
|
|
"resting_hr": rhr,
|
|
"hrv": hrv,
|
|
"vo2_max": vo2,
|
|
"spo2": spo2,
|
|
"respiratory_rate": resp,
|
|
},
|
|
"would_pass_prefilter": d is not None and has_metric,
|
|
"prefilter_fail_reason": (
|
|
"datum_fehlt"
|
|
if d is None
|
|
else ("keine_baseline_metrik" if not has_metric else None)
|
|
),
|
|
}
|
|
|
|
|
|
def _import_vitals_baseline(
|
|
cur,
|
|
profile_id: str,
|
|
text: str,
|
|
delim: str,
|
|
has_header: bool,
|
|
fm: dict,
|
|
tc: dict | None,
|
|
mapping: dict[str, Any],
|
|
error_details: list,
|
|
affected_ids: dict,
|
|
) -> dict[str, int]:
|
|
spec = resolve_import_row_processing("vitals_baseline", mapping)
|
|
mapped_rows: list[dict[str, Any]] = []
|
|
rows_total = 0
|
|
skipped_prefilter = 0
|
|
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
|
|
rows_total += 1
|
|
mapped = build_row_after_mapping(csv_row, fm, tc, module="vitals_baseline")
|
|
d = coerce_date(mapped.get("date"))
|
|
if d is None:
|
|
error_details.append({"row": rows_total, "error": "Datum fehlt"})
|
|
continue
|
|
rhr = _v_safe_int(mapped.get("resting_hr"))
|
|
hrv = _v_safe_int(mapped.get("hrv"))
|
|
vo2 = _v_safe_float(mapped.get("vo2_max"))
|
|
spo2 = _v_safe_int(mapped.get("spo2"))
|
|
resp = _v_safe_float(mapped.get("respiratory_rate"))
|
|
if not any(x is not None for x in (rhr, hrv, vo2, spo2, resp)):
|
|
skipped_prefilter += 1
|
|
continue
|
|
mapped["date"] = d
|
|
mapped_rows.append(mapped)
|
|
|
|
if spec:
|
|
try:
|
|
validate_import_row_processing("vitals_baseline", spec, fm)
|
|
except ValueError as e:
|
|
raise ValueError(str(e)) from e
|
|
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
|
|
error_details.extend(agg_notes)
|
|
else:
|
|
merged_rows = list(mapped_rows)
|
|
agg_notes = []
|
|
|
|
skipped_merge = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
|
|
|
|
inserted = 0
|
|
updated = 0
|
|
skipped = skipped_prefilter + skipped_merge
|
|
for merged in merged_rows:
|
|
d = coerce_date(merged.get("date"))
|
|
if d is None:
|
|
continue
|
|
rhr = _v_safe_int(merged.get("resting_hr"))
|
|
hrv = _v_safe_int(merged.get("hrv"))
|
|
vo2 = _v_safe_float(merged.get("vo2_max"))
|
|
spo2 = _v_safe_int(merged.get("spo2"))
|
|
resp = _v_safe_float(merged.get("respiratory_rate"))
|
|
if not any(x is not None for x in (rhr, hrv, vo2, spo2, resp)):
|
|
skipped += 1
|
|
continue
|
|
iso = d.isoformat()
|
|
try:
|
|
# Ohne SAVEPOINT: erster fehlgeschlagener INSERT setzt die Xact auf „aborted“,
|
|
# alle folgenden Queries + commit() schlagen fehl → generischer 500.
|
|
cur.execute("SAVEPOINT vitals_csv_row")
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO vitals_baseline (
|
|
profile_id, date,
|
|
resting_hr, hrv, vo2_max, spo2, respiratory_rate,
|
|
source
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, 'csv')
|
|
ON CONFLICT (profile_id, date)
|
|
DO UPDATE SET
|
|
resting_hr = COALESCE(EXCLUDED.resting_hr, vitals_baseline.resting_hr),
|
|
hrv = COALESCE(EXCLUDED.hrv, vitals_baseline.hrv),
|
|
vo2_max = COALESCE(EXCLUDED.vo2_max, vitals_baseline.vo2_max),
|
|
spo2 = COALESCE(EXCLUDED.spo2, vitals_baseline.spo2),
|
|
respiratory_rate = COALESCE(EXCLUDED.respiratory_rate, vitals_baseline.respiratory_rate),
|
|
updated_at = NOW()
|
|
WHERE vitals_baseline.source != 'manual'
|
|
RETURNING (xmax = 0) AS inserted, id
|
|
""",
|
|
(profile_id, iso, rhr, hrv, vo2, spo2, resp),
|
|
)
|
|
result = cur.fetchone()
|
|
if result is None:
|
|
skipped += 1
|
|
elif result.get("inserted"):
|
|
inserted += 1
|
|
if result.get("id"):
|
|
affected_ids["vitals_baseline"].append(str(result["id"]))
|
|
else:
|
|
updated += 1
|
|
if result.get("id"):
|
|
affected_ids["vitals_baseline"].append(str(result["id"]))
|
|
cur.execute("RELEASE SAVEPOINT vitals_csv_row")
|
|
except Exception as e:
|
|
try:
|
|
cur.execute("ROLLBACK TO SAVEPOINT vitals_csv_row")
|
|
except Exception:
|
|
pass
|
|
err = enrich_row_error(str(e), module="vitals_baseline")
|
|
error_details.append(
|
|
{"row": rows_total, "context": "vitals_baseline upsert", **err},
|
|
)
|
|
|
|
return {
|
|
"rows_total": rows_total,
|
|
"inserted": inserted,
|
|
"updated": updated,
|
|
"skipped": skipped,
|
|
"new_entries": inserted,
|
|
}
|
|
|
|
|
|
def _sf_act(val: Any) -> float | None:
|
|
try:
|
|
return round(float(val), 1) if val is not None else None
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _activity_hr_bpm(val: Any) -> float | None:
|
|
"""Plausible Herzfrequenz (Import); größere Werte oft Fehlzuordnung (z. B. Schrittzahl) → NUMERIC-Overflow."""
|
|
v = _sf_act(val)
|
|
if v is None:
|
|
return None
|
|
if v < 20 or v > 280:
|
|
return None
|
|
return v
|
|
|
|
|
|
def _looks_like_time_only(s: str) -> bool:
|
|
t = s.strip()
|
|
if not t or " " in t:
|
|
return False
|
|
parts = t.split(":")
|
|
if len(parts) < 2 or len(parts) > 3:
|
|
return False
|
|
try:
|
|
for p in parts:
|
|
int(p)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def _import_activity(
|
|
cur,
|
|
profile_id: str,
|
|
text: str,
|
|
delim: str,
|
|
has_header: bool,
|
|
fm: dict,
|
|
tc: dict | None,
|
|
error_details: list,
|
|
affected_ids: dict,
|
|
) -> dict[str, int]:
|
|
from data_layer.activity_time_normalize import normalize_activity_start
|
|
from data_layer.activity_persistence_orchestrator import (
|
|
find_activity_duplicate_id,
|
|
insert_activity_csv_minimal,
|
|
new_activity_id,
|
|
run_activity_post_write_hooks_import,
|
|
update_activity_columns,
|
|
)
|
|
from data_layer.activity_session_metrics import (
|
|
resolve_activity_log_column_patch_from_csv,
|
|
upsert_session_metrics_from_csv_mapped,
|
|
)
|
|
|
|
rows_total = 0
|
|
inserted = 0
|
|
updated = 0
|
|
new_entries = 0
|
|
|
|
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
|
|
rows_total += 1
|
|
mapped = build_row_after_mapping(csv_row, fm, tc, module="activity")
|
|
activity_type = mapped.get("activity_type")
|
|
if not activity_type or not str(activity_type).strip():
|
|
error_details.append({"row": rows_total, "error": "Trainingsart fehlt"})
|
|
continue
|
|
|
|
start_raw = mapped.get("start_time")
|
|
date_d = coerce_date(mapped.get("date"))
|
|
start_key: str | None = None
|
|
if isinstance(start_raw, dt.datetime):
|
|
start_key = start_raw.strftime("%Y-%m-%d %H:%M:%S")
|
|
if date_d is None:
|
|
date_d = start_raw.date()
|
|
elif isinstance(start_raw, dt.date):
|
|
date_d = start_raw
|
|
start_key = f"{start_raw.isoformat()} 00:00:00"
|
|
elif isinstance(start_raw, dt.time):
|
|
if date_d is None:
|
|
error_details.append(
|
|
{"row": rows_total, "error": "Startzeit (Uhrzeit) ohne Datumsspalte"}
|
|
)
|
|
continue
|
|
start_key = f"{date_d.isoformat()} {start_raw.strftime('%H:%M:%S')}"
|
|
elif isinstance(start_raw, str) and start_raw.strip():
|
|
s = start_raw.strip()
|
|
if date_d is not None and _looks_like_time_only(s):
|
|
start_key = f"{date_d.isoformat()} {s}"
|
|
else:
|
|
start_key = s
|
|
if date_d is None and len(start_key) >= 10:
|
|
for fmt in ("%Y-%m-%d", "%d.%m.%Y"):
|
|
try:
|
|
date_d = dt.datetime.strptime(start_key[:10], fmt).date()
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if date_d is None or not start_key:
|
|
error_details.append({"row": rows_total, "error": "Datum/Startzeit fehlt oder ungültig"})
|
|
continue
|
|
|
|
end_raw = mapped.get("end_time")
|
|
if isinstance(end_raw, dt.datetime):
|
|
end_str = end_raw.strftime("%Y-%m-%d %H:%M:%S")
|
|
elif isinstance(end_raw, str):
|
|
end_str = end_raw.strip()
|
|
else:
|
|
end_str = ""
|
|
|
|
duration_min = mapped.get("duration_min")
|
|
if duration_min is not None:
|
|
try:
|
|
duration_min = round(float(duration_min), 1)
|
|
except (TypeError, ValueError):
|
|
duration_min = None
|
|
|
|
kcal_a = _sf_act(mapped.get("kcal_active"))
|
|
kcal_r = _sf_act(mapped.get("kcal_resting"))
|
|
hr_a = _activity_hr_bpm(mapped.get("hr_avg"))
|
|
hr_m = _activity_hr_bpm(mapped.get("hr_max"))
|
|
dist = _sf_act(mapped.get("distance_km"))
|
|
|
|
wtype = str(activity_type).strip()
|
|
iso = date_d.isoformat()
|
|
_, workout_start_t = normalize_activity_start(start_key)
|
|
|
|
# Pro Zeile: bei SQL-Fehler sonst „current transaction is aborted“ bis Xact-Ende.
|
|
cur.execute("SAVEPOINT csv_activity_row")
|
|
try:
|
|
training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity(
|
|
cur, wtype, profile_id
|
|
)
|
|
column_patch = resolve_activity_log_column_patch_from_csv(
|
|
cur, mapped, training_category, training_type_id
|
|
)
|
|
existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t)
|
|
|
|
if existing_id:
|
|
upd = {
|
|
"start_time": workout_start_t,
|
|
"end_time": end_str or None,
|
|
"activity_type": wtype,
|
|
"duration_min": duration_min,
|
|
"kcal_active": kcal_a,
|
|
"kcal_resting": kcal_r,
|
|
"hr_avg": hr_a,
|
|
"hr_max": hr_m,
|
|
"distance_km": dist,
|
|
"training_type_id": training_type_id,
|
|
"training_category": training_category,
|
|
"training_subcategory": training_subcategory,
|
|
"source": "csv",
|
|
}
|
|
upd.update(column_patch)
|
|
update_activity_columns(cur, profile_id, existing_id, upd)
|
|
updated += 1
|
|
affected_ids["activity_log"].append(str(existing_id))
|
|
aid = existing_id
|
|
else:
|
|
eid = new_activity_id()
|
|
insert_activity_csv_minimal(
|
|
cur,
|
|
profile_id,
|
|
eid,
|
|
date_iso=iso,
|
|
start_time=workout_start_t,
|
|
end_time=end_str or None,
|
|
activity_type=wtype,
|
|
duration_min=duration_min,
|
|
kcal_active=kcal_a,
|
|
kcal_resting=kcal_r,
|
|
hr_avg=hr_a,
|
|
hr_max=hr_m,
|
|
distance_km=dist,
|
|
training_type_id=training_type_id,
|
|
training_category=training_category,
|
|
training_subcategory=training_subcategory,
|
|
source="csv",
|
|
)
|
|
inserted += 1
|
|
new_entries += 1
|
|
affected_ids["activity_log"].append(str(eid))
|
|
aid = eid
|
|
if column_patch:
|
|
update_activity_columns(cur, profile_id, aid, column_patch)
|
|
|
|
run_activity_post_write_hooks_import(
|
|
cur,
|
|
profile_id,
|
|
str(aid),
|
|
workout_date=iso,
|
|
training_type_id=training_type_id,
|
|
duration_min=duration_min,
|
|
hr_avg=hr_a,
|
|
hr_max=hr_m,
|
|
distance_km=dist,
|
|
kcal_active=kcal_a,
|
|
kcal_resting=kcal_r,
|
|
)
|
|
upsert_session_metrics_from_csv_mapped(
|
|
cur,
|
|
profile_id,
|
|
str(aid),
|
|
mapped,
|
|
training_category,
|
|
training_type_id,
|
|
)
|
|
cur.execute("RELEASE SAVEPOINT csv_activity_row")
|
|
except Exception as e:
|
|
try:
|
|
cur.execute("ROLLBACK TO SAVEPOINT csv_activity_row")
|
|
except Exception:
|
|
pass
|
|
err = enrich_row_error(str(e), module="activity")
|
|
error_details.append({"row": rows_total, **err})
|
|
|
|
return {
|
|
"rows_total": rows_total,
|
|
"inserted": inserted,
|
|
"updated": updated,
|
|
"skipped": 0,
|
|
"new_entries": new_entries,
|
|
}
|