diff --git a/backend/csv_parser/core.py b/backend/csv_parser/core.py index dd4f225..53def94 100644 --- a/backend/csv_parser/core.py +++ b/backend/csv_parser/core.py @@ -1,12 +1,13 @@ """ CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21). """ + from __future__ import annotations import csv import io import re -from typing import Any, List, Tuple +from typing import Any, Dict, Iterator, List, Tuple _DEFAULT_DELIMS = [",", ";", "\t"] @@ -135,3 +136,22 @@ def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]: out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}} return out return defaults + + +def iter_csv_dict_rows( + text: str, + delimiter: str, + *, + has_header: bool = True, +) -> Iterator[Dict[str, str]]: + """Vollständige Datei zeilenweise als Dict (Header = Keys).""" + if not has_header: + raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt") + normalized = text.replace("\r\n", "\n").replace("\r", "\n") + reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter) + for row in reader: + if row is None: + continue + if not any(v and str(v).strip() for v in row.values()): + continue + yield {k: (v or "").strip() for k, v in row.items()} diff --git a/backend/csv_parser/executor.py b/backend/csv_parser/executor.py new file mode 100644 index 0000000..53ddb0c --- /dev/null +++ b/backend/csv_parser/executor.py @@ -0,0 +1,377 @@ +""" +CSV → Zieltabellen: Upsert, Fehlerliste, affected_ids für csv_import_log (Issue #21). +""" + +from __future__ import annotations + +import datetime as dt +import uuid +from collections import defaultdict +from typing import Any + +from csv_parser.core import iter_csv_dict_rows +from csv_parser.module_registry import get_module_definition +from csv_parser.type_converter import build_row_after_mapping + + +def coerce_date(val: Any) -> dt.date | None: + if val is None: + return None + if isinstance(val, dt.datetime): + return val.date() + if isinstance(val, dt.date): + return val + return None + + +def _derive_bp_context(hour: int) -> str: + if 5 <= hour < 10: + return "morning_fasted" + if 18 <= hour < 23: + return "evening" + return "other" + + +def run_universal_csv_import( + cur, + profile_id: str, + module: str, + text: str, + filename: str, + mapping: dict[str, Any], +) -> dict[str, Any]: + """ + Nutzt cur innerhalb einer bestehenden Transaktion. + Gibt Statistik + affected_ids (+ error_details) zurück. + """ + mod = get_module_definition(module) + if not mod: + raise ValueError(f"Unbekanntes Modul: {module}") + + fm = mapping.get("field_mappings") or {} + if isinstance(fm, str): + raise ValueError("field_mappings muss ein Objekt sein") + tc = mapping.get("type_conversions") + if tc is not None and not isinstance(tc, dict): + tc = None + + delim = mapping.get("delimiter") or "," + has_header = mapping.get("has_header", True) + + rows_total = 0 + error_details: list[dict[str, Any]] = [] + affected_ids: dict[str, list[str]] = defaultdict(list) + + if module == "nutrition": + stats = _import_nutrition( + cur, + profile_id, + text, + delim, + bool(has_header), + fm, + tc, + error_details, + affected_ids, + ) + rows_total = stats.pop("rows_total") + elif module == "weight": + stats = _import_weight( + cur, + profile_id, + text, + delim, + bool(has_header), + fm, + tc, + error_details, + affected_ids, + ) + rows_total = stats.pop("rows_total") + elif module == "blood_pressure": + stats = _import_blood_pressure( + cur, + profile_id, + text, + delim, + bool(has_header), + fm, + tc, + error_details, + affected_ids, + ) + rows_total = stats.pop("rows_total") + else: + raise ValueError(f"Modul '{module}' wird für Universal-Import noch nicht unterstützt") + + out = { + "rows_total": rows_total, + "rows_imported": stats.get("inserted", 0), + "rows_updated": stats.get("updated", 0), + "rows_skipped": stats.get("skipped", 0), + "rows_errors": len(error_details), + "error_details": error_details[:50], + "new_entries": stats.get("new_entries", stats.get("inserted", 0)), + "affected_ids": dict(affected_ids), + } + return out + + +def _import_nutrition( + cur, + profile_id: str, + text: str, + delim: str, + has_header: bool, + fm: dict, + tc: dict | None, + error_details: list, + affected_ids: dict, +) -> dict[str, int]: + agg: dict[str, dict[str, float]] = defaultdict( + lambda: {"kcal": 0.0, "protein_g": 0.0, "fat_g": 0.0, "carbs_g": 0.0} + ) + rows_total = 0 + for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): + rows_total += 1 + mapped = build_row_after_mapping(csv_row, fm, tc) + d = coerce_date(mapped.get("date")) + if d is None: + error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"}) + continue + iso = d.isoformat() + for key in ("kcal", "protein_g", "fat_g", "carbs_g"): + v = mapped.get(key) + if v is not None: + try: + agg[iso][key] += float(v) + except (TypeError, ValueError): + pass + + inserted = 0 + updated = 0 + new_entries = 0 + for iso, vals in agg.items(): + kcal = round(vals["kcal"], 1) + fat = round(vals["fat_g"], 1) + carbs = round(vals["carbs_g"], 1) + prot = round(vals["protein_g"], 1) + if kcal == 0 and fat == 0 and carbs == 0 and prot == 0: + continue + + cur.execute( + "SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s", + (profile_id, iso), + ) + existing = cur.fetchone() + if existing: + cur.execute( + """ + UPDATE nutrition_log + SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='csv' + WHERE profile_id=%s AND date=%s + RETURNING id + """, + (kcal, prot, fat, carbs, profile_id, iso), + ) + row = cur.fetchone() + updated += 1 + if row and row.get("id"): + affected_ids["nutrition_log"].append(str(row["id"])) + else: + eid = str(uuid.uuid4()) + cur.execute( + """ + INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created) + VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP) + """, + (eid, profile_id, iso, kcal, prot, fat, carbs), + ) + inserted += 1 + new_entries += 1 + affected_ids["nutrition_log"].append(eid) + + return { + "rows_total": rows_total, + "inserted": inserted, + "updated": updated, + "skipped": 0, + "new_entries": new_entries, + } + + +def _import_weight( + cur, + profile_id: str, + text: str, + delim: str, + has_header: bool, + fm: dict, + tc: dict | None, + error_details: list, + affected_ids: dict, +) -> dict[str, int]: + rows_total = 0 + inserted = 0 + updated = 0 + new_entries = 0 + for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): + rows_total += 1 + mapped = build_row_after_mapping(csv_row, fm, tc) + d = coerce_date(mapped.get("date")) + w = mapped.get("weight") + note = mapped.get("note") + if d is None: + error_details.append({"row": rows_total, "error": "Datum fehlt"}) + continue + if w is None: + error_details.append({"row": rows_total, "error": "Gewicht fehlt"}) + continue + try: + w = float(w) + except (TypeError, ValueError): + error_details.append({"row": rows_total, "error": "Gewicht ungültig"}) + continue + iso = d.isoformat() + cur.execute( + "SELECT id FROM weight_log WHERE profile_id=%s AND date=%s", + (profile_id, iso), + ) + existing = cur.fetchone() + if existing: + cur.execute( + """ + UPDATE weight_log SET weight=%s, note=COALESCE(%s, note), source='csv' + WHERE profile_id=%s AND date=%s + RETURNING id + """, + (w, note, profile_id, iso), + ) + row = cur.fetchone() + updated += 1 + if row and row.get("id"): + affected_ids["weight_log"].append(str(row["id"])) + else: + eid = str(uuid.uuid4()) + cur.execute( + """ + INSERT INTO weight_log (id, profile_id, date, weight, note, source, created) + VALUES (%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP) + """, + (eid, profile_id, iso, w, note), + ) + inserted += 1 + new_entries += 1 + affected_ids["weight_log"].append(eid) + + return { + "rows_total": rows_total, + "inserted": inserted, + "updated": updated, + "skipped": 0, + "new_entries": new_entries, + } + + +def _import_blood_pressure( + cur, + profile_id: str, + text: str, + delim: str, + has_header: bool, + fm: dict, + tc: dict | None, + error_details: list, + affected_ids: dict, +) -> dict[str, int]: + rows_total = 0 + inserted = 0 + updated = 0 + skipped = 0 + for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): + rows_total += 1 + mapped = build_row_after_mapping(csv_row, fm, tc) + md = coerce_date(mapped.get("measured_date")) + mt = mapped.get("measured_time") + if md is None: + error_details.append({"row": rows_total, "error": "Datum fehlt"}) + continue + if mt is None: + error_details.append({"row": rows_total, "error": "Zeit fehlt"}) + continue + if isinstance(mt, str): + try: + parts = mt.replace(".", ":").split(":") + if len(parts) >= 2: + mt = dt.time(int(parts[0]), int(parts[1]), int(parts[2]) if len(parts) > 2 else 0) + else: + raise ValueError() + except Exception: + error_details.append({"row": rows_total, "error": "Zeit ungültig"}) + continue + if not isinstance(mt, dt.time): + error_details.append({"row": rows_total, "error": "Zeitformat wird nicht unterstützt"}) + continue + + systolic = mapped.get("systolic") + diastolic = mapped.get("diastolic") + pulse = mapped.get("pulse") + try: + sys_i = int(systolic) + dia_i = int(diastolic) + except (TypeError, ValueError): + error_details.append({"row": rows_total, "error": "Blutdruckwerte fehlen oder ungültig"}) + continue + pulse_i = int(pulse) if pulse is not None else None + + measured_at = dt.datetime.combine(md, mt) + hour = mt.hour + context = _derive_bp_context(hour) + + cur.execute( + """ + SELECT id FROM blood_pressure_log + WHERE profile_id = %s AND measured_at = %s + """, + (profile_id, measured_at), + ) + existing_bp = cur.fetchone() + if existing_bp: + cur.execute( + """ + UPDATE blood_pressure_log SET + systolic = %s, diastolic = %s, pulse = %s, + context = %s, source = 'csv' + WHERE profile_id = %s AND measured_at = %s + RETURNING id + """, + (sys_i, dia_i, pulse_i, context, profile_id, measured_at), + ) + row = cur.fetchone() + updated += 1 + if row and row.get("id"): + affected_ids["blood_pressure_log"].append(str(row["id"])) + else: + cur.execute( + """ + INSERT INTO blood_pressure_log ( + profile_id, measured_at, + systolic, diastolic, pulse, + context, source + ) VALUES (%s, %s, %s, %s, %s, %s, 'csv') + RETURNING id + """, + (profile_id, measured_at, sys_i, dia_i, pulse_i, context), + ) + row = cur.fetchone() + inserted += 1 + if row and row.get("id"): + affected_ids["blood_pressure_log"].append(str(row["id"])) + + return { + "rows_total": rows_total, + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "new_entries": inserted, + } diff --git a/backend/routers/csv_import.py b/backend/routers/csv_import.py index 93b498f..916236f 100644 --- a/backend/routers/csv_import.py +++ b/backend/routers/csv_import.py @@ -6,13 +6,16 @@ from __future__ import annotations import logging from typing import Any, Optional -from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile +from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, UploadFile from pydantic import BaseModel from psycopg2.extras import Json -from auth import require_auth +from auth import require_auth, check_feature_access, increment_feature_usage +from feature_logger import log_feature_usage from db import get_db, get_cursor, r2d +from routers.profiles import get_pid +from csv_parser.executor import run_universal_csv_import from csv_parser.core import ( decode_raw_bytes, column_signature, @@ -252,3 +255,211 @@ async def analyze_csv( "detected_mappings": ranked[:5], "available_fields": available_fields, } + + +def _fetch_mapping_row(cur, mapping_id: int, profile_id: str, module: str) -> dict: + cur.execute( + """ + SELECT * FROM csv_field_mappings WHERE id = %s + """, + (mapping_id,), + ) + m = r2d(cur.fetchone()) + if not m: + raise HTTPException(404, "Mapping nicht gefunden") + if m.get("module") != module: + raise HTTPException(400, "Mapping gehört zu einem anderen Modul") + if not m.get("is_system"): + if str(m.get("profile_id") or "") != profile_id: + raise HTTPException(403, "Kein Zugriff auf dieses Mapping") + return m + + +def _check_module_feature_access(pid: str, module: str) -> None: + if module == "nutrition": + access = check_feature_access(pid, "nutrition_entries") + log_feature_usage(pid, "nutrition_entries", access, "csv_universal_import") + if not access["allowed"]: + raise HTTPException( + 403, + f"Limit erreicht (Ernährungseinträge): {access.get('used')}/{access.get('limit')}", + ) + elif module == "weight": + access = check_feature_access(pid, "weight_entries") + log_feature_usage(pid, "weight_entries", access, "csv_universal_import") + if not access["allowed"]: + raise HTTPException( + 403, + f"Limit erreicht (Gewichtseinträge): {access.get('used')}/{access.get('limit')}", + ) + + +@router.post("/import") +async def csv_import_execute( + file: UploadFile = File(...), + module: str = Form(...), + mapping_id: int = Form(...), + x_profile_id: Optional[str] = Header(default=None), + session: dict = Depends(require_auth), +): + """ + Universal-CSV-Import mit gespeichertem Mapping (Issue #21). + Unterstützt: nutrition, weight, blood_pressure. activity: noch nicht. + """ + if module == "activity": + raise HTTPException( + 501, + "Aktivitäts-CSV über den Universal-Importer ist noch nicht freigeschaltet " + "(Training-Type-Mapping). Bitte weiterhin /api/activity/import nutzen.", + ) + if not get_module_definition(module): + raise HTTPException(400, f"Unbekanntes oder nicht unterstütztes Modul: {module}") + + pid = get_pid(x_profile_id) + + access_di = check_feature_access(pid, "data_import") + log_feature_usage(pid, "data_import", access_di, "csv_universal_import") + if not access_di["allowed"]: + raise HTTPException( + 403, + "Limit erreicht (Daten importieren): " + f"{access_di.get('used')}/{access_di.get('limit')}", + ) + + _check_module_feature_access(pid, module) + + raw = await file.read() + limits = _load_import_limits() + max_bytes = limits.get("max_file_bytes", 52_428_800) + if len(raw) > max_bytes: + raise HTTPException( + 413, + f"Datei zu groß (max. {max_bytes} Bytes laut Systemkonfiguration)", + ) + text = decode_raw_bytes(raw) + if not text.strip(): + raise HTTPException(400, "Leere Datei") + max_rows = limits.get("max_rows_per_file", 50_000) + if text.count("\n") > max_rows + 5: + raise HTTPException( + 413, + f"Zu viele Zeilen (>{max_rows}) laut Systemkonfiguration", + ) + + log_id: int | None = None + err_response: HTTPException | None = None + result: dict | None = None + + try: + with get_db() as conn: + cur = get_cursor(conn) + m = _fetch_mapping_row(cur, mapping_id, pid, module) + + cur.execute( + """ + INSERT INTO csv_import_log ( + profile_id, mapping_id, module, filename, + rows_total, rows_imported, rows_updated, rows_skipped, rows_errors, + status, error_details, affected_ids + ) VALUES ( + %s::uuid, %s, %s, %s, + 0, 0, 0, 0, 0, + 'running', NULL, NULL + ) RETURNING id + """, + (pid, mapping_id, module, file.filename or "upload.csv"), + ) + log_id = cur.fetchone()["id"] + + cur.execute("SAVEPOINT csv_import_exec") + try: + result = run_universal_csv_import( + cur, + pid, + module, + text, + file.filename or "upload.csv", + m, + ) + except Exception as exec_err: + cur.execute("ROLLBACK TO SAVEPOINT csv_import_exec") + cur.execute( + """ + UPDATE csv_import_log SET + finished_at = CURRENT_TIMESTAMP, + status = 'failed', + error_details = %s + WHERE id = %s + """, + (Json([{"error": str(exec_err)}]), log_id), + ) + err_response = HTTPException(500, f"Import fehlgeschlagen: {exec_err}") + else: + cur.execute("RELEASE SAVEPOINT csv_import_exec") + cur.execute( + """ + UPDATE csv_import_log SET + finished_at = CURRENT_TIMESTAMP, + status = 'success', + rows_total = %s, + rows_imported = %s, + rows_updated = %s, + rows_skipped = %s, + rows_errors = %s, + error_details = %s, + affected_ids = %s + WHERE id = %s + """, + ( + result["rows_total"], + result["rows_imported"], + result["rows_updated"], + result["rows_skipped"], + result["rows_errors"], + Json(result["error_details"]), + Json(result["affected_ids"]), + log_id, + ), + ) + + cur.execute( + """ + UPDATE csv_field_mappings SET + usage_count = usage_count + 1, + last_used_at = CURRENT_TIMESTAMP, + updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """, + (mapping_id,), + ) + + except HTTPException: + raise + + if err_response: + raise err_response + + assert result is not None + + increment_feature_usage(pid, "data_import") + + ne = result.get("new_entries", result["rows_imported"]) + if module == "nutrition": + for _ in range(ne): + increment_feature_usage(pid, "nutrition_entries") + elif module == "weight": + for _ in range(ne): + increment_feature_usage(pid, "weight_entries") + + return { + "success": True, + "import_log_id": log_id, + "stats": { + "total_rows": result["rows_total"], + "imported": result["rows_imported"], + "updated": result["rows_updated"], + "skipped": result["rows_skipped"], + "errors": result["rows_errors"], + }, + "error_details": result["error_details"], + } diff --git a/backend/tests/test_csv_parser_core.py b/backend/tests/test_csv_parser_core.py index f721909..dd4ba1b 100644 --- a/backend/tests/test_csv_parser_core.py +++ b/backend/tests/test_csv_parser_core.py @@ -9,6 +9,7 @@ from csv_parser.core import ( column_signature, headers_signature_match_score, get_csv_import_limits, + iter_csv_dict_rows, ) from csv_parser.type_converter import convert_value, build_row_after_mapping @@ -57,6 +58,12 @@ def test_convert_date_and_kcal_factor(): assert abs(k - 8000 * 0.239) < 0.01 +def test_iter_csv_dict_rows_full_file(): + text = "a;b\n1;2\n3;4\n" + rows = list(iter_csv_dict_rows(text, ";", has_header=True)) + assert rows == [{"a": "1", "b": "2"}, {"a": "3", "b": "4"}] + + def test_build_row_after_mapping(): csv_row = {"Datum": "01.01.2024", "kj": "4200"} fm = {"Datum": "date", "kj": "kcal"} diff --git a/backend/version.py b/backend/version.py index d4095f0..526d353 100644 --- a/backend/version.py +++ b/backend/version.py @@ -31,7 +31,7 @@ MODULE_VERSIONS = { "membership": "2.1.0", "workflow": "0.6.0", # Phase 4: End Node Template Engine "app_dashboard": "1.11.0", # Entitlements: DB-Override widget→features (AND), sonst Katalog - "csv_import": "0.1.0", # Issue #21: Analyse, Mappings, Limits + "csv_import": "0.2.0", # Issue #21: + POST /csv/import (nutrition, weight, blood_pressure) "admin_csv_templates": "0.1.0", # Issue #21: System-Templates + Import-Limits (Admin) } @@ -44,6 +44,8 @@ CHANGELOG = [ "csv_parser: core (Decode/Delimiter/Sample), module_registry, type_converter, permissions", "API /api/csv: modules, limits, mappings, analyze, copy", "API /api/admin/csv-templates: CRUD System-Templates, import-limits (system_config)", + "Issue #21: POST /api/csv/import + executor (nutrition Aggregat/Tag, weight, Blutdruck); activity 501", + "v9c_cleanup_features.sql: FK-sichere csv_import→data_import Reihenfolge", ], }, { diff --git a/frontend/src/utils/api.js b/frontend/src/utils/api.js index e12ef87..6bb3eee 100644 --- a/frontend/src/utils/api.js +++ b/frontend/src/utils/api.js @@ -489,6 +489,22 @@ export const api = { req(module ? `/csv/mappings?module=${encodeURIComponent(module)}` : '/csv/mappings'), copyCsvMapping: (mappingId, body = null) => req(`/csv/mappings/${mappingId}/copy`, body ? json(body) : { method: 'POST' }), + importCsv: async (file, module, mappingId) => { + const fd = new FormData() + fd.append('file', file) + fd.append('module', module) + fd.append('mapping_id', String(mappingId)) + const res = await fetch(BASE + '/csv/import', { method: 'POST', headers: hdrs(), body: fd }) + if (!res.ok) { + const errText = await res.text() + let parsed = null + try { + parsed = JSON.parse(errText) + } catch { /* ignore */ } + throw new Error(formatFastApiDetail(parsed?.detail, errText.trim() || `HTTP ${res.status}`)) + } + return res.json() + }, analyzeCsv: async (file, module, delimiter = null) => { const fd = new FormData() fd.append('file', file)