""" Persönliche Referenzwerte (profilorientiert) Typkatalog system-seeded; Nutzer pflegt historische Werte pro aktivem Profil. """ from __future__ import annotations from datetime import datetime from decimal import Decimal from typing import Any, Optional from fastapi import APIRouter, Depends, Header, HTTPException, Query from pydantic import BaseModel, Field, model_validator from psycopg2.extras import Json from auth import require_auth from db import get_db, get_cursor, r2d from routers.profiles import get_pid router = APIRouter(prefix="/api", tags=["reference-values"]) def _row_to_api(d: dict[str, Any]) -> dict[str, Any]: if not d: return d out = dict(d) ed = out.get("effective_date") if ed is not None and hasattr(ed, "isoformat"): out["effective_date"] = ed.isoformat() ca = out.get("created_at") if ca is not None and hasattr(ca, "isoformat"): out["created_at"] = ca.isoformat() ua = out.get("updated_at") if ua is not None and hasattr(ua, "isoformat"): out["updated_at"] = ua.isoformat() vn = out.get("value_numeric") if vn is not None and isinstance(vn, Decimal): out["value_numeric"] = float(vn) conf = out.get("confidence") if conf is not None and isinstance(conf, Decimal): out["confidence"] = float(conf) return out def _get_type_by_key(cur, key: str, require_active: bool = True) -> dict: q = ( "SELECT id, key, label, default_unit, active FROM reference_value_types WHERE key = %s " ) if require_active: q += "AND active = TRUE " cur.execute(q, (key,)) row = cur.fetchone() if not row: raise HTTPException(404, "Referenztyp nicht gefunden") return r2d(row) class ProfileReferenceValueCreate(BaseModel): reference_value_type_key: str = Field(..., min_length=1, max_length=64) effective_date: str value_numeric: Optional[float] = None value_text: Optional[str] = None unit: Optional[str] = Field(None, max_length=32) source: Optional[str] = None confidence: Optional[float] = Field(None, ge=0, le=100) method: Optional[str] = None notes: Optional[str] = None extra: Optional[dict] = None @model_validator(mode="after") def _value_present(self): has_num = self.value_numeric is not None has_txt = self.value_text is not None and str(self.value_text).strip() != "" if not has_num and not has_txt: raise ValueError("Mindestens value_numeric oder value_text erforderlich") return self class ProfileReferenceValueUpdate(BaseModel): effective_date: Optional[str] = None value_numeric: Optional[float] = None value_text: Optional[str] = None unit: Optional[str] = Field(None, max_length=32) source: Optional[str] = None confidence: Optional[float] = Field(None, ge=0, le=100) method: Optional[str] = None notes: Optional[str] = None extra: Optional[dict] = None @router.get("/reference-value-types") def list_reference_value_types(session: dict = Depends(require_auth)): """Alle aktiven Referenztyp-Definitionen (für dynamische UI).""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT id, key, label, description, default_unit, sort_order, active, metadata, created_at FROM reference_value_types WHERE active = TRUE ORDER BY sort_order ASC, id ASC """ ) return [_row_to_api(r2d(r)) for r in cur.fetchall()] @router.get("/profile-reference-values") def list_profile_reference_values( type_key: str = Query(..., description="Schlüssel aus reference_value_types.key"), x_profile_id: Optional[str] = Header(default=None), session: dict = Depends(require_auth), ): """Historische Einträge eines Typs für das aktive Profil (neueste zuerst).""" pid = get_pid(x_profile_id) with get_db() as conn: cur = get_cursor(conn) t = _get_type_by_key(cur, type_key, require_active=True) cur.execute( """ SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.profile_id = %s AND rt.key = %s ORDER BY v.effective_date DESC, v.created_at DESC """, (pid, t["key"]), ) return [_row_to_api(r2d(r)) for r in cur.fetchall()] @router.post("/profile-reference-values") def create_profile_reference_value( body: ProfileReferenceValueCreate, x_profile_id: Optional[str] = Header(default=None), session: dict = Depends(require_auth), ): pid = get_pid(x_profile_id) try: datetime.strptime(body.effective_date, "%Y-%m-%d") except ValueError: raise HTTPException(400, "Ungültiges Datum. Format: YYYY-MM-DD") with get_db() as conn: cur = get_cursor(conn) t = _get_type_by_key(cur, body.reference_value_type_key.strip(), require_active=True) unit = (body.unit or "").strip() or (t.get("default_unit") or "").strip() if not unit: raise HTTPException(400, "Bitte eine Einheit angeben (kein Standard für diesen Typ definiert).") vnum = body.value_numeric vtxt = (body.value_text.strip() if body.value_text is not None else None) or None if vnum is None and not vtxt: raise HTTPException(400, "Mindestens value_numeric oder value_text erforderlich.") extra = body.extra if body.extra is not None else {} cur.execute( """ INSERT INTO profile_reference_values ( profile_id, reference_value_type_id, effective_date, value_numeric, value_text, unit, source, confidence, method, notes, extra ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( pid, t["id"], body.effective_date, vnum, vtxt, unit, body.source, body.confidence, body.method, body.notes, Json(extra), ), ) new_id = cur.fetchone()["id"] cur.execute( """ SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.id = %s AND v.profile_id = %s """, (new_id, pid), ) return _row_to_api(r2d(cur.fetchone())) @router.put("/profile-reference-values/{entry_id}") def update_profile_reference_value( entry_id: int, body: ProfileReferenceValueUpdate, x_profile_id: Optional[str] = Header(default=None), session: dict = Depends(require_auth), ): pid = get_pid(x_profile_id) patch = body.model_dump(exclude_unset=True) if not patch: raise HTTPException(400, "Keine Felder zum Aktualisieren") if patch.get("effective_date"): try: datetime.strptime(patch["effective_date"], "%Y-%m-%d") except ValueError: raise HTTPException(400, "Ungültiges Datum. Format: YYYY-MM-DD") with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT v.*, rt.default_unit FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.id = %s AND v.profile_id = %s """, (entry_id, pid), ) row = cur.fetchone() if not row: raise HTTPException(404, "Eintrag nicht gefunden") cur_row = r2d(row) new_ed = patch.get("effective_date", cur_row["effective_date"]) if hasattr(new_ed, "isoformat"): new_ed = new_ed.isoformat() new_num = patch["value_numeric"] if "value_numeric" in patch else cur_row.get("value_numeric") new_txt_raw = patch["value_text"] if "value_text" in patch else cur_row.get("value_text") new_txt = (new_txt_raw.strip() if isinstance(new_txt_raw, str) else new_txt_raw) or None if new_num is not None and isinstance(new_num, Decimal): new_num = float(new_num) if new_num is None and not new_txt: raise HTTPException(400, "Mindestens value_numeric oder value_text erforderlich.") if "unit" in patch: new_unit = str(patch["unit"]).strip() if new_unit == "": default_u = (cur_row.get("default_unit") or "").strip() nu = default_u or cur_row["unit"] else: nu = new_unit else: nu = cur_row["unit"] if not nu: raise HTTPException(400, "Einheit darf nicht leer sein.") updates: dict[str, Any] = { "effective_date": new_ed, "value_numeric": new_num, "value_text": new_txt, "unit": nu, } if "source" in patch: updates["source"] = patch["source"] if "confidence" in patch: updates["confidence"] = patch["confidence"] if "method" in patch: updates["method"] = patch["method"] if "notes" in patch: updates["notes"] = patch["notes"] if "extra" in patch: updates["extra"] = Json(patch["extra"] if patch["extra"] is not None else {}) set_parts = [f"{k} = %s" for k in updates] vals = list(updates.values()) + [entry_id, pid] cur.execute( f""" UPDATE profile_reference_values SET {", ".join(set_parts)}, updated_at = NOW() WHERE id = %s AND profile_id = %s """, tuple(vals), ) cur.execute( """ SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.id = %s AND v.profile_id = %s """, (entry_id, pid), ) return _row_to_api(r2d(cur.fetchone())) @router.delete("/profile-reference-values/{entry_id}") def delete_profile_reference_value( entry_id: int, x_profile_id: Optional[str] = Header(default=None), session: dict = Depends(require_auth), ): pid = get_pid(x_profile_id) with get_db() as conn: cur = get_cursor(conn) cur.execute( "DELETE FROM profile_reference_values WHERE id = %s AND profile_id = %s RETURNING id", (entry_id, pid), ) if not cur.fetchone(): raise HTTPException(404, "Eintrag nicht gefunden") return {"ok": True}