shinkan-jinkendo/backend/club_tenancy.py
Lars abee6171df
Some checks failed
Deploy Development / deploy (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Failing after 34s
feat: enhance access layer governance and visibility checks
- Added new documentation references for access layer governance in CLAUDE.md, including multi-tenancy and endpoint audit guidelines.
- Updated ACCESS_LAYER_AND_GOVERNANCE_PLAN.md to include cursor and heuristic checks for access layer compliance.
- Enhanced ACCESS_LAYER_ENDPOINT_AUDIT.md to clarify endpoint visibility and governance requirements, including exemptions for certain routers.
- Introduced library_content_visible_to_profile function in club_tenancy.py to streamline visibility checks for library content.
- Updated exercise progression graphs router to utilize the new visibility function, improving access control.
- Bumped application version to 0.8.27 and updated changelog to reflect these changes.
2026-05-05 22:09:25 +02:00

173 lines
5.6 KiB
Python

"""
Vereins-Mandanten: Mitgliedschaften, aktiver Vereinskontext, einfache Berechtigungen.
Siehe .claude/docs/technical/MULTI_TENANCY_RBAC_ARCHITECTURE.md
"""
from typing import Any, Dict, List, Optional, Set
from fastapi import HTTPException
def is_platform_admin(role: Optional[str]) -> bool:
return (role or "").lower() in ("admin", "superadmin")
def club_ids_for_profile(cur, profile_id: int) -> Set[int]:
cur.execute(
"""
SELECT club_id FROM club_members
WHERE profile_id = %s AND status = 'active'
""",
(profile_id,),
)
return {r["club_id"] for r in cur.fetchall()}
def assert_club_member(cur, profile_id: int, club_id: int) -> None:
cur.execute(
"""
SELECT 1 FROM club_members
WHERE profile_id = %s AND club_id = %s AND status = 'active'
LIMIT 1
""",
(profile_id, club_id),
)
if not cur.fetchone():
raise HTTPException(status_code=403, detail="Keine Mitgliedschaft in diesem Verein")
def has_club_role(cur, profile_id: int, club_id: int, *role_codes: str) -> bool:
if not role_codes:
return False
cur.execute(
"""
SELECT 1
FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.club_id = %s AND cm.status = 'active'
AND r.role_code IN ({ph})
LIMIT 1
""".format(ph=",".join(["%s"] * len(role_codes))),
(profile_id, club_id, *role_codes),
)
return cur.fetchone() is not None
def can_manage_club_org(cur, profile_id: int, club_id: int, global_role: Optional[str]) -> bool:
"""Sparten/Gruppen/Struktur: Vereinsadmin oder Plattform-Admin."""
if is_platform_admin(global_role):
return True
return has_club_role(cur, profile_id, club_id, "club_admin")
def can_plan_in_club(cur, profile_id: int, club_id: int, global_role: Optional[str]) -> bool:
"""Trainingsgruppen anlegen / planen: Admin-Rollen im Verein oder Plattform."""
if is_platform_admin(global_role):
return True
return has_club_role(
cur, profile_id, club_id, "club_admin", "trainer", "content_editor", "division_lead"
)
def memberships_with_roles(cur, profile_id: int, active_only: bool = True) -> List[Dict[str, Any]]:
status_filter = "AND cm.status = 'active'" if active_only else ""
cur.execute(
f"""
SELECT c.id, c.name, c.abbreviation, c.status,
cm.status AS membership_status,
COALESCE(
ARRAY_AGG(DISTINCT r.role_code) FILTER (WHERE r.role_code IS NOT NULL),
ARRAY[]::varchar[]
) AS roles
FROM club_members cm
INNER JOIN clubs c ON c.id = cm.club_id
LEFT JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s {status_filter}
GROUP BY c.id, c.name, c.abbreviation, c.status, cm.status
ORDER BY c.name
""",
(profile_id,),
)
out: List[Dict[str, Any]] = []
for row in cur.fetchall():
d = dict(row)
roles = d.get("roles") or []
if hasattr(roles, "tolist"):
roles = roles.tolist()
d["roles"] = list(roles)
out.append(d)
return out
_GOVERNANCE_VISIBILITY = frozenset({"private", "club", "official"})
def assert_valid_governance_visibility(
cur,
profile_id: int,
role: Optional[str],
visibility: str,
club_id: Optional[int],
) -> None:
"""Pflicht club_id bei visibility=club; Mitgliedschaft außer Plattform-Admin; official nur Plattform-Admin."""
if visibility not in _GOVERNANCE_VISIBILITY:
raise HTTPException(status_code=400, detail="Ungültige visibility")
if visibility == "official" and not is_platform_admin(role):
raise HTTPException(
status_code=403,
detail="Nur Plattform-Admins dürfen offizielle Inhalte setzen",
)
if visibility == "club":
if club_id is None:
raise HTTPException(status_code=400, detail="club_id ist bei visibility=club erforderlich")
if is_platform_admin(role):
cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Verein nicht gefunden")
else:
assert_club_member(cur, profile_id, club_id)
def library_content_visible_to_profile(
cur,
profile_id: int,
visibility: str,
content_club_id: Optional[int],
created_by: Optional[int],
global_role: Optional[str],
) -> bool:
"""Leserechte wie Übungen für alle Objekte mit visibility/club_id/created_by (Bibliothek & Co.)."""
return exercise_visible_to_profile(
cur, profile_id, visibility, content_club_id, created_by, global_role
)
def exercise_visible_to_profile(
cur,
profile_id: int,
visibility: str,
exercise_club_id: Optional[int],
created_by: Optional[int],
global_role: Optional[str],
) -> bool:
if is_platform_admin(global_role):
return True
if visibility == "official":
return True
if created_by is not None and created_by == profile_id:
return True
if visibility == "private":
return False
if visibility == "club":
if exercise_club_id is None:
return False
cur.execute(
"""
SELECT 1 FROM club_members
WHERE profile_id = %s AND club_id = %s AND status = 'active'
LIMIT 1
""",
(profile_id, exercise_club_id),
)
return cur.fetchone() is not None
return False