diff --git a/backend/auth.py b/backend/auth.py new file mode 100644 index 0000000..9c7ece4 --- /dev/null +++ b/backend/auth.py @@ -0,0 +1,116 @@ +""" +Authentication and Authorization for Mitai Jinkendo + +Provides password hashing, session management, and auth dependencies +for FastAPI endpoints. +""" +import hashlib +import secrets +from typing import Optional +from fastapi import Header, Query, HTTPException +import bcrypt + +from db import get_db, get_cursor + + +def hash_pin(pin: str) -> str: + """Hash password with bcrypt. Falls back gracefully from legacy SHA256.""" + return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode() + + +def verify_pin(pin: str, stored_hash: str) -> bool: + """Verify password - supports both bcrypt and legacy SHA256.""" + if not stored_hash: + return False + # Detect bcrypt hash (starts with $2b$ or $2a$) + if stored_hash.startswith('$2'): + try: + return bcrypt.checkpw(pin.encode(), stored_hash.encode()) + except Exception: + return False + # Legacy SHA256 support (auto-upgrade to bcrypt on next login) + return stored_hash == hashlib.sha256(pin.encode()).hexdigest() + + +def make_token() -> str: + """Generate a secure random token for sessions.""" + return secrets.token_urlsafe(32) + + +def get_session(token: str): + """ + Get session data for a given token. + + Returns session dict with profile info, or None if invalid/expired. + """ + if not token: + return None + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + "SELECT s.*, p.role, p.name, p.ai_enabled, p.ai_limit_day, p.export_enabled " + "FROM sessions s JOIN profiles p ON s.profile_id=p.id " + "WHERE s.token=%s AND s.expires_at > CURRENT_TIMESTAMP", + (token,) + ) + return cur.fetchone() + + +def require_auth(x_auth_token: Optional[str] = Header(default=None)): + """ + FastAPI dependency - requires valid authentication. + + Usage: + @app.get("/api/endpoint") + def endpoint(session: dict = Depends(require_auth)): + profile_id = session['profile_id'] + ... + + Raises: + HTTPException 401 if not authenticated + """ + session = get_session(x_auth_token) + if not session: + raise HTTPException(401, "Nicht eingeloggt") + return session + + +def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), token: Optional[str] = Query(default=None)): + """ + FastAPI dependency - auth via header OR query parameter. + + Used for endpoints accessed by tags that can't send headers. + + Usage: + @app.get("/api/photos/{id}") + def get_photo(id: str, session: dict = Depends(require_auth_flexible)): + ... + + Raises: + HTTPException 401 if not authenticated + """ + session = get_session(x_auth_token or token) + if not session: + raise HTTPException(401, "Nicht eingeloggt") + return session + + +def require_admin(x_auth_token: Optional[str] = Header(default=None)): + """ + FastAPI dependency - requires admin authentication. + + Usage: + @app.put("/api/admin/endpoint") + def admin_endpoint(session: dict = Depends(require_admin)): + ... + + Raises: + HTTPException 401 if not authenticated + HTTPException 403 if not admin + """ + session = get_session(x_auth_token) + if not session: + raise HTTPException(401, "Nicht eingeloggt") + if session['role'] != 'admin': + raise HTTPException(403, "Nur für Admins") + return session diff --git a/backend/main.py b/backend/main.py index 49f85b1..e8a0f78 100644 --- a/backend/main.py +++ b/backend/main.py @@ -16,6 +16,7 @@ from slowapi.errors import RateLimitExceeded from starlette.requests import Request from db import get_db, get_cursor, r2d, init_db +from auth import hash_pin, verify_pin, make_token, get_session, require_auth, require_auth_flexible, require_admin DATA_DIR = Path(os.getenv("DATA_DIR", "./data")) PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos")) @@ -114,55 +115,9 @@ class NutritionDay(BaseModel): fat_g: Optional[float]=None; carbs_g: Optional[float]=None # ── Profiles ────────────────────────────────────────────────────────────────── -import hashlib, secrets from datetime import timedelta -def hash_pin(pin: str) -> str: - """Hash password with bcrypt. Falls back gracefully from legacy SHA256.""" - return bcrypt.hashpw(pin.encode(), bcrypt.gensalt()).decode() - -def verify_pin(pin: str, stored_hash: str) -> bool: - """Verify password - supports both bcrypt and legacy SHA256.""" - if not stored_hash: - return False - # Detect bcrypt hash (starts with $2b$ or $2a$) - if stored_hash.startswith('$2'): - return bcrypt.checkpw(pin.encode(), stored_hash.encode()) - # Legacy SHA256 fallback - auto-upgrade on successful login - import hashlib - return hashlib.sha256(pin.encode()).hexdigest() == stored_hash - -def make_token() -> str: - return secrets.token_urlsafe(32) - -def get_session(token: str): - if not token: return None - with get_db() as conn: - cur = get_cursor(conn) - cur.execute( - "SELECT s.*, p.role, p.name, p.ai_enabled, p.ai_limit_day, p.export_enabled " - "FROM sessions s JOIN profiles p ON s.profile_id=p.id " - "WHERE s.token=%s AND s.expires_at > CURRENT_TIMESTAMP", (token,) - ) - row = cur.fetchone() - return r2d(row) - -def require_auth(x_auth_token: Optional[str]=Header(default=None)): - session = get_session(x_auth_token) - if not session: raise HTTPException(401, "Nicht eingeloggt") - return session - -def require_auth_flexible(x_auth_token: Optional[str]=Header(default=None), token: Optional[str]=Query(default=None)): - """Auth via header OR query parameter (for tags).""" - session = get_session(x_auth_token or token) - if not session: raise HTTPException(401, "Nicht eingeloggt") - return session - -def require_admin(x_auth_token: Optional[str]=Header(default=None)): - session = get_session(x_auth_token) - if not session: raise HTTPException(401, "Nicht eingeloggt") - if session['role'] != 'admin': raise HTTPException(403, "Nur für Admins") - return session +# Auth functions moved to auth.py @app.get("/api/profiles") def list_profiles(session=Depends(require_auth)):