shinkan-jinkendo/backend/routers/admin_rights.py
Lars a9a6153ed5
Some checks failed
Deploy Development / deploy (push) Successful in 45s
Test Suite / pytest-backend (push) Failing after 43s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m15s
Implement Club Feature Enforcement Logic and Update Versioning
- Introduced a new environment variable `CLUB_FEATURE_ENFORCE` to control club feature access, allowing values of 1, true, or yes for activation.
- Updated the backend logic to check for club feature enforcement, raising HTTP exceptions when access is denied without an active club context.
- Enhanced the admin rights router with a new endpoint to check the enforcement status of club features.
- Incremented application version to 0.8.202 to reflect these changes.
2026-06-07 15:47:49 +02:00

604 lines
20 KiB
Python

"""
Superadmin: Rollen & Rechte — Capability-Grants, Kontingent-Bypass, Vereins-Kontingente.
Ein Router für das Rechtesystem (M6). Kein paralleles Exemption-Schema.
"""
import os
from typing import Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from auth import require_auth
from club_quota_bypass import (
QUOTA_BYPASS_ALL,
ensure_quota_bypass_capability,
list_quota_bypass_grants,
quota_bypass_capability_id_for_feature,
)
from capabilities import capability_enforcement_enabled
from capability_enforcement_audit import (
enforcement_status_for_capability,
feature_consume_status,
)
from club_features import club_feature_enforcement_enabled
from club_tenancy import is_superadmin
from db import get_db, get_cursor, r2d
router = APIRouter(prefix="/api/admin/rights", tags=["admin_rights"])
PORTAL_ROLES = ("user", "trainer", "admin", "superadmin")
CLUB_ROLES = ("club_admin", "trainer", "division_lead", "content_editor")
def _require_superadmin(session: dict) -> None:
if not is_superadmin(session.get("role")):
raise HTTPException(status_code=403, detail="Nur Super-Administratoren")
def _resolve_quota_bypass_capability_id(cur, feature_id: Optional[str]) -> str:
fid = (feature_id or "").strip() or None
if not fid:
return QUOTA_BYPASS_ALL
cur.execute("SELECT 1 FROM features WHERE id = %s", (fid,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Unbekanntes Feature")
return ensure_quota_bypass_capability(cur, fid)
class PlanLimitItem(BaseModel):
feature_id: str
limit_value: Optional[int] = Field(
None,
description="NULL = unbegrenzt; 0 = deaktiviert (boolean/count)",
)
class PlanLimitsBody(BaseModel):
limits: List[PlanLimitItem]
class ClubSubscriptionBody(BaseModel):
plan_id: str
status: str = Field(default="active", pattern="^(active|trial|past_due|cancelled)$")
class PortalCapabilityGrantBody(BaseModel):
portal_role: str = Field(..., min_length=1, max_length=50)
capability_id: str = Field(..., min_length=1)
class ClubRoleCapabilityGrantBody(BaseModel):
role_code: str = Field(..., min_length=1, max_length=50)
capability_id: str = Field(..., min_length=1)
class QuotaBypassPortalBody(BaseModel):
portal_role: str = Field(..., min_length=1, max_length=50)
feature_id: Optional[str] = Field(
None,
description="Feature-ID oder leer = alle Vereins-Features (platform.club_quota.bypass)",
)
class QuotaBypassProfileBody(BaseModel):
feature_id: Optional[str] = Field(None, description="Feature-ID oder leer = alle Features")
reason: Optional[str] = Field(None, max_length=500)
# ── Enforcement-Diagnose (Superadmin) ────────────────────────────────────────
@router.get("/enforcement-status")
def get_enforcement_status(session: dict = Depends(require_auth)):
"""Prüft ob Hard-Block-Env im laufenden Container/Prozess ankommt."""
_require_superadmin(session)
raw = os.getenv("CLUB_FEATURE_ENFORCE", "0")
return {
"club_feature_enforce_active": club_feature_enforcement_enabled(),
"club_feature_enforce_raw": raw,
"capability_enforce_active": capability_enforcement_enabled(),
"capability_enforce_raw": os.getenv("CAPABILITY_ENFORCE", "0"),
"hints": [
"Nach .env-Änderung: docker compose up -d backend (Container neu erstellen).",
"Superadmin hat Quota-Bypass — KI-Limit-Test als Trainer, nicht als Superadmin.",
"Ohne aktiven Verein (X-Active-Club-Id) blockiert Enforce ebenfalls.",
],
}
# ── Capability-Matrix (Rollen → Fähigkeiten) ─────────────────────────────────
@router.get("/capability-matrix")
def get_capability_matrix(session: dict = Depends(require_auth)):
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT id, name, domain, min_account_state, linked_feature_id, module
FROM capabilities
WHERE active = true AND module IS NOT NULL
ORDER BY module, domain, id
"""
)
capabilities = []
for row in cur.fetchall():
cap = r2d(row)
cap["enforcement"] = enforcement_status_for_capability(cap.get("id"))
if cap.get("linked_feature_id"):
cap["feature_consume"] = feature_consume_status(cap["linked_feature_id"])
capabilities.append(cap)
cur.execute(
"""
SELECT portal_role, capability_id
FROM portal_role_capability_grants
ORDER BY portal_role, capability_id
"""
)
portal_grants = [r2d(r) for r in cur.fetchall()]
cur.execute(
"""
SELECT role_code, capability_id
FROM club_role_capability_grants
ORDER BY role_code, capability_id
"""
)
club_role_grants = [r2d(r) for r in cur.fetchall()]
return {
"portal_roles": list(PORTAL_ROLES),
"club_roles": list(CLUB_ROLES),
"capabilities": capabilities,
"portal_grants": portal_grants,
"club_role_grants": club_role_grants,
"registry_only": True,
"hint": (
"Nur vom Modul registrierte Rechte (capabilities.module). "
"Legacy-Katalog-Seed ohne module erscheint nicht."
),
}
@router.post("/capability-grants/portal-roles", status_code=201)
def add_portal_capability_grant(body: PortalCapabilityGrantBody, session: dict = Depends(require_auth)):
_require_superadmin(session)
role = body.portal_role.strip().lower()
cap_id = body.capability_id.strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT domain FROM capabilities WHERE id = %s AND active = true", (cap_id,))
cap = cur.fetchone()
if not cap:
raise HTTPException(status_code=400, detail="Unbekannte Capability")
domain = (cap.get("domain") or "").lower()
if domain not in ("platform", "quota_bypass") and not cap_id.startswith("platform."):
raise HTTPException(
status_code=400,
detail="Portal-Grants nur für domain=platform oder quota_bypass",
)
cur.execute(
"""
INSERT INTO portal_role_capability_grants (portal_role, capability_id)
VALUES (%s, %s)
ON CONFLICT DO NOTHING
RETURNING portal_role, capability_id
""",
(role, cap_id),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=409, detail="Grant existiert bereits")
conn.commit()
return r2d(row)
@router.delete("/capability-grants/portal-roles")
def delete_portal_capability_grant(
portal_role: str = Query(...),
capability_id: str = Query(...),
session: dict = Depends(require_auth),
):
_require_superadmin(session)
role = portal_role.strip().lower()
cap_id = capability_id.strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
DELETE FROM portal_role_capability_grants
WHERE portal_role = %s AND capability_id = %s
RETURNING portal_role, capability_id
""",
(role, cap_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
conn.commit()
return {"ok": True}
@router.post("/capability-grants/club-roles", status_code=201)
def add_club_role_capability_grant(
body: ClubRoleCapabilityGrantBody,
session: dict = Depends(require_auth),
):
_require_superadmin(session)
role = body.role_code.strip().lower()
cap_id = body.capability_id.strip()
if role not in CLUB_ROLES:
raise HTTPException(status_code=400, detail="Unbekannte Vereinsrolle")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT domain FROM capabilities
WHERE id = %s AND active = true AND domain NOT IN ('platform', 'quota_bypass')
""",
(cap_id,),
)
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Capability nicht für Vereinsrollen")
cur.execute(
"""
INSERT INTO club_role_capability_grants (role_code, capability_id)
VALUES (%s, %s)
ON CONFLICT DO NOTHING
RETURNING role_code, capability_id
""",
(role, cap_id),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=409, detail="Grant existiert bereits")
conn.commit()
return r2d(row)
@router.delete("/capability-grants/club-roles/by-capability")
def clear_club_capability_grants(
capability_id: str = Query(...),
session: dict = Depends(require_auth),
):
"""Alle Rollen-Grants einer Capability entfernen → wieder offen für alle Mitglieder."""
_require_superadmin(session)
cap_id = capability_id.strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
DELETE FROM club_role_capability_grants
WHERE capability_id = %s
""",
(cap_id,),
)
conn.commit()
return {"ok": True, "capability_id": cap_id}
@router.delete("/capability-grants/club-roles")
def delete_club_role_capability_grant(
role_code: str = Query(...),
capability_id: str = Query(...),
session: dict = Depends(require_auth),
):
_require_superadmin(session)
role = role_code.strip().lower()
cap_id = capability_id.strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
DELETE FROM club_role_capability_grants
WHERE role_code = %s AND capability_id = %s
RETURNING role_code, capability_id
""",
(role, cap_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
conn.commit()
return {"ok": True}
# ── Kontingent-Bypass (Capability-Grants) ───────────────────────────────────
@router.get("/quota-bypass")
def list_quota_bypass(session: dict = Depends(require_auth)):
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
return list_quota_bypass_grants(cur)
@router.post("/quota-bypass/portal-roles", status_code=201)
def add_quota_bypass_portal_grant(body: QuotaBypassPortalBody, session: dict = Depends(require_auth)):
_require_superadmin(session)
role = body.portal_role.strip().lower()
with get_db() as conn:
cur = get_cursor(conn)
cap_id = _resolve_quota_bypass_capability_id(cur, body.feature_id)
cur.execute(
"""
SELECT 1 FROM portal_role_capability_grants
WHERE portal_role = %s AND capability_id = %s
LIMIT 1
""",
(role, cap_id),
)
if cur.fetchone():
raise HTTPException(status_code=409, detail="Grant existiert bereits")
cur.execute(
"""
INSERT INTO portal_role_capability_grants (portal_role, capability_id)
VALUES (%s, %s)
RETURNING portal_role, capability_id
""",
(role, cap_id),
)
row = cur.fetchone()
conn.commit()
out = r2d(row)
out["capability_id"] = cap_id
out["feature_id"] = (body.feature_id or "").strip() or None
return out
@router.delete("/quota-bypass/portal-roles")
def delete_quota_bypass_portal_grant(
portal_role: str = Query(...),
capability_id: Optional[str] = Query(None),
feature_id: Optional[str] = Query(None),
session: dict = Depends(require_auth),
):
_require_superadmin(session)
role = portal_role.strip().lower()
cap_id = capability_id
if not cap_id:
cap_id = (
QUOTA_BYPASS_ALL
if not (feature_id or "").strip()
else quota_bypass_capability_id_for_feature(feature_id.strip())
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
DELETE FROM portal_role_capability_grants
WHERE portal_role = %s AND capability_id = %s
RETURNING portal_role, capability_id
""",
(role, cap_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
conn.commit()
return {"ok": True}
@router.post("/quota-bypass/profiles/{profile_id}", status_code=201)
def add_quota_bypass_profile_grant(
profile_id: int,
body: QuotaBypassProfileBody,
session: dict = Depends(require_auth),
):
_require_superadmin(session)
admin_pid = int(session["profile_id"])
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT 1 FROM profiles WHERE id = %s", (profile_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Profil nicht gefunden")
cap_id = _resolve_quota_bypass_capability_id(cur, body.feature_id)
cur.execute(
"""
SELECT 1 FROM profile_capability_grants
WHERE profile_id = %s AND capability_id = %s
LIMIT 1
""",
(profile_id, cap_id),
)
if cur.fetchone():
raise HTTPException(status_code=409, detail="Grant existiert bereits")
cur.execute(
"""
INSERT INTO profile_capability_grants (
profile_id, capability_id, reason, granted_by_profile_id
)
VALUES (%s, %s, %s, %s)
RETURNING profile_id, capability_id, reason, granted_by_profile_id, created_at
""",
(profile_id, cap_id, (body.reason or "").strip() or None, admin_pid),
)
row = cur.fetchone()
conn.commit()
return r2d(row)
@router.delete("/quota-bypass/profiles")
def delete_quota_bypass_profile_grant(
profile_id: int = Query(...),
capability_id: Optional[str] = Query(None),
feature_id: Optional[str] = Query(None),
session: dict = Depends(require_auth),
):
_require_superadmin(session)
cap_id = capability_id
if not cap_id:
cap_id = (
QUOTA_BYPASS_ALL
if not (feature_id or "").strip()
else quota_bypass_capability_id_for_feature(feature_id.strip())
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
DELETE FROM profile_capability_grants
WHERE profile_id = %s AND capability_id = %s
RETURNING profile_id, capability_id
""",
(profile_id, cap_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Grant nicht gefunden")
conn.commit()
return {"ok": True}
# ── Vereins-Kontingente (Pläne & Zuordnung) ─────────────────────────────────
@router.get("/club-plans/matrix")
def get_club_plans_matrix(session: dict = Depends(require_auth)):
"""Aktive Vereinspläne, club-scoped Features und Limit-Matrix."""
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT id, name, description, sort_order, active
FROM club_plans
WHERE active = true
ORDER BY sort_order, id
"""
)
plans = [r2d(r) for r in cur.fetchall()]
cur.execute(
"""
SELECT id, name, description, category, limit_type, reset_period, default_limit, module
FROM features
WHERE app = 'shinkan' AND active = true AND enforcement_subject = 'club'
AND module IS NOT NULL
ORDER BY module, category, id
"""
)
features = [r2d(r) for r in cur.fetchall()]
cur.execute(
"""
SELECT plan_id, feature_id, limit_value
FROM club_plan_limits
WHERE plan_id IN (SELECT id FROM club_plans WHERE active = true)
"""
)
limits: Dict[str, Dict[str, Optional[int]]] = {}
for row in cur.fetchall():
pid = row["plan_id"]
fid = row["feature_id"]
limits.setdefault(pid, {})[fid] = row.get("limit_value")
return {"plans": plans, "features": features, "limits": limits}
@router.put("/club-plans/{plan_id}/limits")
def update_club_plan_limits(
plan_id: str,
body: PlanLimitsBody,
session: dict = Depends(require_auth),
):
_require_superadmin(session)
plan_id = plan_id.strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT 1 FROM club_plans WHERE id = %s AND active = true", (plan_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Plan nicht gefunden")
for item in body.limits:
fid = item.feature_id.strip()
cur.execute(
"SELECT 1 FROM features WHERE id = %s AND app = 'shinkan'",
(fid,),
)
if not cur.fetchone():
raise HTTPException(status_code=400, detail=f"Unbekanntes Feature: {fid}")
cur.execute(
"""
INSERT INTO club_plan_limits (plan_id, feature_id, limit_value)
VALUES (%s, %s, %s)
ON CONFLICT (plan_id, feature_id)
DO UPDATE SET limit_value = EXCLUDED.limit_value
""",
(plan_id, fid, item.limit_value),
)
conn.commit()
return {"ok": True, "plan_id": plan_id, "updated": len(body.limits)}
@router.get("/club-subscriptions")
def list_club_subscriptions(session: dict = Depends(require_auth)):
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT c.id AS club_id, c.name AS club_name,
cs.plan_id, cs.status, cs.started_at, cs.ends_at
FROM clubs c
LEFT JOIN club_subscriptions cs ON cs.club_id = c.id
ORDER BY lower(c.name), c.id
"""
)
rows = []
for r in cur.fetchall():
d = r2d(r)
if not d.get("plan_id"):
d["plan_id"] = "free"
d["status"] = "active"
rows.append(d)
return rows
@router.put("/clubs/{club_id}/subscription")
def update_club_subscription(
club_id: int,
body: ClubSubscriptionBody,
session: dict = Depends(require_auth),
):
_require_superadmin(session)
plan_id = body.plan_id.strip()
status = body.status.strip().lower()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Verein nicht gefunden")
cur.execute("SELECT 1 FROM club_plans WHERE id = %s AND active = true", (plan_id,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Unbekannter Plan")
cur.execute(
"""
INSERT INTO club_subscriptions (club_id, plan_id, status)
VALUES (%s, %s, %s)
ON CONFLICT (club_id)
DO UPDATE SET plan_id = EXCLUDED.plan_id, status = EXCLUDED.status, updated_at = NOW()
RETURNING club_id, plan_id, status
""",
(club_id, plan_id, status),
)
row = cur.fetchone()
conn.commit()
return r2d(row)