feat(csv_import): Enhance CSV import functionality with new endpoint and parsing improvements
All checks were successful
Deploy Development / deploy (push) Successful in 52s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s

- Updated version for csv_import to 0.2.0, reflecting new features.
- Implemented a new POST endpoint for universal CSV import, supporting nutrition, weight, and blood pressure modules.
- Added CSV parsing function to yield rows as dictionaries for easier data handling.
- Enhanced error handling and logging for import operations.
- Introduced tests for the new CSV parsing functionality to ensure reliability.
This commit is contained in:
Lars 2026-04-10 06:03:21 +02:00
parent 36417bfdf3
commit 851018b3b9
6 changed files with 637 additions and 4 deletions

View File

@ -1,12 +1,13 @@
"""
CSV bytes text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21).
"""
from __future__ import annotations
import csv
import io
import re
from typing import Any, List, Tuple
from typing import Any, Dict, Iterator, List, Tuple
_DEFAULT_DELIMS = [",", ";", "\t"]
@ -135,3 +136,22 @@ def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}}
return out
return defaults
def iter_csv_dict_rows(
text: str,
delimiter: str,
*,
has_header: bool = True,
) -> Iterator[Dict[str, str]]:
"""Vollständige Datei zeilenweise als Dict (Header = Keys)."""
if not has_header:
raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt")
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter)
for row in reader:
if row is None:
continue
if not any(v and str(v).strip() for v in row.values()):
continue
yield {k: (v or "").strip() for k, v in row.items()}

View File

