feat(csv-import): Enhance row aggregation and validation features
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s

- Updated the aggregate_mapped_rows function to support multiple row policies, allowing for flexible handling of duplicate keys during CSV imports.
- Introduced deduplication of identical rows before aggregation, improving data integrity.
- Enhanced validation for multi_row_policy and dedupe_identical_rows in import_row_processing specifications.
- Updated the AdminCsvTemplateEditorPage to include options for multi-row policies and deduplication settings, improving user experience in template management.
- Added comprehensive tests to validate new aggregation behaviors and ensure correct error handling for multiple rows.
This commit is contained in:
Lars 2026-04-10 15:36:12 +02:00
parent ad7aa2d255
commit b7cd710c32
5 changed files with 327 additions and 41 deletions

View File

@ -221,9 +221,13 @@ def _import_nutrition(
validate_import_row_processing("nutrition", spec, fm)
except ValueError as e:
raise ValueError(str(e)) from e
merged_rows = aggregate_mapped_rows(mapped_rows, spec)
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
error_details.extend(agg_notes)
else:
merged_rows = list(mapped_rows)
agg_notes = []
skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
inserted = 0
updated = 0
@ -285,7 +289,7 @@ def _import_nutrition(
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"skipped": skipped_groups,
"new_entries": new_entries,
}
@ -329,9 +333,13 @@ def _import_weight(
validate_import_row_processing("weight", spec, fm)
except ValueError as e:
raise ValueError(str(e)) from e
merged_rows = aggregate_mapped_rows(mapped_rows, spec)
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
error_details.extend(agg_notes)
else:
merged_rows = list(mapped_rows)
agg_notes = []
skipped_groups = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
inserted = 0
updated = 0
@ -384,7 +392,7 @@ def _import_weight(
"rows_total": rows_total,
"inserted": inserted,
"updated": updated,
"skipped": 0,
"skipped": skipped_groups,
"new_entries": new_entries,
}
@ -555,13 +563,17 @@ def _import_vitals_baseline(
validate_import_row_processing("vitals_baseline", spec, fm)
except ValueError as e:
raise ValueError(str(e)) from e
merged_rows = aggregate_mapped_rows(mapped_rows, spec)
merged_rows, agg_notes = aggregate_mapped_rows(mapped_rows, spec)
error_details.extend(agg_notes)
else:
merged_rows = list(mapped_rows)
agg_notes = []
skipped_merge = sum(n.get("rows_in_group", 0) for n in (agg_notes or []) if n.get("error") == "mehrere_zeilen_pro_schluessel")
inserted = 0
updated = 0
skipped = skipped_prefilter
skipped = skipped_prefilter + skipped_merge
for merged in merged_rows:
d = coerce_date(merged.get("date"))
if d is None:

View File

@ -1,8 +1,9 @@
"""
Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert.
Spezifikation in der Vorlage (import_row_processing JSONB) oder Modul-Default
(import_row_processing_default in module_registry).
Spezifikation in der Vorlage (import_row_processing JSONB). Optional: Modul-Default
(import_row_processing_default in module_registry) nur als **Legacy-Fallback**, wenn
die Vorlage nichts speichert mittelfristig sollen Vorlagen explizit sein.
"""
from __future__ import annotations
@ -14,6 +15,8 @@ from typing import Any, Mapping
from csv_parser.module_registry import get_module_definition
ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"})
# Mehr als eine CSV-Zeile pro group_by-Schlüssel
ALLOWED_MULTI_ROW_POLICIES = frozenset({"aggregate", "reject", "first_row", "last_row"})
def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None:
@ -66,6 +69,17 @@ def validate_import_row_processing(
f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}"
)
mrp = spec.get("multi_row_policy")
if mrp is not None and str(mrp) not in ALLOWED_MULTI_ROW_POLICIES:
raise ValueError(
f"multi_row_policy: ungültiger Wert '{mrp}'. "
f"Erlaubt: {', '.join(sorted(ALLOWED_MULTI_ROW_POLICIES))}"
)
dedupe = spec.get("dedupe_identical_rows")
if dedupe is not None and not isinstance(dedupe, bool):
raise ValueError("dedupe_identical_rows muss ein Boolean sein")
def _sort_key_for_group(v: Any) -> Any:
if isinstance(v, dt.datetime):
@ -110,18 +124,46 @@ def _apply_aggregate(op: str, values: list[Any]) -> Any:
raise ValueError(f"Unbekannte Aggregations-Operation: {op}")
def _row_identity_signature(r: dict[str, Any]) -> tuple[Any, ...]:
return tuple(sorted((k, _sort_key_for_group(r.get(k))) for k in sorted(r.keys())))
def _dedupe_identical_mapped_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Exakt gleiche gemappte Zeilen (alle Keys/Werte) — erste behalten."""
seen: set[tuple[Any, ...]] = set()
out: list[dict[str, Any]] = []
for r in rows:
sig = _row_identity_signature(r)
if sig in seen:
continue
seen.add(sig)
out.append(r)
return out
def aggregate_mapped_rows(
rows: list[dict[str, Any]],
spec: Mapping[str, Any],
) -> list[dict[str, Any]]:
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""
Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an.
Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe.
Rückgabe: (merged_rows, strukturelle Fehler / Hinweise, z. B. abgelehnte Schlüsselgruppen).
"""
errors: list[dict[str, Any]] = []
rows = list(rows)
if spec.get("dedupe_identical_rows"):
rows = _dedupe_identical_mapped_rows(rows)
group_by = spec.get("group_by") or []
aggregates = spec.get("aggregates") or {}
policy = str(spec.get("multi_row_policy") or "aggregate")
if policy not in ALLOWED_MULTI_ROW_POLICIES:
policy = "aggregate"
if not group_by:
return rows
return rows, errors
buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {}
order: list[tuple[Any, ...]] = []
@ -132,9 +174,28 @@ def aggregate_mapped_rows(
order.append(key)
buckets[key].append(r)
gb_label = ", ".join(group_by)
out: list[dict[str, Any]] = []
for key in order:
group_rows = buckets[key]
if len(group_rows) > 1:
if policy == "reject":
errors.append(
{
"error": "mehrere_zeilen_pro_schluessel",
"message": (
f"{len(group_rows)} CSV-Zeilen mit gleichem Schlüssel ({gb_label}); "
"laut Vorlage abgelehnt (multi_row_policy=reject)."
),
"rows_in_group": len(group_rows),
}
)
continue
if policy == "first_row":
group_rows = [group_rows[0]]
elif policy == "last_row":
group_rows = [group_rows[-1]]
first = group_rows[0]
merged: dict[str, Any] = {}
for g in group_by:
@ -149,4 +210,4 @@ def aggregate_mapped_rows(
continue
merged[k] = v
out.append(merged)
return out
return out, errors

View File

@ -23,7 +23,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Mehrere CSV-Zeilen pro Tag (z. B. pro Lebensmittel) → ein nutrition_log-Eintrag
# Legacy-Fallback wenn die Vorlage kein import_row_processing speichert — Vorlagen mittelfristig explizit.
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
@ -69,6 +69,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Legacy-Fallback — Vorlagen mittelfristig explizit setzen.
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
@ -102,7 +103,7 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Mehrere CSV-Zeilen pro Tag → ein Eintrag (letzte Zeile im Export zählt)
# Legacy-Fallback — Vorlagen mittelfristig explizit setzen.
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {

View File

@ -38,12 +38,79 @@ def test_aggregate_mapped_rows_sums_same_group():
{"date": d, "kcal": 300.0, "protein_g": 15},
]
spec = {"group_by": ["date"], "aggregates": {"kcal": "sum", "protein_g": "sum"}}
out = aggregate_mapped_rows(rows, spec)
out, err = aggregate_mapped_rows(rows, spec)
assert err == []
assert len(out) == 1
assert out[0]["kcal"] == 800.0
assert out[0]["protein_g"] == 35
def test_aggregate_mapped_rows_reject_second_group():
d = dt.date(2024, 1, 15)
rows = [
{"date": d, "kcal": 100.0},
{"date": d, "kcal": 200.0},
]
spec = {
"group_by": ["date"],
"aggregates": {"kcal": "sum"},
"multi_row_policy": "reject",
}
out, err = aggregate_mapped_rows(rows, spec)
assert out == []
assert len(err) == 1
assert err[0].get("error") == "mehrere_zeilen_pro_schluessel"
assert err[0].get("rows_in_group") == 2
def test_aggregate_mapped_rows_first_row_no_merge():
d = dt.date(2024, 1, 15)
rows = [
{"date": d, "kcal": 100.0},
{"date": d, "kcal": 999.0},
]
spec = {
"group_by": ["date"],
"aggregates": {"kcal": "sum"},
"multi_row_policy": "first_row",
}
out, err = aggregate_mapped_rows(rows, spec)
assert err == []
assert len(out) == 1
assert out[0]["kcal"] == 100.0
def test_dedupe_identical_rows_before_group():
d = dt.date(2024, 1, 15)
rows = [
{"date": d, "kcal": 50.0},
{"date": d, "kcal": 50.0},
{"date": d, "kcal": 50.0},
]
spec = {
"group_by": ["date"],
"aggregates": {"kcal": "sum"},
"dedupe_identical_rows": True,
}
out, err = aggregate_mapped_rows(rows, spec)
assert err == []
assert len(out) == 1
assert out[0]["kcal"] == 50.0
def test_validate_multi_row_policy():
with pytest.raises(ValueError, match="multi_row_policy"):
validate_import_row_processing(
"nutrition",
{
"group_by": ["date"],
"aggregates": {"kcal": "sum"},
"multi_row_policy": "nope",
},
{"D": "date", "K": "kcal"},
)
def test_resolve_explicit_overrides_default():
m = {
"import_row_processing": {"group_by": ["date"], "aggregates": {"kcal": "mean"}},

View File

@ -39,38 +39,65 @@ const ROW_AGG_OPS = [
const NUMERIC_ROW_AGG = new Set(['sum', 'mean', 'min', 'max', 'median'])
/** Wenn mehrere CSV-Zeilen denselben group_by-Schlüssel haben */
const MULTI_ROW_POLICY_OPTIONS = [
{ value: 'aggregate', label: 'Zusammenführen (Funktion unten auf alle übrigen Felder)' },
{ value: 'reject', label: 'Abweisen — Gruppe wird nicht importiert (Fehlerhinweis im Import)' },
{ value: 'first_row', label: 'Nur erste Zeile — keine Berechnung über die Duplikat-Zeilen' },
{ value: 'last_row', label: 'Nur letzte Zeile — keine Berechnung über die Duplikat-Zeilen' },
]
function parseStoredImportRowProcessing(irp) {
if (!irp || typeof irp !== 'object' || Object.keys(irp).length === 0) {
const safe = irp && typeof irp === 'object' ? irp : null
const dedupeFromStore = !!(safe && safe.dedupe_identical_rows)
const mrpRaw = safe && safe.multi_row_policy != null ? String(safe.multi_row_policy) : null
const multiRowPolicy =
mrpRaw && MULTI_ROW_POLICY_OPTIONS.some((o) => o.value === mrpRaw) ? mrpRaw : 'aggregate'
if (!safe || Object.keys(safe).length === 0) {
return {
useCustom: false,
irregular: false,
groupBy: [],
mode: '',
multiRowPolicy: 'aggregate',
dedupeIdentical: false,
}
}
const gb = irp.group_by
const agg = irp.aggregates
const gb = safe.group_by
const agg = safe.aggregates
if (!Array.isArray(gb) || gb.length === 0 || agg == null || typeof agg !== 'object') {
return {
useCustom: true,
irregular: true,
groupBy: Array.isArray(gb) ? [...gb] : [],
mode: '',
multiRowPolicy,
dedupeIdentical: dedupeFromStore,
}
}
const ops = [...new Set(Object.values(agg).map((x) => String(x)))]
if (ops.length !== 1) {
return { useCustom: true, irregular: true, groupBy: [...gb], mode: '' }
return {
useCustom: true,
irregular: true,
groupBy: [...gb],
mode: '',
multiRowPolicy,
dedupeIdentical: dedupeFromStore,
}
}
return {
useCustom: true,
irregular: false,
groupBy: [...gb],
mode: ops[0],
multiRowPolicy,
dedupeIdentical: dedupeFromStore,
}
}
function buildImportRowProcessingSimple(modFields, fm, groupBy, mode) {
function buildImportRowProcessingSimple(modFields, fm, groupBy, mode, multiRowPolicy, dedupeIdentical) {
const targets = new Set(
Object.values(fm).filter((v) => v && v !== '-' && v !== '_skip'),
)
@ -86,7 +113,13 @@ function buildImportRowProcessingSimple(modFields, fm, groupBy, mode) {
aggregates[t] = mode
}
}
return { group_by: groupBy, aggregates }
const out = {
group_by: groupBy,
aggregates,
multi_row_policy: multiRowPolicy || 'aggregate',
}
if (dedupeIdentical) out.dedupe_identical_rows = true
return out
}
/** Erlaubt Eingaben wie 1,03 oder 1.03 während des Tippens; Finale normalisiert bei Blur/Speichern. */
@ -255,6 +288,8 @@ export default function AdminCsvTemplateEditorPage() {
const [rowAggMode, setRowAggMode] = useState('')
const [rowAggIrregular, setRowAggIrregular] = useState(false)
const [rowAggJsonText, setRowAggJsonText] = useState('{}')
const [rowAggMultiRowPolicy, setRowAggMultiRowPolicy] = useState('aggregate')
const [rowAggDedupeIdentical, setRowAggDedupeIdentical] = useState(false)
const modMeta = useMemo(() => modules.find((m) => m.id === module), [modules, module])
const aggregateSleepImport = modMeta?.import_mode === 'apple_sleep_aggregate'
@ -295,6 +330,8 @@ export default function AdminCsvTemplateEditorPage() {
setRowAggGroupBy([])
setRowAggMode('')
setRowAggJsonText('{}')
setRowAggMultiRowPolicy('aggregate')
setRowAggDedupeIdentical(false)
}, [module, isNew])
useEffect(() => {
@ -325,6 +362,8 @@ export default function AdminCsvTemplateEditorPage() {
setRowAggIrregular(rp.irregular)
setRowAggGroupBy(rp.groupBy)
setRowAggMode(rp.mode)
setRowAggMultiRowPolicy(rp.multiRowPolicy)
setRowAggDedupeIdentical(rp.dedupeIdentical)
setRowAggJsonText(JSON.stringify(t.import_row_processing || {}, null, 2))
})
.catch((e) => {
@ -541,6 +580,8 @@ export default function AdminCsvTemplateEditorPage() {
setRowAggGroupBy([])
setRowAggMode('')
setRowAggJsonText('{}')
setRowAggMultiRowPolicy('aggregate')
setRowAggDedupeIdentical(false)
} catch (e) {
setError(e.message || 'Analyse fehlgeschlagen')
} finally {
@ -642,6 +683,8 @@ export default function AdminCsvTemplateEditorPage() {
fieldMappings,
rowAggGroupBy,
rowAggMode,
rowAggMultiRowPolicy,
rowAggDedupeIdentical,
)
}
}
@ -975,10 +1018,10 @@ export default function AdminCsvTemplateEditorPage() {
) : (
<>
<p style={{ fontSize: 13, color: 'var(--text2)', marginTop: 8, lineHeight: 1.55 }}>
Mehrere CSV-Zeilen mit denselben Werten in den gewählten <strong>Schlüsselfeldern</strong> werden zu einer
importierten Zeile zusammengefasst. Für alle übrigen zugewiesenen Zielfelder gilt{' '}
<strong>eine gemeinsame</strong> Funktion. Textfelder werden bei Summe/Mittelwert usw. automatisch
ausgelassen; mit Erster/Letzter Wert sind sie enthalten.
<strong>Schlüsselfelder</strong> bestimmen, wann CSV-Zeilen dieselbe Gruppe sind. Was bei{' '}
<strong>mehr als einer Zeile pro Gruppe</strong> passiert, steuern Sie unten (
<em>Zusammenführen / Abweisen / nur erste oder letzte Zeile</em>). Optional können{' '}
<strong>völlig identische</strong> gemappte Zeilen vorher entfernt werden.
</p>
<label
style={{
@ -1002,18 +1045,23 @@ export default function AdminCsvTemplateEditorPage() {
setRowAggGroupBy([])
setRowAggMode('')
setRowAggJsonText('{}')
setRowAggMultiRowPolicy('aggregate')
setRowAggDedupeIdentical(false)
}
}}
style={{ marginTop: 3 }}
/>
<span>
<strong>Eigene Aggregation in dieser Vorlage speichern.</strong> Wenn deaktiviert, gilt der{' '}
<strong>Modul-Standard</strong> (siehe unten) bzw. kein Aggregat, wenn das Modul keinen definiert.
<strong>Eigene Zeilenlogik in dieser Vorlage speichern.</strong> Wenn deaktiviert, nutzt der Import den{' '}
<strong>Legacy-Fallback im Server-Code</strong> (nur solange die Vorlage kein JSON speichert
mittelfristig sollen alle Vorlagen explizit sein).
</span>
</label>
{modMeta.import_row_processing_default && (
<details style={{ marginTop: 12, fontSize: 13, color: 'var(--text2)' }}>
<summary style={{ cursor: 'pointer' }}>Modul-Standard (Referenz, wenn Haken oben aus ist)</summary>
<summary style={{ cursor: 'pointer' }}>
Legacy-Fallback im Code (Referenz, wenn der Haken oben aus ist)
</summary>
<pre
style={{
marginTop: 8,
@ -1042,6 +1090,8 @@ export default function AdminCsvTemplateEditorPage() {
setRowAggIrregular(p.irregular)
setRowAggGroupBy(p.groupBy)
setRowAggMode(p.mode)
setRowAggMultiRowPolicy(p.multiRowPolicy)
setRowAggDedupeIdentical(p.dedupeIdentical)
setRowAggJsonText(JSON.stringify(d, null, 2))
}}
>
@ -1052,21 +1102,10 @@ export default function AdminCsvTemplateEditorPage() {
<>
<p style={{ fontSize: 13, color: 'var(--text2)', marginTop: 14, lineHeight: 1.55 }}>
Diese Vorlage nutzt <strong>unterschiedliche</strong> Aggregations-Funktionen pro Feld. JSON
anpassen oder vereinheitlichen (pro-Feld-Auswahl folgt in einer späteren Ausbaustufe).
anpassen oder vereinheitlichen (pro-Feld-Auswahl später). Optional:{' '}
<code>multi_row_policy</code> (<code>aggregate</code> | <code>reject</code> |{' '}
<code>first_row</code> | <code>last_row</code>), <code>dedupe_identical_rows</code>: true.
</p>
<textarea
className="form-input"
style={{
width: '100%',
minHeight: 160,
marginTop: 8,
fontFamily: 'monospace',
fontSize: 12,
textAlign: 'left',
}}
value={rowAggJsonText}
onChange={(e) => setRowAggJsonText(e.target.value)}
/>
</>
) : (
<>
@ -1141,8 +1180,114 @@ export default function AdminCsvTemplateEditorPage() {
</option>
))}
</select>
<label className="form-label" style={{ marginTop: 16, display: 'block' }}>
Mehrere Zeilen pro Schlüssel
</label>
<select
className="form-input"
style={{
width: '100%',
maxWidth: 520,
marginTop: 8,
textAlign: 'left',
minHeight: 46,
}}
value={rowAggMultiRowPolicy}
onChange={(e) => {
setRowAggIrregular(false)
setRowAggMultiRowPolicy(e.target.value)
}}
>
{MULTI_ROW_POLICY_OPTIONS.map((o) => (
<option key={o.value} value={o.value}>
{o.label}
</option>
))}
</select>
<label
style={{
display: 'flex',
alignItems: 'flex-start',
gap: 10,
marginTop: 14,
cursor: 'pointer',
fontSize: 14,
color: 'var(--text1)',
}}
>
<input
type="checkbox"
checked={rowAggDedupeIdentical}
onChange={(e) => {
setRowAggIrregular(false)
setRowAggDedupeIdentical(e.target.checked)
}}
style={{ marginTop: 3 }}
/>
<span>
<strong>Identische Zeilen vorher entfernen</strong> (alle gemappten Felder gleich nur die erste
Zeile jeder Kopie bleibt).
</span>
</label>
</>
)}
<div className="form-label" style={{ marginTop: 18 }}>
import_row_processing (JSON)
</div>
<p style={{ fontSize: 13, color: 'var(--text2)', marginTop: 6, lineHeight: 1.55 }}>
{rowAggIrregular ? (
<>
<strong>Bearbeitbar</strong> wie bei <code>type_conversions</code>; beim Speichern prüft das
Backend die Struktur.
</>
) : (
<>
<strong>Nur Lesen</strong> Vorschau aus den Feldern oben; dasselbe JSON wird beim Speichern
geschrieben.
</>
)}
</p>
<textarea
className="form-input"
readOnly={!rowAggIrregular}
style={{
width: '100%',
minHeight: rowAggIrregular ? 200 : 160,
marginTop: 8,
fontFamily: 'monospace',
fontSize: 12,
textAlign: 'left',
opacity: rowAggIrregular ? 1 : 0.95,
background: rowAggIrregular ? undefined : 'var(--surface2)',
}}
value={
rowAggIrregular
? rowAggJsonText
: rowAggGroupBy.length && rowAggMode
? JSON.stringify(
buildImportRowProcessingSimple(
modMeta?.fields,
fieldMappings,
rowAggGroupBy,
rowAggMode,
rowAggMultiRowPolicy,
rowAggDedupeIdentical,
),
null,
2,
)
: JSON.stringify(
{
_hinweis:
'Mindestens ein Schlüsselfeld und eine gemeinsame Funktion wählen — dann erscheint hier das finale JSON.',
},
null,
2,
)
}
onChange={rowAggIrregular ? (e) => setRowAggJsonText(e.target.value) : undefined}
spellCheck={false}
/>
</>
)}
</>