mitai-jinkendo/backend/auth.py
Lars d826524789
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
refactor: extract auth functions to auth.py
Phase 1.2 - Authentication-Logik isolieren

NEUE DATEI:
- backend/auth.py: Auth-Funktionen mit Dokumentation
  * hash_pin() - bcrypt + SHA256 legacy support
  * verify_pin() - Password verification
  * make_token() - Session token generation
  * get_session() - Token validation
  * require_auth() - FastAPI dependency
  * require_auth_flexible() - Auth via header OR query
  * require_admin() - Admin-only dependency

ÄNDERUNGEN:
- backend/main.py:
  * Import from auth.py
  * Removed 48 lines of auth code
  * hashlib, secrets nicht mehr benötigt

KEINE funktionalen Änderungen.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 09:51:25 +01:00

117 lines
3.3 KiB
Python

"""
Authentication and Authorization for Mitai Jinkendo
Provides password hashing, session management, and auth dependencies
for FastAPI endpoints.
"""
import hashlib
import secrets
from typing import Optional
from fastapi import Header, Query, HTTPException
import bcrypt
from db import get_db, get_cursor
def hash_pin(pin: str) -> str:
"""Hash password with bcrypt. Falls back gracefully from legacy SHA256."""
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
def verify_pin(pin: str, stored_hash: str) -> bool:
"""Verify password - supports both bcrypt and legacy SHA256."""
if not stored_hash:
return False
# Detect bcrypt hash (starts with $2b$ or $2a$)
if stored_hash.startswith('$2'):
try:
return bcrypt.checkpw(pin.encode(), stored_hash.encode())
except Exception:
return False
# Legacy SHA256 support (auto-upgrade to bcrypt on next login)
return stored_hash == hashlib.sha256(pin.encode()).hexdigest()
def make_token() -> str:
"""Generate a secure random token for sessions."""
return secrets.token_urlsafe(32)
def get_session(token: str):
"""
Get session data for a given token.
Returns session dict with profile info, or None if invalid/expired.
"""
if not token:
return None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT s.*, p.role, p.name, p.ai_enabled, p.ai_limit_day, p.export_enabled "
"FROM sessions s JOIN profiles p ON s.profile_id=p.id "
"WHERE s.token=%s AND s.expires_at > CURRENT_TIMESTAMP",
(token,)
)
return cur.fetchone()
def require_auth(x_auth_token: Optional[str] = Header(default=None)):
"""
FastAPI dependency - requires valid authentication.
Usage:
@app.get("/api/endpoint")
def endpoint(session: dict = Depends(require_auth)):
profile_id = session['profile_id']
...
Raises:
HTTPException 401 if not authenticated
"""
session = get_session(x_auth_token)
if not session:
raise HTTPException(401, "Nicht eingeloggt")
return session
def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), token: Optional[str] = Query(default=None)):
"""
FastAPI dependency - auth via header OR query parameter.
Used for endpoints accessed by <img> tags that can't send headers.
Usage:
@app.get("/api/photos/{id}")
def get_photo(id: str, session: dict = Depends(require_auth_flexible)):
...
Raises:
HTTPException 401 if not authenticated
"""
session = get_session(x_auth_token or token)
if not session:
raise HTTPException(401, "Nicht eingeloggt")
return session
def require_admin(x_auth_token: Optional[str] = Header(default=None)):
"""
FastAPI dependency - requires admin authentication.
Usage:
@app.put("/api/admin/endpoint")
def admin_endpoint(session: dict = Depends(require_admin)):
...
Raises:
HTTPException 401 if not authenticated
HTTPException 403 if not admin
"""
session = get_session(x_auth_token)
if not session:
raise HTTPException(401, "Nicht eingeloggt")
if session['role'] != 'admin':
raise HTTPException(403, "Nur für Admins")
return session