refactor: extract auth functions to auth.py
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s

Phase 1.2 - Authentication-Logik isolieren

NEUE DATEI:
- backend/auth.py: Auth-Funktionen mit Dokumentation
  * hash_pin() - bcrypt + SHA256 legacy support
  * verify_pin() - Password verification
  * make_token() - Session token generation
  * get_session() - Token validation
  * require_auth() - FastAPI dependency
  * require_auth_flexible() - Auth via header OR query
  * require_admin() - Admin-only dependency

ÄNDERUNGEN:
- backend/main.py:
  * Import from auth.py
  * Removed 48 lines of auth code
  * hashlib, secrets nicht mehr benötigt

KEINE funktionalen Änderungen.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Lars 2026-03-19 09:51:25 +01:00
parent 548d733048
commit d826524789
2 changed files with 118 additions and 47 deletions

116
backend/auth.py Normal file
View File

@ -0,0 +1,116 @@
"""
Authentication and Authorization for Mitai Jinkendo
Provides password hashing, session management, and auth dependencies
for FastAPI endpoints.
"""
import hashlib
import secrets
from typing import Optional
from fastapi import Header, Query, HTTPException
import bcrypt
from db import get_db, get_cursor
def hash_pin(pin: str) -> str:
"""Hash password with bcrypt. Falls back gracefully from legacy SHA256."""
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
def verify_pin(pin: str, stored_hash: str) -> bool:
"""Verify password - supports both bcrypt and legacy SHA256."""
if not stored_hash:
return False
# Detect bcrypt hash (starts with $2b$ or $2a$)
if stored_hash.startswith('$2'):
try:
return bcrypt.checkpw(pin.encode(), stored_hash.encode())
except Exception:
return False
# Legacy SHA256 support (auto-upgrade to bcrypt on next login)
return stored_hash == hashlib.sha256(pin.encode()).hexdigest()
def make_token() -> str:
"""Generate a secure random token for sessions."""
return secrets.token_urlsafe(32)
def get_session(token: str):
"""
Get session data for a given token.
Returns session dict with profile info, or None if invalid/expired.
"""
if not token:
return None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT s.*, p.role, p.name, p.ai_enabled, p.ai_limit_day, p.export_enabled "
"FROM sessions s JOIN profiles p ON s.profile_id=p.id "
"WHERE s.token=%s AND s.expires_at > CURRENT_TIMESTAMP",
(token,)
)
return cur.fetchone()
def require_auth(x_auth_token: Optional[str] = Header(default=None)):
"""
FastAPI dependency - requires valid authentication.
Usage:
@app.get("/api/endpoint")
def endpoint(session: dict = Depends(require_auth)):
profile_id = session['profile_id']
...
Raises:
HTTPException 401 if not authenticated
"""
session = get_session(x_auth_token)
if not session:
raise HTTPException(401, "Nicht eingeloggt")
return session
def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), token: Optional[str] = Query(default=None)):
"""
FastAPI dependency - auth via header OR query parameter.
Used for endpoints accessed by <img> tags that can't send headers.
Usage:
@app.get("/api/photos/{id}")
def get_photo(id: str, session: dict = Depends(require_auth_flexible)):
...
Raises:
HTTPException 401 if not authenticated
"""
session = get_session(x_auth_token or token)
if not session:
raise HTTPException(401, "Nicht eingeloggt")
return session
def require_admin(x_auth_token: Optional[str] = Header(default=None)):
"""
FastAPI dependency - requires admin authentication.
Usage:
@app.put("/api/admin/endpoint")
def admin_endpoint(session: dict = Depends(require_admin)):
...
Raises:
HTTPException 401 if not authenticated
HTTPException 403 if not admin
"""
session = get_session(x_auth_token)
if not session:
raise HTTPException(401, "Nicht eingeloggt")
if session['role'] != 'admin':
raise HTTPException(403, "Nur für Admins")
return session

View File

@ -16,6 +16,7 @@ from slowapi.errors import RateLimitExceeded
from starlette.requests import Request
from db import get_db, get_cursor, r2d, init_db
from auth import hash_pin, verify_pin, make_token, get_session, require_auth, require_auth_flexible, require_admin
DATA_DIR = Path(os.getenv("DATA_DIR", "./data"))
PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos"))
@ -114,55 +115,9 @@ class NutritionDay(BaseModel):
fat_g: Optional[float]=None; carbs_g: Optional[float]=None
# ── Profiles ──────────────────────────────────────────────────────────────────
import hashlib, secrets
from datetime import timedelta
def hash_pin(pin: str) -> str:
"""Hash password with bcrypt. Falls back gracefully from legacy SHA256."""
return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode()
def verify_pin(pin: str, stored_hash: str) -> bool:
"""Verify password - supports both bcrypt and legacy SHA256."""
if not stored_hash:
return False
# Detect bcrypt hash (starts with $2b$ or $2a$)
if stored_hash.startswith('$2'):
return bcrypt.checkpw(pin.encode(), stored_hash.encode())
# Legacy SHA256 fallback - auto-upgrade on successful login
import hashlib
return hashlib.sha256(pin.encode()).hexdigest() == stored_hash
def make_token() -> str:
return secrets.token_urlsafe(32)
def get_session(token: str):
if not token: return None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT s.*, p.role, p.name, p.ai_enabled, p.ai_limit_day, p.export_enabled "
"FROM sessions s JOIN profiles p ON s.profile_id=p.id "
"WHERE s.token=%s AND s.expires_at > CURRENT_TIMESTAMP", (token,)
)
row = cur.fetchone()
return r2d(row)
def require_auth(x_auth_token: Optional[str]=Header(default=None)):
session = get_session(x_auth_token)
if not session: raise HTTPException(401, "Nicht eingeloggt")
return session
def require_auth_flexible(x_auth_token: Optional[str]=Header(default=None), token: Optional[str]=Query(default=None)):
"""Auth via header OR query parameter (for <img> tags)."""
session = get_session(x_auth_token or token)
if not session: raise HTTPException(401, "Nicht eingeloggt")
return session
def require_admin(x_auth_token: Optional[str]=Header(default=None)):
session = get_session(x_auth_token)
if not session: raise HTTPException(401, "Nicht eingeloggt")
if session['role'] != 'admin': raise HTTPException(403, "Nur für Admins")
return session
# Auth functions moved to auth.py
@app.get("/api/profiles")
def list_profiles(session=Depends(require_auth)):