mitai-jinkendo/backend/data_layer/reference_values.py
Lars 932bceb1e1
All checks were successful
Deploy Development / deploy (push) Successful in 47s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 23s
feat: Update reference values and introduce pilot visualization module
- Bumped version of reference_values module to 1.3.0.
- Added new imports and functionality for reference values in the backend, enhancing data retrieval.
- Introduced a new PilotVizPage in the frontend for visualizing data, linked from the SettingsPage for easy access.
- Updated routing in App.jsx to include the new pilot visualization route.
2026-04-07 10:15:13 +02:00

180 lines
6.3 KiB
Python

"""
Reference values — Layer 1 (read paths + structured rows)
Structured retrieval for profile reference values and the active type catalog.
Mutations (INSERT/UPDATE/DELETE) stay in routers/reference_values.py with validation.
Dates are normalized to ISO strings; Decimals to float — suitable for JSON/API layers.
"""
from __future__ import annotations
from decimal import Decimal
from typing import Any, Optional
from db import get_cursor, get_db, r2d
def normalize_reference_row(d: Optional[dict[str, Any]]) -> dict[str, Any]:
"""Normalize DB row dict for JSON (dates → ISO, Decimal → float)."""
if not d:
return d
out = dict(d)
for k in ("effective_date", "created_at", "updated_at"):
v = out.get(k)
if v is not None and hasattr(v, "isoformat"):
out[k] = v.isoformat()
vn = out.get("value_numeric")
if vn is not None and isinstance(vn, Decimal):
out["value_numeric"] = float(vn)
return out
def fetch_reference_type_by_key(cur, key: str, require_active: bool = True) -> Optional[dict[str, Any]]:
"""Single type row by key; for use inside router transactions (shared cursor)."""
q = (
"SELECT id, key, label, description, default_unit, active, category, "
"value_data_type, validation_rules, metadata "
"FROM reference_value_types WHERE key = %s "
)
if require_active:
q += "AND active = TRUE "
cur.execute(q, (key,))
row = cur.fetchone()
return r2d(row) if row else None
def list_active_reference_value_types_data() -> list[dict[str, Any]]:
"""All active reference_value_types rows, catalog sort order."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
id, key, label, description, default_unit, sort_order, active,
category, value_data_type, validation_rules, metadata, created_at
FROM reference_value_types
WHERE active = TRUE
ORDER BY sort_order ASC, id ASC
"""
)
return [normalize_reference_row(r2d(r)) for r in cur.fetchall()]
def list_profile_reference_values_for_type(
profile_id: str, type_key: str
) -> Optional[list[dict[str, Any]]]:
"""
Historical entries for one type, newest first.
Returns None if type_key does not resolve to an active type.
"""
with get_db() as conn:
cur = get_cursor(conn)
t = fetch_reference_type_by_key(cur, type_key, require_active=True)
if not t:
return None
cur.execute(
"""
SELECT
v.id,
v.profile_id,
v.reference_value_type_id,
v.effective_date,
v.value_numeric,
v.value_text,
v.unit,
v.source,
v.confidence,
v.method,
v.notes,
v.extra,
v.created_at,
v.updated_at,
rt.key AS type_key,
rt.label AS type_label
FROM profile_reference_values v
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
WHERE v.profile_id = %s AND rt.key = %s
ORDER BY v.effective_date DESC, v.created_at DESC
""",
(profile_id, t["key"]),
)
return [normalize_reference_row(r2d(r)) for r in cur.fetchall()]
def build_summary_tiles_from_ranked_rows(raw_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Build /summary.tile payloads from SQL rows (rn 1..2 per type).
Each row still contains rn, type_sort_order, value_data_type before stripping.
"""
by_key: dict[str, dict[str, Any]] = {}
skip_cols = frozenset({"rn", "type_sort_order", "value_data_type"})
for row in raw_rows:
rn = int(row.get("rn") or 0)
key = row["type_key"]
if key not in by_key:
by_key[key] = {
"type_key": key,
"type_label": row.get("type_label") or key,
"value_data_type": (row.get("value_data_type") or "decimal").strip().lower(),
"sort_key": (row.get("type_sort_order") or 0, key),
"latest": None,
"previous": None,
}
entry = {k: v for k, v in row.items() if k not in skip_cols}
api_entry = normalize_reference_row(entry)
if rn == 1:
by_key[key]["latest"] = api_entry
elif rn == 2:
by_key[key]["previous"] = api_entry
tiles = sorted(by_key.values(), key=lambda t: t["sort_key"])
for t in tiles:
t.pop("sort_key", None)
return tiles
def get_profile_reference_values_summary(profile_id: str) -> dict[str, Any]:
"""latest + previous entry per type (active types only), tiles sorted like catalog."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
WITH ranked AS (
SELECT
v.id,
v.profile_id,
v.reference_value_type_id,
v.effective_date,
v.value_numeric,
v.value_text,
v.unit,
v.source,
v.confidence,
v.method,
v.notes,
v.extra,
v.created_at,
v.updated_at,
rt.key AS type_key,
rt.label AS type_label,
rt.sort_order AS type_sort_order,
rt.value_data_type,
ROW_NUMBER() OVER (
PARTITION BY v.reference_value_type_id
ORDER BY v.effective_date DESC, v.created_at DESC
) AS rn
FROM profile_reference_values v
JOIN reference_value_types rt ON rt.id = v.reference_value_type_id
WHERE v.profile_id = %s AND rt.active = TRUE
)
SELECT * FROM ranked WHERE rn <= 2
ORDER BY type_sort_order ASC, type_key ASC, rn ASC
""",
(profile_id,),
)
raw_rows = [r2d(r) for r in cur.fetchall()]
tiles = build_summary_tiles_from_ranked_rows(raw_rows)
return {"tiles": tiles}