"""
Request-weiter Mandanten-Kontext (ACCESS_LAYER_AND_GOVERNANCE_PLAN.md, Stufe B).
Zielt auf einheitliche Auflösung aus Session + Header X-Active-Club-Id + Profilfeld active_club_id.
Router können Depends(get_tenant_context) nutzen oder resolve_tenant_context mit bereits geladenen Mitgliedschaften (ein DB-Block).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from fastapi import Depends, Header, HTTPException
from auth import require_auth, require_auth_flexible
from club_tenancy import is_platform_admin, memberships_with_roles
from db import get_db, get_cursor
def _club_exists(cur, club_id: int) -> bool:
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
return cur.fetchone() is not None
def memberships_for_tenant_resolution(
memberships: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Nur Zeilen mit aktivem Vereinszugang (cm.status = 'active').
Wird genutzt, wenn /profiles/me alle Mitgliedschaften inkl. inaktiver liefert.
"""
out: List[Dict[str, Any]] = []
for r in memberships:
st_raw = r.get("membership_status")
st = str(st_raw if st_raw is not None else "active").strip().lower()
if st == "active":
out.append(r)
return out
def parse_active_club_header(raw: Optional[str]) -> Optional[int]:
"""Parst X-Active-Club-Id; leer → None. Ungültig → HTTP 400."""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
try:
v = int(s)
except ValueError:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
if v < 1:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
return v
def library_content_visibility_sql(
*,
alias: str,
profile_id: int,
role: str,
effective_club_id: Optional[int],
) -> tuple[str, List[Any]]:
"""
WHERE-Baustein für Bibliothekslisten (Übungen, Vorlagen, Rahmenprogramme, …):
- official immer
- private: eigene (Norm) bzw. alle (Plattform-Admin/Superadmin)
- club: nur mit **aktivem** Vereinszugang; Existenz einer nur **inactive** Mitgliedschaft schließt
aus — auch bei Plattform-Rolle. Ist man **kein** Mitglied des Vereins, behalten Plattform-Admins
Zugriff (Audit).
Für Nicht-Plattform: club-Zweig nur mit effective_club_id (Mandantenfilter).
"""
plat = is_platform_admin(role)
parts: List[str] = [f"{alias}.visibility = 'official'"]
params: List[Any] = []
if plat:
parts.append(f"({alias}.visibility = 'private')")
else:
parts.append(f"({alias}.visibility = 'private' AND {alias}.created_by = %s)")
params.append(profile_id)
club_ok_plat = (
f"({alias}.visibility = 'club' AND {alias}.club_id IS NOT NULL AND ("
f"EXISTS (SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = {alias}.club_id "
f"AND cm.status = 'active') OR NOT EXISTS (SELECT 1 FROM club_members cm2 WHERE cm2.profile_id = %s "
f"AND cm2.club_id = {alias}.club_id)))"
)
if plat:
parts.append(club_ok_plat)
params.extend([profile_id, profile_id])
elif effective_club_id is not None:
parts.append(
f"""(
{alias}.visibility = 'club'
AND {alias}.club_id IS NOT NULL
AND {alias}.club_id = %s
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s AND cm.club_id = {alias}.club_id AND cm.status = 'active'
)
)"""
)
params.extend([effective_club_id, profile_id])
return "(" + " OR ".join(parts) + ")", params
def club_library_visibility_sql(
*,
alias: str,
profile_id: int,
effective_club_id: Optional[int],
) -> tuple[str, List[Any]]:
"""
Nur Inhalte des aktiven Vereins (visibility=club, club_id=active).
Für Skill-Vergleiche im Vereinskontext — ohne official/private anderer Mandanten.
"""
if effective_club_id is None:
return "(1=0)", []
return (
f"""(
{alias}.visibility = 'club'
AND {alias}.club_id = %s
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = {alias}.club_id
AND cm.status = 'active'
)
)""",
[effective_club_id, profile_id],
)
@dataclass
class TenantContext:
profile_id: int
global_role: str
# Header > gespeichertes Profil > Fallback; Plattform-Admin ohne Header: Profil-Verein wenn existent
effective_club_id: Optional[int]
club_ids: frozenset[int]
memberships: List[Dict[str, Any]]
def resolve_tenant_context(
cur,
*,
profile_id: int,
global_role: str,
header_raw: Optional[str],
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
) -> TenantContext:
"""
Mitgliedschaften: wenn nicht übergeben, lädt ``active_only=True`` aus der DB.
Übergabe z. B. von ``/profiles/me``: Liste darf auch **deaktivierte** Vereinszugänge
(`membership_status` = inactive) enthalten — für ``club_ids`` und Mandantenwahl werden nur aktive verwendet.
Auflösung effective_club_id:
- Plattform-Admin: Header setzt beliebigen existierenden Verein; ohne Header → gespeichertes
active_club_id falls der Verein existiert, sonst None.
- Sonst: gültiger Header zwingend Mitgliedschaft — bei ``reject`` sonst 403, bei ``ignore`` wie ohne Header.
Ohne gültigen Header: gespeichertes active_club_id wenn Mitglied; sonst einziger Verein;
bei mehreren ohne gültige Vorgabe → min(club_ids) (Fallback).
"""
role_lc = (global_role or "").lower()
header_cid = parse_active_club_header(header_raw)
if memberships is None:
membership_rows = memberships_with_roles(cur, profile_id, active_only=True)
else:
membership_rows = memberships_for_tenant_resolution(memberships)
club_ids = frozenset(int(r["id"]) for r in membership_rows if r.get("id") is not None)
if is_platform_admin(role_lc):
if header_cid is not None:
if not _club_exists(cur, header_cid):
raise HTTPException(status_code=400, detail="Verein nicht gefunden")
effective = header_cid
elif (
stored_active_club_id is not None
and _club_exists(cur, stored_active_club_id)
):
effective = stored_active_club_id
else:
effective = None
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=membership_rows,
)
chosen_header = header_cid
if chosen_header is not None and chosen_header not in club_ids:
if invalid_header_policy == "reject":
raise HTTPException(
status_code=403,
detail="Keine Mitgliedschaft im gewählten Verein",
)
chosen_header = None
if chosen_header is not None:
effective = chosen_header
elif stored_active_club_id is not None and stored_active_club_id in club_ids:
effective = stored_active_club_id
elif len(club_ids) == 1:
effective = next(iter(club_ids))
elif len(club_ids) == 0:
effective = None
else:
effective = min(club_ids)
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=membership_rows,
)
def get_tenant_context(
session: dict = Depends(require_auth),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""FastAPI-Dependency: öffnet eine DB-Verbindung und liefert TenantContext."""
pid = int(session["profile_id"])
role = session.get("role") or ""
stored: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,))
row = cur.fetchone()
if row is not None:
ac = row.get("active_club_id")
if ac is not None:
stored = int(ac)
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=x_active_club_id,
memberships=None,
stored_active_club_id=stored,
)
def get_tenant_context_flexible(
session: dict = Depends(require_auth_flexible),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""
Wie get_tenant_context, aber Auth per Header oder Query ?ssetoken (für
/