""" API-Gates für Onboarding (Phase A — MEMBERSHIP_RBAC_DECISIONS_2026-06.md §1.1). Blockiert Domänen-APIs für unverified / verified_pending_club vor dem Router. """ from __future__ import annotations import os import re from typing import Optional, Tuple from account_lifecycle import resolve_account_state from club_tenancy import memberships_with_roles # Öffentlich ohne Session PUBLIC_API_PREFIXES = ( "/api/auth/login", "/api/auth/register", "/api/auth/forgot-password", "/api/auth/reset-password", "/api/auth/verify/", "/api/legal-documents/", "/api/clubs/public-directory", "/api/version", "/api/health/", "/health", ) # Mit Session, unabhängig vom account_state (Logout, Profil lesen, …) AUTH_INFRA_PREFIXES = ( "/api/auth/logout", "/api/auth/me", "/api/auth/status", "/api/auth/pin", "/api/auth/resend-verification", "/api/profiles/me", "/api/me/entitlements", ) # Zusätzlich für verified_pending_club (Verein bewerben) PENDING_CLUB_PREFIXES = ( "/api/me/club-join-requests", "/api/me/club-creation-requests", ) _PROFILE_MUTATION_RE = re.compile(r"^/api/profiles/(\d+)$") def api_onboarding_gate_enabled() -> bool: """Produktions-Gate aktiv (ACCOUNT_GATE_API_ENFORCE=0 zum Abschalten).""" return os.getenv("ACCOUNT_GATE_API_ENFORCE", "1").strip() == "1" def _middleware_db_lookup_enabled() -> bool: """ Middleware-Session-Lookup nur mit echter DB (nicht in pytest TestClient ohne Postgres). """ if os.getenv("SKIP_DB_MIGRATE", "").strip().lower() in ("1", "true", "yes"): return False if os.getenv("PYTEST_CURRENT_TEST"): return False return True def normalize_api_path(path: str) -> str: p = (path or "").split("?", 1)[0].strip() if not p.startswith("/"): p = "/" + p if len(p) > 1 and p.endswith("/"): p = p[:-1] return p def is_public_api_path(path: str) -> bool: p = normalize_api_path(path) return any(p == pref or p.startswith(pref) for pref in PUBLIC_API_PREFIXES) def _path_allowed_for_state(path: str, method: str, account_state: str, profile_id: int) -> bool: p = normalize_api_path(path) m = (method or "GET").upper() for pref in AUTH_INFRA_PREFIXES: if p == pref or p.startswith(pref + "/"): return True match = _PROFILE_MUTATION_RE.match(p) if match and m in ("PUT", "PATCH") and int(match.group(1)) == int(profile_id): return True if account_state == "unverified": return False if account_state == "verified_pending_club": for pref in PENDING_CLUB_PREFIXES: if p == pref or p.startswith(pref + "/"): return True return False return True def resolve_account_state_for_token(cur, session_row: dict) -> str: profile_id = int(session_row["profile_id"]) role = (session_row.get("role") or "").lower() cur.execute( "SELECT COALESCE(email_verified, false) AS email_verified FROM profiles WHERE id = %s", (profile_id,), ) prof = cur.fetchone() email_verified = bool(prof.get("email_verified")) if prof else False memberships = memberships_with_roles(cur, profile_id, active_only=True) has_active = len(memberships) > 0 return resolve_account_state( email_verified=email_verified, global_role=role, has_active_membership=has_active, ) def check_api_onboarding_gate( *, path: str, method: str, profile_id: int, account_state: str, ) -> Tuple[bool, Optional[str]]: """ Returns (allowed, reason). active_member / platform_admin → immer erlaubt (Domain). """ if not api_onboarding_gate_enabled(): return True, None if account_state in ("active_member", "platform_admin"): return True, None if _path_allowed_for_state(path, method, account_state, profile_id): return True, None return False, f"account_state_{account_state}" def evaluate_request_gate(token: Optional[str], path: str, method: str) -> Tuple[bool, Optional[str], Optional[str]]: """ Vollständige Prüfung inkl. Session-Lookup. Returns: allowed, reason, account_state (für Logging) """ if not api_onboarding_gate_enabled() or not _middleware_db_lookup_enabled(): return True, None, None p = normalize_api_path(path) if not p.startswith("/api/"): return True, None, None if is_public_api_path(p): return True, None, None if not token: return True, None, None from auth import get_session from db import get_db, get_cursor session = get_session(token) if not session: return True, None, None profile_id = int(session["profile_id"]) with get_db() as conn: cur = get_cursor(conn) account_state = resolve_account_state_for_token(cur, session) allowed, reason = check_api_onboarding_gate( path=p, method=method, profile_id=profile_id, account_state=account_state, ) return allowed, reason, account_state