shinkan-jinkendo/backend/capabilities.py
Lars 8404a42b6c
Some checks failed
Deploy Development / deploy (push) Successful in 43s
Test Suite / pytest-backend (push) Failing after 2s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 42s
Test Suite / playwright-tests (push) Successful in 1m19s
Implement Club Feature Quota Bypass and Update Versioning
- Added support for club feature quota bypass based on portal roles and profile grants in the capabilities check.
- Introduced new functions to handle quota bypass logic in club feature access and consumption.
- Updated the FeatureUsageBadge component to reflect platform exemptions for features.
- Incremented application version to 0.8.195 and database schema version to 20260606083 to reflect these changes.
- Enhanced backend routers to include new logic for consuming club features during AI-related actions.
2026-06-07 07:43:35 +02:00

285 lines
9.3 KiB
Python

"""
Capability-Auflösung (CAPABILITY_CATALOG.v1.md, M3 C1).
Phase 2: probe_capability — JSON-Log, kein Block (CAPABILITY_ENFORCE=0).
Phase 3+: CAPABILITY_ENFORCE=1 — HTTP 403 bei fehlender Capability.
"""
from __future__ import annotations
import os
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from fastapi import HTTPException
from account_lifecycle import account_state_satisfies
from club_tenancy import is_platform_admin
from db import get_db, get_cursor
if TYPE_CHECKING:
from tenant_context import TenantContext
def capability_enforcement_enabled() -> bool:
return os.getenv("CAPABILITY_ENFORCE", "0").strip() == "1"
def club_roles_in_club(tenant: "TenantContext", club_id: Optional[int]) -> List[str]:
if club_id is None:
return []
for m in tenant.memberships or []:
if int(m.get("id") or 0) == int(club_id):
roles = m.get("roles") or []
if hasattr(roles, "tolist"):
roles = roles.tolist()
return list(roles)
return []
def check_capability(
cur,
tenant: "TenantContext",
capability_id: str,
*,
club_id: Optional[int] = None,
) -> Dict[str, Any]:
"""
Prüft eine Capability für Tenant + optionalen Vereinskontext.
Returns: allowed, reason, account_state, club_roles, linked_feature_id
"""
account_state = getattr(tenant, "account_state", "active_member")
eff_club = club_id if club_id is not None else tenant.effective_club_id
club_roles = club_roles_in_club(tenant, eff_club) if eff_club is not None else []
cur.execute(
"""
SELECT id, min_account_state, linked_feature_id, active, domain
FROM capabilities
WHERE id = %s
""",
(capability_id,),
)
cap = cur.fetchone()
if not cap or not cap.get("active"):
return {
"allowed": False,
"reason": "capability_not_found",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": None,
}
min_state = cap.get("min_account_state") or "active_member"
if not account_state_satisfies(account_state, min_state):
return {
"allowed": False,
"reason": "account_state_insufficient",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
domain = (cap.get("domain") or "").strip().lower()
# Kontingent-Bypass (konfigurierbar per portal_role / profile grants, ohne Plattform-Admin-Pflicht)
if domain == "quota_bypass":
role_lc = (tenant.global_role or "").lower()
cur.execute(
"""
SELECT 1 FROM portal_role_capability_grants
WHERE portal_role = %s AND capability_id = %s
LIMIT 1
""",
(role_lc, capability_id),
)
if cur.fetchone():
return {
"allowed": True,
"reason": "quota_bypass_portal_grant",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
cur.execute(
"""
SELECT 1 FROM profile_capability_grants
WHERE profile_id = %s AND capability_id = %s
LIMIT 1
""",
(tenant.profile_id, capability_id),
)
if cur.fetchone():
return {
"allowed": True,
"reason": "quota_bypass_profile_grant",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
return {
"allowed": False,
"reason": "quota_bypass_denied",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
# Plattform-Capabilities
if domain == "platform" or capability_id.startswith("platform."):
role_lc = (tenant.global_role or "").lower()
if not is_platform_admin(role_lc):
return {
"allowed": False,
"reason": "portal_role_required",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
cur.execute(
"""
SELECT 1 FROM portal_role_capability_grants
WHERE portal_role = %s AND capability_id = %s
LIMIT 1
""",
(role_lc, capability_id),
)
if not cur.fetchone():
cur.execute(
"""
SELECT 1 FROM profile_capability_grants
WHERE profile_id = %s AND capability_id = %s
LIMIT 1
""",
(tenant.profile_id, capability_id),
)
if not cur.fetchone():
return {
"allowed": False,
"reason": "portal_capability_denied",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
return {
"allowed": True,
"reason": "portal_granted",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
# Plattform-Admin-Bypass für Mandanten-Funktionen (Audit-Pflicht, s. Katalog §9)
if is_platform_admin(tenant.global_role):
return {
"allowed": True,
"reason": "platform_admin_bypass",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
# Vereins-Capabilities: aktive Mitgliedschaft im Zielverein
if min_state == "active_member":
if eff_club is None:
return {
"allowed": False,
"reason": "no_club_context",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
if eff_club not in tenant.club_ids:
return {
"allowed": False,
"reason": "not_club_member",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
cur.execute(
"""
SELECT role_code FROM club_role_capability_grants
WHERE capability_id = %s
""",
(capability_id,),
)
required_roles = [r["role_code"] for r in cur.fetchall()]
if required_roles:
if not any(r in required_roles for r in club_roles):
return {
"allowed": False,
"reason": "club_role_denied",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
elif min_state == "active_member" and eff_club is not None:
# Offene Capability für alle aktiven Mitglieder — Mitgliedschaft reicht
pass
return {
"allowed": True,
"reason": "granted",
"account_state": account_state,
"club_roles": club_roles,
"linked_feature_id": cap.get("linked_feature_id"),
}
def resolve_capabilities_map(
cur,
tenant: "TenantContext",
*,
club_id: Optional[int] = None,
) -> Dict[str, bool]:
"""Alle aktiven Capabilities → bool (für späteres /me/entitlements)."""
cur.execute("SELECT id FROM capabilities WHERE active = true ORDER BY id")
ids = [r["id"] for r in cur.fetchall()]
out: Dict[str, bool] = {}
for cid in ids:
res = check_capability(cur, tenant, cid, club_id=club_id)
out[cid] = bool(res.get("allowed"))
return out
def probe_capability(
tenant: "TenantContext",
capability_id: str,
*,
action: str,
club_id: Optional[int] = None,
endpoint: Optional[str] = None,
conn=None,
) -> Dict[str, Any]:
"""Phase 2: Capability prüfen + JSON-Log; blockiert nur bei CAPABILITY_ENFORCE=1."""
from capability_logger import log_capability_check
def _run(c):
cur = get_cursor(c)
result = check_capability(cur, tenant, capability_id, club_id=club_id)
log_capability_check(
club_id=club_id if club_id is not None else tenant.effective_club_id,
profile_id=tenant.profile_id,
capability_id=capability_id,
action=action,
result=result,
endpoint=endpoint,
phase="enforce" if capability_enforcement_enabled() else "probe",
)
if capability_enforcement_enabled() and not result.get("allowed"):
raise HTTPException(
status_code=403,
detail=(
f"Keine Berechtigung für {capability_id} "
f"({result.get('reason', 'denied')})."
),
)
return result
if conn is not None:
return _run(conn)
with get_db() as c:
return _run(c)