mitai-jinkendo/backend/routers/photos.py
Lars 0035d08149
All checks were successful
Deploy Development / deploy (push) Successful in 1m4s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat: enhance photo upload and management features
- Added `taken_at` timestamp to the photos table for improved photo metadata.
- Updated the photo upload API to support optional EXIF data extraction and file last modified timestamp.
- Enhanced the photo upload process to allow skipping EXIF data, defaulting to today's date if no other date is provided.
- Improved the photo display in various components to utilize a unified caption format.
- Refactored photo sorting and grouping logic for better organization in the UI.
2026-04-19 10:13:22 +02:00

191 lines
6.2 KiB
Python

"""
Photo Management Endpoints for Mitai Jinkendo
Handles progress photo uploads and retrieval.
"""
import os
import uuid
import logging
from datetime import date as date_cls
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, UploadFile, File, Form, Header, HTTPException, Depends
from fastapi.responses import FileResponse
import aiofiles
from db import get_db, get_cursor, r2d
from photo_exif import extract_taken_at_from_image_bytes, taken_at_from_file_last_modified_ms
from auth import require_auth, require_auth_flexible, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
router = APIRouter(prefix="/api/photos", tags=["photos"])
logger = logging.getLogger(__name__)
PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos"))
PHOTOS_DIR.mkdir(parents=True, exist_ok=True)
def resolve_photo_path(stored: Optional[str]) -> Optional[Path]:
"""
Map DB `path` to an existing file. Supports legacy rows (absolute path or nested)
and new rows (filename only under PHOTOS_DIR).
"""
if not stored or not str(stored).strip():
return None
raw = str(stored).strip()
p = Path(raw)
if p.is_absolute() and p.exists():
return p
cand = PHOTOS_DIR / raw
if cand.exists():
return cand
cand2 = PHOTOS_DIR / Path(raw).name
if cand2.exists():
return cand2
return None
@router.post("")
async def upload_photo(
file: UploadFile = File(...),
date: str = Form(""),
skip_exif: str = Form(""),
file_last_modified: str = Form(""),
x_profile_id: Optional[str] = Header(default=None),
session: dict = Depends(require_auth),
):
"""
Upload progress photo.
Reihenfolge (wenn nicht ``skip_exif``): EXIF → Datei-Zeitstempel (Browser ``File.lastModified``)
→ Formularfeld ``date`` → heute. Bei ``skip_exif``: nur Formular / heute (weder EXIF noch Datei-Zeit).
"""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'photos')
log_feature_usage(pid, 'photos', access, 'upload')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"photos {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für Fotos überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
raw = await file.read()
ignore_exif = skip_exif.strip().lower() in ("1", "true", "yes", "on")
taken_at = None
exif_used = False
file_mtime_used = False
if not ignore_exif:
taken_at = extract_taken_at_from_image_bytes(raw)
exif_used = taken_at is not None
if not taken_at:
taken_at = taken_at_from_file_last_modified_ms(file_last_modified)
file_mtime_used = taken_at is not None
if taken_at:
photo_date = taken_at.date()
elif date and date.strip():
photo_date = date_cls.fromisoformat(date.strip()[:10])
else:
photo_date = date_cls.today()
fid = str(uuid.uuid4())
ext = Path(file.filename).suffix or ".jpg"
fname = f"{fid}{ext}"
path = PHOTOS_DIR / fname
async with aiofiles.open(path, "wb") as f:
await f.write(raw)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
INSERT INTO photos (id, profile_id, date, path, taken_at, created)
VALUES (%s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
""",
(fid, pid, photo_date, fname, taken_at),
)
# Phase 2: Increment usage counter
increment_feature_usage(pid, "photos")
return {
"id": fid,
"date": photo_date.isoformat(),
"taken_at": taken_at.isoformat() if taken_at else None,
"exif_used": exif_used,
"file_mtime_used": file_mtime_used,
}
@router.get("/{fid}")
def get_photo(fid: str, session: dict=Depends(require_auth_flexible)):
"""Get photo by ID. Auth via header or query param ssetoken (for <img> tags)."""
profile_id = str(session["profile_id"])
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT path, profile_id FROM photos WHERE id=%s", (fid,))
row = cur.fetchone()
if not row:
raise HTTPException(404, "Photo not found")
if str(row["profile_id"]) != profile_id:
raise HTTPException(404, "Photo not found")
photo_path = resolve_photo_path(row["path"])
if not photo_path:
raise HTTPException(404, "Photo file not found")
return FileResponse(photo_path)
@router.get("")
def list_photos(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get all photos for current profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT * FROM photos WHERE profile_id=%s
ORDER BY COALESCE(taken_at, date::timestamptz, created) DESC NULLS LAST
LIMIT 500
""",
(pid,),
)
return [r2d(r) for r in cur.fetchall()]
@router.delete("/{fid}")
def delete_photo(
fid: str,
x_profile_id: Optional[str] = Header(default=None),
session: dict = Depends(require_auth),
):
"""Delete a photo (DB row + Datei auf der Platte)."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT path FROM photos WHERE id=%s AND profile_id=%s",
(fid, pid),
)
row = cur.fetchone()
if not row:
raise HTTPException(404, "Photo not found")
photo_path = resolve_photo_path(row["path"])
cur.execute("DELETE FROM photos WHERE id=%s AND profile_id=%s", (fid, pid))
if photo_path and photo_path.exists():
try:
photo_path.unlink()
except OSError as e:
logger.warning("Could not delete photo file %s: %s", photo_path, e)
return {"ok": True, "id": fid}