- Updated the aggregate_mapped_rows function to support multiple row policies, allowing for flexible handling of duplicate keys during CSV imports. - Introduced deduplication of identical rows before aggregation, improving data integrity. - Enhanced validation for multi_row_policy and dedupe_identical_rows in import_row_processing specifications. - Updated the AdminCsvTemplateEditorPage to include options for multi-row policies and deduplication settings, improving user experience in template management. - Added comprehensive tests to validate new aggregation behaviors and ensure correct error handling for multiple rows.
214 lines
7.5 KiB
Python
214 lines
7.5 KiB
Python
"""
|
|
Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert.
|
|
|
|
Spezifikation in der Vorlage (import_row_processing JSONB). Optional: Modul-Default
|
|
(import_row_processing_default in module_registry) nur als **Legacy-Fallback**, wenn
|
|
die Vorlage nichts speichert — mittelfristig sollen Vorlagen explizit sein.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import statistics
|
|
from typing import Any, Mapping
|
|
|
|
from csv_parser.module_registry import get_module_definition
|
|
|
|
ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"})
|
|
# Mehr als eine CSV-Zeile pro group_by-Schlüssel
|
|
ALLOWED_MULTI_ROW_POLICIES = frozenset({"aggregate", "reject", "first_row", "last_row"})
|
|
|
|
|
|
def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None:
|
|
"""Explizite Vorlage hat Vorrang; sonst Modul-Default; leeres Dict zählt wie „nicht gesetzt“."""
|
|
raw = mapping_row.get("import_row_processing")
|
|
if isinstance(raw, dict) and raw:
|
|
return dict(raw)
|
|
mod = get_module_definition(module)
|
|
if not mod:
|
|
return None
|
|
default = mod.get("import_row_processing_default")
|
|
if isinstance(default, dict) and default:
|
|
return dict(default)
|
|
return None
|
|
|
|
|
|
def validate_import_row_processing(
|
|
module: str,
|
|
spec: Mapping[str, Any],
|
|
field_mappings: Mapping[str, Any],
|
|
) -> None:
|
|
"""Wirft ValueError bei ungültiger Konfiguration."""
|
|
mod = get_module_definition(module)
|
|
if not mod:
|
|
raise ValueError(f"Unbekanntes Modul: {module}")
|
|
allowed = set(mod.get("fields") or [])
|
|
fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")}
|
|
|
|
group_by = spec.get("group_by") or []
|
|
if not isinstance(group_by, list) or not all(isinstance(x, str) for x in group_by):
|
|
raise ValueError("import_row_processing.group_by muss eine Liste von Feldnamen sein")
|
|
aggregates = spec.get("aggregates") or {}
|
|
if not isinstance(aggregates, dict):
|
|
raise ValueError("import_row_processing.aggregates muss ein Objekt sein")
|
|
|
|
for g in group_by:
|
|
if g not in allowed:
|
|
raise ValueError(f"group_by: unbekanntes Feld '{g}' für Modul '{module}'")
|
|
if g not in fm_targets:
|
|
raise ValueError(
|
|
f"group_by: Zielfeld '{g}' ist keiner CSV-Spalte zugeordnet — Aggregation nicht möglich."
|
|
)
|
|
|
|
for field, op in aggregates.items():
|
|
if field not in allowed:
|
|
raise ValueError(f"aggregates: unbekanntes Feld '{field}' für Modul '{module}'")
|
|
if str(op) not in ALLOWED_AGGREGATES:
|
|
raise ValueError(
|
|
f"aggregates['{field}']: ungültige Operation '{op}'. "
|
|
f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}"
|
|
)
|
|
|
|
mrp = spec.get("multi_row_policy")
|
|
if mrp is not None and str(mrp) not in ALLOWED_MULTI_ROW_POLICIES:
|
|
raise ValueError(
|
|
f"multi_row_policy: ungültiger Wert '{mrp}'. "
|
|
f"Erlaubt: {', '.join(sorted(ALLOWED_MULTI_ROW_POLICIES))}"
|
|
)
|
|
|
|
dedupe = spec.get("dedupe_identical_rows")
|
|
if dedupe is not None and not isinstance(dedupe, bool):
|
|
raise ValueError("dedupe_identical_rows muss ein Boolean sein")
|
|
|
|
|
|
def _sort_key_for_group(v: Any) -> Any:
|
|
if isinstance(v, dt.datetime):
|
|
return v.isoformat()
|
|
if isinstance(v, dt.date):
|
|
return v.isoformat()
|
|
if isinstance(v, dt.time):
|
|
return v.isoformat()
|
|
return v
|
|
|
|
|
|
def _apply_aggregate(op: str, values: list[Any]) -> Any:
|
|
nums: list[float] = []
|
|
for x in values:
|
|
if x is None or x == "":
|
|
continue
|
|
try:
|
|
nums.append(float(x))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
|
|
if op == "sum":
|
|
return sum(nums) if nums else None
|
|
if op == "mean":
|
|
return statistics.mean(nums) if nums else None
|
|
if op == "median":
|
|
return float(statistics.median(nums)) if nums else None
|
|
if op == "min":
|
|
return min(nums) if nums else None
|
|
if op == "max":
|
|
return max(nums) if nums else None
|
|
if op == "first":
|
|
for x in values:
|
|
if x is not None and x != "":
|
|
return x
|
|
return None
|
|
if op == "last":
|
|
for x in reversed(values):
|
|
if x is not None and x != "":
|
|
return x
|
|
return None
|
|
raise ValueError(f"Unbekannte Aggregations-Operation: {op}")
|
|
|
|
|
|
def _row_identity_signature(r: dict[str, Any]) -> tuple[Any, ...]:
|
|
return tuple(sorted((k, _sort_key_for_group(r.get(k))) for k in sorted(r.keys())))
|
|
|
|
|
|
def _dedupe_identical_mapped_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Exakt gleiche gemappte Zeilen (alle Keys/Werte) — erste behalten."""
|
|
seen: set[tuple[Any, ...]] = set()
|
|
out: list[dict[str, Any]] = []
|
|
for r in rows:
|
|
sig = _row_identity_signature(r)
|
|
if sig in seen:
|
|
continue
|
|
seen.add(sig)
|
|
out.append(r)
|
|
return out
|
|
|
|
|
|
def aggregate_mapped_rows(
|
|
rows: list[dict[str, Any]],
|
|
spec: Mapping[str, Any],
|
|
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
|
"""
|
|
Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an.
|
|
Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe.
|
|
|
|
Rückgabe: (merged_rows, strukturelle Fehler / Hinweise, z. B. abgelehnte Schlüsselgruppen).
|
|
"""
|
|
errors: list[dict[str, Any]] = []
|
|
rows = list(rows)
|
|
if spec.get("dedupe_identical_rows"):
|
|
rows = _dedupe_identical_mapped_rows(rows)
|
|
|
|
group_by = spec.get("group_by") or []
|
|
aggregates = spec.get("aggregates") or {}
|
|
policy = str(spec.get("multi_row_policy") or "aggregate")
|
|
if policy not in ALLOWED_MULTI_ROW_POLICIES:
|
|
policy = "aggregate"
|
|
|
|
if not group_by:
|
|
return rows, errors
|
|
|
|
buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {}
|
|
order: list[tuple[Any, ...]] = []
|
|
for r in rows:
|
|
key = tuple(_sort_key_for_group(r.get(g)) for g in group_by)
|
|
if key not in buckets:
|
|
buckets[key] = []
|
|
order.append(key)
|
|
buckets[key].append(r)
|
|
|
|
gb_label = ", ".join(group_by)
|
|
out: list[dict[str, Any]] = []
|
|
for key in order:
|
|
group_rows = buckets[key]
|
|
if len(group_rows) > 1:
|
|
if policy == "reject":
|
|
errors.append(
|
|
{
|
|
"error": "mehrere_zeilen_pro_schluessel",
|
|
"message": (
|
|
f"{len(group_rows)} CSV-Zeilen mit gleichem Schlüssel ({gb_label}); "
|
|
"laut Vorlage abgelehnt (multi_row_policy=reject)."
|
|
),
|
|
"rows_in_group": len(group_rows),
|
|
}
|
|
)
|
|
continue
|
|
if policy == "first_row":
|
|
group_rows = [group_rows[0]]
|
|
elif policy == "last_row":
|
|
group_rows = [group_rows[-1]]
|
|
|
|
first = group_rows[0]
|
|
merged: dict[str, Any] = {}
|
|
for g in group_by:
|
|
merged[g] = first.get(g)
|
|
for field, op in aggregates.items():
|
|
merged[field] = _apply_aggregate(str(op), [row.get(field) for row in group_rows])
|
|
for row in group_rows:
|
|
for k, v in row.items():
|
|
if k in merged:
|
|
continue
|
|
if k in group_by or k in aggregates:
|
|
continue
|
|
merged[k] = v
|
|
out.append(merged)
|
|
return out, errors
|