All checks were successful
Deploy Development / deploy (push) Successful in 37s
Test Suite / pytest-backend (push) Successful in 31s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 27s
- Added compliance implementation report detailing the status of various packages (P-03, P-04, P-05, P-07, P-23, P-24) and their technical changes, tests, and notes. - Introduced a new workspace configuration file for the project to streamline development setup.
460 lines
17 KiB
Python
460 lines
17 KiB
Python
"""
|
||
Authentication Endpoints for Mitai Jinkendo
|
||
|
||
Handles login, logout, password reset, and profile authentication.
|
||
"""
|
||
import os
|
||
import secrets
|
||
import smtplib
|
||
import ssl
|
||
from urllib.parse import quote
|
||
from typing import Optional
|
||
from datetime import datetime, timedelta, timezone
|
||
from email.mime.text import MIMEText
|
||
|
||
from fastapi import APIRouter, HTTPException, Header, Depends
|
||
from starlette.requests import Request
|
||
from slowapi import Limiter
|
||
from slowapi.util import get_remote_address
|
||
|
||
from db import get_db, get_cursor
|
||
from auth import hash_pin, verify_pin, make_token, require_auth
|
||
from models import LoginRequest, PasswordResetRequest, PasswordResetConfirm, RegisterRequest
|
||
from password_reset_mail import (
|
||
insert_password_reset_session,
|
||
password_reset_email_body,
|
||
revoke_pending_password_resets_for_profile,
|
||
)
|
||
|
||
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
||
limiter = Limiter(key_func=get_remote_address)
|
||
|
||
|
||
@router.post("/login")
|
||
@limiter.limit("30/minute")
|
||
async def login(req: LoginRequest, request: Request):
|
||
"""Login with email + password."""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT * FROM profiles WHERE email=%s", (req.email.lower().strip(),))
|
||
prof = cur.fetchone()
|
||
if not prof:
|
||
raise HTTPException(401, "Ungültige Zugangsdaten")
|
||
|
||
# Verify password
|
||
if not verify_pin(req.password, prof['pin_hash']):
|
||
raise HTTPException(401, "Ungültige Zugangsdaten")
|
||
|
||
# Auto-upgrade from SHA256 to bcrypt
|
||
if prof['pin_hash'] and not prof['pin_hash'].startswith('$2'):
|
||
new_hash = hash_pin(req.password)
|
||
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, prof['id']))
|
||
|
||
# Create session
|
||
token = make_token()
|
||
session_days = prof.get('session_days', 30)
|
||
expires = datetime.now() + timedelta(days=session_days)
|
||
cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created_at) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)",
|
||
(token, prof['id'], expires.isoformat()))
|
||
|
||
return {
|
||
"token": token,
|
||
"profile_id": prof['id'],
|
||
"email": prof.get("email"),
|
||
"name": prof.get("name"),
|
||
"role": prof.get("role"),
|
||
"tier": prof.get("tier"),
|
||
"expires_at": expires.isoformat(),
|
||
}
|
||
|
||
|
||
@router.post("/logout")
|
||
def logout(x_auth_token: Optional[str]=Header(default=None)):
|
||
"""Logout (delete session)."""
|
||
if x_auth_token:
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("DELETE FROM sessions WHERE token=%s", (x_auth_token,))
|
||
return {"ok": True}
|
||
|
||
|
||
@router.get("/me")
|
||
def get_me(session: dict=Depends(require_auth)):
|
||
"""Get current user info."""
|
||
pid = session['profile_id']
|
||
# Import here to avoid circular dependency
|
||
from routers.profiles import profile_document
|
||
return profile_document(pid)
|
||
|
||
|
||
@router.get("/status")
|
||
def auth_status():
|
||
"""Health check endpoint."""
|
||
return {"status": "ok", "service": "mitai-jinkendo", "version": "v9b"}
|
||
|
||
|
||
@router.put("/pin")
|
||
def change_pin(req: dict, session: dict=Depends(require_auth)):
|
||
"""Change PIN/password for current user."""
|
||
pid = session['profile_id']
|
||
new_pin = req.get('pin', '')
|
||
if len(new_pin) < 8:
|
||
raise HTTPException(400, "Passwort muss mind. 8 Zeichen haben")
|
||
|
||
new_hash = hash_pin(new_pin)
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
|
||
|
||
return {"ok": True}
|
||
|
||
|
||
@router.post("/forgot-password")
|
||
@limiter.limit("3/minute")
|
||
async def password_reset_request(req: PasswordResetRequest, request: Request):
|
||
"""Request password reset email."""
|
||
email = req.email.lower().strip()
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT id, name FROM profiles WHERE email=%s", (email,))
|
||
prof = cur.fetchone()
|
||
if not prof:
|
||
# Don't reveal if email exists
|
||
return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."}
|
||
revoke_pending_password_resets_for_profile(cur, prof["id"])
|
||
raw_token = insert_password_reset_session(cur, prof["id"])
|
||
|
||
reset_body = password_reset_email_body(
|
||
recipient_name=prof.get("name"),
|
||
token=raw_token,
|
||
intro="Du hast einen Passwort-Reset angefordert.",
|
||
)
|
||
if not send_email(email, "Passwort zurücksetzen – Shinkan Jinkendo", reset_body):
|
||
print("[SMTP] Passwort-Reset konnte nicht gesendet werden (SMTP prüfen).")
|
||
|
||
return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."}
|
||
|
||
|
||
@router.post("/reset-password")
|
||
def password_reset_confirm(req: PasswordResetConfirm):
|
||
"""Confirm password reset with token."""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT profile_id FROM sessions WHERE token=%s AND expires_at > CURRENT_TIMESTAMP",
|
||
(f"reset_{req.token}",))
|
||
sess = cur.fetchone()
|
||
if not sess:
|
||
raise HTTPException(400, "Ungültiger oder abgelaufener Reset-Link")
|
||
|
||
pid = sess['profile_id']
|
||
new_hash = hash_pin(req.new_password)
|
||
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
|
||
cur.execute("DELETE FROM sessions WHERE token=%s", (f"reset_{req.token}",))
|
||
|
||
return {"ok": True, "message": "Passwort erfolgreich zurückgesetzt"}
|
||
|
||
# ── Helpers (Registrierung / E-Mail-Links) ─────────────────────────────────────
|
||
|
||
def _public_app_base() -> str:
|
||
return (os.getenv("APP_URL") or "https://shinkan.jinkendo.de").rstrip("/")
|
||
|
||
|
||
def _verification_link_expired(expires_at) -> bool:
|
||
"""
|
||
True, wenn Ablaufzeit in der Vergangenheit liegt.
|
||
PG liefert TIMESTAMP oft als naive datetime — Vergleich mit timezone-aware UTC
|
||
würde sonst TypeError → 500.
|
||
"""
|
||
if expires_at is None:
|
||
return False
|
||
deadline = expires_at.replace(tzinfo=timezone.utc) if expires_at.tzinfo is None else expires_at.astimezone(timezone.utc)
|
||
return datetime.now(timezone.utc) > deadline
|
||
|
||
|
||
def verification_link(token: str) -> str:
|
||
"""Link zur Web-App (`/verify?token=`); die SPA ruft wie bei Mitai die API auf."""
|
||
return f"{_public_app_base()}/verify?token={quote(token, safe='')}"
|
||
|
||
|
||
def registration_role(cur, email_lower: str) -> str:
|
||
"""
|
||
bootstrap: erste Registrierung in leerer DB → superadmin,
|
||
oder E-Mail ∈ ADMIN_BOOTSTRAP_EMAILS (kommasepariert, Groß/Klein egal).
|
||
|
||
superadmin deckt alle Portal-Rechte ab (inkl. löschen / andere Super-Admins).
|
||
Um alle Self-Regs als Trainer zu haben: AUTO_ADMIN_FIRST_USER=false und keine ADMIN_BOOTSTRAP_EMAILS.
|
||
"""
|
||
bootstrap = {
|
||
x.strip().lower()
|
||
for x in os.getenv("ADMIN_BOOTSTRAP_EMAILS", "").split(",")
|
||
if x.strip()
|
||
}
|
||
if email_lower in bootstrap:
|
||
return "superadmin"
|
||
if os.getenv("AUTO_ADMIN_FIRST_USER", "true").strip().lower() not in ("1", "true", "yes"):
|
||
return "trainer"
|
||
cur.execute("SELECT COUNT(*) AS n FROM profiles")
|
||
row = cur.fetchone()
|
||
try:
|
||
n = int(row["n"]) if row is not None else 0
|
||
except (KeyError, TypeError, ValueError):
|
||
n = 0
|
||
return "superadmin" if n == 0 else "trainer"
|
||
|
||
|
||
# ── Helper: Send Email ────────────────────────────────────────────────────────
|
||
def send_email(to_email: str, subject: str, body: str):
|
||
"""Send mail via SMTP. Port 465 → SSL; sonst SMTP + optional STARTTLS (587)."""
|
||
try:
|
||
smtp_host = (os.getenv("SMTP_HOST") or "").strip()
|
||
smtp_port_raw = os.getenv("SMTP_PORT") or "587"
|
||
try:
|
||
smtp_port = int(str(smtp_port_raw).strip())
|
||
except ValueError:
|
||
smtp_port = 587
|
||
smtp_user = (os.getenv("SMTP_USER") or "").strip()
|
||
smtp_pass = os.getenv("SMTP_PASS")
|
||
smtp_from = os.getenv("SMTP_FROM", "noreply@jinkendo.de")
|
||
|
||
if not smtp_host or not smtp_user or smtp_pass is None or smtp_pass == "":
|
||
print("[SMTP] Nicht konfiguriert — setze SMTP_HOST, SMTP_USER, SMTP_PASS (und ggf. SMTP_PORT)")
|
||
return False
|
||
|
||
msg = MIMEText(body)
|
||
msg["Subject"] = subject
|
||
msg["From"] = smtp_from
|
||
msg["To"] = to_email
|
||
|
||
use_tls_env = (os.getenv("SMTP_STARTTLS", "true").strip().lower() not in ("0", "false", "no"))
|
||
force_ssl = (os.getenv("SMTP_SSL", "").strip().lower() in ("1", "true", "yes")) or smtp_port == 465
|
||
|
||
ctx = ssl.create_default_context()
|
||
|
||
if force_ssl:
|
||
print(f"[SMTP] Versand (SSL) an …@{to_email.split('@')[-1]} über {smtp_host}:{smtp_port}")
|
||
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=ctx, timeout=30) as server:
|
||
server.login(smtp_user, smtp_pass)
|
||
server.send_message(msg)
|
||
else:
|
||
print(f"[SMTP] Versand (STARTTLS={use_tls_env}) an …@{to_email.split('@')[-1]} über {smtp_host}:{smtp_port}")
|
||
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as server:
|
||
server.ehlo()
|
||
if use_tls_env:
|
||
server.starttls(context=ctx)
|
||
server.ehlo()
|
||
server.login(smtp_user, smtp_pass)
|
||
server.send_message(msg)
|
||
|
||
print("[SMTP] Nachricht erfolgreich akzeptiert (Server-Queue)")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[SMTP] Fehler: {e}")
|
||
return False
|
||
|
||
|
||
# ── Registration Endpoints ────────────────────────────────────────────────────
|
||
@router.post("/register")
|
||
@limiter.limit("3/hour")
|
||
async def register(req: RegisterRequest, request: Request):
|
||
"""Self-registration with email verification."""
|
||
email = req.email.lower().strip()
|
||
name = req.name.strip()
|
||
password = req.password
|
||
|
||
# Validation
|
||
if not email or '@' not in email:
|
||
raise HTTPException(400, "Ungültige E-Mail-Adresse")
|
||
if len(password) < 8:
|
||
raise HTTPException(400, "Passwort muss mindestens 8 Zeichen lang sein")
|
||
if not name or len(name) < 2:
|
||
raise HTTPException(400, "Name muss mindestens 2 Zeichen lang sein")
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Check if email already exists
|
||
cur.execute("SELECT id FROM profiles WHERE email=%s", (email,))
|
||
if cur.fetchone():
|
||
raise HTTPException(400, "E-Mail-Adresse bereits registriert")
|
||
|
||
# Generate verification token
|
||
verification_token = secrets.token_urlsafe(32)
|
||
verification_expires = datetime.now(timezone.utc) + timedelta(hours=24)
|
||
|
||
# Rolle: erster Nutzer oder ADMIN_BOOTSTRAP_EMAILS → superadmin
|
||
role = registration_role(cur, email)
|
||
|
||
# Create profile (inactive until verified) — profiles.id ist SERIAL (INT), keine String-IDs einfügen.
|
||
pin_hash = hash_pin(password)
|
||
trial_ends = datetime.now(timezone.utc) + timedelta(days=14) # 14-day trial
|
||
|
||
cur.execute("""
|
||
INSERT INTO profiles (
|
||
name, email, pin_hash, auth_type, role, tier,
|
||
email_verified, verification_token, verification_expires,
|
||
trial_ends_at, created_at
|
||
) VALUES (%s, %s, %s, 'email', %s, 'free', FALSE, %s, %s, %s, CURRENT_TIMESTAMP)
|
||
RETURNING id
|
||
""", (name, email, pin_hash, role, verification_token, verification_expires, trial_ends))
|
||
new_profile_id = cur.fetchone()["id"]
|
||
|
||
req_club = req.requested_club_id
|
||
if req_club is not None:
|
||
cur.execute(
|
||
"SELECT id FROM clubs WHERE id = %s AND status = 'active'",
|
||
(int(req_club),),
|
||
)
|
||
if cur.fetchone():
|
||
cur.execute(
|
||
"""
|
||
SELECT id FROM club_membership_requests
|
||
WHERE profile_id = %s AND club_id = %s AND status = 'pending'
|
||
LIMIT 1
|
||
""",
|
||
(new_profile_id, int(req_club)),
|
||
)
|
||
if not cur.fetchone():
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO club_membership_requests (profile_id, club_id, status, message)
|
||
VALUES (%s, %s, 'pending', NULL)
|
||
""",
|
||
(new_profile_id, int(req_club)),
|
||
)
|
||
|
||
verify_url = verification_link(verification_token)
|
||
|
||
email_body = f"""Hallo {name},
|
||
|
||
willkommen bei Shinkan Jinkendo!
|
||
|
||
Bitte bestätige deine E-Mail-Adresse, um die Registrierung abzuschließen:
|
||
|
||
{verify_url}
|
||
|
||
Der Link ist 24 Stunden gültig.
|
||
|
||
Dein Shinkan Jinkendo Team
|
||
"""
|
||
|
||
if not send_email(email, "Shinkan Jinkendo – E-Mail bestätigen", email_body):
|
||
print("[SMTP] Verifizierungs-Mail konnte nicht gesendet werden — Logs prüfen, SMTP_* in der Laufzeitumgebung.")
|
||
|
||
return {
|
||
"ok": True,
|
||
"message": "Registrierung erfolgreich! Bitte prüfe dein E-Mail-Postfach und bestätige deine E-Mail-Adresse."
|
||
}
|
||
|
||
|
||
@router.get("/verify/{token}")
|
||
async def verify_email(token: str):
|
||
"""Verify email address and activate account."""
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Find profile with this verification token
|
||
cur.execute("""
|
||
SELECT id, name, email, email_verified, verification_expires
|
||
FROM profiles
|
||
WHERE verification_token=%s
|
||
""", (token,))
|
||
|
||
prof = cur.fetchone()
|
||
|
||
if not prof:
|
||
# Token not found - might be already used/verified
|
||
# Check if there's a verified profile (token was deleted after verification)
|
||
raise HTTPException(400, "Verifikations-Link ungültig oder bereits verwendet. Falls du bereits verifiziert bist, melde dich einfach an.")
|
||
|
||
if prof['email_verified']:
|
||
raise HTTPException(400, "E-Mail-Adresse bereits bestätigt")
|
||
|
||
# Check if token expired
|
||
if _verification_link_expired(prof["verification_expires"]):
|
||
raise HTTPException(400, "Verifikations-Link abgelaufen. Bitte registriere dich erneut.")
|
||
|
||
# Mark as verified and clear token
|
||
cur.execute("""
|
||
UPDATE profiles
|
||
SET email_verified=TRUE, verification_token=NULL, verification_expires=NULL
|
||
WHERE id=%s
|
||
""", (prof['id'],))
|
||
|
||
# Create session (auto-login after verification)
|
||
session_token = make_token()
|
||
expires = datetime.now(timezone.utc) + timedelta(days=30)
|
||
cur.execute("""
|
||
INSERT INTO sessions (token, profile_id, expires_at, created_at)
|
||
VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
|
||
""", (session_token, prof['id'], expires))
|
||
|
||
return {
|
||
"ok": True,
|
||
"message": "E-Mail-Adresse erfolgreich bestätigt!",
|
||
"token": session_token,
|
||
"profile": {
|
||
"id": prof['id'],
|
||
"name": prof['name'],
|
||
"email": prof['email']
|
||
}
|
||
}
|
||
|
||
|
||
@router.post("/resend-verification")
|
||
@limiter.limit("3/hour")
|
||
async def resend_verification(req: dict, request: Request):
|
||
"""Resend verification email for unverified account."""
|
||
email = req.get('email', '').strip().lower()
|
||
|
||
if not email:
|
||
raise HTTPException(400, "E-Mail-Adresse erforderlich")
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
|
||
# Find profile by email
|
||
cur.execute("""
|
||
SELECT id, name, email, email_verified, verification_token, verification_expires
|
||
FROM profiles
|
||
WHERE email=%s
|
||
""", (email,))
|
||
|
||
prof = cur.fetchone()
|
||
|
||
if not prof:
|
||
# Don't leak info about existing emails
|
||
return {"ok": True, "message": "Falls ein Account mit dieser E-Mail existiert, wurde eine Bestätigungs-E-Mail versendet."}
|
||
|
||
if prof['email_verified']:
|
||
raise HTTPException(400, "E-Mail-Adresse bereits bestätigt")
|
||
|
||
# Generate new verification token
|
||
verification_token = secrets.token_urlsafe(32)
|
||
verification_expires = datetime.now(timezone.utc) + timedelta(hours=24)
|
||
|
||
cur.execute("""
|
||
UPDATE profiles
|
||
SET verification_token=%s, verification_expires=%s
|
||
WHERE id=%s
|
||
""", (verification_token, verification_expires, prof['id']))
|
||
|
||
verify_url = verification_link(verification_token)
|
||
|
||
email_body = f"""Hallo {prof['name']},
|
||
|
||
du hast eine neue Bestätigungs-E-Mail angefordert.
|
||
|
||
Bitte bestätige deine E-Mail-Adresse mit diesem Link:
|
||
|
||
{verify_url}
|
||
|
||
Dieser Link ist 24 Stunden gültig.
|
||
|
||
Dein Shinkan Jinkendo Team
|
||
"""
|
||
|
||
if not send_email(email, "Shinkan Jinkendo – E-Mail bestätigen", email_body):
|
||
raise HTTPException(502, "E-Mail konnte nicht versendet werden. SMTP-Einstellungen und Container-Logs prüfen.")
|
||
|
||
return {"ok": True, "message": "Bestätigungs-E-Mail wurde erneut versendet."}
|