Phase 1.2 - Authentication-Logik isolieren NEUE DATEI: - backend/auth.py: Auth-Funktionen mit Dokumentation * hash_pin() - bcrypt + SHA256 legacy support * verify_pin() - Password verification * make_token() - Session token generation * get_session() - Token validation * require_auth() - FastAPI dependency * require_auth_flexible() - Auth via header OR query * require_admin() - Admin-only dependency ÄNDERUNGEN: - backend/main.py: * Import from auth.py * Removed 48 lines of auth code * hashlib, secrets nicht mehr benötigt KEINE funktionalen Änderungen. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
117 lines
3.3 KiB
Python
117 lines
3.3 KiB
Python
"""
|
|
Authentication and Authorization for Mitai Jinkendo
|
|
|
|
Provides password hashing, session management, and auth dependencies
|
|
for FastAPI endpoints.
|
|
"""
|
|
import hashlib
|
|
import secrets
|
|
from typing import Optional
|
|
from fastapi import Header, Query, HTTPException
|
|
import bcrypt
|
|
|
|
from db import get_db, get_cursor
|
|
|
|
|
|
def hash_pin(pin: str) -> str:
|
|
"""Hash password with bcrypt. Falls back gracefully from legacy SHA256."""
|
|
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
|
|
|
|
|
|
def verify_pin(pin: str, stored_hash: str) -> bool:
|
|
"""Verify password - supports both bcrypt and legacy SHA256."""
|
|
if not stored_hash:
|
|
return False
|
|
# Detect bcrypt hash (starts with $2b$ or $2a$)
|
|
if stored_hash.startswith('$2'):
|
|
try:
|
|
return bcrypt.checkpw(pin.encode(), stored_hash.encode())
|
|
except Exception:
|
|
return False
|
|
# Legacy SHA256 support (auto-upgrade to bcrypt on next login)
|
|
return stored_hash == hashlib.sha256(pin.encode()).hexdigest()
|
|
|
|
|
|
def make_token() -> str:
|
|
"""Generate a secure random token for sessions."""
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def get_session(token: str):
|
|
"""
|
|
Get session data for a given token.
|
|
|
|
Returns session dict with profile info, or None if invalid/expired.
|
|
"""
|
|
if not token:
|
|
return None
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT s.*, p.role, p.name, p.ai_enabled, p.ai_limit_day, p.export_enabled "
|
|
"FROM sessions s JOIN profiles p ON s.profile_id=p.id "
|
|
"WHERE s.token=%s AND s.expires_at > CURRENT_TIMESTAMP",
|
|
(token,)
|
|
)
|
|
return cur.fetchone()
|
|
|
|
|
|
def require_auth(x_auth_token: Optional[str] = Header(default=None)):
|
|
"""
|
|
FastAPI dependency - requires valid authentication.
|
|
|
|
Usage:
|
|
@app.get("/api/endpoint")
|
|
def endpoint(session: dict = Depends(require_auth)):
|
|
profile_id = session['profile_id']
|
|
...
|
|
|
|
Raises:
|
|
HTTPException 401 if not authenticated
|
|
"""
|
|
session = get_session(x_auth_token)
|
|
if not session:
|
|
raise HTTPException(401, "Nicht eingeloggt")
|
|
return session
|
|
|
|
|
|
def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), token: Optional[str] = Query(default=None)):
|
|
"""
|
|
FastAPI dependency - auth via header OR query parameter.
|
|
|
|
Used for endpoints accessed by <img> tags that can't send headers.
|
|
|
|
Usage:
|
|
@app.get("/api/photos/{id}")
|
|
def get_photo(id: str, session: dict = Depends(require_auth_flexible)):
|
|
...
|
|
|
|
Raises:
|
|
HTTPException 401 if not authenticated
|
|
"""
|
|
session = get_session(x_auth_token or token)
|
|
if not session:
|
|
raise HTTPException(401, "Nicht eingeloggt")
|
|
return session
|
|
|
|
|
|
def require_admin(x_auth_token: Optional[str] = Header(default=None)):
|
|
"""
|
|
FastAPI dependency - requires admin authentication.
|
|
|
|
Usage:
|
|
@app.put("/api/admin/endpoint")
|
|
def admin_endpoint(session: dict = Depends(require_admin)):
|
|
...
|
|
|
|
Raises:
|
|
HTTPException 401 if not authenticated
|
|
HTTPException 403 if not admin
|
|
"""
|
|
session = get_session(x_auth_token)
|
|
if not session:
|
|
raise HTTPException(401, "Nicht eingeloggt")
|
|
if session['role'] != 'admin':
|
|
raise HTTPException(403, "Nur für Admins")
|
|
return session
|