shinkan-jinkendo/backend/tenant_context.py
Lars 4b6fd49940
Some checks failed
Deploy Development / deploy (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Failing after 52s
feat: implement tenant context resolution and update profiles API
- Introduced tenant context resolution in the profiles API, allowing for effective club identification based on user memberships.
- Updated the `GET /profiles/me` endpoint to return `effective_club_id` and removed reliance on the deprecated `X-Active-Club-Id` header.
- Bumped application version to 0.8.22 in both backend and frontend files.
- Enhanced changelog to document the new version and changes made in this release.
2026-05-05 21:42:56 +02:00

168 lines
5.5 KiB
Python

"""
Request-weiter Mandanten-Kontext (ACCESS_LAYER_AND_GOVERNANCE_PLAN.md, Stufe B).
Zielt auf einheitliche Auflösung aus Session + Header X-Active-Club-Id + Profilfeld active_club_id.
Router können Depends(get_tenant_context) nutzen oder resolve_tenant_context mit bereits geladenen Mitgliedschaften (ein DB-Block).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from fastapi import Depends, Header, HTTPException
from auth import require_auth
from club_tenancy import is_platform_admin, memberships_with_roles
from db import get_db, get_cursor
def _club_exists(cur, club_id: int) -> bool:
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
return cur.fetchone() is not None
def parse_active_club_header(raw: Optional[str]) -> Optional[int]:
"""Parst X-Active-Club-Id; leer → None. Ungültig → HTTP 400."""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
try:
v = int(s)
except ValueError:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
if v < 1:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
return v
@dataclass
class TenantContext:
profile_id: int
global_role: str
# Header > gespeichertes Profil > Fallback; Plattform-Admin ohne Header oft None
effective_club_id: Optional[int]
club_ids: frozenset[int]
memberships: List[Dict[str, Any]]
def resolve_tenant_context(
cur,
*,
profile_id: int,
global_role: str,
header_raw: Optional[str],
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
) -> TenantContext:
"""
Mitgliedschaften: wenn nicht übergeben, wird aus der DB geladen (aktive Mitgliedschaften).
Auflösung effective_club_id:
- Plattform-Admin: Header setzt beliebigen existierenden Verein; ohne Header → None.
- Sonst: gültiger Header zwingend Mitgliedschaft — bei ``reject`` sonst 403, bei ``ignore`` wie ohne Header.
Ohne gültigen Header: gespeichertes active_club_id wenn Mitglied; sonst einziger Verein;
bei mehreren ohne gültige Vorgabe → min(club_ids) (Fallback).
"""
role_lc = (global_role or "").lower()
header_cid = parse_active_club_header(header_raw)
if memberships is None:
memberships = memberships_with_roles(cur, profile_id, active_only=True)
club_ids = frozenset(int(r["id"]) for r in memberships if r.get("id") is not None)
if is_platform_admin(role_lc):
if header_cid is not None:
if not _club_exists(cur, header_cid):
raise HTTPException(status_code=400, detail="Verein nicht gefunden")
effective = header_cid
else:
effective = None
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=memberships,
)
chosen_header = header_cid
if chosen_header is not None and chosen_header not in club_ids:
if invalid_header_policy == "reject":
raise HTTPException(
status_code=403,
detail="Keine Mitgliedschaft im gewählten Verein",
)
chosen_header = None
if chosen_header is not None:
effective = chosen_header
elif stored_active_club_id is not None and stored_active_club_id in club_ids:
effective = stored_active_club_id
elif len(club_ids) == 1:
effective = next(iter(club_ids))
elif len(club_ids) == 0:
effective = None
else:
effective = min(club_ids)
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=memberships,
)
def get_tenant_context(
session: dict = Depends(require_auth),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""FastAPI-Dependency: öffnet eine DB-Verbindung und liefert TenantContext."""
pid = int(session["profile_id"])
role = session.get("role") or ""
stored: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,))
row = cur.fetchone()
if row is not None:
ac = row.get("active_club_id")
if ac is not None:
stored = int(ac)
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=x_active_club_id,
memberships=None,
stored_active_club_id=stored,
)
def tenant_context_from_session_only(
cur,
session: dict,
header_raw: Optional[str],
*,
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
) -> TenantContext:
"""Variante ohne FastAPI (Tests / gemeinsamer Cursor mit anderen Queries)."""
pid = int(session["profile_id"])
role = session.get("role") or ""
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=header_raw,
memberships=memberships,
stored_active_club_id=stored_active_club_id,
invalid_header_policy=invalid_header_policy,
)