shinkan-jinkendo/backend/routers/auth.py
Lars 2646bc776a
Some checks failed
Deploy Development / deploy (push) Failing after 12s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Failing after 1s
Test Suite / playwright-tests (push) Has been skipped
feat: implement email verification flow and enhance user experience
- Added EmailVerificationBanner component to notify users about unverified email status and provide a resend verification option.
- Introduced VerifyPage for handling email verification via a token in the URL, including success and error handling.
- Updated LoginPage and AccountSettingsPage to allow users to resend verification emails directly from these pages.
- Enhanced API utility with new functions for verifying emails and resending verification requests.
- Updated routing to include the new verification page and improved link structure for verification links.
2026-04-29 11:37:54 +02:00

430 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Authentication Endpoints for Mitai Jinkendo
Handles login, logout, password reset, and profile authentication.
"""
import os
import secrets
import smtplib
import ssl
from urllib.parse import quote
from typing import Optional
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
from fastapi import APIRouter, HTTPException, Header, Depends
from starlette.requests import Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from db import get_db, get_cursor
from auth import hash_pin, verify_pin, make_token, require_auth
from models import LoginRequest, PasswordResetRequest, PasswordResetConfirm, RegisterRequest
router = APIRouter(prefix="/api/auth", tags=["auth"])
limiter = Limiter(key_func=get_remote_address)
@router.post("/login")
@limiter.limit("5/minute")
async def login(req: LoginRequest, request: Request):
"""Login with email + password."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE email=%s", (req.email.lower().strip(),))
prof = cur.fetchone()
if not prof:
raise HTTPException(401, "Ungültige Zugangsdaten")
# Verify password
if not verify_pin(req.password, prof['pin_hash']):
raise HTTPException(401, "Ungültige Zugangsdaten")
# Auto-upgrade from SHA256 to bcrypt
if prof['pin_hash'] and not prof['pin_hash'].startswith('$2'):
new_hash = hash_pin(req.password)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, prof['id']))
# Create session
token = make_token()
session_days = prof.get('session_days', 30)
expires = datetime.now() + timedelta(days=session_days)
cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created_at) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)",
(token, prof['id'], expires.isoformat()))
return {
"token": token,
"profile_id": prof['id'],
"email": prof.get("email"),
"name": prof.get("name"),
"role": prof.get("role"),
"tier": prof.get("tier"),
"expires_at": expires.isoformat(),
}
@router.post("/logout")
def logout(x_auth_token: Optional[str]=Header(default=None)):
"""Logout (delete session)."""
if x_auth_token:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM sessions WHERE token=%s", (x_auth_token,))
return {"ok": True}
@router.get("/me")
def get_me(session: dict=Depends(require_auth)):
"""Get current user info."""
pid = session['profile_id']
# Import here to avoid circular dependency
from routers.profiles import get_profile
return get_profile(pid, session)
@router.get("/status")
def auth_status():
"""Health check endpoint."""
return {"status": "ok", "service": "mitai-jinkendo", "version": "v9b"}
@router.put("/pin")
def change_pin(req: dict, session: dict=Depends(require_auth)):
"""Change PIN/password for current user."""
pid = session['profile_id']
new_pin = req.get('pin', '')
if len(new_pin) < 4:
raise HTTPException(400, "PIN/Passwort muss mind. 4 Zeichen haben")
new_hash = hash_pin(new_pin)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
return {"ok": True}
@router.post("/forgot-password")
@limiter.limit("3/minute")
async def password_reset_request(req: PasswordResetRequest, request: Request):
"""Request password reset email."""
email = req.email.lower().strip()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id, name FROM profiles WHERE email=%s", (email,))
prof = cur.fetchone()
if not prof:
# Don't reveal if email exists
return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."}
# Generate reset token
token = secrets.token_urlsafe(32)
expires = datetime.now() + timedelta(hours=1)
# Store in sessions table (reuse mechanism)
cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created_at) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)",
(f"reset_{token}", prof['id'], expires.isoformat()))
app_base = (os.getenv("APP_URL") or "https://shinkan.jinkendo.de").rstrip("/")
reset_body = f"""Hallo {prof['name']},
Du hast einen Passwort-Reset angefordert.
Reset-Link: {app_base}/reset-password?token={token}
Der Link ist 1 Stunde gültig.
Falls du diese Anfrage nicht gestellt hast, ignoriere diese E-Mail.
Dein Shinkan Jinkendo Team
"""
if not send_email(email, "Passwort zurücksetzen Shinkan Jinkendo", reset_body):
print("[SMTP] Passwort-Reset konnte nicht gesendet werden (SMTP prüfen).")
return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."}
@router.post("/reset-password")
def password_reset_confirm(req: PasswordResetConfirm):
"""Confirm password reset with token."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT profile_id FROM sessions WHERE token=%s AND expires_at > CURRENT_TIMESTAMP",
(f"reset_{req.token}",))
sess = cur.fetchone()
if not sess:
raise HTTPException(400, "Ungültiger oder abgelaufener Reset-Link")
pid = sess['profile_id']
new_hash = hash_pin(req.new_password)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
cur.execute("DELETE FROM sessions WHERE token=%s", (f"reset_{req.token}",))
return {"ok": True, "message": "Passwort erfolgreich zurückgesetzt"}
# ── Helpers (Registrierung / E-Mail-Links) ─────────────────────────────────────
def _public_app_base() -> str:
return (os.getenv("APP_URL") or "https://shinkan.jinkendo.de").rstrip("/")
def verification_link(token: str) -> str:
"""Link zur Web-App (`/verify?token=`); die SPA ruft wie bei Mitai die API auf."""
return f"{_public_app_base()}/verify?token={quote(token, safe='')}"
def registration_role(cur, email_lower: str) -> str:
"""
bootstrap: erste Registrierung in leerer DB → admin,
oder E-Mail ∈ ADMIN_BOOTSTRAP_EMAILS (kommasepariert, Groß/Klein egal).
Um alle Self-Regs als Trainer zu haben: AUTO_ADMIN_FIRST_USER=false und keine ADMIN_BOOTSTRAP_EMAILS.
"""
bootstrap = {
x.strip().lower()
for x in os.getenv("ADMIN_BOOTSTRAP_EMAILS", "").split(",")
if x.strip()
}
if email_lower in bootstrap:
return "admin"
if os.getenv("AUTO_ADMIN_FIRST_USER", "true").strip().lower() not in ("1", "true", "yes"):
return "trainer"
cur.execute("SELECT COUNT(*) AS n FROM profiles")
row = cur.fetchone()
try:
n = int(row["n"]) if row is not None else 0
except (KeyError, TypeError, ValueError):
n = 0
return "admin" if n == 0 else "trainer"
# ── Helper: Send Email ────────────────────────────────────────────────────────
def send_email(to_email: str, subject: str, body: str):
"""Send mail via SMTP. Port 465 → SSL; sonst SMTP + optional STARTTLS (587)."""
try:
smtp_host = (os.getenv("SMTP_HOST") or "").strip()
smtp_port_raw = os.getenv("SMTP_PORT") or "587"
try:
smtp_port = int(str(smtp_port_raw).strip())
except ValueError:
smtp_port = 587
smtp_user = (os.getenv("SMTP_USER") or "").strip()
smtp_pass = os.getenv("SMTP_PASS")
smtp_from = os.getenv("SMTP_FROM", "noreply@jinkendo.de")
if not smtp_host or not smtp_user or smtp_pass is None or smtp_pass == "":
print("[SMTP] Nicht konfiguriert — setze SMTP_HOST, SMTP_USER, SMTP_PASS (und ggf. SMTP_PORT)")
return False
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = smtp_from
msg["To"] = to_email
use_tls_env = (os.getenv("SMTP_STARTTLS", "true").strip().lower() not in ("0", "false", "no"))
force_ssl = (os.getenv("SMTP_SSL", "").strip().lower() in ("1", "true", "yes")) or smtp_port == 465
ctx = ssl.create_default_context()
if force_ssl:
print(f"[SMTP] Versand (SSL) an …@{to_email.split('@')[-1]} über {smtp_host}:{smtp_port}")
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=ctx, timeout=30) as server:
server.login(smtp_user, smtp_pass)
server.send_message(msg)
else:
print(f"[SMTP] Versand (STARTTLS={use_tls_env}) an …@{to_email.split('@')[-1]} über {smtp_host}:{smtp_port}")
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as server:
server.ehlo()
if use_tls_env:
server.starttls(context=ctx)
server.ehlo()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
print("[SMTP] Nachricht erfolgreich akzeptiert (Server-Queue)")
return True
except Exception as e:
print(f"[SMTP] Fehler: {e}")
return False
# ── Registration Endpoints ────────────────────────────────────────────────────
@router.post("/register")
@limiter.limit("3/hour")
async def register(req: RegisterRequest, request: Request):
"""Self-registration with email verification."""
email = req.email.lower().strip()
name = req.name.strip()
password = req.password
# Validation
if not email or '@' not in email:
raise HTTPException(400, "Ungültige E-Mail-Adresse")
if len(password) < 8:
raise HTTPException(400, "Passwort muss mindestens 8 Zeichen lang sein")
if not name or len(name) < 2:
raise HTTPException(400, "Name muss mindestens 2 Zeichen lang sein")
with get_db() as conn:
cur = get_cursor(conn)
# Check if email already exists
cur.execute("SELECT id FROM profiles WHERE email=%s", (email,))
if cur.fetchone():
raise HTTPException(400, "E-Mail-Adresse bereits registriert")
# Generate verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now(timezone.utc) + timedelta(hours=24)
# Rolle: erster Nutzer oder ADMIN_BOOTSTRAP_EMAILS → admin
role = registration_role(cur, email)
# Create profile (inactive until verified) — profiles.id ist SERIAL (INT), keine String-IDs einfügen.
pin_hash = hash_pin(password)
trial_ends = datetime.now(timezone.utc) + timedelta(days=14) # 14-day trial
cur.execute("""
INSERT INTO profiles (
name, email, pin_hash, auth_type, role, tier,
email_verified, verification_token, verification_expires,
trial_ends_at, created_at
) VALUES (%s, %s, %s, 'email', %s, 'free', FALSE, %s, %s, %s, CURRENT_TIMESTAMP)
""", (name, email, pin_hash, role, verification_token, verification_expires, trial_ends))
verify_url = verification_link(verification_token)
email_body = f"""Hallo {name},
willkommen bei Shinkan Jinkendo!
Bitte bestätige deine E-Mail-Adresse, um die Registrierung abzuschließen:
{verify_url}
Der Link ist 24 Stunden gültig.
Dein Shinkan Jinkendo Team
"""
if not send_email(email, "Shinkan Jinkendo E-Mail bestätigen", email_body):
print("[SMTP] Verifizierungs-Mail konnte nicht gesendet werden — Logs prüfen, SMTP_* in der Laufzeitumgebung.")
return {
"ok": True,
"message": "Registrierung erfolgreich! Bitte prüfe dein E-Mail-Postfach und bestätige deine E-Mail-Adresse."
}
@router.get("/verify/{token}")
async def verify_email(token: str):
"""Verify email address and activate account."""
with get_db() as conn:
cur = get_cursor(conn)
# Find profile with this verification token
cur.execute("""
SELECT id, name, email, email_verified, verification_expires
FROM profiles
WHERE verification_token=%s
""", (token,))
prof = cur.fetchone()
if not prof:
# Token not found - might be already used/verified
# Check if there's a verified profile (token was deleted after verification)
raise HTTPException(400, "Verifikations-Link ungültig oder bereits verwendet. Falls du bereits verifiziert bist, melde dich einfach an.")
if prof['email_verified']:
raise HTTPException(400, "E-Mail-Adresse bereits bestätigt")
# Check if token expired
if prof['verification_expires'] and datetime.now(timezone.utc) > prof['verification_expires']:
raise HTTPException(400, "Verifikations-Link abgelaufen. Bitte registriere dich erneut.")
# Mark as verified and clear token
cur.execute("""
UPDATE profiles
SET email_verified=TRUE, verification_token=NULL, verification_expires=NULL
WHERE id=%s
""", (prof['id'],))
# Create session (auto-login after verification)
session_token = make_token()
expires = datetime.now(timezone.utc) + timedelta(days=30)
cur.execute("""
INSERT INTO sessions (token, profile_id, expires_at, created_at)
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
""", (session_token, prof['id'], expires))
return {
"ok": True,
"message": "E-Mail-Adresse erfolgreich bestätigt!",
"token": session_token,
"profile": {
"id": prof['id'],
"name": prof['name'],
"email": prof['email']
}
}
@router.post("/resend-verification")
@limiter.limit("3/hour")
async def resend_verification(req: dict, request: Request):
"""Resend verification email for unverified account."""
email = req.get('email', '').strip().lower()
if not email:
raise HTTPException(400, "E-Mail-Adresse erforderlich")
with get_db() as conn:
cur = get_cursor(conn)
# Find profile by email
cur.execute("""
SELECT id, name, email, email_verified, verification_token, verification_expires
FROM profiles
WHERE email=%s
""", (email,))
prof = cur.fetchone()
if not prof:
# Don't leak info about existing emails
return {"ok": True, "message": "Falls ein Account mit dieser E-Mail existiert, wurde eine Bestätigungs-E-Mail versendet."}
if prof['email_verified']:
raise HTTPException(400, "E-Mail-Adresse bereits bestätigt")
# Generate new verification token
verification_token = secrets.token_urlsafe(32)
verification_expires = datetime.now(timezone.utc) + timedelta(hours=24)
cur.execute("""
UPDATE profiles
SET verification_token=%s, verification_expires=%s
WHERE id=%s
""", (verification_token, verification_expires, prof['id']))
verify_url = verification_link(verification_token)
email_body = f"""Hallo {prof['name']},
du hast eine neue Bestätigungs-E-Mail angefordert.
Bitte bestätige deine E-Mail-Adresse mit diesem Link:
{verify_url}
Dieser Link ist 24 Stunden gültig.
Dein Shinkan Jinkendo Team
"""
if not send_email(email, "Shinkan Jinkendo E-Mail bestätigen", email_body):
raise HTTPException(502, "E-Mail konnte nicht versendet werden. SMTP-Einstellungen und Container-Logs prüfen.")
return {"ok": True, "message": "Bestätigungs-E-Mail wurde erneut versendet."}