- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy. - Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports. - Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity. - Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
218 lines
7.7 KiB
Python
218 lines
7.7 KiB
Python
"""
|
|
Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert.
|
|
|
|
Spezifikation in der Vorlage (import_row_processing JSONB). Optional: Modul-Default
|
|
(import_row_processing_default in module_registry) nur als **Legacy-Fallback**, wenn
|
|
die Vorlage nichts speichert — mittelfristig sollen Vorlagen explizit sein.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
import statistics
|
|
from typing import Any, Mapping
|
|
|
|
from csv_parser.module_registry import get_module_definition
|
|
|
|
ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"})
|
|
# Mehr als eine CSV-Zeile pro group_by-Schlüssel
|
|
ALLOWED_MULTI_ROW_POLICIES = frozenset({"aggregate", "reject", "first_row", "last_row"})
|
|
|
|
|
|
def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None:
|
|
"""Explizite Vorlage hat Vorrang; sonst Modul-Default; leeres Dict zählt wie „nicht gesetzt“."""
|
|
raw = mapping_row.get("import_row_processing")
|
|
if isinstance(raw, dict) and raw:
|
|
return dict(raw)
|
|
mod = get_module_definition(module)
|
|
if not mod:
|
|
return None
|
|
default = mod.get("import_row_processing_default")
|
|
if isinstance(default, dict) and default:
|
|
return dict(default)
|
|
return None
|
|
|
|
|
|
def validate_import_row_processing(
|
|
module: str,
|
|
spec: Mapping[str, Any],
|
|
field_mappings: Mapping[str, Any],
|
|
cur=None,
|
|
) -> None:
|
|
"""Wirft ValueError bei ungültiger Konfiguration."""
|
|
mod = get_module_definition(module)
|
|
if not mod:
|
|
raise ValueError(f"Unbekanntes Modul: {module}")
|
|
allowed = set(mod.get("fields") or [])
|
|
if module == "activity" and cur is not None:
|
|
cur.execute("SELECT key FROM training_parameters WHERE is_active = true")
|
|
allowed.update(str(r["key"]) for r in cur.fetchall())
|
|
fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")}
|
|
|
|
group_by = spec.get("group_by") or []
|
|
if not isinstance(group_by, list) or not all(isinstance(x, str) for x in group_by):
|
|
raise ValueError("import_row_processing.group_by muss eine Liste von Feldnamen sein")
|
|
aggregates = spec.get("aggregates") or {}
|
|
if not isinstance(aggregates, dict):
|
|
raise ValueError("import_row_processing.aggregates muss ein Objekt sein")
|
|
|
|
for g in group_by:
|
|
if g not in allowed:
|
|
raise ValueError(f"group_by: unbekanntes Feld '{g}' für Modul '{module}'")
|
|
if g not in fm_targets:
|
|
raise ValueError(
|
|
f"group_by: Zielfeld '{g}' ist keiner CSV-Spalte zugeordnet — Aggregation nicht möglich."
|
|
)
|
|
|
|
for field, op in aggregates.items():
|
|
if field not in allowed:
|
|
raise ValueError(f"aggregates: unbekanntes Feld '{field}' für Modul '{module}'")
|
|
if str(op) not in ALLOWED_AGGREGATES:
|
|
raise ValueError(
|
|
f"aggregates['{field}']: ungültige Operation '{op}'. "
|
|
f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}"
|
|
)
|
|
|
|
mrp = spec.get("multi_row_policy")
|
|
if mrp is not None and str(mrp) not in ALLOWED_MULTI_ROW_POLICIES:
|
|
raise ValueError(
|
|
f"multi_row_policy: ungültiger Wert '{mrp}'. "
|
|
f"Erlaubt: {', '.join(sorted(ALLOWED_MULTI_ROW_POLICIES))}"
|
|
)
|
|
|
|
dedupe = spec.get("dedupe_identical_rows")
|
|
if dedupe is not None and not isinstance(dedupe, bool):
|
|
raise ValueError("dedupe_identical_rows muss ein Boolean sein")
|
|
|
|
|
|
def _sort_key_for_group(v: Any) -> Any:
|
|
if isinstance(v, dt.datetime):
|
|
return v.isoformat()
|
|
if isinstance(v, dt.date):
|
|
return v.isoformat()
|
|
if isinstance(v, dt.time):
|
|
return v.isoformat()
|
|
return v
|
|
|
|
|
|
def _apply_aggregate(op: str, values: list[Any]) -> Any:
|
|
nums: list[float] = []
|
|
for x in values:
|
|
if x is None or x == "":
|
|
continue
|
|
try:
|
|
nums.append(float(x))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
|
|
if op == "sum":
|
|
return sum(nums) if nums else None
|
|
if op == "mean":
|
|
return statistics.mean(nums) if nums else None
|
|
if op == "median":
|
|
return float(statistics.median(nums)) if nums else None
|
|
if op == "min":
|
|
return min(nums) if nums else None
|
|
if op == "max":
|
|
return max(nums) if nums else None
|
|
if op == "first":
|
|
for x in values:
|
|
if x is not None and x != "":
|
|
return x
|
|
return None
|
|
if op == "last":
|
|
for x in reversed(values):
|
|
if x is not None and x != "":
|
|
return x
|
|
return None
|
|
raise ValueError(f"Unbekannte Aggregations-Operation: {op}")
|
|
|
|
|
|
def _row_identity_signature(r: dict[str, Any]) -> tuple[Any, ...]:
|
|
return tuple(sorted((k, _sort_key_for_group(r.get(k))) for k in sorted(r.keys())))
|
|
|
|
|
|
def _dedupe_identical_mapped_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""Exakt gleiche gemappte Zeilen (alle Keys/Werte) — erste behalten."""
|
|
seen: set[tuple[Any, ...]] = set()
|
|
out: list[dict[str, Any]] = []
|
|
for r in rows:
|
|
sig = _row_identity_signature(r)
|
|
if sig in seen:
|
|
continue
|
|
seen.add(sig)
|
|
out.append(r)
|
|
return out
|
|
|
|
|
|
def aggregate_mapped_rows(
|
|
rows: list[dict[str, Any]],
|
|
spec: Mapping[str, Any],
|
|
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
|
"""
|
|
Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an.
|
|
Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe.
|
|
|
|
Rückgabe: (merged_rows, strukturelle Fehler / Hinweise, z. B. abgelehnte Schlüsselgruppen).
|
|
"""
|
|
errors: list[dict[str, Any]] = []
|
|
rows = list(rows)
|
|
if spec.get("dedupe_identical_rows"):
|
|
rows = _dedupe_identical_mapped_rows(rows)
|
|
|
|
group_by = spec.get("group_by") or []
|
|
aggregates = spec.get("aggregates") or {}
|
|
policy = str(spec.get("multi_row_policy") or "aggregate")
|
|
if policy not in ALLOWED_MULTI_ROW_POLICIES:
|
|
policy = "aggregate"
|
|
|
|
if not group_by:
|
|
return rows, errors
|
|
|
|
buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {}
|
|
order: list[tuple[Any, ...]] = []
|
|
for r in rows:
|
|
key = tuple(_sort_key_for_group(r.get(g)) for g in group_by)
|
|
if key not in buckets:
|
|
buckets[key] = []
|
|
order.append(key)
|
|
buckets[key].append(r)
|
|
|
|
gb_label = ", ".join(group_by)
|
|
out: list[dict[str, Any]] = []
|
|
for key in order:
|
|
group_rows = buckets[key]
|
|
if len(group_rows) > 1:
|
|
if policy == "reject":
|
|
errors.append(
|
|
{
|
|
"error": "mehrere_zeilen_pro_schluessel",
|
|
"message": (
|
|
f"{len(group_rows)} CSV-Zeilen mit gleichem Schlüssel ({gb_label}); "
|
|
"laut Vorlage abgelehnt (multi_row_policy=reject)."
|
|
),
|
|
"rows_in_group": len(group_rows),
|
|
}
|
|
)
|
|
continue
|
|
if policy == "first_row":
|
|
group_rows = [group_rows[0]]
|
|
elif policy == "last_row":
|
|
group_rows = [group_rows[-1]]
|
|
|
|
first = group_rows[0]
|
|
merged: dict[str, Any] = {}
|
|
for g in group_by:
|
|
merged[g] = first.get(g)
|
|
for field, op in aggregates.items():
|
|
merged[field] = _apply_aggregate(str(op), [row.get(field) for row in group_rows])
|
|
for row in group_rows:
|
|
for k, v in row.items():
|
|
if k in merged:
|
|
continue
|
|
if k in group_by or k in aggregates:
|
|
continue
|
|
merged[k] = v
|
|
out.append(merged)
|
|
return out, errors
|