shinkan-jinkendo/backend/routers/auth.py
Lars 0f08e8df58
Some checks failed
Deploy Development / deploy (push) Successful in 36s
Test Suite / lint-backend (push) Successful in 1s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Failing after 50s
feat: enhance club management features and member requests
- Updated the backend to include a new `requested_club_id` field in the registration request model.
- Replaced the club memberships router with a new club join requests router for better management of membership applications.
- Added API endpoints for listing public clubs and managing club join requests, improving user experience during registration and membership processes.
- Enhanced the ClubsPage in the frontend to support member management and join requests, including new modals for adding members and handling requests.
- Updated API utility functions to accommodate new endpoints for club join requests and public club listings.
2026-05-05 16:40:49 +02:00

468 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Authentication Endpoints for Mitai Jinkendo
Handles login, logout, password reset, and profile authentication.
"""
import os
import secrets
import smtplib
import ssl
from urllib.parse import quote
from typing import Optional
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
from fastapi import APIRouter, HTTPException, Header, Depends
from starlette.requests import Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from db import get_db, get_cursor
from auth import hash_pin, verify_pin, make_token, require_auth
from models import LoginRequest, PasswordResetRequest, PasswordResetConfirm, RegisterRequest
router = APIRouter(prefix="/api/auth", tags=["auth"])
limiter = Limiter(key_func=get_remote_address)
@router.post("/login")
@limiter.limit("5/minute")
async def login(req: LoginRequest, request: Request):
"""Login with email + password."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE email=%s", (req.email.lower().strip(),))
prof = cur.fetchone()
if not prof:
raise HTTPException(401, "Ungültige Zugangsdaten")
# Verify password
if not verify_pin(req.password, prof['pin_hash']):
raise HTTPException(401, "Ungültige Zugangsdaten")
# Auto-upgrade from SHA256 to bcrypt
if prof['pin_hash'] and not prof['pin_hash'].startswith('$2'):
new_hash = hash_pin(req.password)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, prof['id']))
# Create session
token = make_token()
session_days = prof.get('session_days', 30)
expires = datetime.now() + timedelta(days=session_days)
cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created_at) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)",
(token, prof['id'], expires.isoformat()))
return {
"token": token,
"profile_id": prof['id'],
"email": prof.get("email"),
"name": prof.get("name"),
"role": prof.get("role"),
"tier": prof.get("tier"),
"expires_at": expires.isoformat(),
}
@router.post("/logout")
def logout(x_auth_token: Optional[str]=Header(default=None)):
"""Logout (delete session)."""
if x_auth_token:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM sessions WHERE token=%s", (x_auth_token,))
return {"ok": True}
@router.get("/me")
def get_me(session: dict=Depends(require_auth)):
"""Get current user info."""
pid = session['profile_id']
# Import here to avoid circular dependency
from routers.profiles import get_profile
return get_profile(pid, session)
@router.get("/status")
def auth_status():
"""Health check endpoint."""
return {"status": "ok", "service": "mitai-jinkendo", "version": "v9b"}
@router.put("/pin")
def change_pin(req: dict, session: dict=Depends(require_auth)):
"""Change PIN/password for current user."""
pid = session['profile_id']
new_pin = req.get('pin', '')
if len(new_pin) < 4:
raise HTTPException(400, "PIN/Passwort muss mind. 4 Zeichen haben")
new_hash = hash_pin(new_pin)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
return {"ok": True}
@router.post("/forgot-password")
@limiter.limit("3/minute")
async def password_reset_request(req: PasswordResetRequest, request: Request):
"""Request password reset email."""
email = req.email.lower().strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id, name FROM profiles WHERE email=%s", (email,))
prof = cur.fetchone()
if not prof:
# Don't reveal if email exists
return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."}
# Generate reset token
token = secrets.token_urlsafe(32)
expires = datetime.now() + timedelta(hours=1)
# Store in sessions table (reuse mechanism)
cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created_at) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)",
(f"reset_{token}", prof['id'], expires.isoformat()))
app_base = (os.getenv("APP_URL") or "https://shinkan.jinkendo.de").rstrip("/")
reset_body = f"""Hallo {prof['name']},
Du hast einen Passwort-Reset angefordert.
Reset-Link: {app_base}/reset-password?token={token}
Der Link ist 1 Stunde gültig.
Falls du diese Anfrage nicht gestellt hast, ignoriere diese E-Mail.
Dein Shinkan Jinkendo Team
"""
if not send_email(email, "Passwort zurücksetzen Shinkan Jinkendo", reset_body):
print("[SMTP] Passwort-Reset konnte nicht gesendet werden (SMTP prüfen).")
return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."}
@router.post("/reset-password")
def password_reset_confirm(req: PasswordResetConfirm):
"""Confirm password reset with token."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT profile_id FROM sessions WHERE token=%s AND expires_at > CURRENT_TIMESTAMP",
(f"reset_{req.token}",))
sess = cur.fetchone()
if not sess:
raise HTTPException(400, "Ungültiger oder abgelaufener Reset-Link")
pid = sess['profile_id']
new_hash = hash_pin(req.new_password)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
cur.execute("DELETE FROM sessions WHERE token=%s", (f"reset_{req.token}",))
return {"ok": True, "message": "Passwort erfolgreich zurückgesetzt"}
# ── Helpers (Registrierung / E-Mail-Links) ─────────────────────────────────────
def _public_app_base() -> str:
return (os.getenv("APP_URL") or "https://shinkan.jinkendo.de").rstrip("/")
def _verification_link_expired(expires_at) -> bool:
"""
True, wenn Ablaufzeit in der Vergangenheit liegt.
PG liefert TIMESTAMP oft als naive datetime — Vergleich mit timezone-aware UTC
würde sonst TypeError → 500.
"""
if expires_at is None:
return False
deadline = expires_at.replace(tzinfo=timezone.utc) if expires_at.tzinfo is None else expires_at.astimezone(timezone.utc)
return datetime.now(timezone.utc) > deadline
def verification_link(token: str) -> str:
"""Link zur Web-App (`/verify?token=`); die SPA ruft wie bei Mitai die API auf."""
return f"{_public_app_base()}/verify?token={quote(token, safe='')}"
def registration_role(cur, email_lower: str) -> str:
"""
bootstrap: erste Registrierung in leerer DB → admin,
oder E-Mail ∈ ADMIN_BOOTSTRAP_EMAILS (kommasepariert, Groß/Klein egal).
Um alle Self-Regs als Trainer zu haben: AUTO_ADMIN_FIRST_USER=false und keine ADMIN_BOOTSTRAP_EMAILS.
"""
bootstrap = {
x.strip().lower()
for x in os.getenv("ADMIN_BOOTSTRAP_EMAILS", "").split(",")
if x.strip()
}
if email_lower in bootstrap:
return "admin"
if os.getenv("AUTO_ADMIN_FIRST_USER", "true").strip().lower() not in ("1", "true", "yes"):
return "trainer"
cur.execute("SELECT COUNT(*) AS n FROM profiles")
row = cur.fetchone()
try:
n = int(row["n"]) if row is not None else 0
except (KeyError, TypeError, ValueError):
n = 0
return "admin" if n == 0 else "trainer"
# ── Helper: Send Email ────────────────────────────────────────────────────────
def send_email(to_email: str, subject: str, body: str):
"""Send mail via SMTP. Port 465 → SSL; sonst SMTP + optional STARTTLS (587)."""
try:
smtp_host = (os.getenv("SMTP_HOST") or "").strip()
smtp_port_raw = os.getenv("SMTP_PORT") or "587"
try:
smtp_port = int(str(smtp_port_raw).strip())
except ValueError:
smtp_port = 587
smtp_user = (os.getenv("SMTP_USER") or "").strip()
smtp_pass = os.getenv("SMTP_PASS")
smtp_from = os.getenv("SMTP_FROM", "noreply@jinkendo.de")
if not smtp_host or not smtp_user or smtp_pass is None or smtp_pass == "":
print("[SMTP] Nicht konfiguriert — setze SMTP_HOST, SMTP_USER, SMTP_PASS (und ggf. SMTP_PORT)")
return False
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = smtp_from
msg["To"] = to_email
use_tls_env = (os.getenv("SMTP_STARTTLS", "true").strip().lower() not in ("0", "false", "no"))
force_ssl = (os.getenv("SMTP_SSL", "").strip().lower() in ("1", "true", "yes")) or smtp_port == 465
ctx = ssl.create_default_context()
if force_ssl:
print(f"[SMTP] Versand (SSL) an …@{to_email.split('@')[-1]} über {smtp_host}:{smtp_port}")
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=ctx, timeout=30) as server:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
else:
print(f"[SMTP] Versand (STARTTLS={use_tls_env}) an …@{to_email.split('@')[-1]} über {smtp_host}:{smtp_port}")
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as server:
server.ehlo()
if use_tls_env:
server.starttls(context=ctx)
server.ehlo()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
print("[SMTP] Nachricht erfolgreich akzeptiert (Server-Queue)")
return True
except Exception as e:
print(f"[SMTP] Fehler: {e}")
return False
# ── Registration Endpoints ────────────────────────────────────────────────────
@router.post("/register")
@limiter.limit("3/hour")
async def register(req: RegisterRequest, request: Request):
"""Self-registration with email verification."""
email = req.email.lower().strip()
name = req.name.strip()
password = req.password
# Validation
if not email or '@' not in email:
raise HTTPException(400, "Ungültige E-Mail-Adresse")
if len(password) < 8:
raise HTTPException(400, "Passwort muss mindestens 8 Zeichen lang sein")
if not name or len(name) < 2:
raise HTTPException(400, "Name muss mindestens 2 Zeichen lang sein")
with get_db() as conn:
cur = get_cursor(conn)
# Check if email already exists
cur.execute("SELECT id FROM profiles WHERE email=%s", (email,))
if cur.fetchone():
raise HTTPException(400, "E-Mail-Adresse bereits registriert")
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now(timezone.utc) + timedelta(hours=24)
# Rolle: erster Nutzer oder ADMIN_BOOTSTRAP_EMAILS → admin
role = registration_role(cur, email)
# Create profile (inactive until verified) — profiles.id ist SERIAL (INT), keine String-IDs einfügen.
pin_hash = hash_pin(password)
trial_ends = datetime.now(timezone.utc) + timedelta(days=14) # 14-day trial
cur.execute("""
INSERT INTO profiles (
name, email, pin_hash, auth_type, role, tier,
email_verified, verification_token, verification_expires,
trial_ends_at, created_at
) VALUES (%s, %s, %s, 'email', %s, 'free', FALSE, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id
""", (name, email, pin_hash, role, verification_token, verification_expires, trial_ends))
new_profile_id = cur.fetchone()["id"]
req_club = req.requested_club_id
if req_club is not None:
cur.execute(
"SELECT id FROM clubs WHERE id = %s AND status = 'active'",
(int(req_club),),
)
if cur.fetchone():
cur.execute(
"""
SELECT id FROM club_membership_requests
WHERE profile_id = %s AND club_id = %s AND status = 'pending'
LIMIT 1
""",
(new_profile_id, int(req_club)),
)
if not cur.fetchone():
cur.execute(
"""
INSERT INTO club_membership_requests (profile_id, club_id, status, message)
VALUES (%s, %s, 'pending', NULL)
""",
(new_profile_id, int(req_club)),
)
verify_url = verification_link(verification_token)
email_body = f"""Hallo {name},
willkommen bei Shinkan Jinkendo!
Bitte bestätige deine E-Mail-Adresse, um die Registrierung abzuschließen:
{verify_url}
Der Link ist 24 Stunden gültig.
Dein Shinkan Jinkendo Team
"""
if not send_email(email, "Shinkan Jinkendo E-Mail bestätigen", email_body):
print("[SMTP] Verifizierungs-Mail konnte nicht gesendet werden — Logs prüfen, SMTP_* in der Laufzeitumgebung.")
return {
"ok": True,
"message": "Registrierung erfolgreich! Bitte prüfe dein E-Mail-Postfach und bestätige deine E-Mail-Adresse."
}
@router.get("/verify/{token}")
async def verify_email(token: str):
"""Verify email address and activate account."""
with get_db() as conn:
cur = get_cursor(conn)
# Find profile with this verification token
cur.execute("""
SELECT id, name, email, email_verified, verification_expires
FROM profiles
WHERE verification_token=%s
""", (token,))
prof = cur.fetchone()
if not prof:
# Token not found - might be already used/verified
# Check if there's a verified profile (token was deleted after verification)
raise HTTPException(400, "Verifikations-Link ungültig oder bereits verwendet. Falls du bereits verifiziert bist, melde dich einfach an.")
if prof['email_verified']:
raise HTTPException(400, "E-Mail-Adresse bereits bestätigt")
# Check if token expired
if _verification_link_expired(prof["verification_expires"]):
raise HTTPException(400, "Verifikations-Link abgelaufen. Bitte registriere dich erneut.")
# Mark as verified and clear token
cur.execute("""
UPDATE profiles
SET email_verified=TRUE, verification_token=NULL, verification_expires=NULL
WHERE id=%s
""", (prof['id'],))
# Create session (auto-login after verification)
session_token = make_token()
expires = datetime.now(timezone.utc) + timedelta(days=30)
cur.execute("""
INSERT INTO sessions (token, profile_id, expires_at, created_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
""", (session_token, prof['id'], expires))
return {
"ok": True,
"message": "E-Mail-Adresse erfolgreich bestätigt!",
"token": session_token,
"profile": {
"id": prof['id'],
"name": prof['name'],
"email": prof['email']
}
}
@router.post("/resend-verification")
@limiter.limit("3/hour")
async def resend_verification(req: dict, request: Request):
"""Resend verification email for unverified account."""
email = req.get('email', '').strip().lower()
if not email:
raise HTTPException(400, "E-Mail-Adresse erforderlich")
with get_db() as conn:
cur = get_cursor(conn)
# Find profile by email
cur.execute("""
SELECT id, name, email, email_verified, verification_token, verification_expires
FROM profiles
WHERE email=%s
""", (email,))
prof = cur.fetchone()
if not prof:
# Don't leak info about existing emails
return {"ok": True, "message": "Falls ein Account mit dieser E-Mail existiert, wurde eine Bestätigungs-E-Mail versendet."}
if prof['email_verified']:
raise HTTPException(400, "E-Mail-Adresse bereits bestätigt")
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now(timezone.utc) + timedelta(hours=24)
cur.execute("""
UPDATE profiles
SET verification_token=%s, verification_expires=%s
WHERE id=%s
""", (verification_token, verification_expires, prof['id']))
verify_url = verification_link(verification_token)
email_body = f"""Hallo {prof['name']},
du hast eine neue Bestätigungs-E-Mail angefordert.
Bitte bestätige deine E-Mail-Adresse mit diesem Link:
{verify_url}
Dieser Link ist 24 Stunden gültig.
Dein Shinkan Jinkendo Team
"""
if not send_email(email, "Shinkan Jinkendo E-Mail bestätigen", email_body):
raise HTTPException(502, "E-Mail konnte nicht versendet werden. SMTP-Einstellungen und Container-Logs prüfen.")
return {"ok": True, "message": "Bestätigungs-E-Mail wurde erneut versendet."}