shinkan-jinkendo/backend/tenant_context.py
Lars 30dc30c7aa
Some checks failed
Deploy Development / deploy (push) Successful in 43s
Test Suite / pytest-backend (push) Failing after 0s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 14s
Test Suite / k6 /health Baseline (push) Failing after 4m0s
Test Suite / playwright-tests (push) Failing after 3m41s
Enhance Tenant Context and Access Control Features
- Introduced `email_verified` and `account_state` attributes in the `TenantContext` to improve user state management.
- Updated the `resolve_tenant_context` function to dynamically fetch `email_verified` status from the database and determine `account_state` based on user roles and memberships.
- Implemented `assert_min_account_state` checks across various endpoints to enforce access control based on user account status.
- Incremented version to 1.1.0 in version.py to reflect these enhancements in tenant context management and access control.
2026-06-06 21:10:52 +02:00

326 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Request-weiter Mandanten-Kontext (ACCESS_LAYER_AND_GOVERNANCE_PLAN.md, Stufe B).
Zielt auf einheitliche Auflösung aus Session + Header X-Active-Club-Id + Profilfeld active_club_id.
Router können Depends(get_tenant_context) nutzen oder resolve_tenant_context mit bereits geladenen Mitgliedschaften (ein DB-Block).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from fastapi import Depends, Header, HTTPException
from auth import require_auth, require_auth_flexible
from account_lifecycle import resolve_account_state
from club_tenancy import is_platform_admin, memberships_with_roles
from db import get_db, get_cursor
def _club_exists(cur, club_id: int) -> bool:
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
return cur.fetchone() is not None
def memberships_for_tenant_resolution(
memberships: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Nur Zeilen mit aktivem Vereinszugang (cm.status = 'active').
Wird genutzt, wenn /profiles/me alle Mitgliedschaften inkl. inaktiver liefert.
"""
out: List[Dict[str, Any]] = []
for r in memberships:
st_raw = r.get("membership_status")
st = str(st_raw if st_raw is not None else "active").strip().lower()
if st == "active":
out.append(r)
return out
def parse_active_club_header(raw: Optional[str]) -> Optional[int]:
"""Parst X-Active-Club-Id; leer → None. Ungültig → HTTP 400."""
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
try:
v = int(s)
except ValueError:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
if v < 1:
raise HTTPException(status_code=400, detail="X-Active-Club-Id ungültig")
return v
def library_content_visibility_sql(
*,
alias: str,
profile_id: int,
role: str,
effective_club_id: Optional[int],
) -> tuple[str, List[Any]]:
"""
WHERE-Baustein für Bibliothekslisten (Übungen, Vorlagen, Rahmenprogramme, …):
- official immer
- private: eigene (Norm) bzw. alle (Plattform-Admin/Superadmin)
- club: nur mit **aktivem** Vereinszugang; Existenz einer nur **inactive** Mitgliedschaft schließt
aus — auch bei Plattform-Rolle. Ist man **kein** Mitglied des Vereins, behalten Plattform-Admins
Zugriff (Audit).
Für Nicht-Plattform: club-Zweig nur mit effective_club_id (Mandantenfilter).
"""
plat = is_platform_admin(role)
parts: List[str] = [f"{alias}.visibility = 'official'"]
params: List[Any] = []
if plat:
parts.append(f"({alias}.visibility = 'private')")
else:
parts.append(f"({alias}.visibility = 'private' AND {alias}.created_by = %s)")
params.append(profile_id)
club_ok_plat = (
f"({alias}.visibility = 'club' AND {alias}.club_id IS NOT NULL AND ("
f"EXISTS (SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = {alias}.club_id "
f"AND cm.status = 'active') OR NOT EXISTS (SELECT 1 FROM club_members cm2 WHERE cm2.profile_id = %s "
f"AND cm2.club_id = {alias}.club_id)))"
)
if plat:
parts.append(club_ok_plat)
params.extend([profile_id, profile_id])
elif effective_club_id is not None:
parts.append(
f"""(
{alias}.visibility = 'club'
AND {alias}.club_id IS NOT NULL
AND {alias}.club_id = %s
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s AND cm.club_id = {alias}.club_id AND cm.status = 'active'
)
)"""
)
params.extend([effective_club_id, profile_id])
return "(" + " OR ".join(parts) + ")", params
def club_library_visibility_sql(
*,
alias: str,
profile_id: int,
effective_club_id: Optional[int],
) -> tuple[str, List[Any]]:
"""
Nur Inhalte des aktiven Vereins (visibility=club, club_id=active).
Für Skill-Vergleiche im Vereinskontext — ohne official/private anderer Mandanten.
"""
if effective_club_id is None:
return "(1=0)", []
return (
f"""(
{alias}.visibility = 'club'
AND {alias}.club_id = %s
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = {alias}.club_id
AND cm.status = 'active'
)
)""",
[effective_club_id, profile_id],
)
@dataclass
class TenantContext:
profile_id: int
global_role: str
# Header > gespeichertes Profil > Fallback; Plattform-Admin ohne Header: Profil-Verein wenn existent
effective_club_id: Optional[int]
club_ids: frozenset[int]
memberships: List[Dict[str, Any]]
email_verified: bool = True
account_state: str = "active_member"
def resolve_tenant_context(
cur,
*,
profile_id: int,
global_role: str,
header_raw: Optional[str],
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
email_verified: Optional[bool] = None,
) -> TenantContext:
"""
Mitgliedschaften: wenn nicht übergeben, lädt ``active_only=True`` aus der DB.
Übergabe z.B. von ``/profiles/me``: Liste darf auch **deaktivierte** Vereinszugänge
(`membership_status` = inactive) enthalten — für ``club_ids`` und Mandantenwahl werden nur aktive verwendet.
Auflösung effective_club_id:
- Plattform-Admin: Header setzt beliebigen existierenden Verein; ohne Header → gespeichertes
active_club_id falls der Verein existiert, sonst None.
- Sonst: gültiger Header zwingend Mitgliedschaft — bei ``reject`` sonst 403, bei ``ignore`` wie ohne Header.
Ohne gültigen Header: gespeichertes active_club_id wenn Mitglied; sonst einziger Verein;
bei mehreren ohne gültige Vorgabe → min(club_ids) (Fallback).
"""
role_lc = (global_role or "").lower()
header_cid = parse_active_club_header(header_raw)
if memberships is None:
membership_rows = memberships_with_roles(cur, profile_id, active_only=True)
else:
membership_rows = memberships_for_tenant_resolution(memberships)
club_ids = frozenset(int(r["id"]) for r in membership_rows if r.get("id") is not None)
if email_verified is None:
cur.execute(
"SELECT COALESCE(email_verified, false) AS email_verified FROM profiles WHERE id = %s",
(profile_id,),
)
prof_row = cur.fetchone()
email_verified = bool(prof_row.get("email_verified")) if prof_row else False
else:
email_verified = bool(email_verified)
account_state = resolve_account_state(
email_verified=email_verified,
global_role=role_lc,
has_active_membership=len(club_ids) > 0,
)
if is_platform_admin(role_lc):
if header_cid is not None:
if not _club_exists(cur, header_cid):
raise HTTPException(status_code=400, detail="Verein nicht gefunden")
effective = header_cid
elif (
stored_active_club_id is not None
and _club_exists(cur, stored_active_club_id)
):
effective = stored_active_club_id
else:
effective = None
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=membership_rows,
email_verified=email_verified,
account_state=account_state,
)
chosen_header = header_cid
if chosen_header is not None and chosen_header not in club_ids:
if invalid_header_policy == "reject":
raise HTTPException(
status_code=403,
detail="Keine Mitgliedschaft im gewählten Verein",
)
chosen_header = None
if chosen_header is not None:
effective = chosen_header
elif stored_active_club_id is not None and stored_active_club_id in club_ids:
effective = stored_active_club_id
elif len(club_ids) == 1:
effective = next(iter(club_ids))
elif len(club_ids) == 0:
effective = None
else:
effective = min(club_ids)
return TenantContext(
profile_id=profile_id,
global_role=role_lc,
effective_club_id=effective,
club_ids=club_ids,
memberships=membership_rows,
email_verified=email_verified,
account_state=account_state,
)
def get_tenant_context(
session: dict = Depends(require_auth),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""FastAPI-Dependency: öffnet eine DB-Verbindung und liefert TenantContext."""
pid = int(session["profile_id"])
role = session.get("role") or ""
stored: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,))
row = cur.fetchone()
if row is not None:
ac = row.get("active_club_id")
if ac is not None:
stored = int(ac)
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=x_active_club_id,
memberships=None,
stored_active_club_id=stored,
)
def get_tenant_context_flexible(
session: dict = Depends(require_auth_flexible),
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
) -> TenantContext:
"""
Wie get_tenant_context, aber Auth per Header oder Query ?ssetoken (für <img>/<video> ohne Custom-Header).
"""
pid = int(session["profile_id"])
role = session.get("role") or ""
stored: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT active_club_id FROM profiles WHERE id = %s", (pid,))
row = cur.fetchone()
if row is not None:
ac = row.get("active_club_id")
if ac is not None:
stored = int(ac)
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=x_active_club_id,
memberships=None,
stored_active_club_id=stored,
)
def tenant_context_from_session_only(
cur,
session: dict,
header_raw: Optional[str],
*,
memberships: Optional[List[Dict[str, Any]]] = None,
stored_active_club_id: Optional[int] = None,
invalid_header_policy: str = "reject",
) -> TenantContext:
"""Variante ohne FastAPI (Tests / gemeinsamer Cursor mit anderen Queries)."""
pid = int(session["profile_id"])
role = session.get("role") or ""
return resolve_tenant_context(
cur,
profile_id=pid,
global_role=role,
header_raw=header_raw,
memberships=memberships,
stored_active_club_id=stored_active_club_id,
invalid_header_policy=invalid_header_policy,
)