shinkan-jinkendo/backend/account_onboarding_gate.py
Lars 8ee8f52e0f
Some checks failed
Deploy Development / deploy (push) Successful in 44s
Test Suite / pytest-backend (push) Successful in 42s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Failing after 1m14s
Add Club Creation Request Management Features
- Introduced endpoints for managing club creation requests, including fetching, creating, and withdrawing requests.
- Updated the onboarding page to allow users to submit new club creation requests and view their existing requests.
- Enhanced the admin interface with navigation and routing for club creation requests management.
- Incremented version to 0.8.191 to reflect these new features and updates in the application.
2026-06-07 07:09:39 +02:00

179 lines
5.0 KiB
Python

"""
API-Gates für Onboarding (Phase A — MEMBERSHIP_RBAC_DECISIONS_2026-06.md §1.1).
Blockiert Domänen-APIs für unverified / verified_pending_club vor dem Router.
"""
from __future__ import annotations
import os
import re
from typing import Optional, Tuple
from account_lifecycle import resolve_account_state
from club_tenancy import memberships_with_roles
# Öffentlich ohne Session
PUBLIC_API_PREFIXES = (
"/api/auth/login",
"/api/auth/register",
"/api/auth/forgot-password",
"/api/auth/reset-password",
"/api/auth/verify/",
"/api/legal-documents/",
"/api/clubs/public-directory",
"/api/version",
"/api/health/",
"/health",
)
# Mit Session, unabhängig vom account_state (Logout, Profil lesen, …)
AUTH_INFRA_PREFIXES = (
"/api/auth/logout",
"/api/auth/me",
"/api/auth/status",
"/api/auth/pin",
"/api/auth/resend-verification",
"/api/profiles/me",
"/api/me/entitlements",
)
# Zusätzlich für verified_pending_club (Verein bewerben)
PENDING_CLUB_PREFIXES = (
"/api/me/club-join-requests",
"/api/me/club-creation-requests",
)
_PROFILE_MUTATION_RE = re.compile(r"^/api/profiles/(\d+)$")
def api_onboarding_gate_enabled() -> bool:
"""Produktions-Gate aktiv (ACCOUNT_GATE_API_ENFORCE=0 zum Abschalten)."""
return os.getenv("ACCOUNT_GATE_API_ENFORCE", "1").strip() == "1"
def _middleware_db_lookup_enabled() -> bool:
"""
Middleware-Session-Lookup nur mit echter DB (nicht in pytest TestClient ohne Postgres).
"""
if os.getenv("SKIP_DB_MIGRATE", "").strip().lower() in ("1", "true", "yes"):
return False
if os.getenv("PYTEST_CURRENT_TEST"):
return False
return True
def normalize_api_path(path: str) -> str:
p = (path or "").split("?", 1)[0].strip()
if not p.startswith("/"):
p = "/" + p
if len(p) > 1 and p.endswith("/"):
p = p[:-1]
return p
def is_public_api_path(path: str) -> bool:
p = normalize_api_path(path)
return any(p == pref or p.startswith(pref) for pref in PUBLIC_API_PREFIXES)
def _path_allowed_for_state(path: str, method: str, account_state: str, profile_id: int) -> bool:
p = normalize_api_path(path)
m = (method or "GET").upper()
for pref in AUTH_INFRA_PREFIXES:
if p == pref or p.startswith(pref + "/"):
return True
match = _PROFILE_MUTATION_RE.match(p)
if match and m in ("PUT", "PATCH") and int(match.group(1)) == int(profile_id):
return True
if account_state == "unverified":
return False
if account_state == "verified_pending_club":
for pref in PENDING_CLUB_PREFIXES:
if p == pref or p.startswith(pref + "/"):
return True
return False
return True
def resolve_account_state_for_token(cur, session_row: dict) -> str:
profile_id = int(session_row["profile_id"])
role = (session_row.get("role") or "").lower()
cur.execute(
"SELECT COALESCE(email_verified, false) AS email_verified FROM profiles WHERE id = %s",
(profile_id,),
)
prof = cur.fetchone()
email_verified = bool(prof.get("email_verified")) if prof else False
memberships = memberships_with_roles(cur, profile_id, active_only=True)
has_active = len(memberships) > 0
return resolve_account_state(
email_verified=email_verified,
global_role=role,
has_active_membership=has_active,
)
def check_api_onboarding_gate(
*,
path: str,
method: str,
profile_id: int,
account_state: str,
) -> Tuple[bool, Optional[str]]:
"""
Returns (allowed, reason).
active_member / platform_admin → immer erlaubt (Domain).
"""
if not api_onboarding_gate_enabled():
return True, None
if account_state in ("active_member", "platform_admin"):
return True, None
if _path_allowed_for_state(path, method, account_state, profile_id):
return True, None
return False, f"account_state_{account_state}"
def evaluate_request_gate(token: Optional[str], path: str, method: str) -> Tuple[bool, Optional[str], Optional[str]]:
"""
Vollständige Prüfung inkl. Session-Lookup.
Returns: allowed, reason, account_state (für Logging)
"""
if not api_onboarding_gate_enabled() or not _middleware_db_lookup_enabled():
return True, None, None
p = normalize_api_path(path)
if not p.startswith("/api/"):
return True, None, None
if is_public_api_path(p):
return True, None, None
if not token:
return True, None, None
from auth import get_session
from db import get_db, get_cursor
session = get_session(token)
if not session:
return True, None, None
profile_id = int(session["profile_id"])
with get_db() as conn:
cur = get_cursor(conn)
account_state = resolve_account_state_for_token(cur, session)
allowed, reason = check_api_onboarding_gate(
path=p,
method=method,
profile_id=profile_id,
account_state=account_state,
)
return allowed, reason, account_state