feat: Enhance CSV import and validation for activity module
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s

- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy.
- Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports.
- Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity.
- Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
This commit is contained in:
Lars 2026-04-15 08:12:58 +02:00
parent 934b915357
commit 574af61349
11 changed files with 325 additions and 80 deletions

View File

@ -814,6 +814,10 @@ def _import_activity(
run_activity_post_write_hooks_import,
update_activity_columns,
)
from data_layer.activity_session_metrics import (
resolve_activity_log_column_patch_from_csv,
upsert_session_metrics_from_csv_mapped,
)
rows_total = 0
inserted = 0
@ -894,29 +898,29 @@ def _import_activity(
training_type_id, training_category, training_subcategory = _resolve_training_type_for_activity(
cur, wtype, profile_id
)
column_patch = resolve_activity_log_column_patch_from_csv(
cur, mapped, training_category, training_type_id
)
existing_id = find_activity_duplicate_id(cur, profile_id, iso, workout_start_t)
if existing_id:
update_activity_columns(
cur,
profile_id,
existing_id,
{
"start_time": workout_start_t,
"end_time": end_str or None,
"activity_type": wtype,
"duration_min": duration_min,
"kcal_active": kcal_a,
"kcal_resting": kcal_r,
"hr_avg": hr_a,
"hr_max": hr_m,
"distance_km": dist,
"training_type_id": training_type_id,
"training_category": training_category,
"training_subcategory": training_subcategory,
"source": "csv",
},
)
upd = {
"start_time": workout_start_t,
"end_time": end_str or None,
"activity_type": wtype,
"duration_min": duration_min,
"kcal_active": kcal_a,
"kcal_resting": kcal_r,
"hr_avg": hr_a,
"hr_max": hr_m,
"distance_km": dist,
"training_type_id": training_type_id,
"training_category": training_category,
"training_subcategory": training_subcategory,
"source": "csv",
}
upd.update(column_patch)
update_activity_columns(cur, profile_id, existing_id, upd)
updated += 1
affected_ids["activity_log"].append(str(existing_id))
aid = existing_id
@ -945,6 +949,8 @@ def _import_activity(
new_entries += 1
affected_ids["activity_log"].append(str(eid))
aid = eid
if column_patch:
update_activity_columns(cur, profile_id, aid, column_patch)
run_activity_post_write_hooks_import(
cur,
@ -959,6 +965,14 @@ def _import_activity(
kcal_active=kcal_a,
kcal_resting=kcal_r,
)
upsert_session_metrics_from_csv_mapped(
cur,
profile_id,
str(aid),
mapped,
training_category,
training_type_id,
)
cur.execute("RELEASE SAVEPOINT csv_activity_row")
except Exception as e:
try:

View File

@ -37,12 +37,16 @@ def validate_import_row_processing(
module: str,
spec: Mapping[str, Any],
field_mappings: Mapping[str, Any],
cur=None,
) -> None:
"""Wirft ValueError bei ungültiger Konfiguration."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
allowed = set(mod.get("fields") or [])
if module == "activity" and cur is not None:
cur.execute("SELECT key FROM training_parameters WHERE is_active = true")
allowed.update(str(r["key"]) for r in cur.fetchall())
fm_targets = {str(v) for v in field_mappings.values() if v and v not in ("-", "_skip")}
group_by = spec.get("group_by") or []

View File

@ -127,13 +127,19 @@ def _match_seed_to_db_field(header: str, seed_fm: Mapping[str, str]) -> str | No
return None
def _alias_suggest(norm: str, module: str, used: set[str]) -> str | None:
def _alias_suggest(
norm: str,
module: str,
used: set[str],
*,
field_order: list[str] | None = None,
) -> str | None:
aliases = _MODULE_HEADER_ALIASES.get(module, {})
mod = get_module_definition(module)
if not mod:
return None
field_order = list(mod["fields"].keys())
for db_field in field_order:
order = field_order if field_order is not None else list(mod["fields"].keys())
for db_field in order:
if db_field in used:
continue
tokens = aliases.get(db_field, frozenset())
@ -152,6 +158,8 @@ def suggest_field_mappings(
headers: list[str],
module: str,
seed_fm: Mapping[str, str] | None = None,
*,
effective_fields: Mapping[str, Any] | None = None,
) -> dict[str, str]:
"""
Mappt jede CSV-Spalte (Roh-Header als Key) auf DB-Feld oder '-'.
@ -164,13 +172,16 @@ def suggest_field_mappings(
if not mod:
return {h: "-" for h in headers}
fields_map = dict(effective_fields) if effective_fields is not None else dict(mod["fields"])
field_order = list(fields_map.keys())
fm: dict[str, str] = {h: "-" for h in headers}
used: set[str] = set()
if seed_fm:
for h in headers:
db = _match_seed_to_db_field(h, seed_fm)
if db and db not in used:
if db and db not in used and db in fields_map:
fm[h] = db
used.add(db)
@ -178,7 +189,7 @@ def suggest_field_mappings(
if fm[h] != "-":
continue
norm = _norm_key(h)
db = _alias_suggest(norm, module, used)
db = _alias_suggest(norm, module, used, field_order=field_order)
if db:
fm[h] = db
used.add(db)
@ -190,6 +201,8 @@ def build_type_conversions_for_mapping(
module: str,
field_mappings: Mapping[str, str],
seed_tc: Mapping[str, Any] | None = None,
*,
effective_fields: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
"""type_conversions nur für zugewiesene Zielfelder; Seed überschreibt Defaults."""
if module == "sleep":
@ -198,6 +211,7 @@ def build_type_conversions_for_mapping(
defaults = _DEFAULT_TYPE_CONVERSIONS.get(module, {})
out: dict[str, Any] = {}
targets = {v for v in field_mappings.values() if v and v not in ("-", "_skip")}
field_meta = dict(effective_fields) if effective_fields is not None else None
if seed_tc:
for k, v in seed_tc.items():
@ -208,6 +222,20 @@ def build_type_conversions_for_mapping(
if t not in out and t in defaults:
out[t] = deepcopy(defaults[t])
for t in sorted(targets):
if t in out:
continue
finfo = (field_meta or {}).get(t) if field_meta else None
if not finfo:
continue
typ = finfo.get("type")
if typ == "int":
out[t] = {"type": "int", "flexible": True}
elif typ == "float":
out[t] = {"type": "float", "decimal_separator": "auto", "flexible": True}
else:
out[t] = {"type": "string"}
_apply_energy_kj_hint_from_headers(module, field_mappings, out)
return out

View File

@ -125,13 +125,16 @@ def list_modules() -> list[str]:
return sorted(MODULE_DEFINITIONS.keys())
def validate_field_mappings(module: str, field_mappings: dict) -> None:
def validate_field_mappings(module: str, field_mappings: dict, cur=None) -> None:
"""Wirft ValueError bei unbekanntem Modul oder unbekanntem DB-Feld."""
mod = get_module_definition(module)
if not mod:
raise ValueError(f"Unbekanntes Modul: {module}")
fields = cast(dict, mod["fields"])
allowed = set(fields.keys())
if module == "activity" and cur is not None:
cur.execute("SELECT key FROM training_parameters WHERE is_active = true")
allowed.update(str(r["key"]) for r in cur.fetchall())
if not allowed:
for _csv_col, db_field in field_mappings.items():
if db_field not in ("", None, "-", "_skip"):

View File

@ -15,6 +15,7 @@ from csv_parser.module_registry import (
validate_field_mappings,
validate_required_field_targets,
)
from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields
ALLOWED_SPEC_TYPES = frozenset(
{"string", "float", "number", "int", "date", "time", "datetime", "duration"}
@ -50,6 +51,8 @@ def validate_csv_template(
type_conversions: Mapping[str, Any] | None = None,
import_row_processing: Mapping[str, Any] | None = None,
column_signature: list[str] | None = None,
*,
cur=None,
) -> dict[str, Any]:
"""
Prüft eine Vorlage ohne Datei-Upload.
@ -74,8 +77,12 @@ def validate_csv_template(
)
return {"valid": False, "errors": errors, "warnings": warnings}
field_defs = dict(mod.get("fields") or {})
if module == "activity" and cur is not None:
field_defs = merge_activity_csv_module_fields(cur, field_defs)
try:
validate_field_mappings(module, fm)
validate_field_mappings(module, fm, cur=cur)
except ValueError as e:
errors.append(
_issue(
@ -100,7 +107,7 @@ def validate_csv_template(
if import_row_processing:
try:
validate_import_row_processing_spec(module, import_row_processing, fm)
validate_import_row_processing_spec(module, import_row_processing, fm, cur=cur)
except ValueError as e:
errors.append(
_issue(
@ -111,7 +118,6 @@ def validate_csv_template(
)
)
field_defs = mod.get("fields") or {}
for db_field, spec in tc.items():
if db_field not in field_defs:
errors.append(

View File

@ -227,6 +227,51 @@ def run_activity_post_write_hooks_import(
sync_column_backed_session_metrics(cur, str(profile_id), str(eid))
def merge_activity_csv_module_fields(
cur,
static_fields: Dict[str, Any],
) -> Dict[str, Any]:
"""
activity-Modul für CSV: statische Registry-Felder + alle aktiven training_parameters.
Gleiche Quelle wie get_mappable_activity_field_catalog.training_parameters erscheint
in Admin-CSV-Ziel-Liste, Validierung und Import-Zeilenaggregation.
"""
out = dict(static_fields)
cur.execute(
"""
SELECT key, data_type, unit, name_de
FROM training_parameters
WHERE is_active = true
ORDER BY key
"""
)
for row in cur.fetchall():
k = row["key"]
if k in out:
continue
dt = row["data_type"] or "float"
if dt == "integer":
mtype = "int"
elif dt == "float":
mtype = "float"
elif dt == "boolean":
mtype = "string"
else:
mtype = "string"
spec: Dict[str, Any] = {
"type": mtype,
"required": False,
"from_training_parameter": True,
}
if row.get("unit"):
spec["unit"] = row["unit"]
if row.get("name_de"):
spec["label_de"] = row["name_de"]
out[k] = spec
return out
def get_mappable_activity_field_catalog(cur, profile_id: str) -> Dict[str, Any]:
"""
Felder für konfigurierbare Import-Mappings.

View File

@ -7,10 +7,27 @@ from __future__ import annotations
import logging
from decimal import Decimal
from typing import Any, Dict, List, Optional, Sequence
from typing import Any, Dict, List, Mapping, Optional, Sequence
logger = logging.getLogger(__name__)
# activity_log-Spalten (ohne Kernfelder aus CSV-Minimal-Insert), die über source_field beschrieben werden können.
ACTIVITY_LOG_PATCHABLE_COLUMNS = frozenset(
{
"hr_min",
"pace_min_per_km",
"cadence",
"avg_power",
"elevation_gain",
"temperature_celsius",
"humidity_percent",
"avg_hr_percent",
"kcal_per_km",
"rpe",
"notes",
}
)
class ActivitySessionMetricsError(Exception):
"""Raised by Layer 1; routers map to HTTP (404/400)."""
@ -218,6 +235,99 @@ def _coerce_raw_value_for_parameter(data_type: str, raw: Any) -> Any:
raise ValueError(data_type)
def resolve_activity_log_column_patch_from_csv(
cur,
mapped: Mapping[str, Any],
training_category: Optional[str],
training_type_id: Optional[int],
) -> Dict[str, Any]:
"""
Zusätzliche activity_log-Updates aus CSV: Parameter mit source_field Spalte.
"""
schema = resolve_activity_attribute_schema(cur, training_category, training_type_id)
patch: Dict[str, Any] = {}
for spec in schema:
src_col = (spec.get("source_field") or "").strip()
if not src_col or src_col not in ACTIVITY_LOG_PATCHABLE_COLUMNS:
continue
pkey = spec["key"]
if pkey not in mapped:
continue
raw = mapped[pkey]
if raw is None or raw == "":
continue
dt = spec["data_type"]
rules = _validation_rules_dict(spec["validation_rules"])
try:
coerced = _coerce_raw_value_for_parameter(dt, raw)
_validate_single_value(dt, coerced, rules)
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
logger.warning(
"CSV activity_log patch skipped %s%s: %s",
pkey,
src_col,
ex,
)
continue
patch[src_col] = coerced
return patch
def upsert_session_metrics_from_csv_mapped(
cur,
profile_id: str,
activity_log_id: str,
mapped: Mapping[str, Any],
training_category: Optional[str],
training_type_id: Optional[int],
) -> None:
"""EAV nur für Schema-Parameter ohne source_field (reine Session-Metriken)."""
cur.execute(
"SELECT profile_id FROM activity_log WHERE id = %s",
(activity_log_id,),
)
row = cur.fetchone()
if not row or str(row["profile_id"]) != str(profile_id):
return
schema = resolve_activity_attribute_schema(cur, training_category, training_type_id)
for spec in schema:
pkey = spec["key"]
if pkey not in mapped:
continue
raw = mapped[pkey]
if raw is None or raw == "":
continue
src_col = (spec.get("source_field") or "").strip()
if src_col:
continue
tid = spec["training_parameter_id"]
dt = spec["data_type"]
rules = _validation_rules_dict(spec["validation_rules"])
try:
coerced = _coerce_raw_value_for_parameter(dt, raw)
_validate_single_value(dt, coerced, rules)
except (ActivitySessionMetricsError, TypeError, ValueError) as ex:
logger.warning("CSV EAV skipped %s: %s", pkey, ex)
continue
vn, vi, vt, vb = _row_value_tuple(dt, coerced)
cur.execute(
"""
INSERT INTO activity_session_metrics (
activity_log_id, training_parameter_id,
value_num, value_int, value_text, value_bool, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (activity_log_id, training_parameter_id)
DO UPDATE SET
value_num = EXCLUDED.value_num,
value_int = EXCLUDED.value_int,
value_text = EXCLUDED.value_text,
value_bool = EXCLUDED.value_bool,
updated_at = NOW()
""",
(activity_log_id, tid, vn, vi, vt, vb),
)
def sync_column_backed_session_metrics(cur, profile_id: str, activity_log_id: str) -> None:
"""
EAV-Zeilen für alle Schema-Parameter mit gesetztem source_field aus der activity_log-Zeile

View File

@ -29,7 +29,9 @@ def init_pool():
user=os.getenv("DB_USER", "mitai"),
password=os.getenv("DB_PASSWORD", "")
)
print(f"✓ PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})")
print(
f"[OK] PostgreSQL connection pool initialized ({os.getenv('DB_HOST', 'postgres')}:{os.getenv('DB_PORT', '5432')})"
)
@contextmanager
@ -171,7 +173,7 @@ def init_db():
) as table_exists
""")
if not cur.fetchone()['table_exists']:
print("⚠️ ai_prompts table doesn't exist yet - skipping pipeline prompt creation")
print("[WARN] ai_prompts table doesn't exist yet - skipping pipeline prompt creation")
return
# Ensure "pipeline" master prompt exists
@ -189,7 +191,7 @@ def init_db():
)
""")
conn.commit()
print(" Pipeline master prompt created")
print("[OK] Pipeline master prompt created")
except Exception as e:
print(f"⚠️ Could not create pipeline prompt: {e}")
print(f"[WARN] Could not create pipeline prompt: {e}")
# Don't fail startup - prompt can be created manually

View File

@ -67,7 +67,7 @@ async def startup_event():
try:
init_db()
except Exception as e:
print(f"⚠️ init_db() failed (non-fatal): {e}")
print(f"[WARN] init_db() failed (non-fatal): {e}")
# Don't crash on startup - can be created manually
# Apply v9c migration if needed
@ -75,7 +75,7 @@ async def startup_event():
from apply_v9c_migration import apply_migration
apply_migration()
except Exception as e:
print(f"⚠️ v9c migration failed (non-fatal): {e}")
print(f"[WARN] v9c migration failed (non-fatal): {e}")
# ── Register Routers ──────────────────────────────────────────────────────────
app.include_router(auth.router) # /api/auth/*

View File

@ -25,6 +25,7 @@ from csv_parser.import_row_processing import (
)
from csv_parser.module_registry import get_module_definition
from csv_parser.template_validator import validate_csv_template
from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields
router = APIRouter(prefix="/api/admin/csv-templates", tags=["admin", "csv-import"])
@ -181,6 +182,8 @@ async def admin_analyze_csv_for_template(
sig = column_signature(headers)
seed_row: dict | None = None
field_mappings: dict = {}
type_conversions: dict = {}
with get_db() as conn:
cur = get_cursor(conn)
if seed_template_id is not None:
@ -215,15 +218,30 @@ async def admin_analyze_csv_for_template(
if best and best_key[0] > 0:
seed_row = best
seed_fm = (seed_row or {}).get("field_mappings") or {}
if isinstance(seed_fm, str):
seed_fm = {}
seed_tc = (seed_row or {}).get("type_conversions")
if not isinstance(seed_tc, dict):
seed_tc = {}
mod_def = get_module_definition(module) or {}
eff_fields = dict(mod_def.get("fields") or {})
if module == "activity":
eff_fields = merge_activity_csv_module_fields(cur, eff_fields)
field_mappings = suggest_field_mappings(headers, module, seed_fm if seed_fm else None)
type_conversions = build_type_conversions_for_mapping(module, field_mappings, seed_tc if seed_tc else None)
seed_fm = (seed_row or {}).get("field_mappings") or {}
if isinstance(seed_fm, str):
seed_fm = {}
seed_tc = (seed_row or {}).get("type_conversions")
if not isinstance(seed_tc, dict):
seed_tc = {}
field_mappings = suggest_field_mappings(
headers,
module,
seed_fm if seed_fm else None,
effective_fields=eff_fields,
)
type_conversions = build_type_conversions_for_mapping(
module,
field_mappings,
seed_tc if seed_tc else None,
effective_fields=eff_fields,
)
seed_meta = None
if seed_row:
@ -270,13 +288,16 @@ def validate_system_template_dry_run(body: CsvTemplateValidateBody, session: dic
"""
if not get_module_definition(body.module):
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
return validate_csv_template(
body.module,
body.field_mappings,
body.type_conversions,
body.import_row_processing,
body.column_signature,
)
with get_db() as conn:
cur = get_cursor(conn)
return validate_csv_template(
body.module,
body.field_mappings,
body.type_conversions,
body.import_row_processing,
body.column_signature,
cur=cur,
)
@router.get("/{template_id}")
@ -297,18 +318,19 @@ def get_system_template(template_id: int, session: dict = Depends(require_admin)
def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depends(require_admin)):
if not get_module_definition(body.module):
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
report = validate_csv_template(
body.module,
body.field_mappings,
body.type_conversions,
body.import_row_processing,
body.column_signature,
)
if not report["valid"]:
raise HTTPException(status_code=422, detail=report)
with get_db() as conn:
cur = get_cursor(conn)
report = validate_csv_template(
body.module,
body.field_mappings,
body.type_conversions,
body.import_row_processing,
body.column_signature,
cur=cur,
)
if not report["valid"]:
raise HTTPException(status_code=422, detail=report)
cur.execute(
"""
INSERT INTO csv_field_mappings (
@ -367,6 +389,7 @@ def update_system_template(
tc_eff,
irp_eff,
col_eff if isinstance(col_eff, list) else None,
cur=cur,
)
if not report["valid"]:
raise HTTPException(status_code=422, detail=report)

View File

@ -34,6 +34,7 @@ from csv_parser.type_converter import build_row_after_mapping, diagnose_row_mapp
from csv_parser.field_units import source_unit_choices_for_field
from csv_parser.import_errors import enrich_row_error
from csv_parser.module_registry import get_module_definition, list_modules, validate_field_mappings
from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields
from csv_parser.sleep_apple_import import detect_apple_sleep_csv_format
router = APIRouter(prefix="/api/csv", tags=["csv-import"])
@ -66,25 +67,30 @@ def _mapping_to_summary(m: dict) -> dict:
def csv_modules(session: dict = Depends(require_auth)):
"""Unterstützte Import-Module und Felddefinitionen."""
out = []
for mid in list_modules():
d = get_module_definition(mid)
if d:
fields_out = {}
for fname, finfo in (d.get("fields") or {}).items():
fd = dict(finfo)
opts = source_unit_choices_for_field(mid, fname)
if opts:
fd["source_unit_options"] = opts
fields_out[fname] = fd
out.append(
{
"id": mid,
"table": d["table"],
"fields": fields_out,
"import_mode": d.get("import_mode"),
"import_row_processing_default": d.get("import_row_processing_default"),
}
)
with get_db() as conn:
cur = get_cursor(conn)
for mid in list_modules():
d = get_module_definition(mid)
if d:
field_src = dict(d.get("fields") or {})
if mid == "activity":
field_src = merge_activity_csv_module_fields(cur, field_src)
fields_out = {}
for fname, finfo in field_src.items():
fd = dict(finfo)
opts = source_unit_choices_for_field(mid, fname)
if opts:
fd["source_unit_options"] = opts
fields_out[fname] = fd
out.append(
{
"id": mid,
"table": d["table"],
"fields": fields_out,
"import_mode": d.get("import_mode"),
"import_row_processing_default": d.get("import_row_processing_default"),
}
)
return {"modules": out}
@ -257,6 +263,10 @@ async def analyze_csv(
with get_db() as conn:
cur = get_cursor(conn)
if module == "activity" and mod_def:
available_fields = merge_activity_csv_module_fields(
cur, dict(mod_def.get("fields") or {})
)
if module:
cur.execute(
"""