mitai-jinkendo/backend/csv_parser/module_registry.py
Lars e35d167055
All checks were successful
Deploy Development / deploy (push) Successful in 56s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 16s
feat(csv-import): Enhance CSV import processing and validation
- Updated the CSV import logic to support new row processing specifications for weight and vitals baseline, allowing for better data aggregation and validation.
- Implemented handling for multiple rows on the same day, enabling aggregation of values such as averages for vitals and last values for weight.
- Enhanced test coverage for the new import functionalities, ensuring correct behavior during data processing and validation.
- Refactored the module registry to include default import row processing options for better flexibility in handling CSV data.
2026-04-10 15:09:34 +02:00

157 lines
6.2 KiB
Python

"""
Ziel-Module für CSV-Import: Tabellen-Felder, Pflichtfelder, Duplikat-Strategie (Issue #21).
Hinweis: blood_pressure nutzt in der DB measured_at; Logik-Felder measured_date + measured_time
werden im Executor zu measured_at zusammengefügt (Phase Import-Executor).
Activity: date kann aus start_time (ISO-Datetime) abgeleitet werden, wenn nur start_time gesetzt ist.
"""
from __future__ import annotations
from typing import Any, Dict, cast
MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
"nutrition": {
"table": "nutrition_log",
"fields": {
"date": {"type": "date", "required": True},
"kcal": {"type": "float", "required": False, "unit": "kcal"},
"protein_g": {"type": "float", "required": False, "min": 0, "unit": "g"},
"fat_g": {"type": "float", "required": False, "min": 0, "unit": "g"},
"carbs_g": {"type": "float", "required": False, "min": 0, "unit": "g"},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Mehrere CSV-Zeilen pro Tag (z. B. pro Lebensmittel) → ein nutrition_log-Eintrag
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
"kcal": "sum",
"protein_g": "sum",
"fat_g": "sum",
"carbs_g": "sum",
},
},
},
"activity": {
"table": "activity_log",
"fields": {
"date": {"type": "date", "required": False},
"start_time": {"type": "datetime", "required": False},
"end_time": {"type": "datetime", "required": False},
"activity_type": {"type": "string", "required": True},
"duration_min": {"type": "float", "required": False, "min": 0},
"kcal_active": {"type": "float", "required": False, "unit": "kcal"},
"kcal_resting": {"type": "float", "required": False, "unit": "kcal"},
"distance_km": {"type": "float", "required": False, "unit": "km"},
"hr_avg": {"type": "float", "required": False, "min": 30, "max": 220},
"hr_max": {"type": "float", "required": False, "min": 30, "max": 220},
},
"derive_date_from_datetime_field": "start_time",
"duplicate_key": ["profile_id", "date", "start_time"],
"duplicate_strategy": "update",
},
"sleep": {
"table": "sleep_log",
"fields": {},
"import_mode": "apple_sleep_aggregate",
},
"vitals_baseline": {
"table": "vitals_baseline",
"fields": {
"date": {"type": "date", "required": True},
"resting_hr": {"type": "int", "required": False},
"hrv": {"type": "int", "required": False},
"vo2_max": {"type": "float", "required": False},
"spo2": {"type": "int", "required": False},
"respiratory_rate": {"type": "float", "required": False},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
"resting_hr": "mean",
"hrv": "mean",
"vo2_max": "mean",
"spo2": "mean",
"respiratory_rate": "mean",
},
},
},
"blood_pressure": {
"table": "blood_pressure_log",
"fields": {
"measured_date": {"type": "date", "required": True},
"measured_time": {"type": "time", "required": True},
"systolic": {"type": "int", "required": True},
"diastolic": {"type": "int", "required": True},
"pulse": {"type": "int", "required": False},
},
"logical_to_db": "blood_pressure_composite_measured_at",
"duplicate_key": ["profile_id", "measured_at"],
"duplicate_strategy": "update",
},
"weight": {
"table": "weight_log",
"fields": {
"date": {"type": "date", "required": True},
"weight": {"type": "float", "required": True, "min": 20, "max": 400, "unit": "kg"},
"note": {"type": "string", "required": False, "max_length": 2000},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
# Mehrere CSV-Zeilen pro Tag → ein Eintrag (letzte Zeile im Export zählt)
"import_row_processing_default": {
"group_by": ["date"],
"aggregates": {
"weight": "last",
"note": "last",
},
},
},
}
def get_module_definition(module: str) -> Dict[str, Any] | None:
return MODULE_DEFINITIONS.get(module)
def list_modules() -> list[str]:
return sorted(MODULE_DEFINITIONS.keys())
def validate_field_mappings(module: str, field_mappings: dict) -> None:
"""Wirft ValueError bei unbekanntem Modul oder unbekanntem DB-Feld."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
fields = cast(dict, mod["fields"])
allowed = set(fields.keys())
if not allowed:
for _csv_col, db_field in field_mappings.items():
if db_field not in ("", None, "-", "_skip"):
raise ValueError(
f"Modul '{module}' nutzt einen Aggregat-Import ohne Spalten-Mapping; "
f"alle Spalten müssen „ignorieren“ sein."
)
return
for _csv_col, db_field in field_mappings.items():
if db_field in ("", None, "-", "_skip"):
continue
if db_field not in allowed:
raise ValueError(f"Ungültiges Zielfeld '{db_field}' für Modul '{module}'")
def validate_required_field_targets(module: str, field_mappings: dict) -> None:
"""Stellt sicher, dass jedes als required markierte Zielfeld mindestens einer Spalte zugeordnet ist."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
field_defs = cast(dict, mod["fields"])
targets = {v for v in field_mappings.values() if v and v not in ("-", "_skip")}
for fname, finfo in field_defs.items():
if finfo.get("required") and fname not in targets:
raise ValueError(f"Pflicht-Zielfeld nicht zugeordnet: {fname}")