diff --git a/backend/csv_parser/executor.py b/backend/csv_parser/executor.py index 10169a0..3e6fa67 100644 --- a/backend/csv_parser/executor.py +++ b/backend/csv_parser/executor.py @@ -814,6 +814,10 @@ def _import_activity( run_activity_post_write_hooks_import, update_activity_columns, ) + from data_layer.activity_session_metrics import ( + resolve_activity_log_column_patch_from_csv, + upsert_session_metrics_from_csv_mapped, + ) rows_total = 0 inserted = 0 @@ -894,29 +898,29 @@ def _import_activity( training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity( cur, wtype, profile_id ) + column_patch = resolve_activity_log_column_patch_from_csv( + cur, mapped, training_category, training_type_id + ) existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t) if existing_id: - update_activity_columns( - cur, - profile_id, - existing_id, - { - "start_time": workout_start_t, - "end_time": end_str or None, - "activity_type": wtype, - "duration_min": duration_min, - "kcal_active": kcal_a, - "kcal_resting": kcal_r, - "hr_avg": hr_a, - "hr_max": hr_m, - "distance_km": dist, - "training_type_id": training_type_id, - "training_category": training_category, - "training_subcategory": training_subcategory, - "source": "csv", - }, - ) + upd = { + "start_time": workout_start_t, + "end_time": end_str or None, + "activity_type": wtype, + "duration_min": duration_min, + "kcal_active": kcal_a, + "kcal_resting": kcal_r, + "hr_avg": hr_a, + "hr_max": hr_m, + "distance_km": dist, + "training_type_id": training_type_id, + "training_category": training_category, + "training_subcategory": training_subcategory, + "source": "csv", + } + upd.update(column_patch) + update_activity_columns(cur, profile_id, existing_id, upd) updated += 1 affected_ids["activity_log"].append(str(existing_id)) aid = existing_id @@ -945,6 +949,8 @@ def _import_activity( new_entries += 1 affected_ids["activity_log"].append(str(eid)) aid = eid + if column_patch: + update_activity_columns(cur, profile_id, aid, column_patch) run_activity_post_write_hooks_import( cur, @@ -959,6 +965,14 @@ def _import_activity( kcal_active=kcal_a, kcal_resting=kcal_r, ) + upsert_session_metrics_from_csv_mapped( + cur, + profile_id, + str(aid), + mapped, + training_category, + training_type_id, + ) cur.execute("RELEASE SAVEPOINT csv_activity_row") except Exception as e: try: diff --git a/backend/csv_parser/import_row_processing.py b/backend/csv_parser/import_row_processing.py index b33dbad..2aa1c55 100644 --- a/backend/csv_parser/import_row_processing.py +++ b/backend/csv_parser/import_row_processing.py @@ -37,12 +37,16 @@ def validate_import_row_processing( module: str, spec: Mapping[str, Any], field_mappings: Mapping[str, Any], + cur=None, ) -> None: """Wirft ValueError bei ungültiger Konfiguration.""" mod = get_module_definition(module) if not mod: raise ValueError(f"Unbekanntes Modul: {module}") allowed = set(mod.get("fields") or []) + if module == "activity" and cur is not None: + cur.execute("SELECT key FROM training_parameters WHERE is_active = true") + allowed.update(str(r["key"]) for r in cur.fetchall()) fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")} group_by = spec.get("group_by") or [] diff --git a/backend/csv_parser/mapping_suggest.py b/backend/csv_parser/mapping_suggest.py index 57f42f7..905bf44 100644 --- a/backend/csv_parser/mapping_suggest.py +++ b/backend/csv_parser/mapping_suggest.py @@ -127,13 +127,19 @@ def _match_seed_to_db_field(header: str, seed_fm: Mapping[str, str]) -> str | No return None -def _alias_suggest(norm: str, module: str, used: set[str]) -> str | None: +def _alias_suggest( + norm: str, + module: str, + used: set[str], + *, + field_order: list[str] | None = None, +) -> str | None: aliases = _MODULE_HEADER_ALIASES.get(module, {}) mod = get_module_definition(module) if not mod: return None - field_order = list(mod["fields"].keys()) - for db_field in field_order: + order = field_order if field_order is not None else list(mod["fields"].keys()) + for db_field in order: if db_field in used: continue tokens = aliases.get(db_field, frozenset()) @@ -152,6 +158,8 @@ def suggest_field_mappings( headers: list[str], module: str, seed_fm: Mapping[str, str] | None = None, + *, + effective_fields: Mapping[str, Any] | None = None, ) -> dict[str, str]: """ Mappt jede CSV-Spalte (Roh-Header als Key) auf DB-Feld oder '-'. @@ -164,13 +172,16 @@ def suggest_field_mappings( if not mod: return {h: "-" for h in headers} + fields_map = dict(effective_fields) if effective_fields is not None else dict(mod["fields"]) + field_order = list(fields_map.keys()) + fm: dict[str, str] = {h: "-" for h in headers} used: set[str] = set() if seed_fm: for h in headers: db = _match_seed_to_db_field(h, seed_fm) - if db and db not in used: + if db and db not in used and db in fields_map: fm[h] = db used.add(db) @@ -178,7 +189,7 @@ def suggest_field_mappings( if fm[h] != "-": continue norm = _norm_key(h) - db = _alias_suggest(norm, module, used) + db = _alias_suggest(norm, module, used, field_order=field_order) if db: fm[h] = db used.add(db) @@ -190,6 +201,8 @@ def build_type_conversions_for_mapping( module: str, field_mappings: Mapping[str, str], seed_tc: Mapping[str, Any] | None = None, + *, + effective_fields: Mapping[str, Any] | None = None, ) -> dict[str, Any]: """type_conversions nur für zugewiesene Zielfelder; Seed überschreibt Defaults.""" if module == "sleep": @@ -198,6 +211,7 @@ def build_type_conversions_for_mapping( defaults = _DEFAULT_TYPE_CONVERSIONS.get(module, {}) out: dict[str, Any] = {} targets = {v for v in field_mappings.values() if v and v not in ("-", "_skip")} + field_meta = dict(effective_fields) if effective_fields is not None else None if seed_tc: for k, v in seed_tc.items(): @@ -208,6 +222,20 @@ def build_type_conversions_for_mapping( if t not in out and t in defaults: out[t] = deepcopy(defaults[t]) + for t in sorted(targets): + if t in out: + continue + finfo = (field_meta or {}).get(t) if field_meta else None + if not finfo: + continue + typ = finfo.get("type") + if typ == "int": + out[t] = {"type": "int", "flexible": True} + elif typ == "float": + out[t] = {"type": "float", "decimal_separator": "auto", "flexible": True} + else: + out[t] = {"type": "string"} + _apply_energy_kj_hint_from_headers(module, field_mappings, out) return out diff --git a/backend/csv_parser/module_registry.py b/backend/csv_parser/module_registry.py index 2a07fee..d4e5d2d 100644 --- a/backend/csv_parser/module_registry.py +++ b/backend/csv_parser/module_registry.py @@ -125,13 +125,16 @@ def list_modules() -> list[str]: return sorted(MODULE_DEFINITIONS.keys()) -def validate_field_mappings(module: str, field_mappings: dict) -> None: +def validate_field_mappings(module: str, field_mappings: dict, cur=None) -> None: """Wirft ValueError bei unbekanntem Modul oder unbekanntem DB-Feld.""" mod = get_module_definition(module) if not mod: raise ValueError(f"Unbekanntes Modul: {module}") fields = cast(dict, mod["fields"]) allowed = set(fields.keys()) + if module == "activity" and cur is not None: + cur.execute("SELECT key FROM training_parameters WHERE is_active = true") + allowed.update(str(r["key"]) for r in cur.fetchall()) if not allowed: for _csv_col, db_field in field_mappings.items(): if db_field not in ("", None, "-", "_skip"): diff --git a/backend/csv_parser/template_validator.py b/backend/csv_parser/template_validator.py index bbee6c9..774ab44 100644 --- a/backend/csv_parser/template_validator.py +++ b/backend/csv_parser/template_validator.py @@ -15,6 +15,7 @@ from csv_parser.module_registry import ( validate_field_mappings, validate_required_field_targets, ) +from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields ALLOWED_SPEC_TYPES = frozenset( {"string", "float", "number", "int", "date", "time", "datetime", "duration"} @@ -50,6 +51,8 @@ def validate_csv_template( type_conversions: Mapping[str, Any] | None = None, import_row_processing: Mapping[str, Any] | None = None, column_signature: list[str] | None = None, + *, + cur=None, ) -> dict[str, Any]: """ Prüft eine Vorlage ohne Datei-Upload. @@ -74,8 +77,12 @@ def validate_csv_template( ) return {"valid": False, "errors": errors, "warnings": warnings} + field_defs = dict(mod.get("fields") or {}) + if module == "activity" and cur is not None: + field_defs = merge_activity_csv_module_fields(cur, field_defs) + try: - validate_field_mappings(module, fm) + validate_field_mappings(module, fm, cur=cur) except ValueError as e: errors.append( _issue( @@ -100,7 +107,7 @@ def validate_csv_template( if import_row_processing: try: - validate_import_row_processing_spec(module, import_row_processing, fm) + validate_import_row_processing_spec(module, import_row_processing, fm, cur=cur) except ValueError as e: errors.append( _issue( @@ -111,7 +118,6 @@ def validate_csv_template( ) ) - field_defs = mod.get("fields") or {} for db_field, spec in tc.items(): if db_field not in field_defs: errors.append( diff --git a/backend/data_layer/activity_persistence_orchestrator.py b/backend/data_layer/activity_persistence_orchestrator.py index 6e74242..6c4aa82 100644 --- a/backend/data_layer/activity_persistence_orchestrator.py +++ b/backend/data_layer/activity_persistence_orchestrator.py @@ -227,6 +227,51 @@ def run_activity_post_write_hooks_import( sync_column_backed_session_metrics(cur, str(profile_id), str(eid)) +def merge_activity_csv_module_fields( + cur, + static_fields: Dict[str, Any], +) -> Dict[str, Any]: + """ + activity-Modul für CSV: statische Registry-Felder + alle aktiven training_parameters. + + Gleiche Quelle wie get_mappable_activity_field_catalog.training_parameters — erscheint + in Admin-CSV-Ziel-Liste, Validierung und Import-Zeilenaggregation. + """ + out = dict(static_fields) + cur.execute( + """ + SELECT key, data_type, unit, name_de + FROM training_parameters + WHERE is_active = true + ORDER BY key + """ + ) + for row in cur.fetchall(): + k = row["key"] + if k in out: + continue + dt = row["data_type"] or "float" + if dt == "integer": + mtype = "int" + elif dt == "float": + mtype = "float" + elif dt == "boolean": + mtype = "string" + else: + mtype = "string" + spec: Dict[str, Any] = { + "type": mtype, + "required": False, + "from_training_parameter": True, + } + if row.get("unit"): + spec["unit"] = row["unit"] + if row.get("name_de"): + spec["label_de"] = row["name_de"] + out[k] = spec + return out + + def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]: """ Felder für konfigurierbare Import-Mappings. diff --git a/backend/data_layer/activity_session_metrics.py b/backend/data_layer/activity_session_metrics.py index a2459d2..de54f40 100644 --- a/backend/data_layer/activity_session_metrics.py +++ b/backend/data_layer/activity_session_metrics.py @@ -7,10 +7,27 @@ from __future__ import annotations import logging from decimal import Decimal -from typing import Any, Dict, List, Optional, Sequence +from typing import Any, Dict, List, Mapping, Optional, Sequence logger = logging.getLogger(__name__) +# activity_log-Spalten (ohne Kernfelder aus CSV-Minimal-Insert), die über source_field beschrieben werden können. +ACTIVITY_LOG_PATCHABLE_COLUMNS = frozenset( + { + "hr_min", + "pace_min_per_km", + "cadence", + "avg_power", + "elevation_gain", + "temperature_celsius", + "humidity_percent", + "avg_hr_percent", + "kcal_per_km", + "rpe", + "notes", + } +) + class ActivitySessionMetricsError(Exception): """Raised by Layer 1; routers map to HTTP (404/400).""" @@ -218,6 +235,99 @@ def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any: raise ValueError(data_type) +def resolve_activity_log_column_patch_from_csv( + cur, + mapped: Mapping[str, Any], + training_category: Optional[str], + training_type_id: Optional[int], +) -> Dict[str, Any]: + """ + Zusätzliche activity_log-Updates aus CSV: Parameter mit source_field → Spalte. + """ + schema = resolve_activity_attribute_schema(cur, training_category, training_type_id) + patch: Dict[str, Any] = {} + for spec in schema: + src_col = (spec.get("source_field") or "").strip() + if not src_col or src_col not in ACTIVITY_LOG_PATCHABLE_COLUMNS: + continue + pkey = spec["key"] + if pkey not in mapped: + continue + raw = mapped[pkey] + if raw is None or raw == "": + continue + dt = spec["data_type"] + rules = _validation_rules_dict(spec["validation_rules"]) + try: + coerced = _coerce_raw_value_for_parameter(dt, raw) + _validate_single_value(dt, coerced, rules) + except (ActivitySessionMetricsError, TypeError, ValueError) as ex: + logger.warning( + "CSV activity_log patch skipped %s → %s: %s", + pkey, + src_col, + ex, + ) + continue + patch[src_col] = coerced + return patch + + +def upsert_session_metrics_from_csv_mapped( + cur, + profile_id: str, + activity_log_id: str, + mapped: Mapping[str, Any], + training_category: Optional[str], + training_type_id: Optional[int], +) -> None: + """EAV nur für Schema-Parameter ohne source_field (reine Session-Metriken).""" + cur.execute( + "SELECT profile_id FROM activity_log WHERE id = %s", + (activity_log_id,), + ) + row = cur.fetchone() + if not row or str(row["profile_id"]) != str(profile_id): + return + schema = resolve_activity_attribute_schema(cur, training_category, training_type_id) + for spec in schema: + pkey = spec["key"] + if pkey not in mapped: + continue + raw = mapped[pkey] + if raw is None or raw == "": + continue + src_col = (spec.get("source_field") or "").strip() + if src_col: + continue + tid = spec["training_parameter_id"] + dt = spec["data_type"] + rules = _validation_rules_dict(spec["validation_rules"]) + try: + coerced = _coerce_raw_value_for_parameter(dt, raw) + _validate_single_value(dt, coerced, rules) + except (ActivitySessionMetricsError, TypeError, ValueError) as ex: + logger.warning("CSV EAV skipped %s: %s", pkey, ex) + continue + vn, vi, vt, vb = _row_value_tuple(dt, coerced) + cur.execute( + """ + INSERT INTO activity_session_metrics ( + activity_log_id, training_parameter_id, + value_num, value_int, value_text, value_bool, updated_at + ) VALUES (%s, %s, %s, %s, %s, %s, NOW()) + ON CONFLICT (activity_log_id, training_parameter_id) + DO UPDATE SET + value_num = EXCLUDED.value_num, + value_int = EXCLUDED.value_int, + value_text = EXCLUDED.value_text, + value_bool = EXCLUDED.value_bool, + updated_at = NOW() + """, + (activity_log_id, tid, vn, vi, vt, vb), + ) + + def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None: """ EAV-Zeilen für alle Schema-Parameter mit gesetztem source_field aus der activity_log-Zeile diff --git a/backend/db.py b/backend/db.py index 9699bae..0b07348 100644 --- a/backend/db.py +++ b/backend/db.py @@ -29,7 +29,9 @@ def init_pool(): user=os.getenv("DB_USER", "mitai"), password=os.getenv("DB_PASSWORD", "") ) - print(f"✓ PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})") + print( + f"[OK] PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})" + ) @contextmanager @@ -171,7 +173,7 @@ def init_db(): ) as table_exists """) if not cur.fetchone()['table_exists']: - print("⚠️ ai_prompts table doesn't exist yet - skipping pipeline prompt creation") + print("[WARN] ai_prompts table doesn't exist yet - skipping pipeline prompt creation") return # Ensure "pipeline" master prompt exists @@ -189,7 +191,7 @@ def init_db(): ) """) conn.commit() - print("✓ Pipeline master prompt created") + print("[OK] Pipeline master prompt created") except Exception as e: - print(f"⚠️ Could not create pipeline prompt: {e}") + print(f"[WARN] Could not create pipeline prompt: {e}") # Don't fail startup - prompt can be created manually diff --git a/backend/main.py b/backend/main.py index 3a0826b..c6ee964 100644 --- a/backend/main.py +++ b/backend/main.py @@ -67,7 +67,7 @@ async def startup_event(): try: init_db() except Exception as e: - print(f"⚠️ init_db() failed (non-fatal): {e}") + print(f"[WARN] init_db() failed (non-fatal): {e}") # Don't crash on startup - can be created manually # Apply v9c migration if needed @@ -75,7 +75,7 @@ async def startup_event(): from apply_v9c_migration import apply_migration apply_migration() except Exception as e: - print(f"⚠️ v9c migration failed (non-fatal): {e}") + print(f"[WARN] v9c migration failed (non-fatal): {e}") # ── Register Routers ────────────────────────────────────────────────────────── app.include_router(auth.router) # /api/auth/* diff --git a/backend/routers/admin_csv_templates.py b/backend/routers/admin_csv_templates.py index 2b3b7c7..a89c343 100644 --- a/backend/routers/admin_csv_templates.py +++ b/backend/routers/admin_csv_templates.py @@ -25,6 +25,7 @@ from csv_parser.import_row_processing import ( ) from csv_parser.module_registry import get_module_definition from csv_parser.template_validator import validate_csv_template +from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields router = APIRouter(prefix="/api/admin/csv-templates", tags=["admin", "csv-import"]) @@ -181,6 +182,8 @@ async def admin_analyze_csv_for_template( sig = column_signature(headers) seed_row: dict | None = None + field_mappings: dict = {} + type_conversions: dict = {} with get_db() as conn: cur = get_cursor(conn) if seed_template_id is not None: @@ -215,15 +218,30 @@ async def admin_analyze_csv_for_template( if best and best_key[0] > 0: seed_row = best - seed_fm = (seed_row or {}).get("field_mappings") or {} - if isinstance(seed_fm, str): - seed_fm = {} - seed_tc = (seed_row or {}).get("type_conversions") - if not isinstance(seed_tc, dict): - seed_tc = {} + mod_def = get_module_definition(module) or {} + eff_fields = dict(mod_def.get("fields") or {}) + if module == "activity": + eff_fields = merge_activity_csv_module_fields(cur, eff_fields) - field_mappings = suggest_field_mappings(headers, module, seed_fm if seed_fm else None) - type_conversions = build_type_conversions_for_mapping(module, field_mappings, seed_tc if seed_tc else None) + seed_fm = (seed_row or {}).get("field_mappings") or {} + if isinstance(seed_fm, str): + seed_fm = {} + seed_tc = (seed_row or {}).get("type_conversions") + if not isinstance(seed_tc, dict): + seed_tc = {} + + field_mappings = suggest_field_mappings( + headers, + module, + seed_fm if seed_fm else None, + effective_fields=eff_fields, + ) + type_conversions = build_type_conversions_for_mapping( + module, + field_mappings, + seed_tc if seed_tc else None, + effective_fields=eff_fields, + ) seed_meta = None if seed_row: @@ -270,13 +288,16 @@ def validate_system_template_dry_run(body: CsvTemplateValidateBody, session: dic """ if not get_module_definition(body.module): raise HTTPException(400, f"Unbekanntes Modul: {body.module}") - return validate_csv_template( - body.module, - body.field_mappings, - body.type_conversions, - body.import_row_processing, - body.column_signature, - ) + with get_db() as conn: + cur = get_cursor(conn) + return validate_csv_template( + body.module, + body.field_mappings, + body.type_conversions, + body.import_row_processing, + body.column_signature, + cur=cur, + ) @router.get("/{template_id}") @@ -297,18 +318,19 @@ def get_system_template(template_id: int, session: dict = Depends(require_admin) def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depends(require_admin)): if not get_module_definition(body.module): raise HTTPException(400, f"Unbekanntes Modul: {body.module}") - report = validate_csv_template( - body.module, - body.field_mappings, - body.type_conversions, - body.import_row_processing, - body.column_signature, - ) - if not report["valid"]: - raise HTTPException(status_code=422, detail=report) - with get_db() as conn: cur = get_cursor(conn) + report = validate_csv_template( + body.module, + body.field_mappings, + body.type_conversions, + body.import_row_processing, + body.column_signature, + cur=cur, + ) + if not report["valid"]: + raise HTTPException(status_code=422, detail=report) + cur.execute( """ INSERT INTO csv_field_mappings ( @@ -367,6 +389,7 @@ def update_system_template( tc_eff, irp_eff, col_eff if isinstance(col_eff, list) else None, + cur=cur, ) if not report["valid"]: raise HTTPException(status_code=422, detail=report) diff --git a/backend/routers/csv_import.py b/backend/routers/csv_import.py index 5f618da..b3ab67a 100644 --- a/backend/routers/csv_import.py +++ b/backend/routers/csv_import.py @@ -34,6 +34,7 @@ from csv_parser.type_converter import build_row_after_mapping, diagnose_row_mapp from csv_parser.field_units import source_unit_choices_for_field from csv_parser.import_errors import enrich_row_error from csv_parser.module_registry import get_module_definition, list_modules, validate_field_mappings +from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields from csv_parser.sleep_apple_import import detect_apple_sleep_csv_format router = APIRouter(prefix="/api/csv", tags=["csv-import"]) @@ -66,25 +67,30 @@ def _mapping_to_summary(m: dict) -> dict: def csv_modules(session: dict = Depends(require_auth)): """Unterstützte Import-Module und Felddefinitionen.""" out = [] - for mid in list_modules(): - d = get_module_definition(mid) - if d: - fields_out = {} - for fname, finfo in (d.get("fields") or {}).items(): - fd = dict(finfo) - opts = source_unit_choices_for_field(mid, fname) - if opts: - fd["source_unit_options"] = opts - fields_out[fname] = fd - out.append( - { - "id": mid, - "table": d["table"], - "fields": fields_out, - "import_mode": d.get("import_mode"), - "import_row_processing_default": d.get("import_row_processing_default"), - } - ) + with get_db() as conn: + cur = get_cursor(conn) + for mid in list_modules(): + d = get_module_definition(mid) + if d: + field_src = dict(d.get("fields") or {}) + if mid == "activity": + field_src = merge_activity_csv_module_fields(cur, field_src) + fields_out = {} + for fname, finfo in field_src.items(): + fd = dict(finfo) + opts = source_unit_choices_for_field(mid, fname) + if opts: + fd["source_unit_options"] = opts + fields_out[fname] = fd + out.append( + { + "id": mid, + "table": d["table"], + "fields": fields_out, + "import_mode": d.get("import_mode"), + "import_row_processing_default": d.get("import_row_processing_default"), + } + ) return {"modules": out} @@ -257,6 +263,10 @@ async def analyze_csv( with get_db() as conn: cur = get_cursor(conn) + if module == "activity" and mod_def: + available_fields = merge_activity_csv_module_fields( + cur, dict(mod_def.get("fields") or {}) + ) if module: cur.execute( """