mitai-jinkendo/backend/csv_parser/template_validator.py
Lars 0629f88b37
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat(csv-templates): Add CSV template validation endpoint and enhance error handling
- Introduced a new endpoint for validating CSV templates without saving, allowing users to check field mappings and type conversions.
- Updated the `create_system_template` and `update_system_template` functions to include validation reports in responses.
- Enhanced error handling in CSV import processes by integrating `enrich_row_error` for more informative error messages.
- Improved the AdminCsvTemplateEditorPage to support format checking and display validation results, enhancing user experience.
- Incremented version numbers for `csv_import` and `admin_csv_templates` to reflect these updates.
2026-04-11 06:47:27 +02:00

224 lines
7.8 KiB
Python

"""
Formatprüfung für CSV-Import-Vorlagen (field_mappings, type_conversions).
Liefert strukturierte Fehler/Warnungen für Admin-UI und Speicher-Guards.
"""
from __future__ import annotations
from typing import Any, Mapping
from csv_parser.import_row_processing import validate_import_row_processing as validate_import_row_processing_spec
from csv_parser.module_registry import (
get_module_definition,
validate_field_mappings,
validate_required_field_targets,
)
ALLOWED_SPEC_TYPES = frozenset(
{"string", "float", "number", "int", "date", "time", "datetime", "duration"}
)
def _issue(
severity: str,
code: str,
message: str,
*,
hint: str | None = None,
field: str | None = None,
csv_columns: list[str] | None = None,
) -> dict[str, Any]:
out: dict[str, Any] = {
"severity": severity,
"code": code,
"message": message,
}
if hint:
out["hint"] = hint
if field:
out["field"] = field
if csv_columns:
out["csv_columns"] = csv_columns
return out
def validate_csv_template(
module: str,
field_mappings: Mapping[str, Any] | None,
type_conversions: Mapping[str, Any] | None = None,
import_row_processing: Mapping[str, Any] | None = None,
column_signature: list[str] | None = None,
) -> dict[str, Any]:
"""
Prüft eine Vorlage ohne Datei-Upload.
Returns:
``{"valid": bool, "errors": [...], "warnings": [...]}``
"""
errors: list[dict[str, Any]] = []
warnings: list[dict[str, Any]] = []
fm = dict(field_mappings or {})
tc: dict[str, Any] = dict(type_conversions or {}) if type_conversions else {}
mod = get_module_definition(module)
if not mod:
errors.append(
_issue(
"error",
"unknown_module",
f"Unbekanntes Modul «{module}».",
hint="Nur registrierte Module in module_registry sind erlaubt.",
)
)
return {"valid": False, "errors": errors, "warnings": warnings}
try:
validate_field_mappings(module, fm)
except ValueError as e:
errors.append(
_issue(
"error",
"invalid_field_mapping",
str(e),
hint="Jede Zuordnung muss auf ein bekanntes Zielfeld des Moduls zeigen (oder „–“ / ignorieren).",
)
)
try:
validate_required_field_targets(module, fm)
except ValueError as e:
errors.append(
_issue(
"error",
"missing_required_target",
str(e),
hint="Pflichtfelder des Moduls müssen mindestens einer CSV-Spalte zugeordnet sein.",
)
)
if import_row_processing:
try:
validate_import_row_processing_spec(module, import_row_processing, fm)
except ValueError as e:
errors.append(
_issue(
"error",
"invalid_import_row_processing",
str(e),
hint="import_row_processing: group_by und aggregates prüfen (siehe Doku Issue #21).",
)
)
field_defs = mod.get("fields") or {}
for db_field, spec in tc.items():
if db_field not in field_defs:
errors.append(
_issue(
"error",
"unknown_type_conversion_field",
f"type_conversions enthält unbekanntes Zielfeld «{db_field}».",
hint="Nur Felder aus der Moduldefinition sind erlaubt.",
field=db_field,
)
)
continue
if not isinstance(spec, Mapping):
errors.append(
_issue(
"error",
"type_conversion_not_object",
f"type_conversions[\"{db_field}\"] muss ein JSON-Objekt sein.",
field=db_field,
)
)
continue
stype = spec.get("type", "string")
if stype not in ALLOWED_SPEC_TYPES:
warnings.append(
_issue(
"warning",
"unusual_conversion_type",
f"Ungewöhnlicher Typ «{stype}» für «{db_field}» (erwartet u. a. string, float, date, datetime).",
field=db_field,
)
)
finfo = field_defs.get(db_field) or {}
expected = finfo.get("type")
if expected == "date" and stype not in ("date", "datetime"):
warnings.append(
_issue(
"warning",
"date_field_conversion",
f"Zielfeld «{db_field}» ist ein Datum; der Konvertierungstyp ist «{stype}».",
hint="Meist «date» oder «datetime» mit passendem format.",
field=db_field,
)
)
if expected == "float" and stype == "int" and db_field in ("hr_avg", "hr_max"):
warnings.append(
_issue(
"warning",
"hr_as_int",
"Herzfrequenz als «int» konvertiert; Nachkommastellen aus Apple-Export gehen verloren.",
hint="Optional «float» mit flexible: true verwenden.",
field=db_field,
)
)
# Mehrere CSV-Spalten → dasselbe Zielfeld
by_target: dict[str, list[str]] = {}
for csv_col, dbf in fm.items():
if dbf in (None, "", "-", "_skip"):
continue
by_target.setdefault(str(dbf), []).append(str(csv_col))
for dbf, cols in by_target.items():
if len(cols) > 1:
warnings.append(
_issue(
"warning",
"duplicate_target_columns",
f"Mehrere Spalten mappen auf «{dbf}»: {', '.join(cols)}.",
hint="Beim Import gewinnt die letzte Spalte in der CSV-Kopfzeilen-Reihenfolge.",
field=dbf,
csv_columns=cols,
)
)
# Kilojoule in kcal-Feldern (häufiger Apple-DE-Fehler)
for csv_col, dbf in fm.items():
if dbf not in ("kcal_active", "kcal_resting"):
continue
col_l = str(csv_col).lower()
if "kj" in col_l or "kilojoule" in col_l:
sub = tc.get(dbf)
su = (sub or {}).get("source_unit") if isinstance(sub, Mapping) else None
if str(su or "").strip().lower() != "kj":
warnings.append(
_issue(
"warning",
"energy_kj_without_source_unit",
f"Spalte «{csv_col}» deutet auf Kilojoule, Zielfeld «{dbf}» speichert kcal.",
hint='In type_conversions für dieses Feld "source_unit": "kj" setzen (Faktor 1/4.184).',
field=str(dbf),
csv_columns=[str(csv_col)],
)
)
# Signatur vs. gemappte Spalten (nur Hinweis)
if column_signature:
sig_norm = {str(c).strip() for c in column_signature if str(c).strip()}
mapped_cols = {str(k).strip() for k in fm.keys()}
if sig_norm and not sig_norm.intersection(mapped_cols):
warnings.append(
_issue(
"warning",
"signature_vs_mappings_mismatch",
"column_signature und die Schlüssel in field_mappings haben keine gemeinsame Spalte.",
hint="Signatur dient dem Ranking; für den Import müssen die Kopfzeilen der Datei zu den Keys in field_mappings passen (oder Aliase greifen).",
)
)
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}