mitai-jinkendo/backend/csv_parser/module_registry.py
Lars 26ab11eb7b
Some checks failed
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend-csv (push) Failing after 1m4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat(csv-import): Enhance CSV import functionality with new modules and tests
- Added support for new CSV import modules: sleep and vitals_baseline, expanding the import capabilities.
- Implemented backend logic for handling CSV imports related to sleep and vitals baseline, including error handling and data processing.
- Updated frontend components to include new modules in the CSV import interface, improving user experience.
- Introduced unit tests for the new import functionalities to ensure reliability and correctness.
- Enhanced existing CSV analysis features to accommodate the new modules, ensuring consistent behavior across the application.
2026-04-10 07:30:48 +02:00

129 lines
5.2 KiB
Python

"""
Ziel-Module für CSV-Import: Tabellen-Felder, Pflichtfelder, Duplikat-Strategie (Issue #21).
Hinweis: blood_pressure nutzt in der DB measured_at; Logik-Felder measured_date + measured_time
werden im Executor zu measured_at zusammengefügt (Phase Import-Executor).
Activity: date kann aus start_time (ISO-Datetime) abgeleitet werden, wenn nur start_time gesetzt ist.
"""
from __future__ import annotations
from typing import Any, Dict, cast
MODULE_DEFINITIONS: Dict[str, Dict[str, Any]] = {
"nutrition": {
"table": "nutrition_log",
"fields": {
"date": {"type": "date", "required": True},
"kcal": {"type": "float", "required": False},
"protein_g": {"type": "float", "required": False, "min": 0},
"fat_g": {"type": "float", "required": False, "min": 0},
"carbs_g": {"type": "float", "required": False, "min": 0},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
},
"activity": {
"table": "activity_log",
"fields": {
"date": {"type": "date", "required": False},
"start_time": {"type": "datetime", "required": False},
"end_time": {"type": "datetime", "required": False},
"activity_type": {"type": "string", "required": True},
"duration_min": {"type": "float", "required": False, "min": 0},
"kcal_active": {"type": "float", "required": False},
"kcal_resting": {"type": "float", "required": False},
"distance_km": {"type": "float", "required": False},
"hr_avg": {"type": "float", "required": False, "min": 30, "max": 220},
"hr_max": {"type": "float", "required": False, "min": 30, "max": 220},
},
"derive_date_from_datetime_field": "start_time",
"duplicate_key": ["profile_id", "date", "start_time"],
"duplicate_strategy": "update",
},
"sleep": {
"table": "sleep_log",
"fields": {},
"import_mode": "apple_sleep_aggregate",
},
"vitals_baseline": {
"table": "vitals_baseline",
"fields": {
"date": {"type": "date", "required": True},
"resting_hr": {"type": "int", "required": False},
"hrv": {"type": "int", "required": False},
"vo2_max": {"type": "float", "required": False},
"spo2": {"type": "int", "required": False},
"respiratory_rate": {"type": "float", "required": False},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
},
"blood_pressure": {
"table": "blood_pressure_log",
"fields": {
"measured_date": {"type": "date", "required": True},
"measured_time": {"type": "time", "required": True},
"systolic": {"type": "int", "required": True},
"diastolic": {"type": "int", "required": True},
"pulse": {"type": "int", "required": False},
},
"logical_to_db": "blood_pressure_composite_measured_at",
"duplicate_key": ["profile_id", "measured_at"],
"duplicate_strategy": "update",
},
"weight": {
"table": "weight_log",
"fields": {
"date": {"type": "date", "required": True},
"weight": {"type": "float", "required": True, "min": 20, "max": 400},
"note": {"type": "string", "required": False, "max_length": 2000},
},
"duplicate_key": ["profile_id", "date"],
"duplicate_strategy": "update",
},
}
def get_module_definition(module: str) -> Dict[str, Any] | None:
return MODULE_DEFINITIONS.get(module)
def list_modules() -> list[str]:
return sorted(MODULE_DEFINITIONS.keys())
def validate_field_mappings(module: str, field_mappings: dict) -> None:
"""Wirft ValueError bei unbekanntem Modul oder unbekanntem DB-Feld."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
fields = cast(dict, mod["fields"])
allowed = set(fields.keys())
if not allowed:
for _csv_col, db_field in field_mappings.items():
if db_field not in ("", None, "-", "_skip"):
raise ValueError(
f"Modul '{module}' nutzt einen Aggregat-Import ohne Spalten-Mapping; "
f"alle Spalten müssen „ignorieren“ sein."
)
return
for _csv_col, db_field in field_mappings.items():
if db_field in ("", None, "-", "_skip"):
continue
if db_field not in allowed:
raise ValueError(f"Ungültiges Zielfeld '{db_field}' für Modul '{module}'")
def validate_required_field_targets(module: str, field_mappings: dict) -> None:
"""Stellt sicher, dass jedes als required markierte Zielfeld mindestens einer Spalte zugeordnet ist."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
field_defs = cast(dict, mod["fields"])
targets = {v for v in field_mappings.values() if v and v not in ("-", "_skip")}
for fname, finfo in field_defs.items():
if finfo.get("required") and fname not in targets:
raise ValueError(f"Pflicht-Zielfeld nicht zugeordnet: {fname}")