All checks were successful
Deploy Development / deploy (push) Successful in 44s
Test Suite / pytest-backend (push) Successful in 42s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 35s
Test Suite / playwright-tests (push) Successful in 1m51s
- Updated the capability catalog to reflect a registry-first approach, requiring modules to register rights and quotas upon implementation. - Enhanced the backend to synchronize the rights registry with the database, ensuring only registered capabilities and features are displayed in the admin matrix. - Modified SQL queries in the admin rights router to filter capabilities and features based on module registration. - Updated documentation to clarify the new rights and features registry process, replacing the previous catalog-first method. - Incremented application version to 0.8.201 and updated database schema version to 20260606084 to reflect these changes.
581 lines
19 KiB
Python
581 lines
19 KiB
Python
"""
|
|
Superadmin: Rollen & Rechte — Capability-Grants, Kontingent-Bypass, Vereins-Kontingente.
|
|
|
|
Ein Router für das Rechtesystem (M6). Kein paralleles Exemption-Schema.
|
|
"""
|
|
from typing import Dict, List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from pydantic import BaseModel, Field
|
|
|
|
from auth import require_auth
|
|
from club_quota_bypass import (
|
|
QUOTA_BYPASS_ALL,
|
|
ensure_quota_bypass_capability,
|
|
list_quota_bypass_grants,
|
|
quota_bypass_capability_id_for_feature,
|
|
)
|
|
from capability_enforcement_audit import (
|
|
enforcement_status_for_capability,
|
|
feature_consume_status,
|
|
)
|
|
from club_tenancy import is_superadmin
|
|
from db import get_db, get_cursor, r2d
|
|
|
|
router = APIRouter(prefix="/api/admin/rights", tags=["admin_rights"])
|
|
|
|
PORTAL_ROLES = ("user", "trainer", "admin", "superadmin")
|
|
CLUB_ROLES = ("club_admin", "trainer", "division_lead", "content_editor")
|
|
|
|
|
|
def _require_superadmin(session: dict) -> None:
|
|
if not is_superadmin(session.get("role")):
|
|
raise HTTPException(status_code=403, detail="Nur Super-Administratoren")
|
|
|
|
|
|
def _resolve_quota_bypass_capability_id(cur, feature_id: Optional[str]) -> str:
|
|
fid = (feature_id or "").strip() or None
|
|
if not fid:
|
|
return QUOTA_BYPASS_ALL
|
|
cur.execute("SELECT 1 FROM features WHERE id = %s", (fid,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=400, detail="Unbekanntes Feature")
|
|
return ensure_quota_bypass_capability(cur, fid)
|
|
|
|
|
|
class PlanLimitItem(BaseModel):
|
|
feature_id: str
|
|
limit_value: Optional[int] = Field(
|
|
None,
|
|
description="NULL = unbegrenzt; 0 = deaktiviert (boolean/count)",
|
|
)
|
|
|
|
|
|
class PlanLimitsBody(BaseModel):
|
|
limits: List[PlanLimitItem]
|
|
|
|
|
|
class ClubSubscriptionBody(BaseModel):
|
|
plan_id: str
|
|
status: str = Field(default="active", pattern="^(active|trial|past_due|cancelled)$")
|
|
|
|
|
|
class PortalCapabilityGrantBody(BaseModel):
|
|
portal_role: str = Field(..., min_length=1, max_length=50)
|
|
capability_id: str = Field(..., min_length=1)
|
|
|
|
|
|
class ClubRoleCapabilityGrantBody(BaseModel):
|
|
role_code: str = Field(..., min_length=1, max_length=50)
|
|
capability_id: str = Field(..., min_length=1)
|
|
|
|
|
|
class QuotaBypassPortalBody(BaseModel):
|
|
portal_role: str = Field(..., min_length=1, max_length=50)
|
|
feature_id: Optional[str] = Field(
|
|
None,
|
|
description="Feature-ID oder leer = alle Vereins-Features (platform.club_quota.bypass)",
|
|
)
|
|
|
|
|
|
class QuotaBypassProfileBody(BaseModel):
|
|
feature_id: Optional[str] = Field(None, description="Feature-ID oder leer = alle Features")
|
|
reason: Optional[str] = Field(None, max_length=500)
|
|
|
|
|
|
# ── Capability-Matrix (Rollen → Fähigkeiten) ─────────────────────────────────
|
|
|
|
@router.get("/capability-matrix")
|
|
def get_capability_matrix(session: dict = Depends(require_auth)):
|
|
_require_superadmin(session)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, domain, min_account_state, linked_feature_id, module
|
|
FROM capabilities
|
|
WHERE active = true AND module IS NOT NULL
|
|
ORDER BY module, domain, id
|
|
"""
|
|
)
|
|
capabilities = []
|
|
for row in cur.fetchall():
|
|
cap = r2d(row)
|
|
cap["enforcement"] = enforcement_status_for_capability(cap.get("id"))
|
|
if cap.get("linked_feature_id"):
|
|
cap["feature_consume"] = feature_consume_status(cap["linked_feature_id"])
|
|
capabilities.append(cap)
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT portal_role, capability_id
|
|
FROM portal_role_capability_grants
|
|
ORDER BY portal_role, capability_id
|
|
"""
|
|
)
|
|
portal_grants = [r2d(r) for r in cur.fetchall()]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT role_code, capability_id
|
|
FROM club_role_capability_grants
|
|
ORDER BY role_code, capability_id
|
|
"""
|
|
)
|
|
club_role_grants = [r2d(r) for r in cur.fetchall()]
|
|
|
|
return {
|
|
"portal_roles": list(PORTAL_ROLES),
|
|
"club_roles": list(CLUB_ROLES),
|
|
"capabilities": capabilities,
|
|
"portal_grants": portal_grants,
|
|
"club_role_grants": club_role_grants,
|
|
"registry_only": True,
|
|
"hint": (
|
|
"Nur vom Modul registrierte Rechte (capabilities.module). "
|
|
"Legacy-Katalog-Seed ohne module erscheint nicht."
|
|
),
|
|
}
|
|
|
|
|
|
@router.post("/capability-grants/portal-roles", status_code=201)
|
|
def add_portal_capability_grant(body: PortalCapabilityGrantBody, session: dict = Depends(require_auth)):
|
|
_require_superadmin(session)
|
|
role = body.portal_role.strip().lower()
|
|
cap_id = body.capability_id.strip()
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT domain FROM capabilities WHERE id = %s AND active = true", (cap_id,))
|
|
cap = cur.fetchone()
|
|
if not cap:
|
|
raise HTTPException(status_code=400, detail="Unbekannte Capability")
|
|
domain = (cap.get("domain") or "").lower()
|
|
if domain not in ("platform", "quota_bypass") and not cap_id.startswith("platform."):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Portal-Grants nur für domain=platform oder quota_bypass",
|
|
)
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO portal_role_capability_grants (portal_role, capability_id)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
RETURNING portal_role, capability_id
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=409, detail="Grant existiert bereits")
|
|
conn.commit()
|
|
return r2d(row)
|
|
|
|
|
|
@router.delete("/capability-grants/portal-roles")
|
|
def delete_portal_capability_grant(
|
|
portal_role: str = Query(...),
|
|
capability_id: str = Query(...),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
role = portal_role.strip().lower()
|
|
cap_id = capability_id.strip()
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM portal_role_capability_grants
|
|
WHERE portal_role = %s AND capability_id = %s
|
|
RETURNING portal_role, capability_id
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/capability-grants/club-roles", status_code=201)
|
|
def add_club_role_capability_grant(
|
|
body: ClubRoleCapabilityGrantBody,
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
role = body.role_code.strip().lower()
|
|
cap_id = body.capability_id.strip()
|
|
|
|
if role not in CLUB_ROLES:
|
|
raise HTTPException(status_code=400, detail="Unbekannte Vereinsrolle")
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT domain FROM capabilities
|
|
WHERE id = %s AND active = true AND domain NOT IN ('platform', 'quota_bypass')
|
|
""",
|
|
(cap_id,),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=400, detail="Capability nicht für Vereinsrollen")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO club_role_capability_grants (role_code, capability_id)
|
|
VALUES (%s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
RETURNING role_code, capability_id
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=409, detail="Grant existiert bereits")
|
|
conn.commit()
|
|
return r2d(row)
|
|
|
|
|
|
@router.delete("/capability-grants/club-roles/by-capability")
|
|
def clear_club_capability_grants(
|
|
capability_id: str = Query(...),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
"""Alle Rollen-Grants einer Capability entfernen → wieder offen für alle Mitglieder."""
|
|
_require_superadmin(session)
|
|
cap_id = capability_id.strip()
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM club_role_capability_grants
|
|
WHERE capability_id = %s
|
|
""",
|
|
(cap_id,),
|
|
)
|
|
conn.commit()
|
|
return {"ok": True, "capability_id": cap_id}
|
|
|
|
|
|
@router.delete("/capability-grants/club-roles")
|
|
def delete_club_role_capability_grant(
|
|
role_code: str = Query(...),
|
|
capability_id: str = Query(...),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
role = role_code.strip().lower()
|
|
cap_id = capability_id.strip()
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM club_role_capability_grants
|
|
WHERE role_code = %s AND capability_id = %s
|
|
RETURNING role_code, capability_id
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
# ── Kontingent-Bypass (Capability-Grants) ───────────────────────────────────
|
|
|
|
@router.get("/quota-bypass")
|
|
def list_quota_bypass(session: dict = Depends(require_auth)):
|
|
_require_superadmin(session)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
return list_quota_bypass_grants(cur)
|
|
|
|
|
|
@router.post("/quota-bypass/portal-roles", status_code=201)
|
|
def add_quota_bypass_portal_grant(body: QuotaBypassPortalBody, session: dict = Depends(require_auth)):
|
|
_require_superadmin(session)
|
|
role = body.portal_role.strip().lower()
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cap_id = _resolve_quota_bypass_capability_id(cur, body.feature_id)
|
|
cur.execute(
|
|
"""
|
|
SELECT 1 FROM portal_role_capability_grants
|
|
WHERE portal_role = %s AND capability_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=409, detail="Grant existiert bereits")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO portal_role_capability_grants (portal_role, capability_id)
|
|
VALUES (%s, %s)
|
|
RETURNING portal_role, capability_id
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
out = r2d(row)
|
|
out["capability_id"] = cap_id
|
|
out["feature_id"] = (body.feature_id or "").strip() or None
|
|
return out
|
|
|
|
|
|
@router.delete("/quota-bypass/portal-roles")
|
|
def delete_quota_bypass_portal_grant(
|
|
portal_role: str = Query(...),
|
|
capability_id: Optional[str] = Query(None),
|
|
feature_id: Optional[str] = Query(None),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
role = portal_role.strip().lower()
|
|
cap_id = capability_id
|
|
if not cap_id:
|
|
cap_id = (
|
|
QUOTA_BYPASS_ALL
|
|
if not (feature_id or "").strip()
|
|
else quota_bypass_capability_id_for_feature(feature_id.strip())
|
|
)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM portal_role_capability_grants
|
|
WHERE portal_role = %s AND capability_id = %s
|
|
RETURNING portal_role, capability_id
|
|
""",
|
|
(role, cap_id),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/quota-bypass/profiles/{profile_id}", status_code=201)
|
|
def add_quota_bypass_profile_grant(
|
|
profile_id: int,
|
|
body: QuotaBypassProfileBody,
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
admin_pid = int(session["profile_id"])
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT 1 FROM profiles WHERE id = %s", (profile_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Profil nicht gefunden")
|
|
|
|
cap_id = _resolve_quota_bypass_capability_id(cur, body.feature_id)
|
|
cur.execute(
|
|
"""
|
|
SELECT 1 FROM profile_capability_grants
|
|
WHERE profile_id = %s AND capability_id = %s
|
|
LIMIT 1
|
|
""",
|
|
(profile_id, cap_id),
|
|
)
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=409, detail="Grant existiert bereits")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO profile_capability_grants (
|
|
profile_id, capability_id, reason, granted_by_profile_id
|
|
)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING profile_id, capability_id, reason, granted_by_profile_id, created_at
|
|
""",
|
|
(profile_id, cap_id, (body.reason or "").strip() or None, admin_pid),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
return r2d(row)
|
|
|
|
|
|
@router.delete("/quota-bypass/profiles")
|
|
def delete_quota_bypass_profile_grant(
|
|
profile_id: int = Query(...),
|
|
capability_id: Optional[str] = Query(None),
|
|
feature_id: Optional[str] = Query(None),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
cap_id = capability_id
|
|
if not cap_id:
|
|
cap_id = (
|
|
QUOTA_BYPASS_ALL
|
|
if not (feature_id or "").strip()
|
|
else quota_bypass_capability_id_for_feature(feature_id.strip())
|
|
)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM profile_capability_grants
|
|
WHERE profile_id = %s AND capability_id = %s
|
|
RETURNING profile_id, capability_id
|
|
""",
|
|
(profile_id, cap_id),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
# ── Vereins-Kontingente (Pläne & Zuordnung) ─────────────────────────────────
|
|
|
|
@router.get("/club-plans/matrix")
|
|
def get_club_plans_matrix(session: dict = Depends(require_auth)):
|
|
"""Aktive Vereinspläne, club-scoped Features und Limit-Matrix."""
|
|
_require_superadmin(session)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, description, sort_order, active
|
|
FROM club_plans
|
|
WHERE active = true
|
|
ORDER BY sort_order, id
|
|
"""
|
|
)
|
|
plans = [r2d(r) for r in cur.fetchall()]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name, description, category, limit_type, reset_period, default_limit, module
|
|
FROM features
|
|
WHERE app = 'shinkan' AND active = true AND enforcement_subject = 'club'
|
|
AND module IS NOT NULL
|
|
ORDER BY module, category, id
|
|
"""
|
|
)
|
|
features = [r2d(r) for r in cur.fetchall()]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT plan_id, feature_id, limit_value
|
|
FROM club_plan_limits
|
|
WHERE plan_id IN (SELECT id FROM club_plans WHERE active = true)
|
|
"""
|
|
)
|
|
limits: Dict[str, Dict[str, Optional[int]]] = {}
|
|
for row in cur.fetchall():
|
|
pid = row["plan_id"]
|
|
fid = row["feature_id"]
|
|
limits.setdefault(pid, {})[fid] = row.get("limit_value")
|
|
|
|
return {"plans": plans, "features": features, "limits": limits}
|
|
|
|
|
|
@router.put("/club-plans/{plan_id}/limits")
|
|
def update_club_plan_limits(
|
|
plan_id: str,
|
|
body: PlanLimitsBody,
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
plan_id = plan_id.strip()
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT 1 FROM club_plans WHERE id = %s AND active = true", (plan_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Plan nicht gefunden")
|
|
|
|
for item in body.limits:
|
|
fid = item.feature_id.strip()
|
|
cur.execute(
|
|
"SELECT 1 FROM features WHERE id = %s AND app = 'shinkan'",
|
|
(fid,),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=400, detail=f"Unbekanntes Feature: {fid}")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO club_plan_limits (plan_id, feature_id, limit_value)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (plan_id, feature_id)
|
|
DO UPDATE SET limit_value = EXCLUDED.limit_value
|
|
""",
|
|
(plan_id, fid, item.limit_value),
|
|
)
|
|
conn.commit()
|
|
|
|
return {"ok": True, "plan_id": plan_id, "updated": len(body.limits)}
|
|
|
|
|
|
@router.get("/club-subscriptions")
|
|
def list_club_subscriptions(session: dict = Depends(require_auth)):
|
|
_require_superadmin(session)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT c.id AS club_id, c.name AS club_name,
|
|
cs.plan_id, cs.status, cs.started_at, cs.ends_at
|
|
FROM clubs c
|
|
LEFT JOIN club_subscriptions cs ON cs.club_id = c.id
|
|
ORDER BY lower(c.name), c.id
|
|
"""
|
|
)
|
|
rows = []
|
|
for r in cur.fetchall():
|
|
d = r2d(r)
|
|
if not d.get("plan_id"):
|
|
d["plan_id"] = "free"
|
|
d["status"] = "active"
|
|
rows.append(d)
|
|
return rows
|
|
|
|
|
|
@router.put("/clubs/{club_id}/subscription")
|
|
def update_club_subscription(
|
|
club_id: int,
|
|
body: ClubSubscriptionBody,
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
_require_superadmin(session)
|
|
plan_id = body.plan_id.strip()
|
|
status = body.status.strip().lower()
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Verein nicht gefunden")
|
|
|
|
cur.execute("SELECT 1 FROM club_plans WHERE id = %s AND active = true", (plan_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=400, detail="Unbekannter Plan")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO club_subscriptions (club_id, plan_id, status)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (club_id)
|
|
DO UPDATE SET plan_id = EXCLUDED.plan_id, status = EXCLUDED.status, updated_at = NOW()
|
|
RETURNING club_id, plan_id, status
|
|
""",
|
|
(club_id, plan_id, status),
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
return r2d(row)
|