shinkan-jinkendo/backend/club_tenancy.py
Lars 0275f76432
Some checks failed
Deploy Development / deploy (push) Successful in 40s
Test Suite / pytest-backend (push) Successful in 41s
Test Suite / lint-backend (push) Successful in 1s
Test Suite / build-frontend (push) Successful in 11s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Has been cancelled
Implement RBAC for library content management in club tenancy
- Introduced new functions for managing edit, delete, and governance transition permissions for library content, aligning with role-based access control (RBAC) principles.
- Updated existing routers to utilize these new functions, ensuring consistent permission checks across training frameworks, modules, and progression graphs.
- Enhanced visibility and governance handling for training plan templates and library content, improving overall content management and user experience.
- Incremented app version to 0.8.142 and updated changelog to reflect these changes.
2026-05-16 10:53:00 +02:00

407 lines
14 KiB
Python

"""
Vereins-Mandanten: Mitgliedschaften, aktiver Vereinskontext, einfache Berechtigungen.
Siehe .claude/docs/technical/MULTI_TENANCY_RBAC_ARCHITECTURE.md
"""
from typing import Any, Dict, List, Mapping, Optional, Set, Union
from fastapi import HTTPException
def is_platform_admin(role: Optional[str]) -> bool:
return (role or "").lower() in ("admin", "superadmin")
def is_superadmin(role: Optional[str]) -> bool:
return (role or "").lower() == "superadmin"
def club_ids_for_profile(cur, profile_id: int) -> Set[int]:
cur.execute(
"""
SELECT club_id FROM club_members
WHERE profile_id = %s AND status = 'active'
""",
(profile_id,),
)
return {r["club_id"] for r in cur.fetchall()}
def assert_club_member(cur, profile_id: int, club_id: int) -> None:
cur.execute(
"""
SELECT 1 FROM club_members
WHERE profile_id = %s AND club_id = %s AND status = 'active'
LIMIT 1
""",
(profile_id, club_id),
)
if not cur.fetchone():
raise HTTPException(status_code=403, detail="Keine Mitgliedschaft in diesem Verein")
def has_club_role(cur, profile_id: int, club_id: int, *role_codes: str) -> bool:
if not role_codes:
return False
cur.execute(
"""
SELECT 1
FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.club_id = %s AND cm.status = 'active'
AND r.role_code IN ({ph})
LIMIT 1
""".format(ph=",".join(["%s"] * len(role_codes))),
(profile_id, club_id, *role_codes),
)
return cur.fetchone() is not None
def club_admin_shares_club_with_creator(
cur, club_admin_profile_id: int, creator_profile_id: int
) -> bool:
"""
True, wenn club_admin_profile_id in mindestens einem Verein die Rolle club_admin hat und
creator_profile_id dort ebenfalls aktives Mitglied ist (z. B. Löschen fremder privater Übungen).
"""
if club_admin_profile_id == creator_profile_id:
return False
cur.execute(
"""
SELECT 1
FROM club_members cm_admin
INNER JOIN club_member_roles r
ON r.club_member_id = cm_admin.id AND r.role_code = 'club_admin'
INNER JOIN club_members cm_creator
ON cm_creator.club_id = cm_admin.club_id
AND cm_creator.profile_id = %s
AND cm_creator.status = 'active'
WHERE cm_admin.profile_id = %s AND cm_admin.status = 'active'
LIMIT 1
""",
(creator_profile_id, club_admin_profile_id),
)
return cur.fetchone() is not None
def can_manage_club_org(cur, profile_id: int, club_id: int, global_role: Optional[str]) -> bool:
"""Sparten/Gruppen/Struktur: Vereinsadmin oder Plattform-Admin."""
if is_platform_admin(global_role):
return True
return has_club_role(cur, profile_id, club_id, "club_admin")
def can_plan_in_club(cur, profile_id: int, club_id: int, global_role: Optional[str]) -> bool:
"""Trainingsgruppe anlegen u.Ä.; Vereins-rollentrainer, Content-Editor, Spartenleitung …
Hinweis: ``content_editor`` ist derzeit zusammen mit ``trainer``/``division_lead`` in diesem
gemeinsamen Strang gebündelt — u.a. Vereinsübungen bearbeiten (s. exercises) und
Trainingsgruppen unter ``clubs``. Es gibt noch keine eigene Nur-Content-Guard pro Endpunkt.
"""
if is_platform_admin(global_role):
return True
return has_club_role(
cur, profile_id, club_id, "club_admin", "trainer", "content_editor", "division_lead"
)
def memberships_with_roles(cur, profile_id: int, active_only: bool = True) -> List[Dict[str, Any]]:
status_filter = "AND cm.status = 'active'" if active_only else ""
cur.execute(
f"""
SELECT c.id, c.name, c.abbreviation, c.status,
cm.status AS membership_status,
COALESCE(
ARRAY_AGG(DISTINCT r.role_code) FILTER (WHERE r.role_code IS NOT NULL),
ARRAY[]::varchar[]
) AS roles
FROM club_members cm
INNER JOIN clubs c ON c.id = cm.club_id
LEFT JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s {status_filter}
GROUP BY c.id, c.name, c.abbreviation, c.status, cm.status
ORDER BY c.name
""",
(profile_id,),
)
out: List[Dict[str, Any]] = []
for row in cur.fetchall():
d = dict(row)
roles = d.get("roles") or []
if hasattr(roles, "tolist"):
roles = roles.tolist()
d["roles"] = list(roles)
out.append(d)
return out
def club_ids_for_profile_with_roles(cur, profile_id: int, *role_codes: str) -> Set[int]:
"""Vereins-IDs, in denen das Profil mindestens eine der Rollen hat."""
if not role_codes:
return set()
ph = ",".join(["%s"] * len(role_codes))
cur.execute(
f"""
SELECT DISTINCT cm.club_id
FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.status = 'active'
AND r.role_code IN ({ph})
""",
(profile_id, *role_codes),
)
return {int(r["club_id"]) for r in cur.fetchall() if r.get("club_id") is not None}
_GOVERNANCE_VISIBILITY = frozenset({"private", "club", "official"})
def _library_governance_triplet(
row: Mapping[str, Any],
) -> tuple[str, Optional[int], Optional[int]]:
"""visibility, club_id, created_by als normalisierte Werte für Bibliotheks-/Planungsartefakte."""
vis = str(row.get("visibility") or "private").strip().lower()
if vis not in _GOVERNANCE_VISIBILITY:
vis = "private"
cid_raw = row.get("club_id")
try:
ex_cid = int(cid_raw) if cid_raw is not None else None
except (TypeError, ValueError):
ex_cid = None
cr_raw = row.get("created_by")
try:
creator = int(cr_raw) if cr_raw is not None else None
except (TypeError, ValueError):
creator = None
return vis, ex_cid, creator
def assert_library_content_editable(
cur,
profile_id: int,
role: Optional[str],
row: Union[Dict[str, Any], Mapping[str, Any]],
) -> None:
"""Inhalt bearbeiten: wie Übungen — Ersteller, Plattform-Admin oder Planungsberechtigter im Verein."""
pid = int(profile_id)
ex_vis, ex_cid, creator = _library_governance_triplet(row)
if creator is not None and creator == pid:
return
if is_platform_admin(role):
return
if ex_vis == "club" and ex_cid is not None and can_plan_in_club(cur, pid, ex_cid, role):
return
raise HTTPException(status_code=403, detail="Keine Berechtigung zum Bearbeiten dieses Inhalts")
def assert_library_content_deletable(
cur,
profile_id: int,
role: Optional[str],
row: Union[Dict[str, Any], Mapping[str, Any]],
) -> None:
"""Löschen: wie Übungen — privat Eigentümer/Vereins-Admin-Kontext, Verein nur Vereinsadmin, offiziell nur Plattform."""
pid = int(profile_id)
if is_platform_admin(role):
return
vis, cid, creator = _library_governance_triplet(row)
try:
creator_int = int(creator) if creator is not None else None
except (TypeError, ValueError):
creator_int = None
if vis == "official":
raise HTTPException(
status_code=403,
detail="Offizielle Inhalte dürfen nur von Plattform-Admins gelöscht werden.",
)
if vis == "club":
try:
ex_club = int(cid) if cid is not None else None
except (TypeError, ValueError):
ex_club = None
if ex_club is None:
raise HTTPException(status_code=400, detail="Vereinsinhalt ohne gültige Vereinszuordnung")
if not has_club_role(cur, pid, ex_club, "club_admin"):
raise HTTPException(
status_code=403,
detail="Nur Vereins-Admins dürfen Vereins-Inhalte löschen.",
)
return
if creator_int is not None and creator_int == pid:
return
if creator_int is not None and club_admin_shares_club_with_creator(cur, pid, creator_int):
return
raise HTTPException(status_code=403, detail="Keine Berechtigung zum Löschen dieses Inhalts")
def assert_library_content_governance_transition(
cur,
profile_id: int,
role: Optional[str],
prev_row: Union[Dict[str, Any], Mapping[str, Any]],
next_visibility: str,
next_club_id: Optional[int],
) -> None:
"""
Zusätzliche Regeln beim Ändern von visibility/club_id (Zielzustand vor assert_valid_governance_visibility prüfen).
- Abwahl „official“: nur Plattform-Admin.
- private → club: nur Ersteller (oder Plattform-Admin).
- club → private: Ersteller, Vereinsadmin im bisherigen Verein oder Plattform-Admin.
- club → club mit Wechsel club_id: Vereinsadmin im alten oder neuen Verein oder Plattform-Admin.
"""
nv = str(next_visibility or "").strip().lower()
if nv not in _GOVERNANCE_VISIBILITY:
raise HTTPException(status_code=400, detail="Ungültige visibility")
old_vis, old_cid, creator = _library_governance_triplet(prev_row)
new_cid: Optional[int]
try:
new_cid = int(next_club_id) if next_club_id is not None else None
except (TypeError, ValueError):
new_cid = None
pid = int(profile_id)
try:
creator_int = int(creator) if creator is not None else None
except (TypeError, ValueError):
creator_int = None
if old_vis == nv and (nv != "club" or old_cid == new_cid):
return
if old_vis == "official" and nv != "official":
if not is_platform_admin(role):
raise HTTPException(
status_code=403,
detail="Nur Plattform-Admins dürfen offizielle Inhalte auf Verein oder privat setzen.",
)
if nv == "official":
return
if old_vis == "private" and nv == "club":
if creator_int is not None and creator_int != pid and not is_platform_admin(role):
raise HTTPException(
status_code=403,
detail="Nur der Ersteller darf private Inhalte für den Verein freigeben.",
)
return
if old_vis == "club" and nv == "private":
if is_platform_admin(role):
return
if creator_int is not None and creator_int == pid:
return
if old_cid is not None and has_club_role(cur, pid, old_cid, "club_admin"):
return
raise HTTPException(
status_code=403,
detail="Nur Ersteller, Vereins-Admins oder Plattform-Admins dürfen Vereins-Inhalte auf privat setzen.",
)
if old_vis == "club" and nv == "club" and old_cid != new_cid:
if is_platform_admin(role):
return
ok_old = old_cid is not None and has_club_role(cur, pid, old_cid, "club_admin")
ok_new = new_cid is not None and has_club_role(cur, pid, new_cid, "club_admin")
if ok_old or ok_new:
return
raise HTTPException(
status_code=403,
detail="Nur Vereins-Admins oder Plattform-Admins dürfen die Vereinszuordnung ändern.",
)
def assert_valid_governance_visibility(
cur,
profile_id: int,
role: Optional[str],
visibility: str,
club_id: Optional[int],
) -> None:
"""Pflicht club_id bei visibility=club; Mitgliedschaft außer Plattform-Admin; official nur Superadmin."""
if visibility not in _GOVERNANCE_VISIBILITY:
raise HTTPException(status_code=400, detail="Ungültige visibility")
if visibility == "official" and not is_superadmin(role):
raise HTTPException(
status_code=403,
detail="Nur Superadmins dürfen offizielle Inhalte setzen",
)
if visibility == "club":
if club_id is None:
raise HTTPException(status_code=400, detail="club_id ist bei visibility=club erforderlich")
if is_platform_admin(role):
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Verein nicht gefunden")
else:
assert_club_member(cur, profile_id, club_id)
def library_content_visible_to_profile(
cur,
profile_id: int,
visibility: str,
content_club_id: Optional[int],
created_by: Optional[int],
global_role: Optional[str],
) -> bool:
"""Leserechte wie Übungen für alle Objekte mit visibility/club_id/created_by (Bibliothek & Co.)."""
return exercise_visible_to_profile(
cur, profile_id, visibility, content_club_id, created_by, global_role
)
def exercise_visible_to_profile(
cur,
profile_id: int,
visibility: str,
exercise_club_id: Optional[int],
created_by: Optional[int],
global_role: Optional[str],
) -> bool:
"""
Leserechte einer Übung (und analoger Bibliotheksobjekte).
Vereinsbezogene Inhalte (visibility club): aktiv nur mit **aktiver** Mitgliedschaft in diesem Verein.
Mitgliedschaft mit status **inactive** sperrt — auch für Plattform-/Super-Admins — solange eine
Mitgliedschaft existiert.
Ist man kein Mitglied dieses Vereins, behalten Plattform-Admins den bisherigen „Audit“-Zugang
zum Vereinskontext ohne eigene Mitgliedschaft.
"""
vis = (visibility or "").strip().lower()
plat = is_platform_admin(global_role)
pid = int(profile_id)
if vis == "official":
return True
if created_by is not None and int(created_by) == pid:
return True
if vis == "private":
return plat
if vis != "club":
return False
if exercise_club_id is None:
return False
try:
ecid = int(exercise_club_id)
except (TypeError, ValueError):
return False
cur.execute(
"""
SELECT cm.status
FROM club_members cm
WHERE cm.profile_id = %s AND cm.club_id = %s
LIMIT 1
""",
(pid, ecid),
)
row = cur.fetchone()
if row is None:
return plat
st_raw = row["status"] if isinstance(row, dict) else row[0]
return str(st_raw or "active").strip().lower() == "active"