""" Capability-Auflösung (CAPABILITY_CATALOG.v1.md, M3 C1). Phase 2: probe_capability — JSON-Log, kein Block (CAPABILITY_ENFORCE=0). Phase 3+: CAPABILITY_ENFORCE=1 — HTTP 403 bei fehlender Capability. """ from __future__ import annotations import os from typing import Any, Dict, List, Optional, TYPE_CHECKING from fastapi import HTTPException from account_lifecycle import account_state_satisfies from club_tenancy import is_platform_admin from db import get_db, get_cursor if TYPE_CHECKING: from tenant_context import TenantContext def capability_enforcement_enabled() -> bool: v = os.getenv("CAPABILITY_ENFORCE", "0").strip().lower() return v in ("1", "true", "yes") def club_roles_in_club(tenant: "TenantContext", club_id: Optional[int]) -> List[str]: if club_id is None: return [] for m in tenant.memberships or []: if int(m.get("id") or 0) == int(club_id): roles = m.get("roles") or [] if hasattr(roles, "tolist"): roles = roles.tolist() return list(roles) return [] def check_capability( cur, tenant: "TenantContext", capability_id: str, *, club_id: Optional[int] = None, ) -> Dict[str, Any]: """ Prüft eine Capability für Tenant + optionalen Vereinskontext. Returns: allowed, reason, account_state, club_roles, linked_feature_id """ account_state = getattr(tenant, "account_state", "active_member") eff_club = club_id if club_id is not None else tenant.effective_club_id club_roles = club_roles_in_club(tenant, eff_club) if eff_club is not None else [] cur.execute( """ SELECT id, min_account_state, linked_feature_id, active, domain FROM capabilities WHERE id = %s """, (capability_id,), ) cap = cur.fetchone() if not cap or not cap.get("active"): return { "allowed": False, "reason": "capability_not_found", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": None, } min_state = cap.get("min_account_state") or "active_member" if not account_state_satisfies(account_state, min_state): return { "allowed": False, "reason": "account_state_insufficient", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } domain = (cap.get("domain") or "").strip().lower() # Kontingent-Bypass (konfigurierbar per portal_role / profile grants, ohne Plattform-Admin-Pflicht) if domain == "quota_bypass": role_lc = (tenant.global_role or "").lower() cur.execute( """ SELECT 1 FROM portal_role_capability_grants WHERE portal_role = %s AND capability_id = %s LIMIT 1 """, (role_lc, capability_id), ) if cur.fetchone(): return { "allowed": True, "reason": "quota_bypass_portal_grant", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } cur.execute( """ SELECT 1 FROM profile_capability_grants WHERE profile_id = %s AND capability_id = %s LIMIT 1 """, (tenant.profile_id, capability_id), ) if cur.fetchone(): return { "allowed": True, "reason": "quota_bypass_profile_grant", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } return { "allowed": False, "reason": "quota_bypass_denied", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } # Plattform-Capabilities if domain == "platform" or capability_id.startswith("platform."): role_lc = (tenant.global_role or "").lower() if not is_platform_admin(role_lc): return { "allowed": False, "reason": "portal_role_required", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } cur.execute( """ SELECT 1 FROM portal_role_capability_grants WHERE portal_role = %s AND capability_id = %s LIMIT 1 """, (role_lc, capability_id), ) if not cur.fetchone(): cur.execute( """ SELECT 1 FROM profile_capability_grants WHERE profile_id = %s AND capability_id = %s LIMIT 1 """, (tenant.profile_id, capability_id), ) if not cur.fetchone(): return { "allowed": False, "reason": "portal_capability_denied", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } return { "allowed": True, "reason": "portal_granted", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } # Plattform-Admin-Bypass für Mandanten-Funktionen (Audit-Pflicht, s. Katalog §9) if is_platform_admin(tenant.global_role): return { "allowed": True, "reason": "platform_admin_bypass", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } # Vereins-Capabilities: aktive Mitgliedschaft im Zielverein if min_state == "active_member": if eff_club is None: return { "allowed": False, "reason": "no_club_context", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } if eff_club not in tenant.club_ids: return { "allowed": False, "reason": "not_club_member", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } cur.execute( """ SELECT role_code FROM club_role_capability_grants WHERE capability_id = %s """, (capability_id,), ) required_roles = [r["role_code"] for r in cur.fetchall()] if required_roles: if not any(r in required_roles for r in club_roles): return { "allowed": False, "reason": "club_role_denied", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } elif min_state == "active_member" and eff_club is not None: # Offene Capability für alle aktiven Mitglieder — Mitgliedschaft reicht pass return { "allowed": True, "reason": "granted", "account_state": account_state, "club_roles": club_roles, "linked_feature_id": cap.get("linked_feature_id"), } def resolve_capabilities_map( cur, tenant: "TenantContext", *, club_id: Optional[int] = None, ) -> Dict[str, bool]: """Alle aktiven Capabilities → bool (für späteres /me/entitlements).""" cur.execute("SELECT id FROM capabilities WHERE active = true ORDER BY id") ids = [r["id"] for r in cur.fetchall()] out: Dict[str, bool] = {} for cid in ids: res = check_capability(cur, tenant, cid, club_id=club_id) out[cid] = bool(res.get("allowed")) return out def probe_capability( tenant: "TenantContext", capability_id: str, *, action: str, club_id: Optional[int] = None, endpoint: Optional[str] = None, conn=None, ) -> Dict[str, Any]: """Phase 2: Capability prüfen + JSON-Log; blockiert nur bei CAPABILITY_ENFORCE=1.""" from capability_logger import log_capability_check def _run(c): cur = get_cursor(c) result = check_capability(cur, tenant, capability_id, club_id=club_id) log_capability_check( club_id=club_id if club_id is not None else tenant.effective_club_id, profile_id=tenant.profile_id, capability_id=capability_id, action=action, result=result, endpoint=endpoint, phase="enforce" if capability_enforcement_enabled() else "probe", ) if capability_enforcement_enabled() and not result.get("allowed"): raise HTTPException( status_code=403, detail=( f"Keine Berechtigung für {capability_id} " f"({result.get('reason', 'denied')})." ), ) return result if conn is not None: return _run(conn) with get_db() as c: return _run(c)