- Introduced `get_profile_reference_values_current_snapshot` and `get_profile_reference_values_recent_snapshot` functions to retrieve current and recent reference values for profiles. - Updated the placeholder resolver to include new placeholders for current and recent reference values. - Added new API endpoints for fetching current and recent reference values snapshots. - Enhanced the frontend API utility to support the new snapshot endpoints. - Improved unit tests to validate the new data layer functions and their behavior.
373 lines
13 KiB
Python
373 lines
13 KiB
Python
"""
|
|
Reference values — Layer 1 (read paths + structured rows)
|
|
|
|
Structured retrieval for profile reference values and the active type catalog.
|
|
Mutations (INSERT/UPDATE/DELETE) stay in routers/reference_values.py with validation.
|
|
|
|
Dates are normalized to ISO strings; Decimals to float — suitable for JSON/API layers.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import date
|
|
from decimal import Decimal
|
|
from typing import Any, Optional
|
|
|
|
from db import get_cursor, get_db, r2d
|
|
|
|
# Spalten des Messwerts (ohne Typ-Metadaten) für Snapshot-Payloads / Platzhalter-JSON
|
|
_REFERENCE_ENTRY_KEYS = frozenset(
|
|
{
|
|
"id",
|
|
"profile_id",
|
|
"reference_value_type_id",
|
|
"effective_date",
|
|
"value_numeric",
|
|
"value_text",
|
|
"unit",
|
|
"source",
|
|
"confidence",
|
|
"method",
|
|
"notes",
|
|
"extra",
|
|
"created_at",
|
|
"updated_at",
|
|
"type_key",
|
|
"type_label",
|
|
}
|
|
)
|
|
|
|
|
|
def normalize_reference_row(d: Optional[dict[str, Any]]) -> dict[str, Any]:
|
|
"""Normalize DB row dict for JSON (dates → ISO, Decimal → float)."""
|
|
if not d:
|
|
return d
|
|
out = dict(d)
|
|
for k in ("effective_date", "created_at", "updated_at"):
|
|
v = out.get(k)
|
|
if v is not None and hasattr(v, "isoformat"):
|
|
out[k] = v.isoformat()
|
|
vn = out.get("value_numeric")
|
|
if vn is not None and isinstance(vn, Decimal):
|
|
out["value_numeric"] = float(vn)
|
|
return out
|
|
|
|
|
|
def fetch_reference_type_by_key(cur, key: str, require_active: bool = True) -> Optional[dict[str, Any]]:
|
|
"""Single type row by key; for use inside router transactions (shared cursor)."""
|
|
q = (
|
|
"SELECT id, key, label, description, default_unit, active, category, "
|
|
"value_data_type, validation_rules, metadata "
|
|
"FROM reference_value_types WHERE key = %s "
|
|
)
|
|
if require_active:
|
|
q += "AND active = TRUE "
|
|
cur.execute(q, (key,))
|
|
row = cur.fetchone()
|
|
return r2d(row) if row else None
|
|
|
|
|
|
def list_active_reference_value_types_data() -> list[dict[str, Any]]:
|
|
"""All active reference_value_types rows, catalog sort order."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
id, key, label, description, default_unit, sort_order, active,
|
|
category, value_data_type, validation_rules, metadata, created_at
|
|
FROM reference_value_types
|
|
WHERE active = TRUE
|
|
ORDER BY sort_order ASC, id ASC
|
|
"""
|
|
)
|
|
return [normalize_reference_row(r2d(r)) for r in cur.fetchall()]
|
|
|
|
|
|
def list_profile_reference_values_for_type(
|
|
profile_id: str, type_key: str
|
|
) -> Optional[list[dict[str, Any]]]:
|
|
"""
|
|
Historical entries for one type, newest first.
|
|
Returns None if type_key does not resolve to an active type.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
t = fetch_reference_type_by_key(cur, type_key, require_active=True)
|
|
if not t:
|
|
return None
|
|
cur.execute(
|
|
"""
|
|
SELECT
|
|
v.id,
|
|
v.profile_id,
|
|
v.reference_value_type_id,
|
|
v.effective_date,
|
|
v.value_numeric,
|
|
v.value_text,
|
|
v.unit,
|
|
v.source,
|
|
v.confidence,
|
|
v.method,
|
|
v.notes,
|
|
v.extra,
|
|
v.created_at,
|
|
v.updated_at,
|
|
rt.key AS type_key,
|
|
rt.label AS type_label
|
|
FROM profile_reference_values v
|
|
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
|
|
WHERE v.profile_id = %s AND rt.key = %s
|
|
ORDER BY v.effective_date DESC, v.created_at DESC
|
|
""",
|
|
(profile_id, t["key"]),
|
|
)
|
|
return [normalize_reference_row(r2d(r)) for r in cur.fetchall()]
|
|
|
|
|
|
def build_summary_tiles_from_ranked_rows(raw_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""
|
|
Build /summary.tile payloads from SQL rows (rn 1..2 per type).
|
|
Each row still contains rn, type_sort_order, value_data_type before stripping.
|
|
"""
|
|
by_key: dict[str, dict[str, Any]] = {}
|
|
skip_cols = frozenset({"rn", "type_sort_order", "value_data_type"})
|
|
for row in raw_rows:
|
|
rn = int(row.get("rn") or 0)
|
|
key = row["type_key"]
|
|
if key not in by_key:
|
|
by_key[key] = {
|
|
"type_key": key,
|
|
"type_label": row.get("type_label") or key,
|
|
"value_data_type": (row.get("value_data_type") or "decimal").strip().lower(),
|
|
"sort_key": (row.get("type_sort_order") or 0, key),
|
|
"latest": None,
|
|
"previous": None,
|
|
}
|
|
entry = {k: v for k, v in row.items() if k not in skip_cols}
|
|
api_entry = normalize_reference_row(entry)
|
|
if rn == 1:
|
|
by_key[key]["latest"] = api_entry
|
|
elif rn == 2:
|
|
by_key[key]["previous"] = api_entry
|
|
|
|
tiles = sorted(by_key.values(), key=lambda t: t["sort_key"])
|
|
for t in tiles:
|
|
t.pop("sort_key", None)
|
|
return tiles
|
|
|
|
|
|
def get_profile_reference_values_summary(profile_id: str) -> dict[str, Any]:
|
|
"""latest + previous entry per type (active types only), tiles sorted like catalog."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
WITH ranked AS (
|
|
SELECT
|
|
v.id,
|
|
v.profile_id,
|
|
v.reference_value_type_id,
|
|
v.effective_date,
|
|
v.value_numeric,
|
|
v.value_text,
|
|
v.unit,
|
|
v.source,
|
|
v.confidence,
|
|
v.method,
|
|
v.notes,
|
|
v.extra,
|
|
v.created_at,
|
|
v.updated_at,
|
|
rt.key AS type_key,
|
|
rt.label AS type_label,
|
|
rt.sort_order AS type_sort_order,
|
|
rt.value_data_type,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY v.reference_value_type_id
|
|
ORDER BY v.effective_date DESC, v.created_at DESC
|
|
) AS rn
|
|
FROM profile_reference_values v
|
|
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
|
|
WHERE v.profile_id = %s AND rt.active = TRUE
|
|
)
|
|
SELECT * FROM ranked WHERE rn <= 2
|
|
ORDER BY type_sort_order ASC, type_key ASC, rn ASC
|
|
""",
|
|
(profile_id,),
|
|
)
|
|
raw_rows = [r2d(r) for r in cur.fetchall()]
|
|
|
|
tiles = build_summary_tiles_from_ranked_rows(raw_rows)
|
|
return {"tiles": tiles}
|
|
|
|
|
|
def _entry_dict_from_ranked_row(d: dict[str, Any]) -> dict[str, Any]:
|
|
"""Eintragsfelder inkl. type_key/type_label für KI-Kontext."""
|
|
out = {k: d[k] for k in _REFERENCE_ENTRY_KEYS if k in d}
|
|
return normalize_reference_row(out)
|
|
|
|
|
|
def get_profile_reference_values_current_snapshot(profile_id: str) -> dict[str, Any]:
|
|
"""
|
|
Layer 1: Alle **aktuellen** Referenzwerte (jüngster Eintrag pro aktivem Typ), Katalog-Sortierung.
|
|
|
|
Struktur: ``items`` = Liste mit ``type_key``, ``type_label``, ``value_data_type``,
|
|
``type_sort_order``, ``latest`` (vollständiger Eintrag).
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
WITH ranked AS (
|
|
SELECT
|
|
v.id,
|
|
v.profile_id,
|
|
v.reference_value_type_id,
|
|
v.effective_date,
|
|
v.value_numeric,
|
|
v.value_text,
|
|
v.unit,
|
|
v.source,
|
|
v.confidence,
|
|
v.method,
|
|
v.notes,
|
|
v.extra,
|
|
v.created_at,
|
|
v.updated_at,
|
|
rt.key AS type_key,
|
|
rt.label AS type_label,
|
|
rt.sort_order AS type_sort_order,
|
|
rt.value_data_type,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY v.reference_value_type_id
|
|
ORDER BY v.effective_date DESC, v.created_at DESC
|
|
) AS rn
|
|
FROM profile_reference_values v
|
|
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
|
|
WHERE v.profile_id = %s AND rt.active = TRUE
|
|
)
|
|
SELECT * FROM ranked WHERE rn = 1
|
|
ORDER BY type_sort_order ASC, type_key ASC
|
|
""",
|
|
(profile_id,),
|
|
)
|
|
raw_rows = [r2d(r) for r in cur.fetchall()]
|
|
|
|
items: list[dict[str, Any]] = []
|
|
for row in raw_rows:
|
|
row.pop("rn", None)
|
|
vdt = (row.get("value_data_type") or "decimal").strip().lower()
|
|
latest = _entry_dict_from_ranked_row(row)
|
|
items.append(
|
|
{
|
|
"type_key": row.get("type_key"),
|
|
"type_label": row.get("type_label"),
|
|
"value_data_type": vdt,
|
|
"type_sort_order": int(row.get("type_sort_order") or 0),
|
|
"latest": latest,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"schema": "profile_reference_values_current_v1",
|
|
"count": len(items),
|
|
"items": items,
|
|
}
|
|
|
|
|
|
def get_profile_reference_values_recent_snapshot(
|
|
profile_id: str,
|
|
*,
|
|
limit_per_type: int = 5,
|
|
date_from: Optional[date | str] = None,
|
|
date_to: Optional[date | str] = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Layer 1: Pro Referenztyp die **letzten N** Einträge (neueste zuerst), optional nach
|
|
``effective_date`` gefiltert.
|
|
|
|
``date_from`` / ``date_to``: inclusive; als ``date`` oder ISO-``YYYY-MM-DD``-String.
|
|
"""
|
|
lim = max(1, min(int(limit_per_type), 50))
|
|
|
|
df = date_from
|
|
dt = date_to
|
|
if isinstance(df, str) and df.strip():
|
|
df = date.fromisoformat(df.strip())
|
|
elif df is not None and not isinstance(df, date):
|
|
df = None
|
|
if isinstance(dt, str) and dt.strip():
|
|
dt = date.fromisoformat(dt.strip())
|
|
elif dt is not None and not isinstance(dt, date):
|
|
dt = None
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
WITH filtered AS (
|
|
SELECT
|
|
v.id,
|
|
v.profile_id,
|
|
v.reference_value_type_id,
|
|
v.effective_date,
|
|
v.value_numeric,
|
|
v.value_text,
|
|
v.unit,
|
|
v.source,
|
|
v.confidence,
|
|
v.method,
|
|
v.notes,
|
|
v.extra,
|
|
v.created_at,
|
|
v.updated_at,
|
|
rt.key AS type_key,
|
|
rt.label AS type_label,
|
|
rt.sort_order AS type_sort_order,
|
|
rt.value_data_type
|
|
FROM profile_reference_values v
|
|
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
|
|
WHERE v.profile_id = %s
|
|
AND rt.active = TRUE
|
|
AND (%s::date IS NULL OR v.effective_date >= %s::date)
|
|
AND (%s::date IS NULL OR v.effective_date <= %s::date)
|
|
),
|
|
ranked AS (
|
|
SELECT
|
|
f.*,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY f.reference_value_type_id
|
|
ORDER BY f.effective_date DESC, f.created_at DESC
|
|
) AS rn
|
|
FROM filtered f
|
|
)
|
|
SELECT * FROM ranked WHERE rn <= %s
|
|
ORDER BY type_sort_order ASC, type_key ASC, rn ASC
|
|
""",
|
|
(profile_id, df, df, dt, dt, lim),
|
|
)
|
|
raw_rows = [r2d(r) for r in cur.fetchall()]
|
|
|
|
by_type: dict[str, list[dict[str, Any]]] = {}
|
|
type_order: list[str] = []
|
|
seen: set[str] = set()
|
|
|
|
for row in raw_rows:
|
|
row.pop("rn", None)
|
|
tk = row.get("type_key") or ""
|
|
if tk not in seen:
|
|
seen.add(tk)
|
|
type_order.append(tk)
|
|
entry = _entry_dict_from_ranked_row(row)
|
|
by_type.setdefault(tk, []).append(entry)
|
|
|
|
return {
|
|
"schema": "profile_reference_values_recent_v1",
|
|
"limit_per_type": lim,
|
|
"date_from": df.isoformat() if isinstance(df, date) else None,
|
|
"date_to": dt.isoformat() if isinstance(dt, date) else None,
|
|
"ordered_type_keys": type_order,
|
|
"by_type_key": by_type,
|
|
}
|