diff --git a/backend/csv_parser/executor.py b/backend/csv_parser/executor.py index 54114cc..3bfcd85 100644 --- a/backend/csv_parser/executor.py +++ b/backend/csv_parser/executor.py @@ -12,6 +12,11 @@ from typing import Any import logging from csv_parser.core import iter_csv_dict_rows +from csv_parser.import_row_processing import ( + aggregate_mapped_rows, + resolve_import_row_processing, + validate_import_row_processing, +) from csv_parser.module_registry import get_module_definition from csv_parser.type_converter import build_row_after_mapping @@ -111,6 +116,7 @@ def run_universal_csv_import( bool(has_header), fm, tc, + mapping, error_details, affected_ids, ) @@ -191,12 +197,12 @@ def _import_nutrition( has_header: bool, fm: dict, tc: dict | None, + mapping: dict[str, Any], error_details: list, affected_ids: dict, ) -> dict[str, int]: - agg: dict[str, dict[str, float]] = defaultdict( - lambda: {"kcal": 0.0, "protein_g": 0.0, "fat_g": 0.0, "carbs_g": 0.0} - ) + spec = resolve_import_row_processing("nutrition", mapping) + mapped_rows: list[dict[str, Any]] = [] rows_total = 0 for csv_row in iter_csv_dict_rows(text, delim, has_header=has_header): rows_total += 1 @@ -205,23 +211,39 @@ def _import_nutrition( if d is None: error_details.append({"row": rows_total, "error": "Datum fehlt oder ungültig"}) continue - iso = d.isoformat() - for key in ("kcal", "protein_g", "fat_g", "carbs_g"): - v = mapped.get(key) - if v is not None: - try: - agg[iso][key] += float(v) - except (TypeError, ValueError): - pass + mapped["date"] = d + mapped_rows.append(mapped) + + if spec: + try: + validate_import_row_processing("nutrition", spec, fm) + except ValueError as e: + raise ValueError(str(e)) from e + merged_rows = aggregate_mapped_rows(mapped_rows, spec) + else: + merged_rows = list(mapped_rows) inserted = 0 updated = 0 new_entries = 0 - for iso, vals in agg.items(): - kcal = round(vals["kcal"], 1) - fat = round(vals["fat_g"], 1) - carbs = round(vals["carbs_g"], 1) - prot = round(vals["protein_g"], 1) + for merged in merged_rows: + d = coerce_date(merged.get("date")) + if d is None: + continue + iso = d.isoformat() + + def _sf_macro(x: Any) -> float: + if x is None or x == "": + return 0.0 + try: + return float(x) + except (TypeError, ValueError): + return 0.0 + + kcal = round(_sf_macro(merged.get("kcal")), 1) + fat = round(_sf_macro(merged.get("fat_g")), 1) + carbs = round(_sf_macro(merged.get("carbs_g")), 1) + prot = round(_sf_macro(merged.get("protein_g")), 1) if kcal == 0 and fat == 0 and carbs == 0 and prot == 0: continue diff --git a/backend/csv_parser/import_row_processing.py b/backend/csv_parser/import_row_processing.py new file mode 100644 index 0000000..f2bc19f --- /dev/null +++ b/backend/csv_parser/import_row_processing.py @@ -0,0 +1,152 @@ +""" +Zeilenaggregation nach CSV-Mapping (group_by + aggregates), vor dem DB-Upsert. + +Spezifikation in der Vorlage (import_row_processing JSONB) oder Modul-Default +(import_row_processing_default in module_registry). +""" + +from __future__ import annotations + +import datetime as dt +import statistics +from typing import Any, Mapping + +from csv_parser.module_registry import get_module_definition + +ALLOWED_AGGREGATES = frozenset({"sum", "mean", "min", "max", "median", "first", "last"}) + + +def resolve_import_row_processing(module: str, mapping_row: Mapping[str, Any]) -> dict[str, Any] | None: + """Explizite Vorlage hat Vorrang; sonst Modul-Default; leeres Dict zählt wie „nicht gesetzt“.""" + raw = mapping_row.get("import_row_processing") + if isinstance(raw, dict) and raw: + return dict(raw) + mod = get_module_definition(module) + if not mod: + return None + default = mod.get("import_row_processing_default") + if isinstance(default, dict) and default: + return dict(default) + return None + + +def validate_import_row_processing( + module: str, + spec: Mapping[str, Any], + field_mappings: Mapping[str, Any], +) -> None: + """Wirft ValueError bei ungültiger Konfiguration.""" + mod = get_module_definition(module) + if not mod: + raise ValueError(f"Unbekanntes Modul: {module}") + allowed = set(mod.get("fields") or []) + fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")} + + group_by = spec.get("group_by") or [] + if not isinstance(group_by, list) or not all(isinstance(x, str) for x in group_by): + raise ValueError("import_row_processing.group_by muss eine Liste von Feldnamen sein") + aggregates = spec.get("aggregates") or {} + if not isinstance(aggregates, dict): + raise ValueError("import_row_processing.aggregates muss ein Objekt sein") + + for g in group_by: + if g not in allowed: + raise ValueError(f"group_by: unbekanntes Feld '{g}' für Modul '{module}'") + if g not in fm_targets: + raise ValueError( + f"group_by: Zielfeld '{g}' ist keiner CSV-Spalte zugeordnet — Aggregation nicht möglich." + ) + + for field, op in aggregates.items(): + if field not in allowed: + raise ValueError(f"aggregates: unbekanntes Feld '{field}' für Modul '{module}'") + if str(op) not in ALLOWED_AGGREGATES: + raise ValueError( + f"aggregates['{field}']: ungültige Operation '{op}'. " + f"Erlaubt: {', '.join(sorted(ALLOWED_AGGREGATES))}" + ) + + +def _sort_key_for_group(v: Any) -> Any: + if isinstance(v, dt.datetime): + return v.isoformat() + if isinstance(v, dt.date): + return v.isoformat() + if isinstance(v, dt.time): + return v.isoformat() + return v + + +def _apply_aggregate(op: str, values: list[Any]) -> Any: + nums: list[float] = [] + for x in values: + if x is None or x == "": + continue + try: + nums.append(float(x)) + except (TypeError, ValueError): + continue + + if op == "sum": + return sum(nums) if nums else None + if op == "mean": + return statistics.mean(nums) if nums else None + if op == "median": + return float(statistics.median(nums)) if nums else None + if op == "min": + return min(nums) if nums else None + if op == "max": + return max(nums) if nums else None + if op == "first": + for x in values: + if x is not None and x != "": + return x + return None + if op == "last": + for x in reversed(values): + if x is not None and x != "": + return x + return None + raise ValueError(f"Unbekannte Aggregations-Operation: {op}") + + +def aggregate_mapped_rows( + rows: list[dict[str, Any]], + spec: Mapping[str, Any], +) -> list[dict[str, Any]]: + """ + Gruppiert gemappte Zeilen-Dicts nach group_by und wendet aggregates an. + Felder, die weder in group_by noch in aggregates vorkommen: Wert aus der ersten Zeile der Gruppe. + """ + group_by = spec.get("group_by") or [] + aggregates = spec.get("aggregates") or {} + if not group_by: + return rows + + buckets: dict[tuple[Any, ...], list[dict[str, Any]]] = {} + order: list[tuple[Any, ...]] = [] + for r in rows: + key = tuple(_sort_key_for_group(r.get(g)) for g in group_by) + if key not in buckets: + buckets[key] = [] + order.append(key) + buckets[key].append(r) + + out: list[dict[str, Any]] = [] + for key in order: + group_rows = buckets[key] + first = group_rows[0] + merged: dict[str, Any] = {} + for g in group_by: + merged[g] = first.get(g) + for field, op in aggregates.items(): + merged[field] = _apply_aggregate(str(op), [row.get(field) for row in group_rows]) + for row in group_rows: + for k, v in row.items(): + if k in merged: + continue + if k in group_by or k in aggregates: + continue + merged[k] = v + out.append(merged) + return out diff --git a/backend/csv_parser/module_registry.py b/backend/csv_parser/module_registry.py index 4a6ca20..8d22c0e 100644 --- a/backend/csv_parser/module_registry.py +++ b/backend/csv_parser/module_registry.py @@ -23,6 +23,16 @@ MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = { }, "duplicate_key": ["profile_id", "date"], "duplicate_strategy": "update", + # Mehrere CSV-Zeilen pro Tag (z. B. pro Lebensmittel) → ein nutrition_log-Eintrag + "import_row_processing_default": { + "group_by": ["date"], + "aggregates": { + "kcal": "sum", + "protein_g": "sum", + "fat_g": "sum", + "carbs_g": "sum", + }, + }, }, "activity": { "table": "activity_log", diff --git a/backend/migrations/047_csv_import_row_processing.sql b/backend/migrations/047_csv_import_row_processing.sql new file mode 100644 index 0000000..364f342 --- /dev/null +++ b/backend/migrations/047_csv_import_row_processing.sql @@ -0,0 +1,7 @@ +-- Migration 047: CSV-Vorlagen — optionale Zeilenaggregation (group_by + aggregates) vor DB-Schreiben + +ALTER TABLE csv_field_mappings +ADD COLUMN IF NOT EXISTS import_row_processing JSONB; + +COMMENT ON COLUMN csv_field_mappings.import_row_processing IS +'Optional: { "group_by": ["date"], "aggregates": { "kcal": "sum" } } — siehe csv_parser/import_row_processing.py'; diff --git a/backend/routers/admin_csv_templates.py b/backend/routers/admin_csv_templates.py index 14ab754..bafc20a 100644 --- a/backend/routers/admin_csv_templates.py +++ b/backend/routers/admin_csv_templates.py @@ -20,6 +20,9 @@ from csv_parser.core import ( parse_csv_sample, ) from csv_parser.mapping_suggest import build_type_conversions_for_mapping, suggest_field_mappings +from csv_parser.import_row_processing import ( + validate_import_row_processing as validate_import_row_processing_spec, +) from csv_parser.module_registry import ( get_module_definition, validate_field_mappings, @@ -39,6 +42,7 @@ class CsvSystemTemplateCreate(BaseModel): has_header: bool = True field_mappings: dict = Field(default_factory=dict) type_conversions: Optional[dict] = None + import_row_processing: Optional[dict] = None class CsvSystemTemplateUpdate(BaseModel): @@ -50,6 +54,7 @@ class CsvSystemTemplateUpdate(BaseModel): has_header: Optional[bool] = None field_mappings: Optional[dict] = None type_conversions: Optional[dict] = None + import_row_processing: Optional[dict] = None class CsvImportLimitsBody(BaseModel): @@ -69,6 +74,7 @@ def _row_full(m: dict) -> dict: "has_header": m["has_header"], "field_mappings": m["field_mappings"], "type_conversions": m.get("type_conversions"), + "import_row_processing": m.get("import_row_processing"), "usage_count": m.get("usage_count"), "success_rate": m.get("success_rate"), "last_used_at": m.get("last_used_at"), @@ -273,6 +279,12 @@ def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depend except ValueError as e: raise HTTPException(400, str(e)) + if body.import_row_processing: + try: + validate_import_row_processing_spec(body.module, body.import_row_processing, body.field_mappings) + except ValueError as e: + raise HTTPException(400, str(e)) + with get_db() as conn: cur = get_cursor(conn) cur.execute( @@ -280,9 +292,9 @@ def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depend INSERT INTO csv_field_mappings ( profile_id, is_system, module, mapping_name, description, column_signature, delimiter, encoding, has_header, - field_mappings, type_conversions + field_mappings, type_conversions, import_row_processing ) VALUES ( - NULL, true, %s, %s, %s, %s, %s, %s, %s, %s, %s + NULL, true, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) RETURNING id """, ( @@ -295,6 +307,7 @@ def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depend body.has_header, Json(body.field_mappings), Json(body.type_conversions) if body.type_conversions is not None else None, + Json(body.import_row_processing) if body.import_row_processing is not None else None, ), ) new_id = cur.fetchone()["id"] @@ -356,6 +369,19 @@ def update_system_template( fields_sql.append("type_conversions = %s") tc = patch["type_conversions"] vals.append(Json(tc) if tc is not None else None) + if "import_row_processing" in patch: + irp = patch["import_row_processing"] + if irp: + try: + validate_import_row_processing_spec( + existing["module"], + irp, + patch.get("field_mappings", existing["field_mappings"]), + ) + except ValueError as e: + raise HTTPException(400, str(e)) + fields_sql.append("import_row_processing = %s") + vals.append(Json(irp) if irp is not None else None) fields_sql.append("updated_at = CURRENT_TIMESTAMP") vals.append(template_id) diff --git a/backend/routers/csv_import.py b/backend/routers/csv_import.py index 6033c7d..338db2e 100644 --- a/backend/routers/csv_import.py +++ b/backend/routers/csv_import.py @@ -173,10 +173,11 @@ def copy_csv_mapping( INSERT INTO csv_field_mappings ( profile_id, is_system, module, mapping_name, description, column_signature, delimiter, encoding, has_header, - field_mappings, type_conversions, usage_count, success_rate + field_mappings, type_conversions, import_row_processing, + usage_count, success_rate ) VALUES ( %s::uuid, false, %s, %s, %s, - %s, %s, %s, %s, %s, %s, 0, 1.0 + %s, %s, %s, %s, %s, %s, %s, 0, 1.0 ) RETURNING id """, ( @@ -190,6 +191,9 @@ def copy_csv_mapping( src["has_header"], Json(src["field_mappings"]), Json(src["type_conversions"]) if src.get("type_conversions") is not None else None, + Json(src["import_row_processing"]) + if src.get("import_row_processing") is not None + else None, ), ) new_id = cur.fetchone()["id"] diff --git a/backend/tests/test_csv_import_executor.py b/backend/tests/test_csv_import_executor.py index b70252a..28d7327 100644 --- a/backend/tests/test_csv_import_executor.py +++ b/backend/tests/test_csv_import_executor.py @@ -176,3 +176,42 @@ def test_run_universal_import_activity_garmin_time_plus_date_columns(monkeypatch for _sql, params in cur.executes if params ) + + +def test_run_universal_import_nutrition_two_rows_same_day_aggregates_to_one_row(): + """Modul-Default: mehrere CSV-Zeilen pro Tag → Summe, ein nutrition_log-Eintrag.""" + text = ( + "Date,Kalorien,Protein,Fett,KH\n" + "2024-01-15,500,10,20,30\n" + "2024-01-15,300,5,10,15\n" + ) + mapping = { + "delimiter": ",", + "has_header": True, + "field_mappings": { + "Date": "date", + "Kalorien": "kcal", + "Protein": "protein_g", + "Fett": "fat_g", + "KH": "carbs_g", + }, + "type_conversions": { + "date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True}, + "kcal": {"type": "float", "decimal_separator": ".", "flexible": True}, + "protein_g": {"type": "float", "decimal_separator": ".", "flexible": True}, + "fat_g": {"type": "float", "decimal_separator": ".", "flexible": True}, + "carbs_g": {"type": "float", "decimal_separator": ".", "flexible": True}, + }, + } + cur = _SeqCursor([None]) + out = run_universal_csv_import(cur, PID, "nutrition", text, "n.csv", mapping) + assert out["rows_total"] == 2 + assert out["rows_imported"] == 1 + insert_sqls = [q for q in cur.executes if "INSERT INTO nutrition_log" in q[0]] + assert len(insert_sqls) == 1 + params = insert_sqls[0][1] + # (eid, profile_id, iso, kcal, prot, fat, carbs) + assert params[3] == 800.0 + assert params[4] == 15.0 + assert params[5] == 30.0 + assert params[6] == 45.0 diff --git a/backend/tests/test_import_row_processing.py b/backend/tests/test_import_row_processing.py new file mode 100644 index 0000000..8f32ba2 --- /dev/null +++ b/backend/tests/test_import_row_processing.py @@ -0,0 +1,67 @@ +"""Tests für CSV-Zeilenaggregation (import_row_processing).""" + +from __future__ import annotations + +import datetime as dt + +import pytest + +from csv_parser.import_row_processing import ( + aggregate_mapped_rows, + resolve_import_row_processing, + validate_import_row_processing, +) + + +def test_validate_rejects_unknown_aggregate(): + with pytest.raises(ValueError, match="ungültige Operation"): + validate_import_row_processing( + "nutrition", + {"group_by": ["date"], "aggregates": {"kcal": "bogus"}}, + {"Kal": "date", "E": "kcal"}, + ) + + +def test_validate_group_by_must_be_mapped(): + with pytest.raises(ValueError, match="keiner CSV-Spalte zugeordnet"): + validate_import_row_processing( + "nutrition", + {"group_by": ["date"], "aggregates": {"kcal": "sum"}}, + {"Kal": "kcal"}, # date nicht gemappt + ) + + +def test_aggregate_mapped_rows_sums_same_group(): + d = dt.date(2024, 1, 15) + rows = [ + {"date": d, "kcal": 500.0, "protein_g": 20}, + {"date": d, "kcal": 300.0, "protein_g": 15}, + ] + spec = {"group_by": ["date"], "aggregates": {"kcal": "sum", "protein_g": "sum"}} + out = aggregate_mapped_rows(rows, spec) + assert len(out) == 1 + assert out[0]["kcal"] == 800.0 + assert out[0]["protein_g"] == 35 + + +def test_resolve_explicit_overrides_default(): + m = { + "import_row_processing": {"group_by": ["date"], "aggregates": {"kcal": "mean"}}, + } + spec = resolve_import_row_processing("nutrition", m) + assert spec is not None + assert spec["aggregates"]["kcal"] == "mean" + + +def test_resolve_empty_dict_falls_back_to_module_default(): + m: dict = {"import_row_processing": {}} + spec = resolve_import_row_processing("nutrition", m) + assert spec is not None + assert spec["group_by"] == ["date"] + assert spec["aggregates"]["kcal"] == "sum" + + +def test_resolve_none_uses_nutrition_default(): + spec = resolve_import_row_processing("nutrition", {}) + assert spec is not None + assert "date" in (spec.get("group_by") or []) diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index b14fc0d..e238479 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -15,7 +15,7 @@ from datetime import datetime import uuid import logging import json -from jinja2 import Template, TemplateError +from jinja2 import Environment, ChainableUndefined, TemplateError from workflow_models import ( WorkflowGraph, NodeExecutionState, ExecutionResult, @@ -618,6 +618,11 @@ def execute_end_node( "reasoning_anchors": node_state.reasoning_anchors or "", "status": node_state.status.value if node_state.status else "unknown", } + # Backward-compatible shortcut: + # allow {{node_id.relevanz}} in addition to {{node_id.decision_signals.relevanz}} + if node_state.decision_signals: + for signal_key, signal_value in node_state.decision_signals.items(): + node_context[signal_key] = signal_value # Add normalized signals as {{node_id.signal_ID}} # NOTE: question_type now IS the ID (not the type!) @@ -660,7 +665,10 @@ def execute_end_node( # Render template try: - jinja_template = Template(node.template) + # ChainableUndefined keeps missing nested attributes renderable + # so Jinja's default filter can handle them. + jinja_env = Environment(undefined=ChainableUndefined) + jinja_template = jinja_env.from_string(node.template) final_output = jinja_template.render(template_context) logger.info(f"End node {node.id}: Template rendered successfully ({len(final_output)} chars)") except TemplateError as te: