All checks were successful
Deploy Development / deploy (push) Successful in 34s
Test Suite / pytest-backend (push) Successful in 23s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 24s
- Added functions to determine production environment and OpenAPI exposure settings, improving API documentation control. - Updated FastAPI initialization to conditionally set OpenAPI and documentation URLs based on environment variables. - Refactored health check response to limit detail exposure in production environments, enhancing security. - Streamlined profile management by removing legacy ID retrieval and ensuring session-based profile access for security improvements.
342 lines
13 KiB
Python
342 lines
13 KiB
Python
"""
|
|
Profile Management Endpoints for Mitai Jinkendo
|
|
|
|
Handles profile CRUD operations for both admin and current user.
|
|
"""
|
|
import secrets
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, HTTPException, Header, Depends
|
|
|
|
from psycopg2.extras import Json
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_auth, hash_pin
|
|
from club_tenancy import assert_club_member, memberships_with_roles, is_platform_admin
|
|
from tenant_context import resolve_tenant_context, TenantContext, get_tenant_context
|
|
from models import ProfileCreate, ProfileUpdate
|
|
|
|
router = APIRouter(prefix="/api", tags=["profiles"])
|
|
|
|
_ALLOWED_PORTAL_ROLES = frozenset({"user", "trainer", "admin", "superadmin"})
|
|
|
|
|
|
# ── Current User Profile ──────────────────────────────────────────────────────
|
|
@router.get("/profiles/me")
|
|
def get_current_profile(
|
|
session=Depends(require_auth),
|
|
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
|
|
):
|
|
"""Profil inkl. Vereinsmitgliedschaften; effective_club_id = aufgelöster Request-Kontext (Header vor Profilfeld)."""
|
|
profile_id = session["profile_id"]
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Profil nicht gefunden")
|
|
data = r2d(row)
|
|
data.pop("pin_hash", None)
|
|
clubs = memberships_with_roles(cur, profile_id)
|
|
data["clubs"] = clubs
|
|
ac_raw = data.get("active_club_id")
|
|
stored_ac = int(ac_raw) if ac_raw is not None and ac_raw != "" else None
|
|
tenant = resolve_tenant_context(
|
|
cur,
|
|
profile_id=int(profile_id),
|
|
global_role=session.get("role") or "",
|
|
header_raw=x_active_club_id,
|
|
memberships=clubs,
|
|
stored_active_club_id=stored_ac,
|
|
invalid_header_policy="ignore",
|
|
)
|
|
data["effective_club_id"] = tenant.effective_club_id
|
|
return data
|
|
|
|
|
|
# ── Admin Profile Management ──────────────────────────────────────────────────
|
|
@router.get("/profiles")
|
|
def list_profiles(session=Depends(require_auth)):
|
|
"""Liste aller Profile (nur Plattform-Admin)."""
|
|
role = (session.get("role") or "").lower()
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(status_code=403, detail="Nur Plattform-Administratoren dürfen alle Profile einsehen")
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles ORDER BY created")
|
|
rows = cur.fetchall()
|
|
out = []
|
|
for r in rows:
|
|
d = r2d(r)
|
|
d.pop("pin_hash", None)
|
|
out.append(d)
|
|
return out
|
|
|
|
|
|
@router.post("/profiles")
|
|
def create_profile(p: ProfileCreate, session=Depends(require_auth)):
|
|
"""
|
|
Neues Profil anlegen — nur Plattform-Admin.
|
|
|
|
Nutzt gleiches Kernschema wie Registrierung (SERIAL id); PIN wird zufällig gesetzt — Nutzer
|
|
setzt das Passwort über „Passwort vergessen“ oder ein späteres Admin-Tool.
|
|
"""
|
|
role = session.get("role") or ""
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Plattform-Administratoren dürfen Profile anlegen",
|
|
)
|
|
|
|
email_norm = str(p.email).strip().lower()
|
|
name_s = (p.name or "").strip()
|
|
|
|
pin_hash = hash_pin(secrets.token_urlsafe(24))
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT id FROM profiles WHERE email IS NOT NULL AND lower(trim(email)) = %s",
|
|
(email_norm,),
|
|
)
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=409, detail="E-Mail bereits registriert")
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO profiles (
|
|
name, email, pin_hash, auth_type, role, tier,
|
|
email_verified, created_at, updated_at
|
|
)
|
|
VALUES (%s, %s, %s, 'email', 'user', 'free', TRUE,
|
|
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
RETURNING id
|
|
""",
|
|
(name_s, email_norm, pin_hash),
|
|
)
|
|
row = cur.fetchone()
|
|
new_id = row["id"] if row else None
|
|
if new_id is None:
|
|
raise HTTPException(status_code=500, detail="Profil konnte nicht angelegt werden")
|
|
|
|
return profile_document(str(new_id))
|
|
|
|
|
|
def profile_document(pid: str) -> dict:
|
|
"""Profil ohne PIN — für Routen und interne Aufrufe (z. B. /auth/me)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Profil nicht gefunden")
|
|
d = r2d(row)
|
|
d.pop("pin_hash", None)
|
|
return d
|
|
|
|
|
|
@router.get("/profiles/{pid}")
|
|
def get_profile(pid: str, session=Depends(require_auth)):
|
|
"""Profil nach ID — nur eigenes Profil oder Plattform-Admin (wie PUT)."""
|
|
viewer = session["profile_id"]
|
|
role = session.get("role") or ""
|
|
if str(viewer) != str(pid) and not is_platform_admin(role):
|
|
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Profil")
|
|
return profile_document(pid)
|
|
def _run_profile_update(pid: str, p: ProfileUpdate, tenant: TenantContext) -> dict:
|
|
"""Gemeinsame PUT-Logik für /profiles/{id} und Legacy /profile."""
|
|
sess_pid = tenant.profile_id
|
|
role = tenant.global_role
|
|
if str(sess_pid) != str(pid) and role not in ("admin", "superadmin"):
|
|
raise HTTPException(403, "Keine Berechtigung für dieses Profil")
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "Profil nicht gefunden")
|
|
rowd = r2d(row)
|
|
cur_email_norm = (rowd.get("email") or "").strip().lower()
|
|
|
|
patch = p.model_dump(exclude_unset=True)
|
|
data = {}
|
|
|
|
if "role" in patch or "tier" in patch:
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Portal-Admins dürfen Rolle oder Tier ändern",
|
|
)
|
|
|
|
if "role" in patch:
|
|
new_role = (patch["role"] or "").strip().lower()
|
|
if new_role not in _ALLOWED_PORTAL_ROLES:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Ungültige Portal-Rolle. Erlaubt: {', '.join(sorted(_ALLOWED_PORTAL_ROLES))}",
|
|
)
|
|
if new_role == "superadmin" and role != "superadmin":
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Super-Admins dürfen die Rolle Super-Admin vergeben",
|
|
)
|
|
old_r = (rowd.get("role") or "user").strip().lower()
|
|
cur.execute(
|
|
"""
|
|
SELECT COUNT(*)::int AS c FROM profiles
|
|
WHERE lower(trim(role)) IN ('admin','superadmin')
|
|
"""
|
|
)
|
|
admin_cnt = int(cur.fetchone()["c"])
|
|
if old_r in ("admin", "superadmin") and new_role not in ("admin", "superadmin"):
|
|
if admin_cnt <= 1:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Der letzte Portal-Administrator kann nicht zurückgestuft werden",
|
|
)
|
|
data["role"] = new_role
|
|
del patch["role"]
|
|
|
|
if "tier" in patch:
|
|
tv = patch["tier"]
|
|
if tv is None:
|
|
data["tier"] = "free"
|
|
else:
|
|
ts = str(tv).strip()
|
|
data["tier"] = (ts or "free")[:50]
|
|
del patch["tier"]
|
|
|
|
if "email" in patch:
|
|
ev = patch["email"]
|
|
if ev is None or (isinstance(ev, str) and ev.strip() == ""):
|
|
if rowd.get("email") is not None:
|
|
data["email"] = None
|
|
data["email_verified"] = False
|
|
data["verification_token"] = None
|
|
data["verification_expires"] = None
|
|
else:
|
|
email_norm = ev.strip().lower()
|
|
if "@" not in email_norm or len(email_norm) < 5:
|
|
raise HTTPException(400, "Ungültige E-Mail-Adresse")
|
|
cur.execute(
|
|
"""
|
|
SELECT id FROM profiles
|
|
WHERE email IS NOT NULL AND lower(trim(email)) = %s AND id <> %s
|
|
""",
|
|
(email_norm, pid),
|
|
)
|
|
if cur.fetchone():
|
|
raise HTTPException(409, "E-Mail wird bereits verwendet")
|
|
data["email"] = email_norm
|
|
if email_norm != cur_email_norm:
|
|
data["email_verified"] = False
|
|
data["verification_token"] = None
|
|
data["verification_expires"] = None
|
|
|
|
if "active_club_id" in patch:
|
|
ac = patch["active_club_id"]
|
|
if ac is None:
|
|
data["active_club_id"] = None
|
|
else:
|
|
try:
|
|
cid = int(ac)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(400, "active_club_id ungültig")
|
|
assert_club_member(cur, int(pid), cid)
|
|
data["active_club_id"] = cid
|
|
|
|
if "exercise_list_prefs" in patch:
|
|
ep = patch.pop("exercise_list_prefs")
|
|
if ep is None:
|
|
data["exercise_list_prefs"] = Json({})
|
|
elif isinstance(ep, dict):
|
|
data["exercise_list_prefs"] = Json(ep)
|
|
else:
|
|
raise HTTPException(400, "exercise_list_prefs muss ein JSON-Objekt sein")
|
|
|
|
nullable_keys = {"goal_weight", "goal_bf_pct", "dob"}
|
|
for k, v in patch.items():
|
|
if k == "email":
|
|
continue
|
|
if k == "active_club_id":
|
|
continue
|
|
if v is None and k in nullable_keys:
|
|
data[k] = None
|
|
elif v is not None:
|
|
data[k] = v
|
|
|
|
if not data:
|
|
return profile_document(pid)
|
|
|
|
data["updated_at"] = datetime.now()
|
|
cols = ", ".join(f"{k}=%s" for k in data)
|
|
vals = list(data.values()) + [pid]
|
|
cur.execute(f"UPDATE profiles SET {cols} WHERE id=%s", vals)
|
|
return profile_document(pid)
|
|
|
|
|
|
@router.put("/profiles/{pid}")
|
|
def update_profile(pid: str, p: ProfileUpdate, tenant: TenantContext = Depends(get_tenant_context)):
|
|
"""Update profile — nur eigenes Profil oder Admin; TenantContext validiert X-Active-Club-Id."""
|
|
return _run_profile_update(pid, p, tenant)
|
|
|
|
|
|
@router.delete("/profiles/{pid}")
|
|
def delete_profile(pid: str, session=Depends(require_auth)):
|
|
"""Profil löschen — nur Plattform-Admin; letztes Profil wird geschützt."""
|
|
role = session.get("role") or ""
|
|
if not is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Nur Plattform-Administratoren dürfen Profile löschen",
|
|
)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT COUNT(*) as count FROM profiles")
|
|
count = cur.fetchone()['count']
|
|
if count <= 1: raise HTTPException(400, "Letztes Profil kann nicht gelöscht werden")
|
|
# Mitai-Überbleibsel: nur löschen, wenn die Tabelle im Schema existiert (Shinkan-DB ohne diese Tabellen).
|
|
_optional_mitai_tables = (
|
|
"weight_log",
|
|
"circumference_log",
|
|
"caliper_log",
|
|
"nutrition_log",
|
|
"activity_log",
|
|
"ai_insights",
|
|
)
|
|
for table in _optional_mitai_tables:
|
|
cur.execute(
|
|
"""
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.tables
|
|
WHERE table_schema = 'public' AND table_name = %s
|
|
) AS t_exists
|
|
""",
|
|
(table,),
|
|
)
|
|
ex = cur.fetchone()
|
|
if ex and next(iter(ex.values())):
|
|
cur.execute(f"DELETE FROM {table} WHERE profile_id=%s", (pid,))
|
|
cur.execute("DELETE FROM profiles WHERE id=%s", (pid,))
|
|
return {"ok": True}
|
|
|
|
|
|
# ── Current User Profile ──────────────────────────────────────────────────────
|
|
@router.get("/profile")
|
|
def get_active_profile(session: dict = Depends(require_auth)):
|
|
"""Legacy-Alias für das eingeloggte Profil — immer Session, kein X-Profile-Id (SECURITY: kein IDOR)."""
|
|
pid = str(session["profile_id"])
|
|
return profile_document(pid)
|
|
|
|
|
|
@router.put("/profile")
|
|
def update_active_profile(
|
|
p: ProfileUpdate,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
"""Profil des eingeloggten Nutzers aktualisieren — dieselbe Quelle wie GET /profile."""
|
|
pid = str(tenant.profile_id)
|
|
return _run_profile_update(pid, p, tenant)
|