shinkan-jinkendo/backend/tenant_context.py
Lars b752883392
All checks were successful
Deploy Development / deploy (push) Successful in 41s
Test Suite / pytest-backend (push) Successful in 23s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 25s
feat: enhance media access and security for exercises
- Updated PostgreSQL binding in docker-compose to restrict access to localhost only.
- Implemented a new API endpoint for secure media file delivery, requiring authentication via token.
- Enhanced governance checks for exercise media access, ensuring only authorized users can retrieve files.
- Updated frontend components to utilize the new media file access method, improving user experience while maintaining security.
- Documented changes in production readiness audit and access layer endpoint audit for clarity on security enhancements.
2026-05-07 10:52:14 +02:00

235 lines
7.7 KiB
Python

"""
Request-weiter Mandanten-Kontext (ACCESS_LAYER_AND_GOVERNANCE_PLAN.md, Stufe B).
Zielt auf einheitliche Auflösung aus Session + Header X-Active-Club-Id + Profilfeld active_club_id.
Router können Depends(get_tenant_context) nutzen oder resolve_tenant_context mit bereits geladenen Mitgliedschaften (ein DB-Block).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from fastapi import Depends, Header, HTTPException
from auth import require_auth, require_auth_flexible
from club_tenancy import is_platform_admin, memberships_with_roles
from db import get_db, get_cursor
def _club_exists(cur, club_id: int) -> bool:
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
return cur.fetchone() is not None
def parse_active_club_header(raw: Optional[str]) -> Optional[int]:
"""Parst X-Active-Club-Id; leer → None. Ungültig → HTTP 400."""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
try:
v = int(s)
except ValueError:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
if v < 1:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
return v
def library_content_visibility_sql(
*,
alias: str,
profile_id: int,
role: str,
effective_club_id: Optional[int],
) -> tuple[str, List[Any]]:
"""
WHERE-Baustein für Bibliothekslisten (Übungen, Vorlagen, Rahmenprogramme):
official, eigene private, club nur im aktiven Vereinskontext (effective_club_id).
Plattform-Admin: keine Einschränkung (TRUE).
Ohne effective_club_id: kein club-Zweig (nur official + private).
"""
if is_platform_admin(role):
return "TRUE", []
parts: List[str] = [
f"{alias}.visibility = 'official'",
f"({alias}.visibility = 'private' AND {alias}.created_by = %s)",
]
params: List[Any] = [profile_id]
if effective_club_id is not None:
parts.append(
f"""(
{alias}.visibility = 'club'
AND {alias}.club_id IS NOT NULL
AND {alias}.club_id = %s
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s AND cm.club_id = {alias}.club_id AND cm.status = 'active'
)
)"""
)
params.extend([effective_club_id, profile_id])
return "(" + " OR ".join(parts) + ")", params
@dataclass
class TenantContext:
profile_id: int
global_role: str
# Header > gespeichertes Profil > Fallback; Plattform-Admin ohne Header oft None
effective_club_id: Optional[int]
club_ids: frozenset[int]
memberships: List[Dict[str, Any]]
def resolve_tenant_context(
cur,
*,
profile_id: int,
global_role: str,
header_raw: Optional[str],
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
) -> TenantContext:
"""
Mitgliedschaften: wenn nicht übergeben, wird aus der DB geladen (aktive Mitgliedschaften).
Auflösung effective_club_id:
- Plattform-Admin: Header setzt beliebigen existierenden Verein; ohne Header → None.
- Sonst: gültiger Header zwingend Mitgliedschaft — bei ``reject`` sonst 403, bei ``ignore`` wie ohne Header.
Ohne gültigen Header: gespeichertes active_club_id wenn Mitglied; sonst einziger Verein;
bei mehreren ohne gültige Vorgabe → min(club_ids) (Fallback).
"""
role_lc = (global_role or "").lower()
header_cid = parse_active_club_header(header_raw)
if memberships is None:
memberships = memberships_with_roles(cur, profile_id, active_only=True)
club_ids = frozenset(int(r["id"]) for r in memberships if r.get("id") is not None)
if is_platform_admin(role_lc):
if header_cid is not None:
if not _club_exists(cur, header_cid):
raise HTTPException(status_code=400, detail="Verein nicht gefunden")
effective = header_cid
else:
effective = None
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=memberships,
)
chosen_header = header_cid
if chosen_header is not None and chosen_header not in club_ids:
if invalid_header_policy == "reject":
raise HTTPException(
status_code=403,
detail="Keine Mitgliedschaft im gewählten Verein",
)
chosen_header = None
if chosen_header is not None:
effective = chosen_header
elif stored_active_club_id is not None and stored_active_club_id in club_ids:
effective = stored_active_club_id
elif len(club_ids) == 1:
effective = next(iter(club_ids))
elif len(club_ids) == 0:
effective = None
else:
effective = min(club_ids)
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=memberships,
)
def get_tenant_context(
session: dict = Depends(require_auth),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""FastAPI-Dependency: öffnet eine DB-Verbindung und liefert TenantContext."""
pid = int(session["profile_id"])
role = session.get("role") or ""
stored: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,))
row = cur.fetchone()
if row is not None:
ac = row.get("active_club_id")
if ac is not None:
stored = int(ac)
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=x_active_club_id,
memberships=None,
stored_active_club_id=stored,
)
def get_tenant_context_flexible(
session: dict = Depends(require_auth_flexible),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""
Wie get_tenant_context, aber Auth per Header oder Query ?ssetoken (für <img>/<video> ohne Custom-Header).
"""
pid = int(session["profile_id"])
role = session.get("role") or ""
stored: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,))
row = cur.fetchone()
if row is not None:
ac = row.get("active_club_id")
if ac is not None:
stored = int(ac)
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=x_active_club_id,
memberships=None,
stored_active_club_id=stored,
)
def tenant_context_from_session_only(
cur,
session: dict,
header_raw: Optional[str],
*,
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
) -> TenantContext:
"""Variante ohne FastAPI (Tests / gemeinsamer Cursor mit anderen Queries)."""
pid = int(session["profile_id"])
role = session.get("role") or ""
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=header_raw,
memberships=memberships,
stored_active_club_id=stored_active_club_id,
invalid_header_policy=invalid_header_policy,
)