All checks were successful
Deploy Development / deploy (push) Successful in 34s
Test Suite / pytest-backend (push) Successful in 25s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 23s
- Integrated a new password reset mechanism for user accounts, allowing admins to send reset links via email. - Updated the management password reset functionality to differentiate between direct password setting and email link requests. - Added validation to ensure at least one active club admin remains when modifying club member roles. - Improved the user interface for password management in the admin panel, providing clearer feedback and options for password resets.
470 lines
18 KiB
Python
470 lines
18 KiB
Python
"""
|
|
Profile Management Endpoints for Mitai Jinkendo
|
|
|
|
Handles profile CRUD operations for both admin and current user.
|
|
"""
|
|
import secrets
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, HTTPException, Header, Depends
|
|
|
|
from psycopg2.extras import Json
|
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_auth, hash_pin
|
|
from club_tenancy import (
|
|
assert_club_member,
|
|
club_ids_for_profile_with_roles,
|
|
is_platform_admin,
|
|
is_superadmin,
|
|
memberships_with_roles,
|
|
)
|
|
from tenant_context import resolve_tenant_context, TenantContext, get_tenant_context
|
|
from models import ProfileCreate, ProfileUpdate
|
|
|
|
router = APIRouter(prefix="/api", tags=["profiles"])
|
|
|
|
_ALLOWED_PORTAL_ROLES = frozenset({"user", "trainer", "admin", "superadmin"})
|
|
_CLUB_BLOCKED_PORTAL_TARGETS = frozenset({"admin", "superadmin"})
|
|
|
|
|
|
class ManagementPasswordResetBody(BaseModel):
|
|
"""Optional: Nur Super-Admins dürfen `new_password` setzen — sonst E-Mail-Link (wie Passwort vergessen)."""
|
|
|
|
new_password: Optional[str] = Field(None, min_length=8, max_length=128)
|
|
|
|
@field_validator("new_password", mode="before")
|
|
@classmethod
|
|
def _empty_pw_none(cls, v):
|
|
if v is None:
|
|
return None
|
|
if isinstance(v, str) and not v.strip():
|
|
return None
|
|
return v
|
|
|
|
|
|
def _target_portal_role_lower(cur, target_pid: int) -> str:
|
|
cur.execute("SELECT lower(trim(COALESCE(role, ''))) AS r FROM profiles WHERE id = %s", (target_pid,))
|
|
row = cur.fetchone()
|
|
return (row.get("r") or "user") if row else "user"
|
|
|
|
|
|
def _assert_can_management_password_help(cur, tenant: TenantContext, target_pid: int, *, via_email: bool) -> None:
|
|
"""
|
|
Wer darf eines anderen Accounts Passwort-Helfer nutzen (E-Mail-Link oder — nur Superadmin — direktes Setzen)?
|
|
Superadmin / Portal-Admin global; Vereinsadmin nur für aktives Mitglied in gemeinsam verwaltetem Verein.
|
|
"""
|
|
viewer_pid = int(tenant.profile_id)
|
|
if target_pid == viewer_pid:
|
|
raise HTTPException(status_code=400, detail="Eigenes Passwort unter Einstellungen ändern")
|
|
role_raw = tenant.global_role or ""
|
|
if is_superadmin(role_raw):
|
|
return
|
|
if is_platform_admin(role_raw):
|
|
return
|
|
managed = club_ids_for_profile_with_roles(cur, viewer_pid, "club_admin")
|
|
if not managed:
|
|
raise HTTPException(status_code=403, detail="Keine Berechtigung")
|
|
cur.execute(
|
|
"""
|
|
SELECT 1 FROM club_members
|
|
WHERE profile_id = %s AND club_id = ANY(%s) AND status = 'active'
|
|
LIMIT 1
|
|
""",
|
|
(target_pid, list(managed)),
|
|
)
|
|
if not cur.fetchone():
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur für Nutzer, die in mindestens einem deiner Vereine (als Admin) aktiv sind",
|
|
)
|
|
if via_email and _target_portal_role_lower(cur, target_pid) in _CLUB_BLOCKED_PORTAL_TARGETS:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Für Konten mit Portal-Administrator- oder Super-Administrator-Rolle ist das nur für Super-Admins möglich",
|
|
)
|
|
|
|
|
|
# ── Current User Profile ──────────────────────────────────────────────────────
|
|
@router.get("/profiles/me")
|
|
def get_current_profile(
|
|
session=Depends(require_auth),
|
|
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
|
|
):
|
|
"""Profil inkl. Vereinsmitgliedschaften; effective_club_id = aufgelöster Request-Kontext (Header vor Profilfeld)."""
|
|
profile_id = session["profile_id"]
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Profil nicht gefunden")
|
|
data = r2d(row)
|
|
data.pop("pin_hash", None)
|
|
clubs = memberships_with_roles(cur, profile_id)
|
|
data["clubs"] = clubs
|
|
ac_raw = data.get("active_club_id")
|
|
stored_ac = int(ac_raw) if ac_raw is not None and ac_raw != "" else None
|
|
tenant = resolve_tenant_context(
|
|
cur,
|
|
profile_id=int(profile_id),
|
|
global_role=session.get("role") or "",
|
|
header_raw=x_active_club_id,
|
|
memberships=clubs,
|
|
stored_active_club_id=stored_ac,
|
|
invalid_header_policy="ignore",
|
|
)
|
|
data["effective_club_id"] = tenant.effective_club_id
|
|
return data
|
|
|
|
|
|
# ── Admin Profile Management ──────────────────────────────────────────────────
|
|
@router.get("/profiles")
|
|
def list_profiles(session=Depends(require_auth)):
|
|
"""Liste aller Profile (nur Plattform-Admin)."""
|
|
role = (session.get("role") or "").lower()
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(status_code=403, detail="Nur Plattform-Administratoren dürfen alle Profile einsehen")
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles ORDER BY created")
|
|
rows = cur.fetchall()
|
|
out = []
|
|
for r in rows:
|
|
d = r2d(r)
|
|
d.pop("pin_hash", None)
|
|
out.append(d)
|
|
return out
|
|
|
|
|
|
@router.post("/profiles")
|
|
def create_profile(p: ProfileCreate, session=Depends(require_auth)):
|
|
"""
|
|
Neues Profil anlegen — nur Plattform-Admin.
|
|
|
|
Nutzt gleiches Kernschema wie Registrierung (SERIAL id); PIN wird zufällig gesetzt — Nutzer
|
|
setzt das Passwort über „Passwort vergessen“ oder ein späteres Admin-Tool.
|
|
"""
|
|
role = session.get("role") or ""
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Plattform-Administratoren dürfen Profile anlegen",
|
|
)
|
|
|
|
email_norm = str(p.email).strip().lower()
|
|
name_s = (p.name or "").strip()
|
|
|
|
pin_hash = hash_pin(secrets.token_urlsafe(24))
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT id FROM profiles WHERE email IS NOT NULL AND lower(trim(email)) = %s",
|
|
(email_norm,),
|
|
)
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=409, detail="E-Mail bereits registriert")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO profiles (
|
|
name, email, pin_hash, auth_type, role, tier,
|
|
email_verified, created_at, updated_at
|
|
)
|
|
VALUES (%s, %s, %s, 'email', 'user', 'free', TRUE,
|
|
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
RETURNING id
|
|
""",
|
|
(name_s, email_norm, pin_hash),
|
|
)
|
|
row = cur.fetchone()
|
|
new_id = row["id"] if row else None
|
|
if new_id is None:
|
|
raise HTTPException(status_code=500, detail="Profil konnte nicht angelegt werden")
|
|
|
|
return profile_document(str(new_id))
|
|
|
|
|
|
def profile_document(pid: str) -> dict:
|
|
"""Profil ohne PIN — für Routen und interne Aufrufe (z. B. /auth/me)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Profil nicht gefunden")
|
|
d = r2d(row)
|
|
d.pop("pin_hash", None)
|
|
return d
|
|
|
|
|
|
@router.post("/profiles/{pid}/management-password-reset")
|
|
def management_password_reset(
|
|
pid: str,
|
|
body: ManagementPasswordResetBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
"""
|
|
Standard: E-Mail mit Reset-Link wie „Passwort vergessen“ — der PIN-Hash bleibt bis zur Bestätigung unverändert.
|
|
Nur Super-Admins können optional `new_password` setzen (Ausnahme).
|
|
"""
|
|
from routers.auth import send_email
|
|
from password_reset_mail import issue_password_reset_via_email
|
|
|
|
try:
|
|
target = int(pid)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="Ungültige Profil-ID")
|
|
|
|
direct = body.new_password is not None
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT id, email, name FROM profiles WHERE id = %s", (target,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Profil nicht gefunden")
|
|
|
|
if direct:
|
|
role_raw = tenant.global_role or ""
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Direktes Setzen eines Passworts ist nur Super-Admins vorbehalten. Bitte E-Mail-Link verwenden.",
|
|
)
|
|
_assert_can_management_password_help(cur, tenant, target, via_email=False)
|
|
new_hash = hash_pin(body.new_password)
|
|
cur.execute(
|
|
"UPDATE profiles SET pin_hash = %s, updated_at = NOW() WHERE id = %s",
|
|
(new_hash, target),
|
|
)
|
|
return {"ok": True, "mode": "direct"}
|
|
|
|
_assert_can_management_password_help(cur, tenant, target, via_email=True)
|
|
email = (row.get("email") or "").strip()
|
|
if not email:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Für dieses Profil ist keine E-Mail-Adresse hinterlegt; ein Reset-Link kann nicht versendet werden.",
|
|
)
|
|
name = row.get("name")
|
|
intro = "Ein Administrator hat für dein Konto einen sicheren Link zum Setzen eines neuen Passworts angefordert."
|
|
sent = issue_password_reset_via_email(
|
|
cur,
|
|
send_email,
|
|
profile_id=target,
|
|
email=email.lower(),
|
|
name=name,
|
|
intro=intro,
|
|
)
|
|
return {"ok": True, "mode": "email", "email_sent": bool(sent)}
|
|
|
|
|
|
@router.get("/profiles/{pid}")
|
|
def get_profile(pid: str, session=Depends(require_auth)):
|
|
"""Profil nach ID — nur eigenes Profil oder Plattform-Admin (wie PUT)."""
|
|
viewer = session["profile_id"]
|
|
role = session.get("role") or ""
|
|
if str(viewer) != str(pid) and not is_platform_admin(role):
|
|
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Profil")
|
|
return profile_document(pid)
|
|
def _run_profile_update(pid: str, p: ProfileUpdate, tenant: TenantContext) -> dict:
|
|
"""Gemeinsame PUT-Logik für /profiles/{id} und Legacy /profile."""
|
|
sess_pid = tenant.profile_id
|
|
role = tenant.global_role
|
|
if str(sess_pid) != str(pid) and role not in ("admin", "superadmin"):
|
|
raise HTTPException(403, "Keine Berechtigung für dieses Profil")
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Profil nicht gefunden")
|
|
rowd = r2d(row)
|
|
cur_email_norm = (rowd.get("email") or "").strip().lower()
|
|
|
|
patch = p.model_dump(exclude_unset=True)
|
|
data = {}
|
|
|
|
if "role" in patch or "tier" in patch:
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Portal-Admins dürfen Rolle oder Tier ändern",
|
|
)
|
|
|
|
if "role" in patch:
|
|
new_role = (patch["role"] or "").strip().lower()
|
|
if new_role not in _ALLOWED_PORTAL_ROLES:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Ungültige Portal-Rolle. Erlaubt: {', '.join(sorted(_ALLOWED_PORTAL_ROLES))}",
|
|
)
|
|
if new_role == "superadmin" and role != "superadmin":
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Super-Admins dürfen die Rolle Super-Admin vergeben",
|
|
)
|
|
old_r = (rowd.get("role") or "user").strip().lower()
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*)::int AS c FROM profiles
|
|
WHERE lower(trim(role)) IN ('admin','superadmin')
|
|
"""
|
|
)
|
|
admin_cnt = int(cur.fetchone()["c"])
|
|
if old_r in ("admin", "superadmin") and new_role not in ("admin", "superadmin"):
|
|
if admin_cnt <= 1:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Der letzte Portal-Administrator kann nicht zurückgestuft werden",
|
|
)
|
|
data["role"] = new_role
|
|
del patch["role"]
|
|
|
|
if "tier" in patch:
|
|
tv = patch["tier"]
|
|
if tv is None:
|
|
data["tier"] = "free"
|
|
else:
|
|
ts = str(tv).strip()
|
|
data["tier"] = (ts or "free")[:50]
|
|
del patch["tier"]
|
|
|
|
if "email" in patch:
|
|
ev = patch["email"]
|
|
if ev is None or (isinstance(ev, str) and ev.strip() == ""):
|
|
if rowd.get("email") is not None:
|
|
data["email"] = None
|
|
data["email_verified"] = False
|
|
data["verification_token"] = None
|
|
data["verification_expires"] = None
|
|
else:
|
|
email_norm = ev.strip().lower()
|
|
if "@" not in email_norm or len(email_norm) < 5:
|
|
raise HTTPException(400, "Ungültige E-Mail-Adresse")
|
|
cur.execute(
|
|
"""
|
|
SELECT id FROM profiles
|
|
WHERE email IS NOT NULL AND lower(trim(email)) = %s AND id <> %s
|
|
""",
|
|
(email_norm, pid),
|
|
)
|
|
if cur.fetchone():
|
|
raise HTTPException(409, "E-Mail wird bereits verwendet")
|
|
data["email"] = email_norm
|
|
if email_norm != cur_email_norm:
|
|
data["email_verified"] = False
|
|
data["verification_token"] = None
|
|
data["verification_expires"] = None
|
|
|
|
if "active_club_id" in patch:
|
|
ac = patch["active_club_id"]
|
|
if ac is None:
|
|
data["active_club_id"] = None
|
|
else:
|
|
try:
|
|
cid = int(ac)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(400, "active_club_id ungültig")
|
|
assert_club_member(cur, int(pid), cid)
|
|
data["active_club_id"] = cid
|
|
|
|
if "exercise_list_prefs" in patch:
|
|
ep = patch.pop("exercise_list_prefs")
|
|
if ep is None:
|
|
data["exercise_list_prefs"] = Json({})
|
|
elif isinstance(ep, dict):
|
|
data["exercise_list_prefs"] = Json(ep)
|
|
else:
|
|
raise HTTPException(400, "exercise_list_prefs muss ein JSON-Objekt sein")
|
|
|
|
nullable_keys = {"goal_weight", "goal_bf_pct", "dob"}
|
|
for k, v in patch.items():
|
|
if k == "email":
|
|
continue
|
|
if k == "active_club_id":
|
|
continue
|
|
if v is None and k in nullable_keys:
|
|
data[k] = None
|
|
elif v is not None:
|
|
data[k] = v
|
|
|
|
if not data:
|
|
return profile_document(pid)
|
|
|
|
data["updated_at"] = datetime.now()
|
|
cols = ", ".join(f"{k}=%s" for k in data)
|
|
vals = list(data.values()) + [pid]
|
|
cur.execute(f"UPDATE profiles SET {cols} WHERE id=%s", vals)
|
|
return profile_document(pid)
|
|
|
|
|
|
@router.put("/profiles/{pid}")
|
|
def update_profile(pid: str, p: ProfileUpdate, tenant: TenantContext = Depends(get_tenant_context)):
|
|
"""Update profile — nur eigenes Profil oder Admin; TenantContext validiert X-Active-Club-Id."""
|
|
return _run_profile_update(pid, p, tenant)
|
|
|
|
|
|
@router.delete("/profiles/{pid}")
|
|
def delete_profile(pid: str, session=Depends(require_auth)):
|
|
"""Profil löschen — nur Plattform-Admin; letztes Profil wird geschützt."""
|
|
role = session.get("role") or ""
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Plattform-Administratoren dürfen Profile löschen",
|
|
)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT COUNT(*) as count FROM profiles")
|
|
count = cur.fetchone()['count']
|
|
if count <= 1: raise HTTPException(400, "Letztes Profil kann nicht gelöscht werden")
|
|
# Mitai-Überbleibsel: nur löschen, wenn die Tabelle im Schema existiert (Shinkan-DB ohne diese Tabellen).
|
|
_optional_mitai_tables = (
|
|
"weight_log",
|
|
"circumference_log",
|
|
"caliper_log",
|
|
"nutrition_log",
|
|
"activity_log",
|
|
"ai_insights",
|
|
)
|
|
for table in _optional_mitai_tables:
|
|
cur.execute(
|
|
"""
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.tables
|
|
WHERE table_schema = 'public' AND table_name = %s
|
|
) AS t_exists
|
|
""",
|
|
(table,),
|
|
)
|
|
ex = cur.fetchone()
|
|
if ex and next(iter(ex.values())):
|
|
cur.execute(f"DELETE FROM {table} WHERE profile_id=%s", (pid,))
|
|
cur.execute("DELETE FROM profiles WHERE id=%s", (pid,))
|
|
return {"ok": True}
|
|
|
|
|
|
# ── Current User Profile ──────────────────────────────────────────────────────
|
|
@router.get("/profile")
|
|
def get_active_profile(session: dict = Depends(require_auth)):
|
|
"""Legacy-Alias für das eingeloggte Profil — immer Session, kein X-Profile-Id (SECURITY: kein IDOR)."""
|
|
pid = str(session["profile_id"])
|
|
return profile_document(pid)
|
|
|
|
|
|
@router.put("/profile")
|
|
def update_active_profile(
|
|
p: ProfileUpdate,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
"""Profil des eingeloggten Nutzers aktualisieren — dieselbe Quelle wie GET /profile."""
|
|
pid = str(tenant.profile_id)
|
|
return _run_profile_update(pid, p, tenant)
|