Some checks failed
Deploy Development / deploy (push) Successful in 35s
Test Suite / pytest-backend (push) Successful in 7s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Failing after 37s
- Bumped application version to 0.8.36 in both backend and frontend files. - Updated the ProfileCreate model to require name and email fields, ensuring schema compliance. - Implemented a new POST /api/profiles endpoint restricted to platform admins, utilizing a random PIN for user setup. - Added integration tests for profile creation, including checks for unauthorized access and duplicate email handling. - Enhanced changelog to reflect the new version and changes made in this release.
325 lines
13 KiB
Python
325 lines
13 KiB
Python
"""
|
||
Profile Management Endpoints for Mitai Jinkendo
|
||
|
||
Handles profile CRUD operations for both admin and current user.
|
||
"""
|
||
import secrets
|
||
from typing import Optional
|
||
from datetime import datetime
|
||
|
||
from fastapi import APIRouter, HTTPException, Header, Depends
|
||
|
||
from db import get_db, get_cursor, r2d
|
||
from auth import require_auth, hash_pin
|
||
from club_tenancy import assert_club_member, memberships_with_roles, is_platform_admin
|
||
from tenant_context import resolve_tenant_context, TenantContext, get_tenant_context
|
||
from models import ProfileCreate, ProfileUpdate
|
||
|
||
router = APIRouter(prefix="/api", tags=["profiles"])
|
||
|
||
_ALLOWED_PORTAL_ROLES = frozenset({"user", "trainer", "admin", "superadmin"})
|
||
|
||
|
||
# ── Helper ────────────────────────────────────────────────────────────────────
|
||
def get_pid(x_profile_id: Optional[str] = Header(default=None)) -> str:
|
||
"""Get profile_id - from header for legacy endpoints."""
|
||
if x_profile_id:
|
||
return x_profile_id
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT id FROM profiles ORDER BY created LIMIT 1")
|
||
row = cur.fetchone()
|
||
if row: return row['id']
|
||
raise HTTPException(400, "Kein Profil gefunden")
|
||
|
||
|
||
# ── Current User Profile ──────────────────────────────────────────────────────
|
||
@router.get("/profiles/me")
|
||
def get_current_profile(
|
||
session=Depends(require_auth),
|
||
x_active_club_id: Optional[str] = Header(default=None, alias="X-Active-Club-Id"),
|
||
):
|
||
"""Profil inkl. Vereinsmitgliedschaften; effective_club_id = aufgelöster Request-Kontext (Header vor Profilfeld)."""
|
||
profile_id = session["profile_id"]
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,))
|
||
row = cur.fetchone()
|
||
if not row:
|
||
raise HTTPException(404, "Profil nicht gefunden")
|
||
data = r2d(row)
|
||
data.pop("pin_hash", None)
|
||
clubs = memberships_with_roles(cur, profile_id)
|
||
data["clubs"] = clubs
|
||
ac_raw = data.get("active_club_id")
|
||
stored_ac = int(ac_raw) if ac_raw is not None and ac_raw != "" else None
|
||
tenant = resolve_tenant_context(
|
||
cur,
|
||
profile_id=int(profile_id),
|
||
global_role=session.get("role") or "",
|
||
header_raw=x_active_club_id,
|
||
memberships=clubs,
|
||
stored_active_club_id=stored_ac,
|
||
invalid_header_policy="ignore",
|
||
)
|
||
data["effective_club_id"] = tenant.effective_club_id
|
||
return data
|
||
|
||
|
||
# ── Admin Profile Management ──────────────────────────────────────────────────
|
||
@router.get("/profiles")
|
||
def list_profiles(session=Depends(require_auth)):
|
||
"""Liste aller Profile (nur Plattform-Admin)."""
|
||
role = (session.get("role") or "").lower()
|
||
if not is_platform_admin(role):
|
||
raise HTTPException(status_code=403, detail="Nur Plattform-Administratoren dürfen alle Profile einsehen")
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT * FROM profiles ORDER BY created")
|
||
rows = cur.fetchall()
|
||
out = []
|
||
for r in rows:
|
||
d = r2d(r)
|
||
d.pop("pin_hash", None)
|
||
out.append(d)
|
||
return out
|
||
|
||
|
||
@router.post("/profiles")
|
||
def create_profile(p: ProfileCreate, session=Depends(require_auth)):
|
||
"""
|
||
Neues Profil anlegen — nur Plattform-Admin.
|
||
|
||
Nutzt gleiches Kernschema wie Registrierung (SERIAL id); PIN wird zufällig gesetzt — Nutzer
|
||
setzt das Passwort über „Passwort vergessen“ oder ein späteres Admin-Tool.
|
||
"""
|
||
role = session.get("role") or ""
|
||
if not is_platform_admin(role):
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail="Nur Plattform-Administratoren dürfen Profile anlegen",
|
||
)
|
||
|
||
email_norm = str(p.email).strip().lower()
|
||
name_s = (p.name or "").strip()
|
||
|
||
pin_hash = hash_pin(secrets.token_urlsafe(24))
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"SELECT id FROM profiles WHERE email IS NOT NULL AND lower(trim(email)) = %s",
|
||
(email_norm,),
|
||
)
|
||
if cur.fetchone():
|
||
raise HTTPException(status_code=409, detail="E-Mail bereits registriert")
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO profiles (
|
||
name, email, pin_hash, auth_type, role, tier,
|
||
email_verified, created_at, updated_at
|
||
)
|
||
VALUES (%s, %s, %s, 'email', 'user', 'free', TRUE,
|
||
CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
||
RETURNING id
|
||
""",
|
||
(name_s, email_norm, pin_hash),
|
||
)
|
||
row = cur.fetchone()
|
||
new_id = row["id"] if row else None
|
||
if new_id is None:
|
||
raise HTTPException(status_code=500, detail="Profil konnte nicht angelegt werden")
|
||
|
||
return profile_document(str(new_id))
|
||
|
||
|
||
def profile_document(pid: str) -> dict:
|
||
"""Profil ohne PIN — für Routen und interne Aufrufe (z. B. /auth/me)."""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
|
||
row = cur.fetchone()
|
||
if not row:
|
||
raise HTTPException(404, "Profil nicht gefunden")
|
||
d = r2d(row)
|
||
d.pop("pin_hash", None)
|
||
return d
|
||
|
||
|
||
@router.get("/profiles/{pid}")
|
||
def get_profile(pid: str, session=Depends(require_auth)):
|
||
"""Profil nach ID — nur eigenes Profil oder Plattform-Admin (wie PUT)."""
|
||
viewer = session["profile_id"]
|
||
role = session.get("role") or ""
|
||
if str(viewer) != str(pid) and not is_platform_admin(role):
|
||
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Profil")
|
||
return profile_document(pid)
|
||
def _run_profile_update(pid: str, p: ProfileUpdate, tenant: TenantContext) -> dict:
|
||
"""Gemeinsame PUT-Logik für /profiles/{id} und Legacy /profile."""
|
||
sess_pid = tenant.profile_id
|
||
role = tenant.global_role
|
||
if str(sess_pid) != str(pid) and role not in ("admin", "superadmin"):
|
||
raise HTTPException(403, "Keine Berechtigung für dieses Profil")
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
|
||
row = cur.fetchone()
|
||
if not row:
|
||
raise HTTPException(404, "Profil nicht gefunden")
|
||
rowd = r2d(row)
|
||
cur_email_norm = (rowd.get("email") or "").strip().lower()
|
||
|
||
patch = p.model_dump(exclude_unset=True)
|
||
data = {}
|
||
|
||
if "role" in patch or "tier" in patch:
|
||
if not is_platform_admin(role):
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail="Nur Portal-Admins dürfen Rolle oder Tier ändern",
|
||
)
|
||
|
||
if "role" in patch:
|
||
new_role = (patch["role"] or "").strip().lower()
|
||
if new_role not in _ALLOWED_PORTAL_ROLES:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"Ungültige Portal-Rolle. Erlaubt: {', '.join(sorted(_ALLOWED_PORTAL_ROLES))}",
|
||
)
|
||
if new_role == "superadmin" and role != "superadmin":
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail="Nur Super-Admins dürfen die Rolle Super-Admin vergeben",
|
||
)
|
||
old_r = (rowd.get("role") or "user").strip().lower()
|
||
cur.execute(
|
||
"""
|
||
SELECT COUNT(*)::int AS c FROM profiles
|
||
WHERE lower(trim(role)) IN ('admin','superadmin')
|
||
"""
|
||
)
|
||
admin_cnt = int(cur.fetchone()["c"])
|
||
if old_r in ("admin", "superadmin") and new_role not in ("admin", "superadmin"):
|
||
if admin_cnt <= 1:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="Der letzte Portal-Administrator kann nicht zurückgestuft werden",
|
||
)
|
||
data["role"] = new_role
|
||
del patch["role"]
|
||
|
||
if "tier" in patch:
|
||
tv = patch["tier"]
|
||
if tv is None:
|
||
data["tier"] = "free"
|
||
else:
|
||
ts = str(tv).strip()
|
||
data["tier"] = (ts or "free")[:50]
|
||
del patch["tier"]
|
||
|
||
if "email" in patch:
|
||
ev = patch["email"]
|
||
if ev is None or (isinstance(ev, str) and ev.strip() == ""):
|
||
if rowd.get("email") is not None:
|
||
data["email"] = None
|
||
data["email_verified"] = False
|
||
data["verification_token"] = None
|
||
data["verification_expires"] = None
|
||
else:
|
||
email_norm = ev.strip().lower()
|
||
if "@" not in email_norm or len(email_norm) < 5:
|
||
raise HTTPException(400, "Ungültige E-Mail-Adresse")
|
||
cur.execute(
|
||
"""
|
||
SELECT id FROM profiles
|
||
WHERE email IS NOT NULL AND lower(trim(email)) = %s AND id <> %s
|
||
""",
|
||
(email_norm, pid),
|
||
)
|
||
if cur.fetchone():
|
||
raise HTTPException(409, "E-Mail wird bereits verwendet")
|
||
data["email"] = email_norm
|
||
if email_norm != cur_email_norm:
|
||
data["email_verified"] = False
|
||
data["verification_token"] = None
|
||
data["verification_expires"] = None
|
||
|
||
if "active_club_id" in patch:
|
||
ac = patch["active_club_id"]
|
||
if ac is None:
|
||
data["active_club_id"] = None
|
||
else:
|
||
try:
|
||
cid = int(ac)
|
||
except (TypeError, ValueError):
|
||
raise HTTPException(400, "active_club_id ungültig")
|
||
assert_club_member(cur, int(pid), cid)
|
||
data["active_club_id"] = cid
|
||
|
||
nullable_keys = {"goal_weight", "goal_bf_pct", "dob"}
|
||
for k, v in patch.items():
|
||
if k == "email":
|
||
continue
|
||
if k == "active_club_id":
|
||
continue
|
||
if v is None and k in nullable_keys:
|
||
data[k] = None
|
||
elif v is not None:
|
||
data[k] = v
|
||
|
||
if not data:
|
||
return profile_document(pid)
|
||
|
||
data["updated_at"] = datetime.now()
|
||
cols = ", ".join(f"{k}=%s" for k in data)
|
||
vals = list(data.values()) + [pid]
|
||
cur.execute(f"UPDATE profiles SET {cols} WHERE id=%s", vals)
|
||
return profile_document(pid)
|
||
|
||
|
||
@router.put("/profiles/{pid}")
|
||
def update_profile(pid: str, p: ProfileUpdate, tenant: TenantContext = Depends(get_tenant_context)):
|
||
"""Update profile — nur eigenes Profil oder Admin; TenantContext validiert X-Active-Club-Id."""
|
||
return _run_profile_update(pid, p, tenant)
|
||
|
||
|
||
@router.delete("/profiles/{pid}")
|
||
def delete_profile(pid: str, session=Depends(require_auth)):
|
||
"""Profil löschen — nur Plattform-Admin; letztes Profil wird geschützt."""
|
||
role = session.get("role") or ""
|
||
if not is_platform_admin(role):
|
||
raise HTTPException(
|
||
status_code=403,
|
||
detail="Nur Plattform-Administratoren dürfen Profile löschen",
|
||
)
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT COUNT(*) as count FROM profiles")
|
||
count = cur.fetchone()['count']
|
||
if count <= 1: raise HTTPException(400, "Letztes Profil kann nicht gelöscht werden")
|
||
for table in ['weight_log','circumference_log','caliper_log','nutrition_log','activity_log','ai_insights']:
|
||
cur.execute(f"DELETE FROM {table} WHERE profile_id=%s", (pid,))
|
||
cur.execute("DELETE FROM profiles WHERE id=%s", (pid,))
|
||
return {"ok": True}
|
||
|
||
|
||
# ── Current User Profile ──────────────────────────────────────────────────────
|
||
@router.get("/profile")
|
||
def get_active_profile(x_profile_id: Optional[str] = Header(default=None), session: dict = Depends(require_auth)):
|
||
"""Legacy endpoint – returns active profile."""
|
||
pid = get_pid(x_profile_id)
|
||
return profile_document(pid)
|
||
|
||
|
||
@router.put("/profile")
|
||
def update_active_profile(
|
||
p: ProfileUpdate,
|
||
x_profile_id: Optional[str] = Header(default=None),
|
||
tenant: TenantContext = Depends(get_tenant_context),
|
||
):
|
||
"""Update current user's profile."""
|
||
pid = get_pid(x_profile_id)
|
||
return _run_profile_update(pid, p, tenant)
|