""" Reference values — Layer 1 (read paths + structured rows) Structured retrieval for profile reference values and the active type catalog. Mutations (INSERT/UPDATE/DELETE) stay in routers/reference_values.py with validation. Dates are normalized to ISO strings; Decimals to float — suitable for JSON/API layers. """ from __future__ import annotations from datetime import date from decimal import Decimal from typing import Any, Optional from db import get_cursor, get_db, r2d # Spalten des Messwerts (ohne Typ-Metadaten) für Snapshot-Payloads / Platzhalter-JSON _REFERENCE_ENTRY_KEYS = frozenset( { "id", "profile_id", "reference_value_type_id", "effective_date", "value_numeric", "value_text", "unit", "source", "confidence", "method", "notes", "extra", "created_at", "updated_at", "type_key", "type_label", } ) def normalize_reference_row(d: Optional[dict[str, Any]]) -> dict[str, Any]: """Normalize DB row dict for JSON (dates → ISO, Decimal → float).""" if not d: return d out = dict(d) for k in ("effective_date", "created_at", "updated_at"): v = out.get(k) if v is not None and hasattr(v, "isoformat"): out[k] = v.isoformat() vn = out.get("value_numeric") if vn is not None and isinstance(vn, Decimal): out["value_numeric"] = float(vn) return out def fetch_reference_type_by_key(cur, key: str, require_active: bool = True) -> Optional[dict[str, Any]]: """Single type row by key; for use inside router transactions (shared cursor).""" q = ( "SELECT id, key, label, description, default_unit, active, category, " "value_data_type, validation_rules, metadata " "FROM reference_value_types WHERE key = %s " ) if require_active: q += "AND active = TRUE " cur.execute(q, (key,)) row = cur.fetchone() return r2d(row) if row else None def list_active_reference_value_types_data() -> list[dict[str, Any]]: """All active reference_value_types rows, catalog sort order.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT id, key, label, description, default_unit, sort_order, active, category, value_data_type, validation_rules, metadata, created_at FROM reference_value_types WHERE active = TRUE ORDER BY sort_order ASC, id ASC """ ) return [normalize_reference_row(r2d(r)) for r in cur.fetchall()] def list_profile_reference_values_for_type( profile_id: str, type_key: str ) -> Optional[list[dict[str, Any]]]: """ Historical entries for one type, newest first. Returns None if type_key does not resolve to an active type. """ with get_db() as conn: cur = get_cursor(conn) t = fetch_reference_type_by_key(cur, type_key, require_active=True) if not t: return None cur.execute( """ SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.profile_id = %s AND rt.key = %s ORDER BY v.effective_date DESC, v.created_at DESC """, (profile_id, t["key"]), ) return [normalize_reference_row(r2d(r)) for r in cur.fetchall()] def build_summary_tiles_from_ranked_rows(raw_rows: list[dict[str, Any]]) -> list[dict[str, Any]]: """ Build /summary.tile payloads from SQL rows (rn 1..2 per type). Each row still contains rn, type_sort_order, value_data_type before stripping. """ by_key: dict[str, dict[str, Any]] = {} skip_cols = frozenset({"rn", "type_sort_order", "value_data_type"}) for row in raw_rows: rn = int(row.get("rn") or 0) key = row["type_key"] if key not in by_key: by_key[key] = { "type_key": key, "type_label": row.get("type_label") or key, "value_data_type": (row.get("value_data_type") or "decimal").strip().lower(), "sort_key": (row.get("type_sort_order") or 0, key), "latest": None, "previous": None, } entry = {k: v for k, v in row.items() if k not in skip_cols} api_entry = normalize_reference_row(entry) if rn == 1: by_key[key]["latest"] = api_entry elif rn == 2: by_key[key]["previous"] = api_entry tiles = sorted(by_key.values(), key=lambda t: t["sort_key"]) for t in tiles: t.pop("sort_key", None) return tiles def get_profile_reference_values_summary(profile_id: str) -> dict[str, Any]: """latest + previous entry per type (active types only), tiles sorted like catalog.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """ WITH ranked AS ( SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label, rt.sort_order AS type_sort_order, rt.value_data_type, ROW_NUMBER() OVER ( PARTITION BY v.reference_value_type_id ORDER BY v.effective_date DESC, v.created_at DESC ) AS rn FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.profile_id = %s AND rt.active = TRUE ) SELECT * FROM ranked WHERE rn <= 2 ORDER BY type_sort_order ASC, type_key ASC, rn ASC """, (profile_id,), ) raw_rows = [r2d(r) for r in cur.fetchall()] tiles = build_summary_tiles_from_ranked_rows(raw_rows) return {"tiles": tiles} def _entry_dict_from_ranked_row(d: dict[str, Any]) -> dict[str, Any]: """Eintragsfelder inkl. type_key/type_label für KI-Kontext.""" out = {k: d[k] for k in _REFERENCE_ENTRY_KEYS if k in d} return normalize_reference_row(out) def get_profile_reference_values_current_snapshot(profile_id: str) -> dict[str, Any]: """ Layer 1: Alle **aktuellen** Referenzwerte (jüngster Eintrag pro aktivem Typ), Katalog-Sortierung. Struktur: ``items`` = Liste mit ``type_key``, ``type_label``, ``value_data_type``, ``type_sort_order``, ``latest`` (vollständiger Eintrag). """ with get_db() as conn: cur = get_cursor(conn) cur.execute( """ WITH ranked AS ( SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label, rt.sort_order AS type_sort_order, rt.value_data_type, ROW_NUMBER() OVER ( PARTITION BY v.reference_value_type_id ORDER BY v.effective_date DESC, v.created_at DESC ) AS rn FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.profile_id = %s AND rt.active = TRUE ) SELECT * FROM ranked WHERE rn = 1 ORDER BY type_sort_order ASC, type_key ASC """, (profile_id,), ) raw_rows = [r2d(r) for r in cur.fetchall()] items: list[dict[str, Any]] = [] for row in raw_rows: row.pop("rn", None) vdt = (row.get("value_data_type") or "decimal").strip().lower() latest = _entry_dict_from_ranked_row(row) items.append( { "type_key": row.get("type_key"), "type_label": row.get("type_label"), "value_data_type": vdt, "type_sort_order": int(row.get("type_sort_order") or 0), "latest": latest, } ) return { "schema": "profile_reference_values_current_v1", "count": len(items), "items": items, } def get_profile_reference_values_recent_snapshot( profile_id: str, *, limit_per_type: int = 5, date_from: Optional[date | str] = None, date_to: Optional[date | str] = None, ) -> dict[str, Any]: """ Layer 1: Pro Referenztyp die **letzten N** Einträge (neueste zuerst), optional nach ``effective_date`` gefiltert. ``date_from`` / ``date_to``: inclusive; als ``date`` oder ISO-``YYYY-MM-DD``-String. """ lim = max(1, min(int(limit_per_type), 50)) df = date_from dt = date_to if isinstance(df, str) and df.strip(): df = date.fromisoformat(df.strip()) elif df is not None and not isinstance(df, date): df = None if isinstance(dt, str) and dt.strip(): dt = date.fromisoformat(dt.strip()) elif dt is not None and not isinstance(dt, date): dt = None with get_db() as conn: cur = get_cursor(conn) cur.execute( """ WITH filtered AS ( SELECT v.id, v.profile_id, v.reference_value_type_id, v.effective_date, v.value_numeric, v.value_text, v.unit, v.source, v.confidence, v.method, v.notes, v.extra, v.created_at, v.updated_at, rt.key AS type_key, rt.label AS type_label, rt.sort_order AS type_sort_order, rt.value_data_type FROM profile_reference_values v JOIN reference_value_types rt ON rt.id = v.reference_value_type_id WHERE v.profile_id = %s AND rt.active = TRUE AND (%s::date IS NULL OR v.effective_date >= %s::date) AND (%s::date IS NULL OR v.effective_date <= %s::date) ), ranked AS ( SELECT f.*, ROW_NUMBER() OVER ( PARTITION BY f.reference_value_type_id ORDER BY f.effective_date DESC, f.created_at DESC ) AS rn FROM filtered f ) SELECT * FROM ranked WHERE rn <= %s ORDER BY type_sort_order ASC, type_key ASC, rn ASC """, (profile_id, df, df, dt, dt, lim), ) raw_rows = [r2d(r) for r in cur.fetchall()] by_type: dict[str, list[dict[str, Any]]] = {} type_order: list[str] = [] seen: set[str] = set() for row in raw_rows: row.pop("rn", None) tk = row.get("type_key") or "" if tk not in seen: seen.add(tk) type_order.append(tk) entry = _entry_dict_from_ranked_row(row) by_type.setdefault(tk, []).append(entry) return { "schema": "profile_reference_values_recent_v1", "limit_per_type": lim, "date_from": df.isoformat() if isinstance(df, date) else None, "date_to": dt.isoformat() if isinstance(dt, date) else None, "ordered_type_keys": type_order, "by_type_key": by_type, }