mitai-jinkendo/backend/csv_parser/mapping_suggest.py
Lars 574af61349
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance CSV import and validation for activity module
- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy.
- Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports.
- Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity.
- Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
2026-04-15 08:12:58 +02:00

269 lines
10 KiB
Python

"""
Heuristische Vorschläge für CSV field_mappings / type_conversions (Admin-Editor, Issue #21).
"""
from __future__ import annotations
from copy import deepcopy
from typing import Any, Mapping
from csv_parser.core import normalize_header_for_signature
from csv_parser.module_registry import get_module_definition
# Normalisierte Header-Fragmente → DB-Feld (Substring- oder exakter Norm-Vergleich)
_MODULE_HEADER_ALIASES: dict[str, dict[str, frozenset[str]]] = {
"nutrition": {
"date": frozenset(
{"datum", "date", "tag", "day", "zeit", "timestamp", "uhrzeit", "monat", "jahr"}
),
"kcal": frozenset({"kcal", "kalorie", "calorie", "energie", "energy", "kj", "joule"}),
"protein_g": frozenset({"protein", "eiwei", "eiweiss"}),
"fat_g": frozenset({"fett", "fat", "lipid"}),
"carbs_g": frozenset({"kh", "carb", "kohlenhydr", "carbs", "sugar", "zucker"}),
},
"weight": {
"date": frozenset({"datum", "date", "tag", "day", "zeit"}),
"weight": frozenset({"gewicht", "weight", "masse", "kg", "kilo"}),
"note": frozenset({"notiz", "note", "comment", "kommentar"}),
},
"blood_pressure": {
"measured_date": frozenset({"datum", "date", "tag", "day", "messdatum"}),
"measured_time": frozenset({"zeit", "time", "uhr", "uhrzeit"}),
"systolic": frozenset({"systol", "sys", "sbp", "oberdruck"}),
"diastolic": frozenset({"diastol", "dia", "dbp", "unterdruck"}),
"pulse": frozenset({"puls", "pulse", "hr", "herz", "bpm"}),
},
"activity": {
"date": frozenset({"datum", "date", "tag", "day"}),
"start_time": frozenset({"start", "beginn", "von"}),
"end_time": frozenset({"end", "ende", "bis", "stop"}),
"activity_type": frozenset({"workout", "training", "typ", "type", "art", "aktiv"}),
"duration_min": frozenset({"dauer", "duration", "min"}),
"distance_km": frozenset({"strecke", "distance", "km", "distanz"}),
"kcal_active": frozenset({"kcal", "kalorie", "energie", "active"}),
"kcal_resting": frozenset({"ruhe", "resting"}),
"hr_avg": frozenset({"puls", "heart", "hr", "bpm", "herzfrequenz", "durchschn"}),
"hr_max": frozenset({"max", "peak"}),
},
"vitals_baseline": {
"date": frozenset({"datum", "date", "tag", "start", "zeit"}),
"resting_hr": frozenset({"ruhepuls", "resting", "rhr"}),
"hrv": frozenset({"hrv", "variabilit", "vfc"}),
"vo2_max": frozenset({"vo2"}),
"spo2": frozenset({"sauerstoff", "spo2", "oxygen"}),
"respiratory_rate": frozenset({"atem", "respiratory"}),
},
}
_DEFAULT_TYPE_CONVERSIONS: dict[str, dict[str, dict[str, Any]]] = {
"nutrition": {
"date": {"type": "date", "format": "dd.mm.yyyy HH:MM", "extract": "date_only", "flexible": True},
"kcal": {"type": "float", "decimal_separator": "auto", "flexible": True},
"protein_g": {"type": "float", "decimal_separator": "auto", "flexible": True},
"fat_g": {"type": "float", "decimal_separator": "auto", "flexible": True},
"carbs_g": {"type": "float", "decimal_separator": "auto", "flexible": True},
},
"weight": {
"date": {"type": "date", "format": "dd.mm.yyyy", "flexible": True},
"weight": {"type": "float", "decimal_separator": "auto", "flexible": True},
"note": {"type": "string"},
},
"blood_pressure": {
"measured_date": {"type": "date", "format": "dd.mm.yyyy", "flexible": True},
"measured_time": {"type": "time", "format": "HH:MM", "flexible": True},
"start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
"systolic": {"type": "int", "flexible": True},
"diastolic": {"type": "int", "flexible": True},
"pulse": {"type": "int", "flexible": True},
},
"activity": {
"date": {"type": "date", "format": "yyyy-mm-dd", "flexible": True},
"start_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
"end_time": {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True},
"activity_type": {"type": "string"},
"duration_min": {"type": "duration", "format": "HH:MM:SS", "target_unit": "minutes", "flexible": True},
"distance_km": {"type": "float", "decimal_separator": "auto", "flexible": True},
"kcal_active": {"type": "float", "decimal_separator": "auto", "flexible": True},
"kcal_resting": {"type": "float", "decimal_separator": "auto", "flexible": True},
"hr_avg": {"type": "int", "flexible": True},
"hr_max": {"type": "int", "flexible": True},
},
"vitals_baseline": {
"date": {
"type": "datetime",
"format": "yyyy-mm-dd HH:MM:SS",
"extract": "date_only",
"flexible": True,
},
"resting_hr": {"type": "int", "flexible": True},
"hrv": {"type": "int", "flexible": True},
"vo2_max": {"type": "float", "decimal_separator": "auto", "flexible": True},
"spo2": {"type": "int", "flexible": True},
"respiratory_rate": {"type": "float", "decimal_separator": "auto", "flexible": True},
},
}
def _norm_key(header: str) -> str:
return normalize_header_for_signature(header)
def _match_seed_to_db_field(header: str, seed_fm: Mapping[str, str]) -> str | None:
"""Findet Ziel-Feld, wenn Seed-Key zu diesem Header passt (exakt oder normalisiert)."""
if header in seed_fm:
v = seed_fm[header]
if v and v not in ("-", "_skip"):
return v
nh = _norm_key(header)
if nh in seed_fm:
v = seed_fm[nh]
if v and v not in ("-", "_skip"):
return v
for sk, sv in seed_fm.items():
if not sv or sv in ("-", "_skip"):
continue
if _norm_key(str(sk)) == nh:
return sv
return None
def _alias_suggest(
norm: str,
module: str,
used: set[str],
*,
field_order: list[str] | None = None,
) -> str | None:
aliases = _MODULE_HEADER_ALIASES.get(module, {})
mod = get_module_definition(module)
if not mod:
return None
order = field_order if field_order is not None else list(mod["fields"].keys())
for db_field in order:
if db_field in used:
continue
tokens = aliases.get(db_field, frozenset())
nlow = norm.lower()
if nlow == db_field or nlow.replace("_", "") == db_field.replace("_", ""):
return db_field
for tok in tokens:
if len(tok) >= 2 and tok in nlow:
return db_field
if len(tok) >= 4 and tok in norm:
return db_field
return None
def suggest_field_mappings(
headers: list[str],
module: str,
seed_fm: Mapping[str, str] | None = None,
*,
effective_fields: Mapping[str, Any] | None = None,
) -> dict[str, str]:
"""
Mappt jede CSV-Spalte (Roh-Header als Key) auf DB-Feld oder '-'.
Nutzt zuerst eine passende Seed-Vorlage, dann Alias-Heuristik.
"""
if module == "sleep":
return {h: "-" for h in headers}
mod = get_module_definition(module)
if not mod:
return {h: "-" for h in headers}
fields_map = dict(effective_fields) if effective_fields is not None else dict(mod["fields"])
field_order = list(fields_map.keys())
fm: dict[str, str] = {h: "-" for h in headers}
used: set[str] = set()
if seed_fm:
for h in headers:
db = _match_seed_to_db_field(h, seed_fm)
if db and db not in used and db in fields_map:
fm[h] = db
used.add(db)
for h in headers:
if fm[h] != "-":
continue
norm = _norm_key(h)
db = _alias_suggest(norm, module, used, field_order=field_order)
if db:
fm[h] = db
used.add(db)
return fm
def build_type_conversions_for_mapping(
module: str,
field_mappings: Mapping[str, str],
seed_tc: Mapping[str, Any] | None = None,
*,
effective_fields: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
"""type_conversions nur für zugewiesene Zielfelder; Seed überschreibt Defaults."""
if module == "sleep":
return {}
defaults = _DEFAULT_TYPE_CONVERSIONS.get(module, {})
out: dict[str, Any] = {}
targets = {v for v in field_mappings.values() if v and v not in ("-", "_skip")}
field_meta = dict(effective_fields) if effective_fields is not None else None
if seed_tc:
for k, v in seed_tc.items():
if k in targets and isinstance(v, dict):
out[k] = deepcopy(v)
for t in targets:
if t not in out and t in defaults:
out[t] = deepcopy(defaults[t])
for t in sorted(targets):
if t in out:
continue
finfo = (field_meta or {}).get(t) if field_meta else None
if not finfo:
continue
typ = finfo.get("type")
if typ == "int":
out[t] = {"type": "int", "flexible": True}
elif typ == "float":
out[t] = {"type": "float", "decimal_separator": "auto", "flexible": True}
else:
out[t] = {"type": "string"}
_apply_energy_kj_hint_from_headers(module, field_mappings, out)
return out
_ENERGY_FIELDS = frozenset({"kcal", "kcal_active", "kcal_resting"})
def _apply_energy_kj_hint_from_headers(
module: str,
field_mappings: Mapping[str, str],
out: dict[str, Any],
) -> None:
"""Wenn Überschrift kJ/Kilojoule nahelegt (nicht kcal), source_unit kj setzen (FDDB & Co.)."""
if module not in ("nutrition", "activity"):
return
for csv_col, db_field in field_mappings.items():
if db_field not in _ENERGY_FIELDS:
continue
spec = out.get(db_field)
if not isinstance(spec, dict):
continue
if spec.get("source_unit"):
continue
norm = normalize_header_for_signature(str(csv_col)).lower()
if "kcal" in norm:
continue
if "kj" in norm or "kilojoule" in norm:
spec2 = deepcopy(spec)
spec2["source_unit"] = "kj"
out[db_field] = spec2