mitai-jinkendo/backend/csv_parser/template_validator.py
Lars 574af61349
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance CSV import and validation for activity module
- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy.
- Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports.
- Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity.
- Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
2026-04-15 08:12:58 +02:00

242 lines
8.5 KiB
Python

"""
Formatprüfung für CSV-Import-Vorlagen (field_mappings, type_conversions).
Liefert strukturierte Fehler/Warnungen für Admin-UI und Speicher-Guards.
"""
from __future__ import annotations
from typing import Any, Mapping
from csv_parser.core import normalize_header_for_signature
from csv_parser.import_row_processing import validate_import_row_processing as validate_import_row_processing_spec
from csv_parser.module_registry import (
get_module_definition,
validate_field_mappings,
validate_required_field_targets,
)
from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields
ALLOWED_SPEC_TYPES = frozenset(
{"string", "float", "number", "int", "date", "time", "datetime", "duration"}
)
def _issue(
severity: str,
code: str,
message: str,
*,
hint: str | None = None,
field: str | None = None,
csv_columns: list[str] | None = None,
) -> dict[str, Any]:
out: dict[str, Any] = {
"severity": severity,
"code": code,
"message": message,
}
if hint:
out["hint"] = hint
if field:
out["field"] = field
if csv_columns:
out["csv_columns"] = csv_columns
return out
def validate_csv_template(
module: str,
field_mappings: Mapping[str, Any] | None,
type_conversions: Mapping[str, Any] | None = None,
import_row_processing: Mapping[str, Any] | None = None,
column_signature: list[str] | None = None,
*,
cur=None,
) -> dict[str, Any]:
"""
Prüft eine Vorlage ohne Datei-Upload.
Returns:
``{"valid": bool, "errors": [...], "warnings": [...]}``
"""
errors: list[dict[str, Any]] = []
warnings: list[dict[str, Any]] = []
fm = dict(field_mappings or {})
tc: dict[str, Any] = dict(type_conversions or {}) if type_conversions else {}
mod = get_module_definition(module)
if not mod:
errors.append(
_issue(
"error",
"unknown_module",
f"Unbekanntes Modul «{module}».",
hint="Nur registrierte Module in module_registry sind erlaubt.",
)
)
return {"valid": False, "errors": errors, "warnings": warnings}
field_defs = dict(mod.get("fields") or {})
if module == "activity" and cur is not None:
field_defs = merge_activity_csv_module_fields(cur, field_defs)
try:
validate_field_mappings(module, fm, cur=cur)
except ValueError as e:
errors.append(
_issue(
"error",
"invalid_field_mapping",
str(e),
hint="Jede Zuordnung muss auf ein bekanntes Zielfeld des Moduls zeigen (oder „–“ / ignorieren).",
)
)
try:
validate_required_field_targets(module, fm)
except ValueError as e:
errors.append(
_issue(
"error",
"missing_required_target",
str(e),
hint="Pflichtfelder des Moduls müssen mindestens einer CSV-Spalte zugeordnet sein.",
)
)
if import_row_processing:
try:
validate_import_row_processing_spec(module, import_row_processing, fm, cur=cur)
except ValueError as e:
errors.append(
_issue(
"error",
"invalid_import_row_processing",
str(e),
hint="import_row_processing: group_by und aggregates prüfen (siehe Doku Issue #21).",
)
)
for db_field, spec in tc.items():
if db_field not in field_defs:
errors.append(
_issue(
"error",
"unknown_type_conversion_field",
f"type_conversions enthält unbekanntes Zielfeld «{db_field}».",
hint="Nur Felder aus der Moduldefinition sind erlaubt.",
field=db_field,
)
)
continue
if not isinstance(spec, Mapping):
errors.append(
_issue(
"error",
"type_conversion_not_object",
f"type_conversions[\"{db_field}\"] muss ein JSON-Objekt sein.",
field=db_field,
)
)
continue
stype = spec.get("type", "string")
if stype not in ALLOWED_SPEC_TYPES:
warnings.append(
_issue(
"warning",
"unusual_conversion_type",
f"Ungewöhnlicher Typ «{stype}» für «{db_field}» (erwartet u. a. string, float, date, datetime).",
field=db_field,
)
)
finfo = field_defs.get(db_field) or {}
expected = finfo.get("type")
if expected == "date" and stype not in ("date", "datetime"):
warnings.append(
_issue(
"warning",
"date_field_conversion",
f"Zielfeld «{db_field}» ist ein Datum; der Konvertierungstyp ist «{stype}».",
hint="Meist «date» oder «datetime» mit passendem format.",
field=db_field,
)
)
if expected == "float" and stype == "int" and db_field in ("hr_avg", "hr_max"):
warnings.append(
_issue(
"warning",
"hr_as_int",
"Herzfrequenz als «int» konvertiert; Nachkommastellen aus Apple-Export gehen verloren.",
hint="Optional «float» mit flexible: true verwenden.",
field=db_field,
)
)
# Mehrere CSV-Spalten → dasselbe Zielfeld
by_target: dict[str, list[str]] = {}
for csv_col, dbf in fm.items():
if dbf in (None, "", "-", "_skip"):
continue
by_target.setdefault(str(dbf), []).append(str(csv_col))
for dbf, cols in by_target.items():
if len(cols) > 1:
warnings.append(
_issue(
"warning",
"duplicate_target_columns",
f"Mehrere Spalten mappen auf «{dbf}»: {', '.join(cols)}.",
hint="Beim Import gewinnt die letzte Spalte in der CSV-Kopfzeilen-Reihenfolge.",
field=dbf,
csv_columns=cols,
)
)
# Kilojoule in kcal-Feldern (häufiger Apple-DE-Fehler)
for csv_col, dbf in fm.items():
if dbf not in ("kcal_active", "kcal_resting"):
continue
col_l = str(csv_col).lower()
if "kj" in col_l or "kilojoule" in col_l:
sub = tc.get(dbf)
su = (sub or {}).get("source_unit") if isinstance(sub, Mapping) else None
if str(su or "").strip().lower() != "kj":
warnings.append(
_issue(
"warning",
"energy_kj_without_source_unit",
f"Spalte «{csv_col}» deutet auf Kilojoule, Zielfeld «{dbf}» speichert kcal.",
hint='In type_conversions für dieses Feld "source_unit": "kj" setzen (Faktor 1/4.184).',
field=str(dbf),
csv_columns=[str(csv_col)],
)
)
# Signatur vs. gemappte Spalten: beide Seiten wie beim Import normalisieren
# (column_signature kann sortierte Normalform aus Analyse sein, field_mappings rohe Header).
if column_signature:
sig_forms = {
normalize_header_for_signature(str(c))
for c in column_signature
if str(c).strip()
}
sig_forms.discard("")
mapped_forms = {
normalize_header_for_signature(str(k))
for k in fm.keys()
if str(k).strip()
}
mapped_forms.discard("")
if sig_forms and mapped_forms and not sig_forms.intersection(mapped_forms):
warnings.append(
_issue(
"warning",
"signature_vs_mappings_mismatch",
"column_signature und field_mappings (Schlüssel) haben nach Normalisierung keine gemeinsame Spalte.",
hint="Prüfen Sie, ob die gespeicherte Signatur zur CSV passt; Zuordnungen nutzen rohe Kopfzeilen, die Signatur oft die gleiche Normalform wie in der Analyse.",
)
)
return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}