""" Superadmin: Kontingent-Bypass über Capability-Grants (portal_role / profile). Kein separates Exemption-Schema — nutzt portal_role_capability_grants und profile_capability_grants mit IDs platform.club_quota.bypass[.feature_id]. """ from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from auth import require_auth from club_quota_bypass import ( QUOTA_BYPASS_ALL, ensure_quota_bypass_capability, list_quota_bypass_grants, quota_bypass_capability_id_for_feature, ) from club_tenancy import is_superadmin from db import get_db, get_cursor, r2d router = APIRouter(prefix="/api/admin", tags=["admin_capability_grants"]) def _require_superadmin(session: dict) -> None: if not is_superadmin(session.get("role")): raise HTTPException(status_code=403, detail="Nur Super-Administratoren") def _resolve_capability_id(cur, feature_id: Optional[str]) -> str: fid = (feature_id or "").strip() or None if not fid: return QUOTA_BYPASS_ALL cur.execute("SELECT 1 FROM features WHERE id = %s", (fid,)) if not cur.fetchone(): raise HTTPException(status_code=400, detail="Unbekanntes Feature") return ensure_quota_bypass_capability(cur, fid) class PortalGrantBody(BaseModel): portal_role: str = Field(..., min_length=1, max_length=50) feature_id: Optional[str] = Field( None, description="Feature-ID oder leer = alle Vereins-Features (platform.club_quota.bypass)", ) class ProfileGrantBody(BaseModel): feature_id: Optional[str] = Field(None, description="Feature-ID oder leer = alle Features") reason: Optional[str] = Field(None, max_length=500) @router.get("/club-feature-exemptions") def list_club_feature_exemptions(session: dict = Depends(require_auth)): """Übersicht Kontingent-Bypass-Grants (Capability-System).""" _require_superadmin(session) with get_db() as conn: cur = get_cursor(conn) return list_quota_bypass_grants(cur) @router.get("/capability-grants/club-quota-bypass") def list_quota_bypass_capability_grants(session: dict = Depends(require_auth)): """Alias — gleiche Daten wie /club-feature-exemptions.""" return list_club_feature_exemptions(session) @router.post("/club-feature-exemptions/roles", status_code=201) @router.post("/capability-grants/club-quota-bypass/portal-roles", status_code=201) def add_portal_quota_bypass_grant(body: PortalGrantBody, session: dict = Depends(require_auth)): _require_superadmin(session) role = body.portal_role.strip().lower() with get_db() as conn: cur = get_cursor(conn) cap_id = _resolve_capability_id(cur, body.feature_id) cur.execute( """ SELECT 1 FROM portal_role_capability_grants WHERE portal_role = %s AND capability_id = %s LIMIT 1 """, (role, cap_id), ) if cur.fetchone(): raise HTTPException(status_code=409, detail="Grant existiert bereits") cur.execute( """ INSERT INTO portal_role_capability_grants (portal_role, capability_id) VALUES (%s, %s) RETURNING portal_role, capability_id """, (role, cap_id), ) row = cur.fetchone() conn.commit() out = r2d(row) out["capability_id"] = cap_id if body.feature_id: out["feature_id"] = body.feature_id.strip() else: out["feature_id"] = None return out @router.delete("/club-feature-exemptions/roles/{exemption_id}") def delete_legacy_role_exemption(exemption_id: int, session: dict = Depends(require_auth)): """Legacy-Pfad: exemption_id = portal_role_capability_grants nicht unterstützt — 410.""" _require_superadmin(session) raise HTTPException( status_code=410, detail="Bitte DELETE /api/admin/capability-grants/club-quota-bypass/portal-roles nutzen", ) @router.delete("/capability-grants/club-quota-bypass/portal-roles") def delete_portal_quota_bypass_grant( portal_role: str, capability_id: Optional[str] = None, feature_id: Optional[str] = None, session: dict = Depends(require_auth), ): _require_superadmin(session) role = portal_role.strip().lower() cap_id = capability_id if not cap_id: cap_id = QUOTA_BYPASS_ALL if not (feature_id or "").strip() else quota_bypass_capability_id_for_feature( feature_id.strip() ) with get_db() as conn: cur = get_cursor(conn) cur.execute( """ DELETE FROM portal_role_capability_grants WHERE portal_role = %s AND capability_id = %s RETURNING portal_role, capability_id """, (role, cap_id), ) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Grant nicht gefunden") conn.commit() return {"ok": True} @router.post("/club-feature-exemptions/profiles/{profile_id}", status_code=201) @router.post("/capability-grants/club-quota-bypass/profiles/{profile_id}", status_code=201) def add_profile_quota_bypass_grant( profile_id: int, body: ProfileGrantBody, session: dict = Depends(require_auth), ): _require_superadmin(session) admin_pid = int(session["profile_id"]) with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT 1 FROM profiles WHERE id = %s", (profile_id,)) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Profil nicht gefunden") cap_id = _resolve_capability_id(cur, body.feature_id) cur.execute( """ SELECT 1 FROM profile_capability_grants WHERE profile_id = %s AND capability_id = %s LIMIT 1 """, (profile_id, cap_id), ) if cur.fetchone(): raise HTTPException(status_code=409, detail="Grant existiert bereits") cur.execute( """ INSERT INTO profile_capability_grants ( profile_id, capability_id, reason, granted_by_profile_id ) VALUES (%s, %s, %s, %s) RETURNING profile_id, capability_id, reason, granted_by_profile_id, created_at """, (profile_id, cap_id, (body.reason or "").strip() or None, admin_pid), ) row = cur.fetchone() conn.commit() return r2d(row) @router.delete("/club-feature-exemptions/profiles/{exemption_id}") def delete_legacy_profile_exemption(exemption_id: int, session: dict = Depends(require_auth)): _require_superadmin(session) raise HTTPException( status_code=410, detail="Bitte DELETE /api/admin/capability-grants/club-quota-bypass/profiles nutzen", ) @router.delete("/capability-grants/club-quota-bypass/profiles") def delete_profile_quota_bypass_grant( profile_id: int, capability_id: Optional[str] = None, feature_id: Optional[str] = None, session: dict = Depends(require_auth), ): _require_superadmin(session) cap_id = capability_id if not cap_id: cap_id = QUOTA_BYPASS_ALL if not (feature_id or "").strip() else quota_bypass_capability_id_for_feature( feature_id.strip() ) with get_db() as conn: cur = get_cursor(conn) cur.execute( """ DELETE FROM profile_capability_grants WHERE profile_id = %s AND capability_id = %s RETURNING profile_id, capability_id """, (profile_id, cap_id), ) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Grant nicht gefunden") conn.commit() return {"ok": True}