""" Request-weiter Mandanten-Kontext (ACCESS_LAYER_AND_GOVERNANCE_PLAN.md, Stufe B). Zielt auf einheitliche Auflösung aus Session + Header X-Active-Club-Id + Profilfeld active_club_id. Router können Depends(get_tenant_context) nutzen oder resolve_tenant_context mit bereits geladenen Mitgliedschaften (ein DB-Block). """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional from fastapi import Depends, Header, HTTPException from auth import require_auth, require_auth_flexible from club_tenancy import is_platform_admin, memberships_with_roles from db import get_db, get_cursor def _club_exists(cur, club_id: int) -> bool: cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,)) return cur.fetchone() is not None def parse_active_club_header(raw: Optional[str]) -> Optional[int]: """Parst X-Active-Club-Id; leer → None. Ungültig → HTTP 400.""" if raw is None: return None s = str(raw).strip() if not s: return None try: v = int(s) except ValueError: raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig") if v < 1: raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig") return v def library_content_visibility_sql( *, alias: str, profile_id: int, role: str, effective_club_id: Optional[int], ) -> tuple[str, List[Any]]: """ WHERE-Baustein für Bibliothekslisten (Übungen, Vorlagen, Rahmenprogramme): official, eigene private, club nur im aktiven Vereinskontext (effective_club_id). Plattform-Admin: keine Einschränkung (TRUE). Ohne effective_club_id: kein club-Zweig (nur official + private). """ if is_platform_admin(role): return "TRUE", [] parts: List[str] = [ f"{alias}.visibility = 'official'", f"({alias}.visibility = 'private' AND {alias}.created_by = %s)", ] params: List[Any] = [profile_id] if effective_club_id is not None: parts.append( f"""( {alias}.visibility = 'club' AND {alias}.club_id IS NOT NULL AND {alias}.club_id = %s AND EXISTS ( SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = {alias}.club_id AND cm.status = 'active' ) )""" ) params.extend([effective_club_id, profile_id]) return "(" + " OR ".join(parts) + ")", params @dataclass class TenantContext: profile_id: int global_role: str # Header > gespeichertes Profil > Fallback; Plattform-Admin ohne Header: Profil-Verein wenn existent effective_club_id: Optional[int] club_ids: frozenset[int] memberships: List[Dict[str, Any]] def resolve_tenant_context( cur, *, profile_id: int, global_role: str, header_raw: Optional[str], memberships: Optional[List[Dict[str, Any]]] = None, stored_active_club_id: Optional[int] = None, invalid_header_policy: str = "reject", ) -> TenantContext: """ Mitgliedschaften: wenn nicht übergeben, wird aus der DB geladen (aktive Mitgliedschaften). Auflösung effective_club_id: - Plattform-Admin: Header setzt beliebigen existierenden Verein; ohne Header → gespeichertes active_club_id falls der Verein existiert, sonst None. - Sonst: gültiger Header zwingend Mitgliedschaft — bei ``reject`` sonst 403, bei ``ignore`` wie ohne Header. Ohne gültigen Header: gespeichertes active_club_id wenn Mitglied; sonst einziger Verein; bei mehreren ohne gültige Vorgabe → min(club_ids) (Fallback). """ role_lc = (global_role or "").lower() header_cid = parse_active_club_header(header_raw) if memberships is None: memberships = memberships_with_roles(cur, profile_id, active_only=True) club_ids = frozenset(int(r["id"]) for r in memberships if r.get("id") is not None) if is_platform_admin(role_lc): if header_cid is not None: if not _club_exists(cur, header_cid): raise HTTPException(status_code=400, detail="Verein nicht gefunden") effective = header_cid elif ( stored_active_club_id is not None and _club_exists(cur, stored_active_club_id) ): effective = stored_active_club_id else: effective = None return TenantContext( profile_id=profile_id, global_role=role_lc, effective_club_id=effective, club_ids=club_ids, memberships=memberships, ) chosen_header = header_cid if chosen_header is not None and chosen_header not in club_ids: if invalid_header_policy == "reject": raise HTTPException( status_code=403, detail="Keine Mitgliedschaft im gewählten Verein", ) chosen_header = None if chosen_header is not None: effective = chosen_header elif stored_active_club_id is not None and stored_active_club_id in club_ids: effective = stored_active_club_id elif len(club_ids) == 1: effective = next(iter(club_ids)) elif len(club_ids) == 0: effective = None else: effective = min(club_ids) return TenantContext( profile_id=profile_id, global_role=role_lc, effective_club_id=effective, club_ids=club_ids, memberships=memberships, ) def get_tenant_context( session: dict = Depends(require_auth), x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"), ) -> TenantContext: """FastAPI-Dependency: öffnet eine DB-Verbindung und liefert TenantContext.""" pid = int(session["profile_id"]) role = session.get("role") or "" stored: Optional[int] = None with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,)) row = cur.fetchone() if row is not None: ac = row.get("active_club_id") if ac is not None: stored = int(ac) return resolve_tenant_context( cur, profile_id=pid, global_role=role, header_raw=x_active_club_id, memberships=None, stored_active_club_id=stored, ) def get_tenant_context_flexible( session: dict = Depends(require_auth_flexible), x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"), ) -> TenantContext: """ Wie get_tenant_context, aber Auth per Header oder Query ?ssetoken (für /