mitai-jinkendo/backend/csv_parser/executor.py
Lars 851018b3b9
All checks were successful
Deploy Development / deploy (push) Successful in 52s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
feat(csv_import): Enhance CSV import functionality with new endpoint and parsing improvements
- Updated version for csv_import to 0.2.0, reflecting new features.
- Implemented a new POST endpoint for universal CSV import, supporting nutrition, weight, and blood pressure modules.
- Added CSV parsing function to yield rows as dictionaries for easier data handling.
- Enhanced error handling and logging for import operations.
- Introduced tests for the new CSV parsing functionality to ensure reliability.
2026-04-10 06:03:21 +02:00

378 lines
11 KiB
Python

"""
CSV → Zieltabellen: Upsert, Fehlerliste, affected_ids für csv_import_log (Issue #21).
"""
from __future__ import annotations
import datetime as dt
import uuid
from collections import defaultdict
from typing import Any
from csv_parser.core import iter_csv_dict_rows
from csv_parser.module_registry import get_module_definition
from csv_parser.type_converter import build_row_after_mapping
def coerce_date(val: Any) -> dt.date | None:
if val is None:
return None
if isinstance(val, dt.datetime):
return val.date()
if isinstance(val, dt.date):
return val
return None
def _derive_bp_context(hour: int) -> str:
if 5 <= hour < 10:
return "morning_fasted"
if 18 <= hour < 23:
return "evening"
return "other"
def run_universal_csv_import(
cur,
profile_id: str,
module: str,
text: str,
filename: str,
mapping: dict[str, Any],
) -> dict[str, Any]:
"""
Nutzt cur innerhalb einer bestehenden Transaktion.
Gibt Statistik + affected_ids (+ error_details) zurück.
"""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
fm = mapping.get("field_mappings") or {}
if isinstance(fm, str):
raise ValueError("field_mappings muss ein Objekt sein")
tc = mapping.get("type_conversions")
if tc is not None and not isinstance(tc, dict):
tc = None
delim = mapping.get("delimiter") or ","
has_header = mapping.get("has_header", True)
rows_total = 0
error_details: list[dict[str, Any]] = []
affected_ids: dict[str, list[str]] = defaultdict(list)
if module == "nutrition":
stats = _import_nutrition(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "weight":
stats = _import_weight(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "blood_pressure":
stats = _import_blood_pressure(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
else:
raise ValueError(f"Modul '{module}' wird für Universal-Import noch nicht unterstützt")
out = {
"rows_total": rows_total,
"rows_imported": stats.get("inserted", 0),
"rows_updated": stats.get("updated", 0),
"rows_skipped": stats.get("skipped", 0),
"rows_errors": len(error_details),
"error_details": error_details[:50],
"new_entries": stats.get("new_entries", stats.get("inserted", 0)),
"affected_ids": dict(affected_ids),
}
return out
def _import_nutrition(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
agg: dict[str, dict[str, float]] = defaultdict(
lambda: {"kcal": 0.0, "protein_g": 0.0, "fat_g": 0.0, "carbs_g": 0.0}
)
rows_total = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc)
d = coerce_date(mapped.get("date"))
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"})
continue
iso = d.isoformat()
for key in ("kcal", "protein_g", "fat_g", "carbs_g"):
v = mapped.get(key)
if v is not None:
try:
agg[iso][key] += float(v)
except (TypeError, ValueError):
pass
inserted = 0
updated = 0
new_entries = 0
for iso, vals in agg.items():
kcal = round(vals["kcal"], 1)
fat = round(vals["fat_g"], 1)
carbs = round(vals["carbs_g"], 1)
prot = round(vals["protein_g"], 1)
if kcal == 0 and fat == 0 and carbs == 0 and prot == 0:
continue
cur.execute(
"SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s",
(profile_id, iso),
)
existing = cur.fetchone()
if existing:
cur.execute(
"""
UPDATE nutrition_log
SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='csv'
WHERE profile_id=%s AND date=%s
RETURNING id
""",
(kcal, prot, fat, carbs, profile_id, iso),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["nutrition_log"].append(str(row["id"]))
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created)
VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
""",
(eid, profile_id, iso, kcal, prot, fat, carbs),
)
inserted += 1
new_entries += 1
affected_ids["nutrition_log"].append(eid)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"new_entries": new_entries,
}
def _import_weight(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
rows_total = 0
inserted = 0
updated = 0
new_entries = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc)
d = coerce_date(mapped.get("date"))
w = mapped.get("weight")
note = mapped.get("note")
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
if w is None:
error_details.append({"row": rows_total, "error": "Gewicht fehlt"})
continue
try:
w = float(w)
except (TypeError, ValueError):
error_details.append({"row": rows_total, "error": "Gewicht ungültig"})
continue
iso = d.isoformat()
cur.execute(
"SELECT id FROM weight_log WHERE profile_id=%s AND date=%s",
(profile_id, iso),
)
existing = cur.fetchone()
if existing:
cur.execute(
"""
UPDATE weight_log SET weight=%s, note=COALESCE(%s, note), source='csv'
WHERE profile_id=%s AND date=%s
RETURNING id
""",
(w, note, profile_id, iso),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["weight_log"].append(str(row["id"]))
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO weight_log (id, profile_id, date, weight, note, source, created)
VALUES (%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
""",
(eid, profile_id, iso, w, note),
)
inserted += 1
new_entries += 1
affected_ids["weight_log"].append(eid)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"new_entries": new_entries,
}
def _import_blood_pressure(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
rows_total = 0
inserted = 0
updated = 0
skipped = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc)
md = coerce_date(mapped.get("measured_date"))
mt = mapped.get("measured_time")
if md is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
if mt is None:
error_details.append({"row": rows_total, "error": "Zeit fehlt"})
continue
if isinstance(mt, str):
try:
parts = mt.replace(".", ":").split(":")
if len(parts) >= 2:
mt = dt.time(int(parts[0]), int(parts[1]), int(parts[2]) if len(parts) > 2 else 0)
else:
raise ValueError()
except Exception:
error_details.append({"row": rows_total, "error": "Zeit ungültig"})
continue
if not isinstance(mt, dt.time):
error_details.append({"row": rows_total, "error": "Zeitformat wird nicht unterstützt"})
continue
systolic = mapped.get("systolic")
diastolic = mapped.get("diastolic")
pulse = mapped.get("pulse")
try:
sys_i = int(systolic)
dia_i = int(diastolic)
except (TypeError, ValueError):
error_details.append({"row": rows_total, "error": "Blutdruckwerte fehlen oder ungültig"})
continue
pulse_i = int(pulse) if pulse is not None else None
measured_at = dt.datetime.combine(md, mt)
hour = mt.hour
context = _derive_bp_context(hour)
cur.execute(
"""
SELECT id FROM blood_pressure_log
WHERE profile_id = %s AND measured_at = %s
""",
(profile_id, measured_at),
)
existing_bp = cur.fetchone()
if existing_bp:
cur.execute(
"""
UPDATE blood_pressure_log SET
systolic = %s, diastolic = %s, pulse = %s,
context = %s, source = 'csv'
WHERE profile_id = %s AND measured_at = %s
RETURNING id
""",
(sys_i, dia_i, pulse_i, context, profile_id, measured_at),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["blood_pressure_log"].append(str(row["id"]))
else:
cur.execute(
"""
INSERT INTO blood_pressure_log (
profile_id, measured_at,
systolic, diastolic, pulse,
context, source
) VALUES (%s, %s, %s, %s, %s, %s, 'csv')
RETURNING id
""",
(profile_id, measured_at, sys_i, dia_i, pulse_i, context),
)
row = cur.fetchone()
inserted += 1
if row and row.get("id"):
affected_ids["blood_pressure_log"].append(str(row["id"]))
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"new_entries": inserted,
}