mitai-jinkendo/backend/csv_parser/module_registry.py
Lars 5b96bd4f75
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat(csv-import): Add blood pressure and activity row diagnosis functionality
- Introduced `diagnose_blood_pressure_row` and `diagnose_activity_row` functions to validate and analyze blood pressure and activity data from CSV imports.
- Updated the CSV import logic to handle combined datetime columns for blood pressure and activity, improving data integrity during import.
- Enhanced type conversion specifications to include `start_time` for blood pressure and activity, ensuring accurate data mapping.
- Added tests to validate the new diagnosis functions and their integration with existing import processes, ensuring robustness and reliability.
- Updated frontend messages to provide clearer guidance on blood pressure and activity data handling during CSV imports.
2026-04-10 16:43:00 +02:00

162 lines
6.6 KiB
Python

"""
Ziel-Module für CSV-Import: Tabellen-Felder, Pflichtfelder, Duplikat-Strategie (Issue #21).
Hinweis: blood_pressure nutzt in der DB measured_at; Logik-Felder measured_date + measured_time
werden im Executor zu measured_at zusammengefügt (Phase Import-Executor).
Activity: date kann aus start_time (ISO-Datetime) abgeleitet werden, wenn nur start_time gesetzt ist.
"""
from __future__ import annotations
from typing import Any, Dict, cast
MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
"nutrition": {
"table": "nutrition_log",
"fields": {
"date": {"type": "date", "required": True},
"kcal": {"type": "float", "required": False, "unit": "kcal"},
"protein_g": {"type": "float", "required": False, "min": 0, "unit": "g"},
"fat_g": {"type": "float", "required": False, "min": 0, "unit": "g"},
"carbs_g": {"type": "float", "required": False, "min": 0, "unit": "g"},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Legacy-Fallback wenn die Vorlage kein import_row_processing speichert — Vorlagen mittelfristig explizit.
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
"kcal": "sum",
"protein_g": "sum",
"fat_g": "sum",
"carbs_g": "sum",
},
},
},
"activity": {
"table": "activity_log",
"fields": {
"date": {"type": "date", "required": False},
"start_time": {"type": "datetime", "required": False},
"end_time": {"type": "datetime", "required": False},
"activity_type": {"type": "string", "required": True},
"duration_min": {"type": "float", "required": False, "min": 0},
"kcal_active": {"type": "float", "required": False, "unit": "kcal"},
"kcal_resting": {"type": "float", "required": False, "unit": "kcal"},
"distance_km": {"type": "float", "required": False, "unit": "km"},
"hr_avg": {"type": "float", "required": False, "min": 30, "max": 220},
"hr_max": {"type": "float", "required": False, "min": 30, "max": 220},
},
"derive_date_from_datetime_field": "start_time",
"duplicate_key": ["profile_id", "date", "start_time"],
"duplicate_strategy": "update",
},
"sleep": {
"table": "sleep_log",
"fields": {},
"import_mode": "apple_sleep_aggregate",
},
"vitals_baseline": {
"table": "vitals_baseline",
"fields": {
"date": {"type": "date", "required": True},
"resting_hr": {"type": "int", "required": False},
"hrv": {"type": "int", "required": False},
"vo2_max": {"type": "float", "required": False},
"spo2": {"type": "int", "required": False},
"respiratory_rate": {"type": "float", "required": False},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Legacy-Fallback — Vorlagen mittelfristig explizit setzen.
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
"resting_hr": "mean",
"hrv": "mean",
"vo2_max": "mean",
"spo2": "mean",
"respiratory_rate": "mean",
},
},
},
"blood_pressure": {
"table": "blood_pressure_log",
"fields": {
"measured_date": {"type": "date", "required": True},
"measured_time": {"type": "time", "required": True},
# Apple Health: eine Spalte „Start“ / „Datum/Uhrzeit“ (Datetime); Executor splittet.
"start_time": {"type": "datetime", "required": False},
"systolic": {"type": "int", "required": True},
"diastolic": {"type": "int", "required": True},
"pulse": {"type": "int", "required": False},
},
"logical_to_db": "blood_pressure_composite_measured_at",
"duplicate_key": ["profile_id", "measured_at"],
"duplicate_strategy": "update",
},
"weight": {
"table": "weight_log",
"fields": {
"date": {"type": "date", "required": True},
"weight": {"type": "float", "required": True, "min": 20, "max": 400, "unit": "kg"},
"note": {"type": "string", "required": False, "max_length": 2000},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Legacy-Fallback — Vorlagen mittelfristig explizit setzen.
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
"weight": "last",
"note": "last",
},
},
},
}
def get_module_definition(module: str) -> Dict[str, Any] | None:
return MODULE_DEFINITIONS.get(module)
def list_modules() -> list[str]:
return sorted(MODULE_DEFINITIONS.keys())
def validate_field_mappings(module: str, field_mappings: dict) -> None:
"""Wirft ValueError bei unbekanntem Modul oder unbekanntem DB-Feld."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
fields = cast(dict, mod["fields"])
allowed = set(fields.keys())
if not allowed:
for _csv_col, db_field in field_mappings.items():
if db_field not in ("", None, "-", "_skip"):
raise ValueError(
f"Modul '{module}' nutzt einen Aggregat-Import ohne Spalten-Mapping; "
f"alle Spalten müssen „ignorieren“ sein."
)
return
for _csv_col, db_field in field_mappings.items():
if db_field in ("", None, "-", "_skip"):
continue
if db_field not in allowed:
raise ValueError(f"Ungültiges Zielfeld '{db_field}' für Modul '{module}'")
def validate_required_field_targets(module: str, field_mappings: dict) -> None:
"""Stellt sicher, dass jedes als required markierte Zielfeld mindestens einer Spalte zugeordnet ist."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
field_defs = cast(dict, mod["fields"])
targets = {v for v in field_mappings.values() if v and v not in ("-", "_skip")}
if module == "blood_pressure" and "start_time" in targets:
targets = set(targets) | {"measured_date", "measured_time"}
for fname, finfo in field_defs.items():
if finfo.get("required") and fname not in targets:
raise ValueError(f"Pflicht-Zielfeld nicht zugeordnet: {fname}")