shinkan-jinkendo/backend/media_storage.py
Lars 01636b5baf
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Test Suite / pytest-backend (push) Successful in 24s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 8s
Test Suite / playwright-tests (push) Successful in 24s
Medien: Vereinsordner aus Namen statt library/club/c{id}
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 10:06:05 +02:00

227 lines
7.1 KiB
Python

"""Effektives Medien-Wurzelverzeichnis (MEDIA_ROOT + Superadmin-relativer Pfad). Siehe MEDIA_ASSETS_AND_ARCHIVE_SPEC.md §7.1."""
from __future__ import annotations
import os
import re
import shutil
import unicodedata
from pathlib import Path
from typing import Any, Optional
_CLUB_SLUG_TRANS = str.maketrans(
{
"ä": "ae",
"ö": "oe",
"ü": "ue",
"ß": "ss",
"Ä": "ae",
"Ö": "oe",
"Ü": "ue",
}
)
def _default_media_root() -> Path:
return Path(os.getenv("MEDIA_ROOT", str(Path(__file__).resolve().parent / "media")))
def normalize_local_relative_root(raw: str) -> str:
s = (raw or "").strip().replace("\\", "/")
s = s.strip("/")
if not s:
return ""
if ".." in s.split("/"):
raise ValueError("Pfad darf nicht '..' enthalten")
if s.startswith("/"):
raise ValueError("Nur relativer Pfad erlaubt")
return s
def get_effective_media_root(cur: Any) -> Path:
"""
MEDIA_ROOT aus ENV mit optionalem local_relative_root aus platform_media_storage (id=1).
"""
base = _default_media_root().resolve()
rel = ""
try:
cur.execute(
"SELECT local_relative_root FROM platform_media_storage WHERE id = 1",
)
row = cur.fetchone()
if row is not None:
v = row["local_relative_root"] if isinstance(row, dict) else row[0]
rel = normalize_local_relative_root(str(v or ""))
except Exception:
rel = ""
if not rel:
return base
return (base / rel).resolve()
def library_media_kind_dir(mime_type: Optional[str], ext: str) -> str:
"""
Unterordner unter library/* — an API-Filter media_kind angelehnt (image/video/pdf/other).
Bei fehlendem MIME wird anhand der Dateiendung geraten (letzte Instanz: other).
"""
m = (mime_type or "").strip().lower()
x = (ext or "").strip().lower()
if x and not x.startswith("."):
x = "." + x
if m.startswith("image/"):
return "image"
if m.startswith("video/"):
return "video"
if m == "application/pdf" or (m and "pdf" in m):
return "pdf"
if x in (
".jpg",
".jpeg",
".png",
".gif",
".webp",
".heic",
".heif",
".bmp",
".svg",
".avif",
):
return "image"
if x in (".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"):
return "video"
if x == ".pdf":
return "pdf"
return "other"
def library_club_path_segment(club_id: int, club_name: Optional[str]) -> str:
"""
Ein Verzeichnissegment pro Verein: aus Anzeigenamen abgeleitet + „-c{id}“ für Eindeutigkeit
(Umbenennung, Kollisionen nach Slugify). Nur [a-z0-9-], keine Pfad-Sonderzeichen.
Fehlt der Name / Slug leer → „verein-c{id}“.
"""
cid = int(club_id)
if cid < 1:
raise ValueError("club_id muss eine positive Ganzzahl sein")
raw = unicodedata.normalize("NFKC", (club_name or "").strip()).translate(_CLUB_SLUG_TRANS).lower()
raw = re.sub(r"[^a-z0-9]+", "-", raw)
raw = raw.strip("-")
if len(raw) > 48:
raw = raw[:48].rstrip("-")
if not raw:
base = f"verein-c{cid}"
else:
base = f"{raw}-c{cid}"
if not re.fullmatch(r"[a-z0-9]+(?:-[a-z0-9]+)*", base):
raise ValueError("Ungültiger abgeleiteter Vereins-Ordnername")
return base
def path_under_media_root(media_root: Path, storage_key: str) -> Optional[Path]:
"""Gibt absoluten Pfad zurück oder None bei Path-Traversal."""
key = (storage_key or "").strip().replace("\\", "/").lstrip("/")
if not key or ".." in key.split("/"):
return None
p = (media_root / key).resolve()
try:
p.relative_to(media_root.resolve())
except ValueError:
return None
return p
def library_storage_key(
visibility: str,
club_id: Optional[int],
sha256_hex: str,
ext: str,
*,
uploader_profile_id: Optional[int] = None,
mime_type: Optional[str] = None,
club_name: Optional[str] = None,
) -> str:
"""
Relativer Speicherpfad unter MEDIA_ROOT für lokale media_assets.
- official → library/official/{kind}/{sha256}{ext}
- club (vereinsgeteilt) → library/{vereins-segment}/{kind}/{sha256}{ext}
- private (nur Hochlader) → library/{vereins-segment}/u{profile}/{kind}/{sha256}{ext}
Vereins-Segment: aus club_name abgeleitet + „-c{club_id}“ — siehe library_club_path_segment.
kind ∈ {image, video, pdf, other} — siehe library_media_kind_dir.
Kein Ordnername „private“ auf der Platte: Zuordnung erfolgt über Uploader unter dem Verein.
"""
vis = (visibility or "private").strip().lower()
if vis not in ("private", "club", "official"):
raise ValueError(f"Ungültige Sichtbarkeit für Speicherpfad: {visibility!r}")
sha = (sha256_hex or "").strip().lower()
if len(sha) != 64 or any(c not in "0123456789abcdef" for c in sha):
raise ValueError("sha256_hex muss 64 Hex-Zeichen sein")
e = (ext or "").strip()
if not e:
e = ".bin"
if not e.startswith("."):
e = "." + e
if ".." in e or "/" in e or "\\" in e or "\x00" in e:
raise ValueError("Ungültige Dateiendung")
kind = library_media_kind_dir(mime_type, e)
e = e[:16]
blob = f"{kind}/{sha}{e}"
if vis == "official":
return f"library/official/{blob}"
if club_id is None:
raise ValueError("Verein (club_id) ist für diese Sichtbarkeit auf der Platte erforderlich")
cid = int(club_id)
if cid < 1:
raise ValueError("club_id muss eine positive Ganzzahl sein")
club_seg = library_club_path_segment(cid, club_name)
if vis == "club":
return f"library/{club_seg}/{blob}"
if uploader_profile_id is None:
raise ValueError("uploader_profile_id ist für private Medien erforderlich (Ordner u{…} unter dem Verein)")
up = int(uploader_profile_id)
if up < 1:
raise ValueError("uploader_profile_id muss positiv sein")
return f"library/{club_seg}/u{up}/{blob}"
def relocate_local_media_file(media_root: Path, old_storage_key: str, new_storage_key: str) -> None:
"""
Physisches Verschieben bei geändertem library_*-Pfad (z. B. nach PATCH visibility/club_id).
- Kein Op, wenn Quelle fehlt aber Ziel bereits existiert (idempotent).
- Erwartet storage_backend=local; Aufrufer prüft das.
"""
if (old_storage_key or "").replace("\\", "/").lstrip("/") == (new_storage_key or "").replace(
"\\", "/"
).lstrip("/"):
return
old_p = path_under_media_root(media_root, old_storage_key)
new_p = path_under_media_root(media_root, new_storage_key)
if old_p is None or new_p is None:
raise ValueError("Ungültiger Speicherpfad (Path-Traversal)")
if new_p.is_file():
if not old_p.is_file():
return
if old_p.resolve() == new_p.resolve():
return
raise FileExistsError(f"Zieldatei existiert bereits: {new_storage_key}")
if not old_p.is_file():
raise FileNotFoundError(f"Medien-Quelldatei nicht gefunden: {old_storage_key}")
new_p.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(old_p), str(new_p))