""" Formatprüfung für CSV-Import-Vorlagen (field_mappings, type_conversions). Liefert strukturierte Fehler/Warnungen für Admin-UI und Speicher-Guards. """ from __future__ import annotations from typing import Any, Mapping from csv_parser.core import normalize_header_for_signature from csv_parser.import_row_processing import validate_import_row_processing as validate_import_row_processing_spec from csv_parser.module_registry import ( get_module_definition, validate_field_mappings, validate_required_field_targets, ) ALLOWED_SPEC_TYPES = frozenset( {"string", "float", "number", "int", "date", "time", "datetime", "duration"} ) def _issue( severity: str, code: str, message: str, *, hint: str | None = None, field: str | None = None, csv_columns: list[str] | None = None, ) -> dict[str, Any]: out: dict[str, Any] = { "severity": severity, "code": code, "message": message, } if hint: out["hint"] = hint if field: out["field"] = field if csv_columns: out["csv_columns"] = csv_columns return out def validate_csv_template( module: str, field_mappings: Mapping[str, Any] | None, type_conversions: Mapping[str, Any] | None = None, import_row_processing: Mapping[str, Any] | None = None, column_signature: list[str] | None = None, ) -> dict[str, Any]: """ Prüft eine Vorlage ohne Datei-Upload. Returns: ``{"valid": bool, "errors": [...], "warnings": [...]}`` """ errors: list[dict[str, Any]] = [] warnings: list[dict[str, Any]] = [] fm = dict(field_mappings or {}) tc: dict[str, Any] = dict(type_conversions or {}) if type_conversions else {} mod = get_module_definition(module) if not mod: errors.append( _issue( "error", "unknown_module", f"Unbekanntes Modul «{module}».", hint="Nur registrierte Module in module_registry sind erlaubt.", ) ) return {"valid": False, "errors": errors, "warnings": warnings} try: validate_field_mappings(module, fm) except ValueError as e: errors.append( _issue( "error", "invalid_field_mapping", str(e), hint="Jede Zuordnung muss auf ein bekanntes Zielfeld des Moduls zeigen (oder „–“ / ignorieren).", ) ) try: validate_required_field_targets(module, fm) except ValueError as e: errors.append( _issue( "error", "missing_required_target", str(e), hint="Pflichtfelder des Moduls müssen mindestens einer CSV-Spalte zugeordnet sein.", ) ) if import_row_processing: try: validate_import_row_processing_spec(module, import_row_processing, fm) except ValueError as e: errors.append( _issue( "error", "invalid_import_row_processing", str(e), hint="import_row_processing: group_by und aggregates prüfen (siehe Doku Issue #21).", ) ) field_defs = mod.get("fields") or {} for db_field, spec in tc.items(): if db_field not in field_defs: errors.append( _issue( "error", "unknown_type_conversion_field", f"type_conversions enthält unbekanntes Zielfeld «{db_field}».", hint="Nur Felder aus der Moduldefinition sind erlaubt.", field=db_field, ) ) continue if not isinstance(spec, Mapping): errors.append( _issue( "error", "type_conversion_not_object", f"type_conversions[\"{db_field}\"] muss ein JSON-Objekt sein.", field=db_field, ) ) continue stype = spec.get("type", "string") if stype not in ALLOWED_SPEC_TYPES: warnings.append( _issue( "warning", "unusual_conversion_type", f"Ungewöhnlicher Typ «{stype}» für «{db_field}» (erwartet u. a. string, float, date, datetime).", field=db_field, ) ) finfo = field_defs.get(db_field) or {} expected = finfo.get("type") if expected == "date" and stype not in ("date", "datetime"): warnings.append( _issue( "warning", "date_field_conversion", f"Zielfeld «{db_field}» ist ein Datum; der Konvertierungstyp ist «{stype}».", hint="Meist «date» oder «datetime» mit passendem format.", field=db_field, ) ) if expected == "float" and stype == "int" and db_field in ("hr_avg", "hr_max"): warnings.append( _issue( "warning", "hr_as_int", "Herzfrequenz als «int» konvertiert; Nachkommastellen aus Apple-Export gehen verloren.", hint="Optional «float» mit flexible: true verwenden.", field=db_field, ) ) # Mehrere CSV-Spalten → dasselbe Zielfeld by_target: dict[str, list[str]] = {} for csv_col, dbf in fm.items(): if dbf in (None, "", "-", "_skip"): continue by_target.setdefault(str(dbf), []).append(str(csv_col)) for dbf, cols in by_target.items(): if len(cols) > 1: warnings.append( _issue( "warning", "duplicate_target_columns", f"Mehrere Spalten mappen auf «{dbf}»: {', '.join(cols)}.", hint="Beim Import gewinnt die letzte Spalte in der CSV-Kopfzeilen-Reihenfolge.", field=dbf, csv_columns=cols, ) ) # Kilojoule in kcal-Feldern (häufiger Apple-DE-Fehler) for csv_col, dbf in fm.items(): if dbf not in ("kcal_active", "kcal_resting"): continue col_l = str(csv_col).lower() if "kj" in col_l or "kilojoule" in col_l: sub = tc.get(dbf) su = (sub or {}).get("source_unit") if isinstance(sub, Mapping) else None if str(su or "").strip().lower() != "kj": warnings.append( _issue( "warning", "energy_kj_without_source_unit", f"Spalte «{csv_col}» deutet auf Kilojoule, Zielfeld «{dbf}» speichert kcal.", hint='In type_conversions für dieses Feld "source_unit": "kj" setzen (Faktor 1/4.184).', field=str(dbf), csv_columns=[str(csv_col)], ) ) # Signatur vs. gemappte Spalten: beide Seiten wie beim Import normalisieren # (column_signature kann sortierte Normalform aus Analyse sein, field_mappings rohe Header). if column_signature: sig_forms = { normalize_header_for_signature(str(c)) for c in column_signature if str(c).strip() } sig_forms.discard("") mapped_forms = { normalize_header_for_signature(str(k)) for k in fm.keys() if str(k).strip() } mapped_forms.discard("") if sig_forms and mapped_forms and not sig_forms.intersection(mapped_forms): warnings.append( _issue( "warning", "signature_vs_mappings_mismatch", "column_signature und field_mappings (Schlüssel) haben nach Normalisierung keine gemeinsame Spalte.", hint="Prüfen Sie, ob die gespeicherte Signatur zur CSV passt; Zuordnungen nutzen rohe Kopfzeilen, die Signatur oft die gleiche Normalform wie in der Analyse.", ) ) return {"valid": len(errors) == 0, "errors": errors, "warnings": warnings}