diff --git a/backend/csv_parser/executor.py b/backend/csv_parser/executor.py index b80d01c..b018526 100644 --- a/backend/csv_parser/executor.py +++ b/backend/csv_parser/executor.py @@ -221,9 +221,13 @@ def _import_nutrition( validate_import_row_processing("nutrition", spec, fm) except ValueError as e: raise ValueError(str(e)) from e - merged_rows = aggregate_mapped_rows(mapped_rows, spec) + merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec) + error_details.extend(agg_notes) else: merged_rows = list(mapped_rows) + agg_notes = [] + + skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel") inserted = 0 updated = 0 @@ -285,7 +289,7 @@ def _import_nutrition( "rows_total": rows_total, "inserted": inserted, "updated": updated, - "skipped": 0, + "skipped": skipped_groups, "new_entries": new_entries, } @@ -329,9 +333,13 @@ def _import_weight( validate_import_row_processing("weight", spec, fm) except ValueError as e: raise ValueError(str(e)) from e - merged_rows = aggregate_mapped_rows(mapped_rows, spec) + merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec) + error_details.extend(agg_notes) else: merged_rows = list(mapped_rows) + agg_notes = [] + + skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel") inserted = 0 updated = 0 @@ -384,7 +392,7 @@ def _import_weight( "rows_total": rows_total, "inserted": inserted, "updated": updated, - "skipped": 0, + "skipped": skipped_groups, "new_entries": new_entries, } @@ -555,13 +563,17 @@ def _import_vitals_baseline( validate_import_row_processing("vitals_baseline", spec, fm) except ValueError as e: raise ValueError(str(e)) from e - merged_rows = aggregate_mapped_rows(mapped_rows, spec) + merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec) + error_details.extend(agg_notes) else: merged_rows = list(mapped_rows) + agg_notes = [] + + skipped_merge = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel") inserted = 0 updated = 0 - skipped = skipped_prefilter + skipped = skipped_prefilter + skipped_merge for merged in merged_rows: d = coerce_date(merged.get("date")) if d is None: diff --git a/backend/csv_parser/import_row_processing.py b/backend/csv_parser/import_row_processing.py index f2bc19f..b33dbad 100644 --- a/backend/csv_parser/import_row_processing.py +++ b/backend/csv_parser/import_row_processing.py @@ -1,8 +1,9 @@ """ Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert. -Spezifikation in der Vorlage (import_row_processing JSONB) oder Modul-Default -(import_row_processing_default in module_registry). +Spezifikation in der Vorlage (import_row_processing JSONB). Optional: Modul-Default +(import_row_processing_default in module_registry) nur als **Legacy-Fallback**, wenn +die Vorlage nichts speichert — mittelfristig sollen Vorlagen explizit sein. """ from __future__ import annotations @@ -14,6 +15,8 @@ from typing import Any, Mapping from csv_parser.module_registry import get_module_definition ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"}) +# Mehr als eine CSV-Zeile pro group_by-Schlüssel +ALLOWED_MULTI_ROW_POLICIES = frozenset({"aggregate", "reject", "first_row", "last_row"}) def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None: @@ -66,6 +69,17 @@ def validate_import_row_processing( f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}" ) + mrp = spec.get("multi_row_policy") + if mrp is not None and str(mrp) not in ALLOWED_MULTI_ROW_POLICIES: + raise ValueError( + f"multi_row_policy: ungültiger Wert '{mrp}'. " + f"Erlaubt: {', '.join(sorted(ALLOWED_MULTI_ROW_POLICIES))}" + ) + + dedupe = spec.get("dedupe_identical_rows") + if dedupe is not None and not isinstance(dedupe, bool): + raise ValueError("dedupe_identical_rows muss ein Boolean sein") + def _sort_key_for_group(v: Any) -> Any: if isinstance(v, dt.datetime): @@ -110,18 +124,46 @@ def _apply_aggregate(op: str, values: list[Any]) -> Any: raise ValueError(f"Unbekannte Aggregations-Operation: {op}") +def _row_identity_signature(r: dict[str, Any]) -> tuple[Any, ...]: + return tuple(sorted((k, _sort_key_for_group(r.get(k))) for k in sorted(r.keys()))) + + +def _dedupe_identical_mapped_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Exakt gleiche gemappte Zeilen (alle Keys/Werte) — erste behalten.""" + seen: set[tuple[Any, ...]] = set() + out: list[dict[str, Any]] = [] + for r in rows: + sig = _row_identity_signature(r) + if sig in seen: + continue + seen.add(sig) + out.append(r) + return out + + def aggregate_mapped_rows( rows: list[dict[str, Any]], spec: Mapping[str, Any], -) -> list[dict[str, Any]]: +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: """ Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an. Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe. + + Rückgabe: (merged_rows, strukturelle Fehler / Hinweise, z. B. abgelehnte Schlüsselgruppen). """ + errors: list[dict[str, Any]] = [] + rows = list(rows) + if spec.get("dedupe_identical_rows"): + rows = _dedupe_identical_mapped_rows(rows) + group_by = spec.get("group_by") or [] aggregates = spec.get("aggregates") or {} + policy = str(spec.get("multi_row_policy") or "aggregate") + if policy not in ALLOWED_MULTI_ROW_POLICIES: + policy = "aggregate" + if not group_by: - return rows + return rows, errors buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {} order: list[tuple[Any, ...]] = [] @@ -132,9 +174,28 @@ def aggregate_mapped_rows( order.append(key) buckets[key].append(r) + gb_label = ", ".join(group_by) out: list[dict[str, Any]] = [] for key in order: group_rows = buckets[key] + if len(group_rows) > 1: + if policy == "reject": + errors.append( + { + "error": "mehrere_zeilen_pro_schluessel", + "message": ( + f"{len(group_rows)} CSV-Zeilen mit gleichem Schlüssel ({gb_label}); " + "laut Vorlage abgelehnt (multi_row_policy=reject)." + ), + "rows_in_group": len(group_rows), + } + ) + continue + if policy == "first_row": + group_rows = [group_rows[0]] + elif policy == "last_row": + group_rows = [group_rows[-1]] + first = group_rows[0] merged: dict[str, Any] = {} for g in group_by: @@ -149,4 +210,4 @@ def aggregate_mapped_rows( continue merged[k] = v out.append(merged) - return out + return out, errors diff --git a/backend/csv_parser/module_registry.py b/backend/csv_parser/module_registry.py index 4ac8ec9..20a20e7 100644 --- a/backend/csv_parser/module_registry.py +++ b/backend/csv_parser/module_registry.py @@ -23,7 +23,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = { }, "duplicate_key": ["profile_id", "date"], "duplicate_strategy": "update", - # Mehrere CSV-Zeilen pro Tag (z. B. pro Lebensmittel) → ein nutrition_log-Eintrag + # Legacy-Fallback wenn die Vorlage kein import_row_processing speichert — Vorlagen mittelfristig explizit. "import_row_processing_default": { "group_by": ["date"], "aggregates": { @@ -69,6 +69,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = { }, "duplicate_key": ["profile_id", "date"], "duplicate_strategy": "update", + # Legacy-Fallback — Vorlagen mittelfristig explizit setzen. "import_row_processing_default": { "group_by": ["date"], "aggregates": { @@ -102,7 +103,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = { }, "duplicate_key": ["profile_id", "date"], "duplicate_strategy": "update", - # Mehrere CSV-Zeilen pro Tag → ein Eintrag (letzte Zeile im Export zählt) + # Legacy-Fallback — Vorlagen mittelfristig explizit setzen. "import_row_processing_default": { "group_by": ["date"], "aggregates": { diff --git a/backend/tests/test_import_row_processing.py b/backend/tests/test_import_row_processing.py index 92684ad..54f3673 100644 --- a/backend/tests/test_import_row_processing.py +++ b/backend/tests/test_import_row_processing.py @@ -38,12 +38,79 @@ def test_aggregate_mapped_rows_sums_same_group(): {"date": d, "kcal": 300.0, "protein_g": 15}, ] spec = {"group_by": ["date"], "aggregates": {"kcal": "sum", "protein_g": "sum"}} - out = aggregate_mapped_rows(rows, spec) + out, err = aggregate_mapped_rows(rows, spec) + assert err == [] assert len(out) == 1 assert out[0]["kcal"] == 800.0 assert out[0]["protein_g"] == 35 +def test_aggregate_mapped_rows_reject_second_group(): + d = dt.date(2024, 1, 15) + rows = [ + {"date": d, "kcal": 100.0}, + {"date": d, "kcal": 200.0}, + ] + spec = { + "group_by": ["date"], + "aggregates": {"kcal": "sum"}, + "multi_row_policy": "reject", + } + out, err = aggregate_mapped_rows(rows, spec) + assert out == [] + assert len(err) == 1 + assert err[0].get("error") == "mehrere_zeilen_pro_schluessel" + assert err[0].get("rows_in_group") == 2 + + +def test_aggregate_mapped_rows_first_row_no_merge(): + d = dt.date(2024, 1, 15) + rows = [ + {"date": d, "kcal": 100.0}, + {"date": d, "kcal": 999.0}, + ] + spec = { + "group_by": ["date"], + "aggregates": {"kcal": "sum"}, + "multi_row_policy": "first_row", + } + out, err = aggregate_mapped_rows(rows, spec) + assert err == [] + assert len(out) == 1 + assert out[0]["kcal"] == 100.0 + + +def test_dedupe_identical_rows_before_group(): + d = dt.date(2024, 1, 15) + rows = [ + {"date": d, "kcal": 50.0}, + {"date": d, "kcal": 50.0}, + {"date": d, "kcal": 50.0}, + ] + spec = { + "group_by": ["date"], + "aggregates": {"kcal": "sum"}, + "dedupe_identical_rows": True, + } + out, err = aggregate_mapped_rows(rows, spec) + assert err == [] + assert len(out) == 1 + assert out[0]["kcal"] == 50.0 + + +def test_validate_multi_row_policy(): + with pytest.raises(ValueError, match="multi_row_policy"): + validate_import_row_processing( + "nutrition", + { + "group_by": ["date"], + "aggregates": {"kcal": "sum"}, + "multi_row_policy": "nope", + }, + {"D": "date", "K": "kcal"}, + ) + + def test_resolve_explicit_overrides_default(): m = { "import_row_processing": {"group_by": ["date"], "aggregates": {"kcal": "mean"}}, diff --git a/frontend/src/pages/AdminCsvTemplateEditorPage.jsx b/frontend/src/pages/AdminCsvTemplateEditorPage.jsx index 7f4b302..8dfdd17 100644 --- a/frontend/src/pages/AdminCsvTemplateEditorPage.jsx +++ b/frontend/src/pages/AdminCsvTemplateEditorPage.jsx @@ -39,38 +39,65 @@ const ROW_AGG_OPS = [ const NUMERIC_ROW_AGG = new Set(['sum', 'mean', 'min', 'max', 'median']) +/** Wenn mehrere CSV-Zeilen denselben group_by-Schlüssel haben */ +const MULTI_ROW_POLICY_OPTIONS = [ + { value: 'aggregate', label: 'Zusammenführen (Funktion unten auf alle übrigen Felder)' }, + { value: 'reject', label: 'Abweisen — Gruppe wird nicht importiert (Fehlerhinweis im Import)' }, + { value: 'first_row', label: 'Nur erste Zeile — keine Berechnung über die Duplikat-Zeilen' }, + { value: 'last_row', label: 'Nur letzte Zeile — keine Berechnung über die Duplikat-Zeilen' }, +] + function parseStoredImportRowProcessing(irp) { - if (!irp || typeof irp !== 'object' || Object.keys(irp).length === 0) { + const safe = irp && typeof irp === 'object' ? irp : null + const dedupeFromStore = !!(safe && safe.dedupe_identical_rows) + const mrpRaw = safe && safe.multi_row_policy != null ? String(safe.multi_row_policy) : null + const multiRowPolicy = + mrpRaw && MULTI_ROW_POLICY_OPTIONS.some((o) => o.value === mrpRaw) ? mrpRaw : 'aggregate' + + if (!safe || Object.keys(safe).length === 0) { return { useCustom: false, irregular: false, groupBy: [], mode: '', + multiRowPolicy: 'aggregate', + dedupeIdentical: false, } } - const gb = irp.group_by - const agg = irp.aggregates + const gb = safe.group_by + const agg = safe.aggregates if (!Array.isArray(gb) || gb.length === 0 || agg == null || typeof agg !== 'object') { return { useCustom: true, irregular: true, groupBy: Array.isArray(gb) ? [...gb] : [], mode: '', + multiRowPolicy, + dedupeIdentical: dedupeFromStore, } } const ops = [...new Set(Object.values(agg).map((x) => String(x)))] if (ops.length !== 1) { - return { useCustom: true, irregular: true, groupBy: [...gb], mode: '' } + return { + useCustom: true, + irregular: true, + groupBy: [...gb], + mode: '', + multiRowPolicy, + dedupeIdentical: dedupeFromStore, + } } return { useCustom: true, irregular: false, groupBy: [...gb], mode: ops[0], + multiRowPolicy, + dedupeIdentical: dedupeFromStore, } } -function buildImportRowProcessingSimple(modFields, fm, groupBy, mode) { +function buildImportRowProcessingSimple(modFields, fm, groupBy, mode, multiRowPolicy, dedupeIdentical) { const targets = new Set( Object.values(fm).filter((v) => v && v !== '-' && v !== '_skip'), ) @@ -86,7 +113,13 @@ function buildImportRowProcessingSimple(modFields, fm, groupBy, mode) { aggregates[t] = mode } } - return { group_by: groupBy, aggregates } + const out = { + group_by: groupBy, + aggregates, + multi_row_policy: multiRowPolicy || 'aggregate', + } + if (dedupeIdentical) out.dedupe_identical_rows = true + return out } /** Erlaubt Eingaben wie 1,03 oder 1.03 während des Tippens; Finale normalisiert bei Blur/Speichern. */ @@ -255,6 +288,8 @@ export default function AdminCsvTemplateEditorPage() { const [rowAggMode, setRowAggMode] = useState('') const [rowAggIrregular, setRowAggIrregular] = useState(false) const [rowAggJsonText, setRowAggJsonText] = useState('{}') + const [rowAggMultiRowPolicy, setRowAggMultiRowPolicy] = useState('aggregate') + const [rowAggDedupeIdentical, setRowAggDedupeIdentical] = useState(false) const modMeta = useMemo(() => modules.find((m) => m.id === module), [modules, module]) const aggregateSleepImport = modMeta?.import_mode === 'apple_sleep_aggregate' @@ -295,6 +330,8 @@ export default function AdminCsvTemplateEditorPage() { setRowAggGroupBy([]) setRowAggMode('') setRowAggJsonText('{}') + setRowAggMultiRowPolicy('aggregate') + setRowAggDedupeIdentical(false) }, [module, isNew]) useEffect(() => { @@ -325,6 +362,8 @@ export default function AdminCsvTemplateEditorPage() { setRowAggIrregular(rp.irregular) setRowAggGroupBy(rp.groupBy) setRowAggMode(rp.mode) + setRowAggMultiRowPolicy(rp.multiRowPolicy) + setRowAggDedupeIdentical(rp.dedupeIdentical) setRowAggJsonText(JSON.stringify(t.import_row_processing || {}, null, 2)) }) .catch((e) => { @@ -541,6 +580,8 @@ export default function AdminCsvTemplateEditorPage() { setRowAggGroupBy([]) setRowAggMode('') setRowAggJsonText('{}') + setRowAggMultiRowPolicy('aggregate') + setRowAggDedupeIdentical(false) } catch (e) { setError(e.message || 'Analyse fehlgeschlagen') } finally { @@ -642,6 +683,8 @@ export default function AdminCsvTemplateEditorPage() { fieldMappings, rowAggGroupBy, rowAggMode, + rowAggMultiRowPolicy, + rowAggDedupeIdentical, ) } } @@ -975,10 +1018,10 @@ export default function AdminCsvTemplateEditorPage() { ) : ( <>

- Mehrere CSV-Zeilen mit denselben Werten in den gewählten Schlüsselfeldern werden zu einer - importierten Zeile zusammengefasst. Für alle übrigen zugewiesenen Zielfelder gilt{' '} - eine gemeinsame Funktion. Textfelder werden bei Summe/Mittelwert usw. automatisch - ausgelassen; mit „Erster/Letzter Wert“ sind sie enthalten. + Schlüsselfelder bestimmen, wann CSV-Zeilen dieselbe „Gruppe“ sind. Was bei{' '} + mehr als einer Zeile pro Gruppe passiert, steuern Sie unten ( + Zusammenführen / Abweisen / nur erste oder letzte Zeile). Optional können{' '} + völlig identische gemappte Zeilen vorher entfernt werden.

{modMeta.import_row_processing_default && (
- Modul-Standard (Referenz, wenn Haken oben aus ist) + + Legacy-Fallback im Code (Referenz, wenn der Haken oben aus ist) +
@@ -1052,21 +1102,10 @@ export default function AdminCsvTemplateEditorPage() {
                 <>
                   

Diese Vorlage nutzt unterschiedliche Aggregations-Funktionen pro Feld. JSON - anpassen oder vereinheitlichen (pro-Feld-Auswahl folgt in einer späteren Ausbaustufe). + anpassen oder vereinheitlichen (pro-Feld-Auswahl später). Optional:{' '} + multi_row_policy (aggregate | reject |{' '} + first_row | last_row), dedupe_identical_rows: true.

-