""" Authentication Endpoints for Mitai Jinkendo Handles login, logout, password reset, and profile authentication. """ import os import secrets import smtplib from typing import Optional from datetime import datetime, timedelta from email.mime.text import MIMEText from fastapi import APIRouter, HTTPException, Header, Depends from starlette.requests import Request from slowapi import Limiter from slowapi.util import get_remote_address from db import get_db, get_cursor from auth import hash_pin, verify_pin, make_token, require_auth from models import LoginRequest, PasswordResetRequest, PasswordResetConfirm, RegisterRequest router = APIRouter(prefix="/api/auth", tags=["auth"]) limiter = Limiter(key_func=get_remote_address) @router.post("/login") @limiter.limit("5/minute") async def login(req: LoginRequest, request: Request): """Login with email + password.""" with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM profiles WHERE email=%s", (req.email.lower().strip(),)) prof = cur.fetchone() if not prof: raise HTTPException(401, "Ungültige Zugangsdaten") # Verify password if not verify_pin(req.password, prof['pin_hash']): raise HTTPException(401, "Ungültige Zugangsdaten") # Auto-upgrade from SHA256 to bcrypt if prof['pin_hash'] and not prof['pin_hash'].startswith('$2'): new_hash = hash_pin(req.password) cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, prof['id'])) # Create session token = make_token() session_days = prof.get('session_days', 30) expires = datetime.now() + timedelta(days=session_days) cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)", (token, prof['id'], expires.isoformat())) return { "token": token, "profile_id": prof['id'], "name": prof['name'], "role": prof['role'], "expires_at": expires.isoformat() } @router.post("/logout") def logout(x_auth_token: Optional[str]=Header(default=None)): """Logout (delete session).""" if x_auth_token: with get_db() as conn: cur = get_cursor(conn) cur.execute("DELETE FROM sessions WHERE token=%s", (x_auth_token,)) return {"ok": True} @router.get("/me") def get_me(session: dict=Depends(require_auth)): """Get current user info.""" pid = session['profile_id'] # Import here to avoid circular dependency from routers.profiles import get_profile return get_profile(pid, session) @router.get("/status") def auth_status(): """Health check endpoint.""" return {"status": "ok", "service": "mitai-jinkendo", "version": "v9b"} @router.put("/pin") def change_pin(req: dict, session: dict=Depends(require_auth)): """Change PIN/password for current user.""" pid = session['profile_id'] new_pin = req.get('pin', '') if len(new_pin) < 4: raise HTTPException(400, "PIN/Passwort muss mind. 4 Zeichen haben") new_hash = hash_pin(new_pin) with get_db() as conn: cur = get_cursor(conn) cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid)) return {"ok": True} @router.post("/forgot-password") @limiter.limit("3/minute") async def password_reset_request(req: PasswordResetRequest, request: Request): """Request password reset email.""" email = req.email.lower().strip() with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT id, name FROM profiles WHERE email=%s", (email,)) prof = cur.fetchone() if not prof: # Don't reveal if email exists return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."} # Generate reset token token = secrets.token_urlsafe(32) expires = datetime.now() + timedelta(hours=1) # Store in sessions table (reuse mechanism) cur.execute("INSERT INTO sessions (token, profile_id, expires_at, created) VALUES (%s,%s,%s,CURRENT_TIMESTAMP)", (f"reset_{token}", prof['id'], expires.isoformat())) # Send email try: smtp_host = os.getenv("SMTP_HOST") smtp_port = int(os.getenv("SMTP_PORT", 587)) smtp_user = os.getenv("SMTP_USER") smtp_pass = os.getenv("SMTP_PASS") smtp_from = os.getenv("SMTP_FROM") app_url = os.getenv("APP_URL", "https://mitai.jinkendo.de") if smtp_host and smtp_user and smtp_pass: msg = MIMEText(f"""Hallo {prof['name']}, Du hast einen Passwort-Reset angefordert. Reset-Link: {app_url}/reset-password?token={token} Der Link ist 1 Stunde gültig. Falls du diese Anfrage nicht gestellt hast, ignoriere diese E-Mail. Dein Mitai Jinkendo Team """) msg['Subject'] = "Passwort zurücksetzen – Mitai Jinkendo" msg['From'] = smtp_from msg['To'] = email with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) except Exception as e: print(f"Email error: {e}") return {"ok": True, "message": "Falls die E-Mail existiert, wurde ein Reset-Link gesendet."} @router.post("/reset-password") def password_reset_confirm(req: PasswordResetConfirm): """Confirm password reset with token.""" with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT profile_id FROM sessions WHERE token=%s AND expires_at > CURRENT_TIMESTAMP", (f"reset_{req.token}",)) sess = cur.fetchone() if not sess: raise HTTPException(400, "Ungültiger oder abgelaufener Reset-Link") pid = sess['profile_id'] new_hash = hash_pin(req.new_password) cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid)) cur.execute("DELETE FROM sessions WHERE token=%s", (f"reset_{req.token}",)) return {"ok": True, "message": "Passwort erfolgreich zurückgesetzt"} # ── Helper: Send Email ──────────────────────────────────────────────────────── def send_email(to_email: str, subject: str, body: str): """Send email via SMTP (reusable helper).""" try: smtp_host = os.getenv("SMTP_HOST") smtp_port = int(os.getenv("SMTP_PORT", 587)) smtp_user = os.getenv("SMTP_USER") smtp_pass = os.getenv("SMTP_PASS") smtp_from = os.getenv("SMTP_FROM", "noreply@jinkendo.de") if not smtp_host or not smtp_user or not smtp_pass: print("SMTP not configured, skipping email") return False msg = MIMEText(body) msg['Subject'] = subject msg['From'] = smtp_from msg['To'] = to_email with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False # ── Registration Endpoints ──────────────────────────────────────────────────── @router.post("/register") @limiter.limit("3/hour") async def register(req: RegisterRequest, request: Request): """Self-registration with email verification.""" email = req.email.lower().strip() name = req.name.strip() password = req.password # Validation if not email or '@' not in email: raise HTTPException(400, "Ungültige E-Mail-Adresse") if len(password) < 8: raise HTTPException(400, "Passwort muss mindestens 8 Zeichen lang sein") if not name or len(name) < 2: raise HTTPException(400, "Name muss mindestens 2 Zeichen lang sein") with get_db() as conn: cur = get_cursor(conn) # Check if email already exists cur.execute("SELECT id FROM profiles WHERE email=%s", (email,)) if cur.fetchone(): raise HTTPException(400, "E-Mail-Adresse bereits registriert") # Generate verification token verification_token = secrets.token_urlsafe(32) verification_expires = datetime.now() + timedelta(hours=24) # Create profile (inactive until verified) profile_id = str(secrets.token_hex(16)) pin_hash = hash_pin(password) cur.execute(""" INSERT INTO profiles ( id, name, email, pin_hash, auth_type, role, tier, email_verified, verification_token, verification_expires, created ) VALUES (%s, %s, %s, %s, 'email', 'user', 'free', FALSE, %s, %s, CURRENT_TIMESTAMP) """, (profile_id, name, email, pin_hash, verification_token, verification_expires)) # Send verification email app_url = os.getenv("APP_URL", "https://mitai.jinkendo.de") verify_url = f"{app_url}/verify?token={verification_token}" email_body = f"""Hallo {name}, willkommen bei Mitai Jinkendo! Bitte bestätige deine E-Mail-Adresse um die Registrierung abzuschließen: {verify_url} Der Link ist 24 Stunden gültig. Dein Mitai Jinkendo Team """ send_email(email, "Willkommen bei Mitai Jinkendo – E-Mail bestätigen", email_body) return { "ok": True, "message": "Registrierung erfolgreich! Bitte prüfe dein E-Mail-Postfach und bestätige deine E-Mail-Adresse." } @router.get("/verify/{token}") async def verify_email(token: str): """Verify email address and activate account.""" with get_db() as conn: cur = get_cursor(conn) # Find profile with this verification token cur.execute(""" SELECT id, name, email, email_verified, verification_expires FROM profiles WHERE verification_token=%s """, (token,)) prof = cur.fetchone() if not prof: raise HTTPException(400, "Ungültiger Verifikations-Link") if prof['email_verified']: raise HTTPException(400, "E-Mail-Adresse bereits bestätigt") # Check if token expired if prof['verification_expires'] and datetime.now() > prof['verification_expires']: raise HTTPException(400, "Verifikations-Link abgelaufen. Bitte registriere dich erneut.") # Mark as verified and clear token cur.execute(""" UPDATE profiles SET email_verified=TRUE, verification_token=NULL, verification_expires=NULL WHERE id=%s """, (prof['id'],)) # Create session (auto-login after verification) session_token = make_token() expires = datetime.now() + timedelta(days=30) cur.execute(""" INSERT INTO sessions (token, profile_id, expires_at, created) VALUES (%s, %s, %s, CURRENT_TIMESTAMP) """, (session_token, prof['id'], expires)) return { "ok": True, "message": "E-Mail-Adresse erfolgreich bestätigt!", "token": session_token, "profile": { "id": prof['id'], "name": prof['name'], "email": prof['email'] } }