mitai-jinkendo/backend/data_layer/reference_values.py
Lars 42ae796448
All checks were successful
Deploy Development / deploy (push) Successful in 1m3s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat: add reference values snapshot endpoints and data layer functions
- Introduced `get_profile_reference_values_current_snapshot` and `get_profile_reference_values_recent_snapshot` functions to retrieve current and recent reference values for profiles.
- Updated the placeholder resolver to include new placeholders for current and recent reference values.
- Added new API endpoints for fetching current and recent reference values snapshots.
- Enhanced the frontend API utility to support the new snapshot endpoints.
- Improved unit tests to validate the new data layer functions and their behavior.
2026-04-19 10:52:31 +02:00

373 lines
13 KiB
Python

"""
Reference values — Layer 1 (read paths + structured rows)
Structured retrieval for profile reference values and the active type catalog.
Mutations (INSERT/UPDATE/DELETE) stay in routers/reference_values.py with validation.
Dates are normalized to ISO strings; Decimals to float — suitable for JSON/API layers.
"""
from __future__ import annotations
from datetime import date
from decimal import Decimal
from typing import Any, Optional
from db import get_cursor, get_db, r2d
# Spalten des Messwerts (ohne Typ-Metadaten) für Snapshot-Payloads / Platzhalter-JSON
_REFERENCE_ENTRY_KEYS = frozenset(
{
"id",
"profile_id",
"reference_value_type_id",
"effective_date",
"value_numeric",
"value_text",
"unit",
"source",
"confidence",
"method",
"notes",
"extra",
"created_at",
"updated_at",
"type_key",
"type_label",
}
)
def normalize_reference_row(d: Optional[dict[str, Any]]) -> dict[str, Any]:
"""Normalize DB row dict for JSON (dates → ISO, Decimal → float)."""
if not d:
return d
out = dict(d)
for k in ("effective_date", "created_at", "updated_at"):
v = out.get(k)
if v is not None and hasattr(v, "isoformat"):
out[k] = v.isoformat()
vn = out.get("value_numeric")
if vn is not None and isinstance(vn, Decimal):
out["value_numeric"] = float(vn)
return out
def fetch_reference_type_by_key(cur, key: str, require_active: bool = True) -> Optional[dict[str, Any]]:
"""Single type row by key; for use inside router transactions (shared cursor)."""
q = (
"SELECT id, key, label, description, default_unit, active, category, "
"value_data_type, validation_rules, metadata "
"FROM reference_value_types WHERE key = %s "
)
if require_active:
q += "AND active = TRUE "
cur.execute(q, (key,))
row = cur.fetchone()
return r2d(row) if row else None
def list_active_reference_value_types_data() -> list[dict[str, Any]]:
"""All active reference_value_types rows, catalog sort order."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
id, key, label, description, default_unit, sort_order, active,
category, value_data_type, validation_rules, metadata, created_at
FROM reference_value_types
WHERE active = TRUE
ORDER BY sort_order ASC, id ASC
"""
)
return [normalize_reference_row(r2d(r)) for r in cur.fetchall()]
def list_profile_reference_values_for_type(
profile_id: str, type_key: str
) -> Optional[list[dict[str, Any]]]:
"""
Historical entries for one type, newest first.
Returns None if type_key does not resolve to an active type.
"""
with get_db() as conn:
cur = get_cursor(conn)
t = fetch_reference_type_by_key(cur, type_key, require_active=True)
if not t:
return None
cur.execute(
"""
SELECT
v.id,
v.profile_id,
v.reference_value_type_id,
v.effective_date,
v.value_numeric,
v.value_text,
v.unit,
v.source,
v.confidence,
v.method,
v.notes,
v.extra,
v.created_at,
v.updated_at,
rt.key AS type_key,
rt.label AS type_label
FROM profile_reference_values v
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
WHERE v.profile_id = %s AND rt.key = %s
ORDER BY v.effective_date DESC, v.created_at DESC
""",
(profile_id, t["key"]),
)
return [normalize_reference_row(r2d(r)) for r in cur.fetchall()]
def build_summary_tiles_from_ranked_rows(raw_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Build /summary.tile payloads from SQL rows (rn 1..2 per type).
Each row still contains rn, type_sort_order, value_data_type before stripping.
"""
by_key: dict[str, dict[str, Any]] = {}
skip_cols = frozenset({"rn", "type_sort_order", "value_data_type"})
for row in raw_rows:
rn = int(row.get("rn") or 0)
key = row["type_key"]
if key not in by_key:
by_key[key] = {
"type_key": key,
"type_label": row.get("type_label") or key,
"value_data_type": (row.get("value_data_type") or "decimal").strip().lower(),
"sort_key": (row.get("type_sort_order") or 0, key),
"latest": None,
"previous": None,
}
entry = {k: v for k, v in row.items() if k not in skip_cols}
api_entry = normalize_reference_row(entry)
if rn == 1:
by_key[key]["latest"] = api_entry
elif rn == 2:
by_key[key]["previous"] = api_entry
tiles = sorted(by_key.values(), key=lambda t: t["sort_key"])
for t in tiles:
t.pop("sort_key", None)
return tiles
def get_profile_reference_values_summary(profile_id: str) -> dict[str, Any]:
"""latest + previous entry per type (active types only), tiles sorted like catalog."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
WITH ranked AS (
SELECT
v.id,
v.profile_id,
v.reference_value_type_id,
v.effective_date,
v.value_numeric,
v.value_text,
v.unit,
v.source,
v.confidence,
v.method,
v.notes,
v.extra,
v.created_at,
v.updated_at,
rt.key AS type_key,
rt.label AS type_label,
rt.sort_order AS type_sort_order,
rt.value_data_type,
ROW_NUMBER() OVER (
PARTITION BY v.reference_value_type_id
ORDER BY v.effective_date DESC, v.created_at DESC
) AS rn
FROM profile_reference_values v
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
WHERE v.profile_id = %s AND rt.active = TRUE
)
SELECT * FROM ranked WHERE rn <= 2
ORDER BY type_sort_order ASC, type_key ASC, rn ASC
""",
(profile_id,),
)
raw_rows = [r2d(r) for r in cur.fetchall()]
tiles = build_summary_tiles_from_ranked_rows(raw_rows)
return {"tiles": tiles}
def _entry_dict_from_ranked_row(d: dict[str, Any]) -> dict[str, Any]:
"""Eintragsfelder inkl. type_key/type_label für KI-Kontext."""
out = {k: d[k] for k in _REFERENCE_ENTRY_KEYS if k in d}
return normalize_reference_row(out)
def get_profile_reference_values_current_snapshot(profile_id: str) -> dict[str, Any]:
"""
Layer 1: Alle **aktuellen** Referenzwerte (jüngster Eintrag pro aktivem Typ), Katalog-Sortierung.
Struktur: ``items`` = Liste mit ``type_key``, ``type_label``, ``value_data_type``,
``type_sort_order``, ``latest`` (vollständiger Eintrag).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
WITH ranked AS (
SELECT
v.id,
v.profile_id,
v.reference_value_type_id,
v.effective_date,
v.value_numeric,
v.value_text,
v.unit,
v.source,
v.confidence,
v.method,
v.notes,
v.extra,
v.created_at,
v.updated_at,
rt.key AS type_key,
rt.label AS type_label,
rt.sort_order AS type_sort_order,
rt.value_data_type,
ROW_NUMBER() OVER (
PARTITION BY v.reference_value_type_id
ORDER BY v.effective_date DESC, v.created_at DESC
) AS rn
FROM profile_reference_values v
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
WHERE v.profile_id = %s AND rt.active = TRUE
)
SELECT * FROM ranked WHERE rn = 1
ORDER BY type_sort_order ASC, type_key ASC
""",
(profile_id,),
)
raw_rows = [r2d(r) for r in cur.fetchall()]
items: list[dict[str, Any]] = []
for row in raw_rows:
row.pop("rn", None)
vdt = (row.get("value_data_type") or "decimal").strip().lower()
latest = _entry_dict_from_ranked_row(row)
items.append(
{
"type_key": row.get("type_key"),
"type_label": row.get("type_label"),
"value_data_type": vdt,
"type_sort_order": int(row.get("type_sort_order") or 0),
"latest": latest,
}
)
return {
"schema": "profile_reference_values_current_v1",
"count": len(items),
"items": items,
}
def get_profile_reference_values_recent_snapshot(
profile_id: str,
*,
limit_per_type: int = 5,
date_from: Optional[date | str] = None,
date_to: Optional[date | str] = None,
) -> dict[str, Any]:
"""
Layer 1: Pro Referenztyp die **letzten N** Einträge (neueste zuerst), optional nach
``effective_date`` gefiltert.
``date_from`` / ``date_to``: inclusive; als ``date`` oder ISO-``YYYY-MM-DD``-String.
"""
lim = max(1, min(int(limit_per_type), 50))
df = date_from
dt = date_to
if isinstance(df, str) and df.strip():
df = date.fromisoformat(df.strip())
elif df is not None and not isinstance(df, date):
df = None
if isinstance(dt, str) and dt.strip():
dt = date.fromisoformat(dt.strip())
elif dt is not None and not isinstance(dt, date):
dt = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
WITH filtered AS (
SELECT
v.id,
v.profile_id,
v.reference_value_type_id,
v.effective_date,
v.value_numeric,
v.value_text,
v.unit,
v.source,
v.confidence,
v.method,
v.notes,
v.extra,
v.created_at,
v.updated_at,
rt.key AS type_key,
rt.label AS type_label,
rt.sort_order AS type_sort_order,
rt.value_data_type
FROM profile_reference_values v
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
WHERE v.profile_id = %s
AND rt.active = TRUE
AND (%s::date IS NULL OR v.effective_date >= %s::date)
AND (%s::date IS NULL OR v.effective_date <= %s::date)
),
ranked AS (
SELECT
f.*,
ROW_NUMBER() OVER (
PARTITION BY f.reference_value_type_id
ORDER BY f.effective_date DESC, f.created_at DESC
) AS rn
FROM filtered f
)
SELECT * FROM ranked WHERE rn <= %s
ORDER BY type_sort_order ASC, type_key ASC, rn ASC
""",
(profile_id, df, df, dt, dt, lim),
)
raw_rows = [r2d(r) for r in cur.fetchall()]
by_type: dict[str, list[dict[str, Any]]] = {}
type_order: list[str] = []
seen: set[str] = set()
for row in raw_rows:
row.pop("rn", None)
tk = row.get("type_key") or ""
if tk not in seen:
seen.add(tk)
type_order.append(tk)
entry = _entry_dict_from_ranked_row(row)
by_type.setdefault(tk, []).append(entry)
return {
"schema": "profile_reference_values_recent_v1",
"limit_per_type": lim,
"date_from": df.isoformat() if isinstance(df, date) else None,
"date_to": dt.isoformat() if isinstance(dt, date) else None,
"ordered_type_keys": type_order,
"by_type_key": by_type,
}