""" Vereins-Mandanten: Mitgliedschaften, aktiver Vereinskontext, einfache Berechtigungen. Siehe .claude/docs/technical/MULTI_TENANCY_RBAC_ARCHITECTURE.md """ from typing import Any, Dict, List, Mapping, Optional, Set, Union from fastapi import HTTPException def is_platform_admin(role: Optional[str]) -> bool: return (role or "").lower() in ("admin", "superadmin") def is_superadmin(role: Optional[str]) -> bool: return (role or "").lower() == "superadmin" def club_ids_for_profile(cur, profile_id: int) -> Set[int]: cur.execute( """ SELECT club_id FROM club_members WHERE profile_id = %s AND status = 'active' """, (profile_id,), ) return {r["club_id"] for r in cur.fetchall()} def assert_club_member(cur, profile_id: int, club_id: int) -> None: cur.execute( """ SELECT 1 FROM club_members WHERE profile_id = %s AND club_id = %s AND status = 'active' LIMIT 1 """, (profile_id, club_id), ) if not cur.fetchone(): raise HTTPException(status_code=403, detail="Keine Mitgliedschaft in diesem Verein") def has_club_role(cur, profile_id: int, club_id: int, *role_codes: str) -> bool: if not role_codes: return False cur.execute( """ SELECT 1 FROM club_members cm INNER JOIN club_member_roles r ON r.club_member_id = cm.id WHERE cm.profile_id = %s AND cm.club_id = %s AND cm.status = 'active' AND r.role_code IN ({ph}) LIMIT 1 """.format(ph=",".join(["%s"] * len(role_codes))), (profile_id, club_id, *role_codes), ) return cur.fetchone() is not None def club_admin_shares_club_with_creator( cur, club_admin_profile_id: int, creator_profile_id: int ) -> bool: """ True, wenn club_admin_profile_id in mindestens einem Verein die Rolle club_admin hat und creator_profile_id dort ebenfalls aktives Mitglied ist (z. B. Löschen fremder privater Übungen). """ if club_admin_profile_id == creator_profile_id: return False cur.execute( """ SELECT 1 FROM club_members cm_admin INNER JOIN club_member_roles r ON r.club_member_id = cm_admin.id AND r.role_code = 'club_admin' INNER JOIN club_members cm_creator ON cm_creator.club_id = cm_admin.club_id AND cm_creator.profile_id = %s AND cm_creator.status = 'active' WHERE cm_admin.profile_id = %s AND cm_admin.status = 'active' LIMIT 1 """, (creator_profile_id, club_admin_profile_id), ) return cur.fetchone() is not None def can_manage_club_org(cur, profile_id: int, club_id: int, global_role: Optional[str]) -> bool: """Sparten/Gruppen/Struktur: Vereinsadmin oder Plattform-Admin.""" if is_platform_admin(global_role): return True return has_club_role(cur, profile_id, club_id, "club_admin") def can_plan_in_club(cur, profile_id: int, club_id: int, global_role: Optional[str]) -> bool: """Trainingsgruppe anlegen u.Ä.; Vereins-rollentrainer, Content-Editor, Spartenleitung … Hinweis: ``content_editor`` ist derzeit zusammen mit ``trainer``/``division_lead`` in diesem gemeinsamen Strang gebündelt — u.a. Vereinsübungen bearbeiten (s. exercises) und Trainingsgruppen unter ``clubs``. Es gibt noch keine eigene Nur-Content-Guard pro Endpunkt. """ if is_platform_admin(global_role): return True return has_club_role( cur, profile_id, club_id, "club_admin", "trainer", "content_editor", "division_lead" ) def memberships_with_roles(cur, profile_id: int, active_only: bool = True) -> List[Dict[str, Any]]: status_filter = "AND cm.status = 'active'" if active_only else "" cur.execute( f""" SELECT c.id, c.name, c.abbreviation, c.status, cm.status AS membership_status, COALESCE( ARRAY_AGG(DISTINCT r.role_code) FILTER (WHERE r.role_code IS NOT NULL), ARRAY[]::varchar[] ) AS roles FROM club_members cm INNER JOIN clubs c ON c.id = cm.club_id LEFT JOIN club_member_roles r ON r.club_member_id = cm.id WHERE cm.profile_id = %s {status_filter} GROUP BY c.id, c.name, c.abbreviation, c.status, cm.status ORDER BY c.name """, (profile_id,), ) out: List[Dict[str, Any]] = [] for row in cur.fetchall(): d = dict(row) roles = d.get("roles") or [] if hasattr(roles, "tolist"): roles = roles.tolist() d["roles"] = list(roles) out.append(d) return out def club_ids_for_profile_with_roles(cur, profile_id: int, *role_codes: str) -> Set[int]: """Vereins-IDs, in denen das Profil mindestens eine der Rollen hat.""" if not role_codes: return set() ph = ",".join(["%s"] * len(role_codes)) cur.execute( f""" SELECT DISTINCT cm.club_id FROM club_members cm INNER JOIN club_member_roles r ON r.club_member_id = cm.id WHERE cm.profile_id = %s AND cm.status = 'active' AND r.role_code IN ({ph}) """, (profile_id, *role_codes), ) return {int(r["club_id"]) for r in cur.fetchall() if r.get("club_id") is not None} _GOVERNANCE_VISIBILITY = frozenset({"private", "club", "official"}) def _library_governance_triplet( row: Mapping[str, Any], ) -> tuple[str, Optional[int], Optional[int]]: """visibility, club_id, created_by als normalisierte Werte für Bibliotheks-/Planungsartefakte.""" vis = str(row.get("visibility") or "private").strip().lower() if vis not in _GOVERNANCE_VISIBILITY: vis = "private" cid_raw = row.get("club_id") try: ex_cid = int(cid_raw) if cid_raw is not None else None except (TypeError, ValueError): ex_cid = None cr_raw = row.get("created_by") try: creator = int(cr_raw) if cr_raw is not None else None except (TypeError, ValueError): creator = None return vis, ex_cid, creator def assert_library_content_editable( cur, profile_id: int, role: Optional[str], row: Union[Dict[str, Any], Mapping[str, Any]], ) -> None: """Inhalt bearbeiten: wie Übungen — Ersteller, Plattform-Admin oder Planungsberechtigter im Verein.""" pid = int(profile_id) ex_vis, ex_cid, creator = _library_governance_triplet(row) if creator is not None and creator == pid: return if is_platform_admin(role): return if ex_vis == "club" and ex_cid is not None and can_plan_in_club(cur, pid, ex_cid, role): return raise HTTPException(status_code=403, detail="Keine Berechtigung zum Bearbeiten dieses Inhalts") def assert_library_content_deletable( cur, profile_id: int, role: Optional[str], row: Union[Dict[str, Any], Mapping[str, Any]], ) -> None: """Löschen: wie Übungen — privat Eigentümer/Vereins-Admin-Kontext, Verein nur Vereinsadmin, offiziell nur Plattform.""" pid = int(profile_id) if is_platform_admin(role): return vis, cid, creator = _library_governance_triplet(row) try: creator_int = int(creator) if creator is not None else None except (TypeError, ValueError): creator_int = None if vis == "official": raise HTTPException( status_code=403, detail="Offizielle Inhalte dürfen nur von Plattform-Admins gelöscht werden.", ) if vis == "club": try: ex_club = int(cid) if cid is not None else None except (TypeError, ValueError): ex_club = None if ex_club is None: raise HTTPException(status_code=400, detail="Vereinsinhalt ohne gültige Vereinszuordnung") if not has_club_role(cur, pid, ex_club, "club_admin"): raise HTTPException( status_code=403, detail="Nur Vereins-Admins dürfen Vereins-Inhalte löschen.", ) return if creator_int is not None and creator_int == pid: return if creator_int is not None and club_admin_shares_club_with_creator(cur, pid, creator_int): return raise HTTPException(status_code=403, detail="Keine Berechtigung zum Löschen dieses Inhalts") def assert_library_content_governance_transition( cur, profile_id: int, role: Optional[str], prev_row: Union[Dict[str, Any], Mapping[str, Any]], next_visibility: str, next_club_id: Optional[int], ) -> None: """ Zusätzliche Regeln beim Ändern von visibility/club_id (Zielzustand vor assert_valid_governance_visibility prüfen). - Abwahl „official“: nur Plattform-Admin. - private → club: nur Ersteller (oder Plattform-Admin). - club → private: Ersteller, Vereinsadmin im bisherigen Verein oder Plattform-Admin. - club → club mit Wechsel club_id: Vereinsadmin im alten oder neuen Verein oder Plattform-Admin. """ nv = str(next_visibility or "").strip().lower() if nv not in _GOVERNANCE_VISIBILITY: raise HTTPException(status_code=400, detail="Ungültige visibility") old_vis, old_cid, creator = _library_governance_triplet(prev_row) new_cid: Optional[int] try: new_cid = int(next_club_id) if next_club_id is not None else None except (TypeError, ValueError): new_cid = None pid = int(profile_id) try: creator_int = int(creator) if creator is not None else None except (TypeError, ValueError): creator_int = None if old_vis == nv and (nv != "club" or old_cid == new_cid): return if old_vis == "official" and nv != "official": if not is_platform_admin(role): raise HTTPException( status_code=403, detail="Nur Plattform-Admins dürfen offizielle Inhalte auf Verein oder privat setzen.", ) if nv == "official": return if old_vis == "private" and nv == "club": if creator_int is not None and creator_int != pid and not is_platform_admin(role): raise HTTPException( status_code=403, detail="Nur der Ersteller darf private Inhalte für den Verein freigeben.", ) return if old_vis == "club" and nv == "private": if is_platform_admin(role): return if creator_int is not None and creator_int == pid: return if old_cid is not None and has_club_role(cur, pid, old_cid, "club_admin"): return raise HTTPException( status_code=403, detail="Nur Ersteller, Vereins-Admins oder Plattform-Admins dürfen Vereins-Inhalte auf privat setzen.", ) if old_vis == "club" and nv == "club" and old_cid != new_cid: if is_platform_admin(role): return ok_old = old_cid is not None and has_club_role(cur, pid, old_cid, "club_admin") ok_new = new_cid is not None and has_club_role(cur, pid, new_cid, "club_admin") if ok_old or ok_new: return raise HTTPException( status_code=403, detail="Nur Vereins-Admins oder Plattform-Admins dürfen die Vereinszuordnung ändern.", ) def assert_valid_governance_visibility( cur, profile_id: int, role: Optional[str], visibility: str, club_id: Optional[int], ) -> None: """Pflicht club_id bei visibility=club; Mitgliedschaft außer Plattform-Admin; official nur Superadmin.""" if visibility not in _GOVERNANCE_VISIBILITY: raise HTTPException(status_code=400, detail="Ungültige visibility") if visibility == "official" and not is_superadmin(role): raise HTTPException( status_code=403, detail="Nur Superadmins dürfen offizielle Inhalte setzen", ) if visibility == "club": if club_id is None: raise HTTPException(status_code=400, detail="club_id ist bei visibility=club erforderlich") if is_platform_admin(role): cur.execute("SELECT 1 FROM clubs WHERE id = %s", (club_id,)) if not cur.fetchone(): raise HTTPException(status_code=400, detail="Verein nicht gefunden") else: assert_club_member(cur, profile_id, club_id) def library_content_visible_to_profile( cur, profile_id: int, visibility: str, content_club_id: Optional[int], created_by: Optional[int], global_role: Optional[str], ) -> bool: """Leserechte wie Übungen für alle Objekte mit visibility/club_id/created_by (Bibliothek & Co.).""" return exercise_visible_to_profile( cur, profile_id, visibility, content_club_id, created_by, global_role ) def exercise_visible_to_profile( cur, profile_id: int, visibility: str, exercise_club_id: Optional[int], created_by: Optional[int], global_role: Optional[str], ) -> bool: """ Leserechte einer Übung (und analoger Bibliotheksobjekte). Vereinsbezogene Inhalte (visibility club): aktiv nur mit **aktiver** Mitgliedschaft in diesem Verein. Mitgliedschaft mit status **inactive** sperrt — auch für Plattform-/Super-Admins — solange eine Mitgliedschaft existiert. Ist man kein Mitglied dieses Vereins, behalten Plattform-Admins den bisherigen „Audit“-Zugang zum Vereinskontext ohne eigene Mitgliedschaft. """ vis = (visibility or "").strip().lower() plat = is_platform_admin(global_role) pid = int(profile_id) if vis == "official": return True if created_by is not None and int(created_by) == pid: return True if vis == "private": return plat if vis != "club": return False if exercise_club_id is None: return False try: ecid = int(exercise_club_id) except (TypeError, ValueError): return False cur.execute( """ SELECT cm.status FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = %s LIMIT 1 """, (pid, ecid), ) row = cur.fetchone() if row is None: return plat st_raw = row["status"] if isinstance(row, dict) else row[0] return str(st_raw or "active").strip().lower() == "active"