mitai-jinkendo/backend/csv_parser/import_row_processing.py
Lars c0fcdea1fe
All checks were successful
Deploy Development / deploy (push) Successful in 48s
Build Test / pytest-backend (push) Successful in 3s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
refactor(csv-import): Enhance nutrition data processing and template rendering
- Updated the nutrition import logic to utilize a new row processing specification, improving data aggregation and validation.
- Refactored the template rendering process in the workflow executor to use Jinja2's Environment with ChainableUndefined for better handling of missing attributes.
- Added backward-compatible shortcuts for accessing decision signals in node contexts, enhancing flexibility in template usage.
- Introduced import row processing options in CSV templates, allowing for more customizable data handling during imports.
2026-04-10 11:56:43 +02:00

153 lines
5.1 KiB
Python

"""
Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert.
Spezifikation in der Vorlage (import_row_processing JSONB) oder Modul-Default
(import_row_processing_default in module_registry).
"""
from __future__ import annotations
import datetime as dt
import statistics
from typing import Any, Mapping
from csv_parser.module_registry import get_module_definition
ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"})
def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None:
"""Explizite Vorlage hat Vorrang; sonst Modul-Default; leeres Dict zählt wie „nicht gesetzt“."""
raw = mapping_row.get("import_row_processing")
if isinstance(raw, dict) and raw:
return dict(raw)
mod = get_module_definition(module)
if not mod:
return None
default = mod.get("import_row_processing_default")
if isinstance(default, dict) and default:
return dict(default)
return None
def validate_import_row_processing(
module: str,
spec: Mapping[str, Any],
field_mappings: Mapping[str, Any],
) -> None:
"""Wirft ValueError bei ungültiger Konfiguration."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
allowed = set(mod.get("fields") or [])
fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")}
group_by = spec.get("group_by") or []
if not isinstance(group_by, list) or not all(isinstance(x, str) for x in group_by):
raise ValueError("import_row_processing.group_by muss eine Liste von Feldnamen sein")
aggregates = spec.get("aggregates") or {}
if not isinstance(aggregates, dict):
raise ValueError("import_row_processing.aggregates muss ein Objekt sein")
for g in group_by:
if g not in allowed:
raise ValueError(f"group_by: unbekanntes Feld '{g}' für Modul '{module}'")
if g not in fm_targets:
raise ValueError(
f"group_by: Zielfeld '{g}' ist keiner CSV-Spalte zugeordnet — Aggregation nicht möglich."
)
for field, op in aggregates.items():
if field not in allowed:
raise ValueError(f"aggregates: unbekanntes Feld '{field}' für Modul '{module}'")
if str(op) not in ALLOWED_AGGREGATES:
raise ValueError(
f"aggregates['{field}']: ungültige Operation '{op}'. "
f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}"
)
def _sort_key_for_group(v: Any) -> Any:
if isinstance(v, dt.datetime):
return v.isoformat()
if isinstance(v, dt.date):
return v.isoformat()
if isinstance(v, dt.time):
return v.isoformat()
return v
def _apply_aggregate(op: str, values: list[Any]) -> Any:
nums: list[float] = []
for x in values:
if x is None or x == "":
continue
try:
nums.append(float(x))
except (TypeError, ValueError):
continue
if op == "sum":
return sum(nums) if nums else None
if op == "mean":
return statistics.mean(nums) if nums else None
if op == "median":
return float(statistics.median(nums)) if nums else None
if op == "min":
return min(nums) if nums else None
if op == "max":
return max(nums) if nums else None
if op == "first":
for x in values:
if x is not None and x != "":
return x
return None
if op == "last":
for x in reversed(values):
if x is not None and x != "":
return x
return None
raise ValueError(f"Unbekannte Aggregations-Operation: {op}")
def aggregate_mapped_rows(
rows: list[dict[str, Any]],
spec: Mapping[str, Any],
) -> list[dict[str, Any]]:
"""
Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an.
Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe.
"""
group_by = spec.get("group_by") or []
aggregates = spec.get("aggregates") or {}
if not group_by:
return rows
buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {}
order: list[tuple[Any, ...]] = []
for r in rows:
key = tuple(_sort_key_for_group(r.get(g)) for g in group_by)
if key not in buckets:
buckets[key] = []
order.append(key)
buckets[key].append(r)
out: list[dict[str, Any]] = []
for key in order:
group_rows = buckets[key]
first = group_rows[0]
merged: dict[str, Any] = {}
for g in group_by:
merged[g] = first.get(g)
for field, op in aggregates.items():
merged[field] = _apply_aggregate(str(op), [row.get(field) for row in group_rows])
for row in group_rows:
for k, v in row.items():
if k in merged:
continue
if k in group_by or k in aggregates:
continue
merged[k] = v
out.append(merged)
return out