@ -0,0 +1,377 @@
"""
CSV Zieltabellen: Upsert, Fehlerliste, affected_ids für csv_import_log (Issue #21).
"""
from __future__ import annotations
import datetime as dt
import uuid
from collections import defaultdict
from typing import Any
from csv_parser.core import iter_csv_dict_rows
from csv_parser.module_registry import get_module_definition
from csv_parser.type_converter import build_row_after_mapping
def coerce_date(val: Any) -> dt.date | None:
if val is None:
return None
if isinstance(val, dt.datetime):
return val.date()
if isinstance(val, dt.date):
return val
return None
def _derive_bp_context(hour: int) -> str:
if 5 <= hour < 10:
return "morning_fasted"
if 18 <= hour < 23:
return "evening"
return "other"
def run_universal_csv_import(
cur,
profile_id: str,
module: str,
text: str,
filename: str,
mapping: dict[str, Any],
) -> dict[str, Any]:
"""
Nutzt cur innerhalb einer bestehenden Transaktion.
Gibt Statistik + affected_ids (+ error_details) zurück.
"""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
fm = mapping.get("field_mappings") or {}
if isinstance(fm, str):
raise ValueError("field_mappings muss ein Objekt sein")
tc = mapping.get("type_conversions")
if tc is not None and not isinstance(tc, dict):
tc = None
delim = mapping.get("delimiter") or ","
has_header = mapping.get("has_header", True)
rows_total = 0
error_details: list[dict[str, Any]] = []
affected_ids: dict[str, list[str]] = defaultdict(list)
if module == "nutrition":
stats = _import_nutrition(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "weight":
stats = _import_weight(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
elif module == "blood_pressure":
stats = _import_blood_pressure(
cur,
profile_id,
text,
delim,
bool(has_header),
fm,
tc,
error_details,
affected_ids,
)
rows_total = stats.pop("rows_total")
else:
raise ValueError(f"Modul '{module}' wird für Universal-Import noch nicht unterstützt")
out = {
"rows_total": rows_total,
"rows_imported": stats.get("inserted", 0),
"rows_updated": stats.get("updated", 0),
"rows_skipped": stats.get("skipped", 0),
"rows_errors": len(error_details),
"error_details": error_details[:50],
"new_entries": stats.get("new_entries", stats.get("inserted", 0)),
"affected_ids": dict(affected_ids),
}
return out
def _import_nutrition(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
agg: dict[str, dict[str, float]] = defaultdict(
lambda: {"kcal": 0.0, "protein_g": 0.0, "fat_g": 0.0, "carbs_g": 0.0}
)
rows_total = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc)
d = coerce_date(mapped.get("date"))
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"})
continue
iso = d.isoformat()
for key in ("kcal", "protein_g", "fat_g", "carbs_g"):
v = mapped.get(key)
if v is not None:
try:
agg[iso][key] += float(v)
except (TypeError, ValueError):
pass
inserted = 0
updated = 0
new_entries = 0
for iso, vals in agg.items():
kcal = round(vals["kcal"], 1)
fat = round(vals["fat_g"], 1)
carbs = round(vals["carbs_g"], 1)
prot = round(vals["protein_g"], 1)
if kcal == 0 and fat == 0 and carbs == 0 and prot == 0:
continue
cur.execute(
"SELECT id FROM nutrition_log WHERE profile_id=%s AND date=%s",
(profile_id, iso),
)
existing = cur.fetchone()
if existing:
cur.execute(
"""
UPDATE nutrition_log
SET kcal=%s, protein_g=%s, fat_g=%s, carbs_g=%s, source='csv'
WHERE profile_id=%s AND date=%s
RETURNING id
""",
(kcal, prot, fat, carbs, profile_id, iso),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["nutrition_log"].append(str(row["id"]))
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO nutrition_log (id, profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created)
VALUES (%s,%s,%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
""",
(eid, profile_id, iso, kcal, prot, fat, carbs),
)
inserted += 1
new_entries += 1
affected_ids["nutrition_log"].append(eid)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"new_entries": new_entries,
}
def _import_weight(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
rows_total = 0
inserted = 0
updated = 0
new_entries = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc)
d = coerce_date(mapped.get("date"))
w = mapped.get("weight")
note = mapped.get("note")
if d is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
if w is None:
error_details.append({"row": rows_total, "error": "Gewicht fehlt"})
continue
try:
w = float(w)
except (TypeError, ValueError):
error_details.append({"row": rows_total, "error": "Gewicht ungültig"})
continue
iso = d.isoformat()
cur.execute(
"SELECT id FROM weight_log WHERE profile_id=%s AND date=%s",
(profile_id, iso),
)
existing = cur.fetchone()
if existing:
cur.execute(
"""
UPDATE weight_log SET weight=%s, note=COALESCE(%s, note), source='csv'
WHERE profile_id=%s AND date=%s
RETURNING id
""",
(w, note, profile_id, iso),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["weight_log"].append(str(row["id"]))
else:
eid = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO weight_log (id, profile_id, date, weight, note, source, created)
VALUES (%s,%s,%s,%s,%s,'csv',CURRENT_TIMESTAMP)
""",
(eid, profile_id, iso, w, note),
)
inserted += 1
new_entries += 1
affected_ids["weight_log"].append(eid)
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"new_entries": new_entries,
}
def _import_blood_pressure(
cur,
profile_id: str,
text: str,
delim: str,
has_header: bool,
fm: dict,
tc: dict | None,
error_details: list,
affected_ids: dict,
) -> dict[str, int]:
rows_total = 0
inserted = 0
updated = 0
skipped = 0
for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header):
rows_total += 1
mapped = build_row_after_mapping(csv_row, fm, tc)
md = coerce_date(mapped.get("measured_date"))
mt = mapped.get("measured_time")
if md is None:
error_details.append({"row": rows_total, "error": "Datum fehlt"})
continue
if mt is None:
error_details.append({"row": rows_total, "error": "Zeit fehlt"})
continue
if isinstance(mt, str):
try:
parts = mt.replace(".", ":").split(":")
if len(parts) >= 2:
mt = dt.time(int(parts[0]), int(parts[1]), int(parts[2]) if len(parts) > 2 else 0)
else:
raise ValueError()
except Exception:
error_details.append({"row": rows_total, "error": "Zeit ungültig"})
continue
if not isinstance(mt, dt.time):
error_details.append({"row": rows_total, "error": "Zeitformat wird nicht unterstützt"})
continue
systolic = mapped.get("systolic")
diastolic = mapped.get("diastolic")
pulse = mapped.get("pulse")
try:
sys_i = int(systolic)
dia_i = int(diastolic)
except (TypeError, ValueError):
error_details.append({"row": rows_total, "error": "Blutdruckwerte fehlen oder ungültig"})
continue
pulse_i = int(pulse) if pulse is not None else None
measured_at = dt.datetime.combine(md, mt)
hour = mt.hour
context = _derive_bp_context(hour)
cur.execute(
"""
SELECT id FROM blood_pressure_log
WHERE profile_id = %s AND measured_at = %s
""",
(profile_id, measured_at),
)
existing_bp = cur.fetchone()
if existing_bp:
cur.execute(
"""
UPDATE blood_pressure_log SET
systolic = %s, diastolic = %s, pulse = %s,
context = %s, source = 'csv'
WHERE profile_id = %s AND measured_at = %s
RETURNING id
""",
(sys_i, dia_i, pulse_i, context, profile_id, measured_at),
)
row = cur.fetchone()
updated += 1
if row and row.get("id"):
affected_ids["blood_pressure_log"].append(str(row["id"]))
else:
cur.execute(
"""
INSERT INTO blood_pressure_log (
profile_id, measured_at,
systolic, diastolic, pulse,
context, source
) VALUES (%s, %s, %s, %s, %s, %s, 'csv')
RETURNING id
""",
(profile_id, measured_at, sys_i, dia_i, pulse_i, context),
)
row = cur.fetchone()
inserted += 1
if row and row.get("id"):
affected_ids["blood_pressure_log"].append(str(row["id"]))
return {
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"new_entries": inserted,
}

View File

@ -6,13 +6,16 @@ from __future__ import annotations
import logging
from typing import Any, Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, UploadFile
from pydantic import BaseModel
from psycopg2.extras import Json
from auth import require_auth
from auth import require_auth, check_feature_access, increment_feature_usage
from feature_logger import log_feature_usage
from db import get_db, get_cursor, r2d
from routers.profiles import get_pid
from csv_parser.executor import run_universal_csv_import
from csv_parser.core import (
decode_raw_bytes,
column_signature,
@ -252,3 +255,211 @@ async def analyze_csv(
"detected_mappings": ranked[:5],
"available_fields": available_fields,
}
def _fetch_mapping_row(cur, mapping_id: int, profile_id: str, module: str) -> dict:
cur.execute(
"""
SELECT * FROM csv_field_mappings WHERE id = %s
""",
(mapping_id,),
)
m = r2d(cur.fetchone())
if not m:
raise HTTPException(404, "Mapping nicht gefunden")
if m.get("module") != module:
raise HTTPException(400, "Mapping gehört zu einem anderen Modul")
if not m.get("is_system"):
if str(m.get("profile_id") or "") != profile_id:
raise HTTPException(403, "Kein Zugriff auf dieses Mapping")
return m
def _check_module_feature_access(pid: str, module: str) -> None:
if module == "nutrition":
access = check_feature_access(pid, "nutrition_entries")
log_feature_usage(pid, "nutrition_entries", access, "csv_universal_import")
if not access["allowed"]:
raise HTTPException(
403,
f"Limit erreicht (Ernährungseinträge): {access.get('used')}/{access.get('limit')}",
)
elif module == "weight":
access = check_feature_access(pid, "weight_entries")
log_feature_usage(pid, "weight_entries", access, "csv_universal_import")
if not access["allowed"]:
raise HTTPException(
403,
f"Limit erreicht (Gewichtseinträge): {access.get('used')}/{access.get('limit')}",
)
@router.post("/import")
async def csv_import_execute(
file: UploadFile = File(...),
module: str = Form(...),
mapping_id: int = Form(...),
x_profile_id: Optional[str] = Header(default=None),
session: dict = Depends(require_auth),
):
"""
Universal-CSV-Import mit gespeichertem Mapping (Issue #21).
Unterstützt: nutrition, weight, blood_pressure. activity: noch nicht.
"""
if module == "activity":
raise HTTPException(
501,
"Aktivitäts-CSV über den Universal-Importer ist noch nicht freigeschaltet "
"(Training-Type-Mapping). Bitte weiterhin /api/activity/import nutzen.",
)
if not get_module_definition(module):
raise HTTPException(400, f"Unbekanntes oder nicht unterstütztes Modul: {module}")
pid = get_pid(x_profile_id)
access_di = check_feature_access(pid, "data_import")
log_feature_usage(pid, "data_import", access_di, "csv_universal_import")
if not access_di["allowed"]:
raise HTTPException(
403,
"Limit erreicht (Daten importieren): "
f"{access_di.get('used')}/{access_di.get('limit')}",
)
_check_module_feature_access(pid, module)
raw = await file.read()
limits = _load_import_limits()
max_bytes = limits.get("max_file_bytes", 52_428_800)
if len(raw) > max_bytes:
raise HTTPException(
413,
f"Datei zu groß (max. {max_bytes} Bytes laut Systemkonfiguration)",
)
text = decode_raw_bytes(raw)
if not text.strip():
raise HTTPException(400, "Leere Datei")
max_rows = limits.get("max_rows_per_file", 50_000)
if text.count("\n") > max_rows + 5:
raise HTTPException(
413,
f"Zu viele Zeilen (>{max_rows}) laut Systemkonfiguration",
)
log_id: int | None = None
err_response: HTTPException | None = None
result: dict | None = None
try:
with get_db() as conn:
cur = get_cursor(conn)
m = _fetch_mapping_row(cur, mapping_id, pid, module)
cur.execute(
"""
INSERT INTO csv_import_log (
profile_id, mapping_id, module, filename,
rows_total, rows_imported, rows_updated, rows_skipped, rows_errors,
status, error_details, affected_ids
) VALUES (
%s::uuid, %s, %s, %s,
0, 0, 0, 0, 0,
'running', NULL, NULL
) RETURNING id
""",
(pid, mapping_id, module, file.filename or "upload.csv"),
)
log_id = cur.fetchone()["id"]
cur.execute("SAVEPOINT csv_import_exec")
try:
result = run_universal_csv_import(
cur,
pid,
module,
text,
file.filename or "upload.csv",
m,
)
except Exception as exec_err:
cur.execute("ROLLBACK TO SAVEPOINT csv_import_exec")
cur.execute(
"""
UPDATE csv_import_log SET
finished_at = CURRENT_TIMESTAMP,
status = 'failed',
error_details = %s
WHERE id = %s
""",
(Json([{"error": str(exec_err)}]), log_id),
)
err_response = HTTPException(500, f"Import fehlgeschlagen: {exec_err}")
else:
cur.execute("RELEASE SAVEPOINT csv_import_exec")
cur.execute(
"""
UPDATE csv_import_log SET
finished_at = CURRENT_TIMESTAMP,
status = 'success',
rows_total = %s,
rows_imported = %s,
rows_updated = %s,
rows_skipped = %s,
rows_errors = %s,
error_details = %s,
affected_ids = %s
WHERE id = %s
""",
(
result["rows_total"],
result["rows_imported"],
result["rows_updated"],
result["rows_skipped"],
result["rows_errors"],
Json(result["error_details"]),
Json(result["affected_ids"]),
log_id,
),
)
cur.execute(
"""
UPDATE csv_field_mappings SET
usage_count = usage_count + 1,
last_used_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = %s
""",
(mapping_id,),
)
except HTTPException:
raise
if err_response:
raise err_response
assert result is not None
increment_feature_usage(pid, "data_import")
ne = result.get("new_entries", result["rows_imported"])
if module == "nutrition":
for _ in range(ne):
increment_feature_usage(pid, "nutrition_entries")
elif module == "weight":
for _ in range(ne):
increment_feature_usage(pid, "weight_entries")
return {
"success": True,
"import_log_id": log_id,
"stats": {
"total_rows": result["rows_total"],
"imported": result["rows_imported"],
"updated": result["rows_updated"],
"skipped": result["rows_skipped"],
"errors": result["rows_errors"],
},
"error_details": result["error_details"],
}

View File

@ -9,6 +9,7 @@ from csv_parser.core import (
column_signature,
headers_signature_match_score,
get_csv_import_limits,
iter_csv_dict_rows,
)
from csv_parser.type_converter import convert_value, build_row_after_mapping
@ -57,6 +58,12 @@ def test_convert_date_and_kcal_factor():
assert abs(k - 8000 * 0.239) < 0.01
def test_iter_csv_dict_rows_full_file():
text = "a;b\n1;2\n3;4\n"
rows = list(iter_csv_dict_rows(text, ";", has_header=True))
assert rows == [{"a": "1", "b": "2"}, {"a": "3", "b": "4"}]
def test_build_row_after_mapping():
csv_row = {"Datum": "01.01.2024", "kj": "4200"}
fm = {"Datum": "date", "kj": "kcal"}

View File

@ -31,7 +31,7 @@ MODULE_VERSIONS = {
"membership": "2.1.0",
"workflow": "0.6.0", # Phase 4: End Node Template Engine
"app_dashboard": "1.11.0", # Entitlements: DB-Override widget→features (AND), sonst Katalog
"csv_import": "0.1.0", # Issue #21: Analyse, Mappings, Limits
"csv_import": "0.2.0", # Issue #21: + POST /csv/import (nutrition, weight, blood_pressure)
"admin_csv_templates": "0.1.0", # Issue #21: System-Templates + Import-Limits (Admin)
}
@ -44,6 +44,8 @@ CHANGELOG = [
"csv_parser: core (Decode/Delimiter/Sample), module_registry, type_converter, permissions",
"API /api/csv: modules, limits, mappings, analyze, copy",
"API /api/admin/csv-templates: CRUD System-Templates, import-limits (system_config)",
"Issue #21: POST /api/csv/import + executor (nutrition Aggregat/Tag, weight, Blutdruck); activity 501",
"v9c_cleanup_features.sql: FK-sichere csv_import→data_import Reihenfolge",
],
},
{

View File

@ -489,6 +489,22 @@ export const api = {
req(module ? `/csv/mappings?module=${encodeURIComponent(module)}` : '/csv/mappings'),
copyCsvMapping: (mappingId, body = null) =>
req(`/csv/mappings/${mappingId}/copy`, body ? json(body) : { method: 'POST' }),
importCsv: async (file, module, mappingId) => {
const fd = new FormData()
fd.append('file', file)
fd.append('module', module)
fd.append('mapping_id', String(mappingId))
const res = await fetch(BASE + '/csv/import', { method: 'POST', headers: hdrs(), body: fd })
if (!res.ok) {
const errText = await res.text()
let parsed = null
try {
parsed = JSON.parse(errText)
} catch { /* ignore */ }
throw new Error(formatFastApiDetail(parsed?.detail, errText.trim() || `HTTP ${res.status}`))
}
return res.json()
},
analyzeCsv: async (file, module, delimiter = null) => {
const fd = new FormData()
fd.append('file', file)