All checks were successful
Deploy Development / deploy (push) Successful in 36s
Test Suite / pytest-backend (push) Successful in 33s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Successful in 47s
Co-authored-by: Cursor <cursoragent@cursor.com>
230 lines
7.3 KiB
Python
230 lines
7.3 KiB
Python
"""Effektives Medien-Wurzelverzeichnis (MEDIA_ROOT + Superadmin-relativer Pfad). Siehe MEDIA_ASSETS_AND_ARCHIVE_SPEC.md §7.1."""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
_CLUB_SLUG_TRANS = str.maketrans(
|
|
{
|
|
"ä": "ae",
|
|
"ö": "oe",
|
|
"ü": "ue",
|
|
"ß": "ss",
|
|
"Ä": "ae",
|
|
"Ö": "oe",
|
|
"Ü": "ue",
|
|
}
|
|
)
|
|
|
|
|
|
def _default_media_root() -> Path:
|
|
return Path(os.getenv("MEDIA_ROOT", str(Path(__file__).resolve().parent / "media")))
|
|
|
|
|
|
def normalize_local_relative_root(raw: str) -> str:
|
|
s = (raw or "").strip().replace("\\", "/")
|
|
s = s.strip("/")
|
|
if not s:
|
|
return ""
|
|
if ".." in s.split("/"):
|
|
raise ValueError("Pfad darf nicht '..' enthalten")
|
|
if s.startswith("/"):
|
|
raise ValueError("Nur relativer Pfad erlaubt")
|
|
return s
|
|
|
|
|
|
def get_effective_media_root(cur: Any) -> Path:
|
|
"""
|
|
MEDIA_ROOT aus ENV mit optionalem local_relative_root aus platform_media_storage (id=1).
|
|
"""
|
|
base = _default_media_root().resolve()
|
|
rel = ""
|
|
try:
|
|
cur.execute(
|
|
"SELECT local_relative_root FROM platform_media_storage WHERE id = 1",
|
|
)
|
|
row = cur.fetchone()
|
|
if row is not None:
|
|
v = row["local_relative_root"] if isinstance(row, dict) else row[0]
|
|
rel = normalize_local_relative_root(str(v or ""))
|
|
except Exception:
|
|
rel = ""
|
|
if not rel:
|
|
return base
|
|
return (base / rel).resolve()
|
|
|
|
|
|
def library_media_kind_dir(mime_type: Optional[str], ext: str) -> str:
|
|
"""
|
|
Unterordner unter library/* — an API-Filter media_kind angelehnt (image/video/pdf/other).
|
|
|
|
Bei fehlendem MIME wird anhand der Dateiendung geraten (letzte Instanz: other).
|
|
"""
|
|
m = (mime_type or "").strip().lower()
|
|
x = (ext or "").strip().lower()
|
|
if x and not x.startswith("."):
|
|
x = "." + x
|
|
|
|
if m.startswith("image/"):
|
|
return "image"
|
|
if m.startswith("video/"):
|
|
return "video"
|
|
if m == "application/pdf" or (m and "pdf" in m):
|
|
return "pdf"
|
|
|
|
if x in (
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".webp",
|
|
".heic",
|
|
".heif",
|
|
".bmp",
|
|
".svg",
|
|
".avif",
|
|
):
|
|
return "image"
|
|
if x in (".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"):
|
|
return "video"
|
|
if x == ".pdf":
|
|
return "pdf"
|
|
return "other"
|
|
|
|
|
|
def library_club_path_segment(club_id: int, club_name: Optional[str]) -> str:
|
|
"""
|
|
Ein Verzeichnissegment pro Verein: aus Anzeigenamen abgeleitet + „-c{id}“ für Eindeutigkeit
|
|
(Umbenennung, Kollisionen nach Slugify). Nur [a-z0-9-], keine Pfad-Sonderzeichen.
|
|
Fehlt der Name / Slug leer → „verein-c{id}“.
|
|
"""
|
|
cid = int(club_id)
|
|
if cid < 1:
|
|
raise ValueError("club_id muss eine positive Ganzzahl sein")
|
|
|
|
raw = unicodedata.normalize("NFKC", (club_name or "").strip()).translate(_CLUB_SLUG_TRANS).lower()
|
|
raw = re.sub(r"[^a-z0-9]+", "-", raw)
|
|
raw = raw.strip("-")
|
|
if len(raw) > 48:
|
|
raw = raw[:48].rstrip("-")
|
|
|
|
if not raw:
|
|
base = f"verein-c{cid}"
|
|
else:
|
|
base = f"{raw}-c{cid}"
|
|
|
|
if not re.fullmatch(r"[a-z0-9]+(?:-[a-z0-9]+)*", base):
|
|
raise ValueError("Ungültiger abgeleiteter Vereins-Ordnername")
|
|
return base
|
|
|
|
|
|
def path_under_media_root(media_root: Path, storage_key: str) -> Optional[Path]:
|
|
"""Gibt absoluten Pfad zurück oder None bei Path-Traversal."""
|
|
key = (storage_key or "").strip().replace("\\", "/").lstrip("/")
|
|
if not key or ".." in key.split("/"):
|
|
return None
|
|
p = (media_root / key).resolve()
|
|
try:
|
|
p.relative_to(media_root.resolve())
|
|
except ValueError:
|
|
return None
|
|
return p
|
|
|
|
|
|
def library_storage_key(
|
|
visibility: str,
|
|
club_id: Optional[int],
|
|
sha256_hex: str,
|
|
ext: str,
|
|
*,
|
|
uploader_profile_id: Optional[int] = None,
|
|
mime_type: Optional[str] = None,
|
|
club_name: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Relativer Speicherpfad unter MEDIA_ROOT für lokale media_assets.
|
|
|
|
- official → library/official/{kind}/{sha256}{ext}
|
|
- club (vereinsgeteilt) → library/{vereins-segment}/{kind}/{sha256}{ext}
|
|
- private → dieselbe Ordnerlogik wie Verein: library/{vereins-segment}/{kind}/{sha}.u{profile}{ext}
|
|
|
|
Dateiname bei private: „{sha}.u{profile_id}{ext}“ (nicht Unterordner u{…}), damit Ordnerstruktur wie bei „Verein“.
|
|
|
|
Vereins-Segment: aus club_name abgeleitet + „-c{club_id}“ — siehe library_club_path_segment.
|
|
|
|
kind ∈ {image, video, pdf, other} — siehe library_media_kind_dir.
|
|
|
|
Kein Ordnername „private“ auf der Platte. Private Dateien unterscheiden sich nur im Dateinamen (.u{Profil} vor Endung).
|
|
"""
|
|
vis = (visibility or "private").strip().lower()
|
|
if vis not in ("private", "club", "official"):
|
|
raise ValueError(f"Ungültige Sichtbarkeit für Speicherpfad: {visibility!r}")
|
|
|
|
sha = (sha256_hex or "").strip().lower()
|
|
if len(sha) != 64 or any(c not in "0123456789abcdef" for c in sha):
|
|
raise ValueError("sha256_hex muss 64 Hex-Zeichen sein")
|
|
|
|
e = (ext or "").strip()
|
|
if not e:
|
|
e = ".bin"
|
|
if not e.startswith("."):
|
|
e = "." + e
|
|
if ".." in e or "/" in e or "\\" in e or "\x00" in e:
|
|
raise ValueError("Ungültige Dateiendung")
|
|
|
|
kind = library_media_kind_dir(mime_type, e)
|
|
e = e[:16]
|
|
club_blob = f"{kind}/{sha}{e}"
|
|
if vis == "official":
|
|
return f"library/official/{club_blob}"
|
|
if club_id is None:
|
|
raise ValueError("Verein (club_id) ist für diese Sichtbarkeit auf der Platte erforderlich")
|
|
cid = int(club_id)
|
|
if cid < 1:
|
|
raise ValueError("club_id muss eine positive Ganzzahl sein")
|
|
club_seg = library_club_path_segment(cid, club_name)
|
|
if vis == "club":
|
|
return f"library/{club_seg}/{club_blob}"
|
|
if uploader_profile_id is None:
|
|
raise ValueError("uploader_profile_id ist für private Archiv-Medien erforderlich")
|
|
up = int(uploader_profile_id)
|
|
if up < 1:
|
|
raise ValueError("uploader_profile_id muss positiv sein")
|
|
priv_name = f"{sha}.u{up}{e}"
|
|
return f"library/{club_seg}/{kind}/{priv_name}"
|
|
|
|
|
|
def relocate_local_media_file(media_root: Path, old_storage_key: str, new_storage_key: str) -> None:
|
|
"""
|
|
Physisches Verschieben bei geändertem library_*-Pfad (z. B. nach PATCH visibility/club_id).
|
|
|
|
- Kein Op, wenn Quelle fehlt aber Ziel bereits existiert (idempotent).
|
|
- Erwartet storage_backend=local; Aufrufer prüft das.
|
|
"""
|
|
if (old_storage_key or "").replace("\\", "/").lstrip("/") == (new_storage_key or "").replace(
|
|
"\\", "/"
|
|
).lstrip("/"):
|
|
return
|
|
|
|
old_p = path_under_media_root(media_root, old_storage_key)
|
|
new_p = path_under_media_root(media_root, new_storage_key)
|
|
if old_p is None or new_p is None:
|
|
raise ValueError("Ungültiger Speicherpfad (Path-Traversal)")
|
|
|
|
if new_p.is_file():
|
|
if not old_p.is_file():
|
|
return
|
|
if old_p.resolve() == new_p.resolve():
|
|
return
|
|
raise FileExistsError(f"Zieldatei existiert bereits: {new_storage_key}")
|
|
|
|
if not old_p.is_file():
|
|
raise FileNotFoundError(f"Medien-Quelldatei nicht gefunden: {old_storage_key}")
|
|
|
|
new_p.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.move(str(old_p), str(new_p))
|