mitai-jinkendo/backend/csv_parser/import_row_processing.py
Lars 574af61349
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance CSV import and validation for activity module
- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy.
- Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports.
- Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity.
- Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
2026-04-15 08:12:58 +02:00

218 lines
7.7 KiB
Python

"""
Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert.
Spezifikation in der Vorlage (import_row_processing JSONB). Optional: Modul-Default
(import_row_processing_default in module_registry) nur als **Legacy-Fallback**, wenn
die Vorlage nichts speichert — mittelfristig sollen Vorlagen explizit sein.
"""
from __future__ import annotations
import datetime as dt
import statistics
from typing import Any, Mapping
from csv_parser.module_registry import get_module_definition
ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"})
# Mehr als eine CSV-Zeile pro group_by-Schlüssel
ALLOWED_MULTI_ROW_POLICIES = frozenset({"aggregate", "reject", "first_row", "last_row"})
def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None:
"""Explizite Vorlage hat Vorrang; sonst Modul-Default; leeres Dict zählt wie „nicht gesetzt“."""
raw = mapping_row.get("import_row_processing")
if isinstance(raw, dict) and raw:
return dict(raw)
mod = get_module_definition(module)
if not mod:
return None
default = mod.get("import_row_processing_default")
if isinstance(default, dict) and default:
return dict(default)
return None
def validate_import_row_processing(
module: str,
spec: Mapping[str, Any],
field_mappings: Mapping[str, Any],
cur=None,
) -> None:
"""Wirft ValueError bei ungültiger Konfiguration."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
allowed = set(mod.get("fields") or [])
if module == "activity" and cur is not None:
cur.execute("SELECT key FROM training_parameters WHERE is_active = true")
allowed.update(str(r["key"]) for r in cur.fetchall())
fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")}
group_by = spec.get("group_by") or []
if not isinstance(group_by, list) or not all(isinstance(x, str) for x in group_by):
raise ValueError("import_row_processing.group_by muss eine Liste von Feldnamen sein")
aggregates = spec.get("aggregates") or {}
if not isinstance(aggregates, dict):
raise ValueError("import_row_processing.aggregates muss ein Objekt sein")
for g in group_by:
if g not in allowed:
raise ValueError(f"group_by: unbekanntes Feld '{g}' für Modul '{module}'")
if g not in fm_targets:
raise ValueError(
f"group_by: Zielfeld '{g}' ist keiner CSV-Spalte zugeordnet — Aggregation nicht möglich."
)
for field, op in aggregates.items():
if field not in allowed:
raise ValueError(f"aggregates: unbekanntes Feld '{field}' für Modul '{module}'")
if str(op) not in ALLOWED_AGGREGATES:
raise ValueError(
f"aggregates['{field}']: ungültige Operation '{op}'. "
f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}"
)
mrp = spec.get("multi_row_policy")
if mrp is not None and str(mrp) not in ALLOWED_MULTI_ROW_POLICIES:
raise ValueError(
f"multi_row_policy: ungültiger Wert '{mrp}'. "
f"Erlaubt: {', '.join(sorted(ALLOWED_MULTI_ROW_POLICIES))}"
)
dedupe = spec.get("dedupe_identical_rows")
if dedupe is not None and not isinstance(dedupe, bool):
raise ValueError("dedupe_identical_rows muss ein Boolean sein")
def _sort_key_for_group(v: Any) -> Any:
if isinstance(v, dt.datetime):
return v.isoformat()
if isinstance(v, dt.date):
return v.isoformat()
if isinstance(v, dt.time):
return v.isoformat()
return v
def _apply_aggregate(op: str, values: list[Any]) -> Any:
nums: list[float] = []
for x in values:
if x is None or x == "":
continue
try:
nums.append(float(x))
except (TypeError, ValueError):
continue
if op == "sum":
return sum(nums) if nums else None
if op == "mean":
return statistics.mean(nums) if nums else None
if op == "median":
return float(statistics.median(nums)) if nums else None
if op == "min":
return min(nums) if nums else None
if op == "max":
return max(nums) if nums else None
if op == "first":
for x in values:
if x is not None and x != "":
return x
return None
if op == "last":
for x in reversed(values):
if x is not None and x != "":
return x
return None
raise ValueError(f"Unbekannte Aggregations-Operation: {op}")
def _row_identity_signature(r: dict[str, Any]) -> tuple[Any, ...]:
return tuple(sorted((k, _sort_key_for_group(r.get(k))) for k in sorted(r.keys())))
def _dedupe_identical_mapped_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Exakt gleiche gemappte Zeilen (alle Keys/Werte) — erste behalten."""
seen: set[tuple[Any, ...]] = set()
out: list[dict[str, Any]] = []
for r in rows:
sig = _row_identity_signature(r)
if sig in seen:
continue
seen.add(sig)
out.append(r)
return out
def aggregate_mapped_rows(
rows: list[dict[str, Any]],
spec: Mapping[str, Any],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""
Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an.
Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe.
Rückgabe: (merged_rows, strukturelle Fehler / Hinweise, z. B. abgelehnte Schlüsselgruppen).
"""
errors: list[dict[str, Any]] = []
rows = list(rows)
if spec.get("dedupe_identical_rows"):
rows = _dedupe_identical_mapped_rows(rows)
group_by = spec.get("group_by") or []
aggregates = spec.get("aggregates") or {}
policy = str(spec.get("multi_row_policy") or "aggregate")
if policy not in ALLOWED_MULTI_ROW_POLICIES:
policy = "aggregate"
if not group_by:
return rows, errors
buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {}
order: list[tuple[Any, ...]] = []
for r in rows:
key = tuple(_sort_key_for_group(r.get(g)) for g in group_by)
if key not in buckets:
buckets[key] = []
order.append(key)
buckets[key].append(r)
gb_label = ", ".join(group_by)
out: list[dict[str, Any]] = []
for key in order:
group_rows = buckets[key]
if len(group_rows) > 1:
if policy == "reject":
errors.append(
{
"error": "mehrere_zeilen_pro_schluessel",
"message": (
f"{len(group_rows)} CSV-Zeilen mit gleichem Schlüssel ({gb_label}); "
"laut Vorlage abgelehnt (multi_row_policy=reject)."
),
"rows_in_group": len(group_rows),
}
)
continue
if policy == "first_row":
group_rows = [group_rows[0]]
elif policy == "last_row":
group_rows = [group_rows[-1]]
first = group_rows[0]
merged: dict[str, Any] = {}
for g in group_by:
merged[g] = first.get(g)
for field, op in aggregates.items():
merged[field] = _apply_aggregate(str(op), [row.get(field) for row in group_rows])
for row in group_rows:
for k, v in row.items():
if k in merged:
continue
if k in group_by or k in aggregates:
continue
merged[k] = v
out.append(merged)
return out, errors