Some checks failed
Deploy Development / deploy (push) Successful in 44s
Test Suite / pytest-backend (push) Successful in 42s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Failing after 1m14s
- Introduced endpoints for managing club creation requests, including fetching, creating, and withdrawing requests. - Updated the onboarding page to allow users to submit new club creation requests and view their existing requests. - Enhanced the admin interface with navigation and routing for club creation requests management. - Incremented version to 0.8.191 to reflect these new features and updates in the application.
179 lines
5.0 KiB
Python
179 lines
5.0 KiB
Python
"""
|
|
API-Gates für Onboarding (Phase A — MEMBERSHIP_RBAC_DECISIONS_2026-06.md §1.1).
|
|
|
|
Blockiert Domänen-APIs für unverified / verified_pending_club vor dem Router.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
from typing import Optional, Tuple
|
|
|
|
from account_lifecycle import resolve_account_state
|
|
from club_tenancy import memberships_with_roles
|
|
|
|
# Öffentlich ohne Session
|
|
PUBLIC_API_PREFIXES = (
|
|
"/api/auth/login",
|
|
"/api/auth/register",
|
|
"/api/auth/forgot-password",
|
|
"/api/auth/reset-password",
|
|
"/api/auth/verify/",
|
|
"/api/legal-documents/",
|
|
"/api/clubs/public-directory",
|
|
"/api/version",
|
|
"/api/health/",
|
|
"/health",
|
|
)
|
|
|
|
# Mit Session, unabhängig vom account_state (Logout, Profil lesen, …)
|
|
AUTH_INFRA_PREFIXES = (
|
|
"/api/auth/logout",
|
|
"/api/auth/me",
|
|
"/api/auth/status",
|
|
"/api/auth/pin",
|
|
"/api/auth/resend-verification",
|
|
"/api/profiles/me",
|
|
"/api/me/entitlements",
|
|
)
|
|
|
|
# Zusätzlich für verified_pending_club (Verein bewerben)
|
|
PENDING_CLUB_PREFIXES = (
|
|
"/api/me/club-join-requests",
|
|
"/api/me/club-creation-requests",
|
|
)
|
|
|
|
_PROFILE_MUTATION_RE = re.compile(r"^/api/profiles/(\d+)$")
|
|
|
|
|
|
def api_onboarding_gate_enabled() -> bool:
|
|
"""Produktions-Gate aktiv (ACCOUNT_GATE_API_ENFORCE=0 zum Abschalten)."""
|
|
return os.getenv("ACCOUNT_GATE_API_ENFORCE", "1").strip() == "1"
|
|
|
|
|
|
def _middleware_db_lookup_enabled() -> bool:
|
|
"""
|
|
Middleware-Session-Lookup nur mit echter DB (nicht in pytest TestClient ohne Postgres).
|
|
"""
|
|
if os.getenv("SKIP_DB_MIGRATE", "").strip().lower() in ("1", "true", "yes"):
|
|
return False
|
|
if os.getenv("PYTEST_CURRENT_TEST"):
|
|
return False
|
|
return True
|
|
|
|
|
|
def normalize_api_path(path: str) -> str:
|
|
p = (path or "").split("?", 1)[0].strip()
|
|
if not p.startswith("/"):
|
|
p = "/" + p
|
|
if len(p) > 1 and p.endswith("/"):
|
|
p = p[:-1]
|
|
return p
|
|
|
|
|
|
def is_public_api_path(path: str) -> bool:
|
|
p = normalize_api_path(path)
|
|
return any(p == pref or p.startswith(pref) for pref in PUBLIC_API_PREFIXES)
|
|
|
|
|
|
def _path_allowed_for_state(path: str, method: str, account_state: str, profile_id: int) -> bool:
|
|
p = normalize_api_path(path)
|
|
m = (method or "GET").upper()
|
|
|
|
for pref in AUTH_INFRA_PREFIXES:
|
|
if p == pref or p.startswith(pref + "/"):
|
|
return True
|
|
|
|
match = _PROFILE_MUTATION_RE.match(p)
|
|
if match and m in ("PUT", "PATCH") and int(match.group(1)) == int(profile_id):
|
|
return True
|
|
|
|
if account_state == "unverified":
|
|
return False
|
|
|
|
if account_state == "verified_pending_club":
|
|
for pref in PENDING_CLUB_PREFIXES:
|
|
if p == pref or p.startswith(pref + "/"):
|
|
return True
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def resolve_account_state_for_token(cur, session_row: dict) -> str:
|
|
profile_id = int(session_row["profile_id"])
|
|
role = (session_row.get("role") or "").lower()
|
|
cur.execute(
|
|
"SELECT COALESCE(email_verified, false) AS email_verified FROM profiles WHERE id = %s",
|
|
(profile_id,),
|
|
)
|
|
prof = cur.fetchone()
|
|
email_verified = bool(prof.get("email_verified")) if prof else False
|
|
memberships = memberships_with_roles(cur, profile_id, active_only=True)
|
|
has_active = len(memberships) > 0
|
|
return resolve_account_state(
|
|
email_verified=email_verified,
|
|
global_role=role,
|
|
has_active_membership=has_active,
|
|
)
|
|
|
|
|
|
def check_api_onboarding_gate(
|
|
*,
|
|
path: str,
|
|
method: str,
|
|
profile_id: int,
|
|
account_state: str,
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Returns (allowed, reason).
|
|
active_member / platform_admin → immer erlaubt (Domain).
|
|
"""
|
|
if not api_onboarding_gate_enabled():
|
|
return True, None
|
|
|
|
if account_state in ("active_member", "platform_admin"):
|
|
return True, None
|
|
|
|
if _path_allowed_for_state(path, method, account_state, profile_id):
|
|
return True, None
|
|
|
|
return False, f"account_state_{account_state}"
|
|
|
|
|
|
def evaluate_request_gate(token: Optional[str], path: str, method: str) -> Tuple[bool, Optional[str], Optional[str]]:
|
|
"""
|
|
Vollständige Prüfung inkl. Session-Lookup.
|
|
Returns: allowed, reason, account_state (für Logging)
|
|
"""
|
|
if not api_onboarding_gate_enabled() or not _middleware_db_lookup_enabled():
|
|
return True, None, None
|
|
|
|
p = normalize_api_path(path)
|
|
if not p.startswith("/api/"):
|
|
return True, None, None
|
|
if is_public_api_path(p):
|
|
return True, None, None
|
|
if not token:
|
|
return True, None, None
|
|
|
|
from auth import get_session
|
|
from db import get_db, get_cursor
|
|
|
|
session = get_session(token)
|
|
if not session:
|
|
return True, None, None
|
|
|
|
profile_id = int(session["profile_id"])
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
account_state = resolve_account_state_for_token(cur, session)
|
|
|
|
allowed, reason = check_api_onboarding_gate(
|
|
path=p,
|
|
method=method,
|
|
profile_id=profile_id,
|
|
account_state=account_state,
|
|
)
|
|
return allowed, reason, account_state
|