"""Effektives Medien-Wurzelverzeichnis (MEDIA_ROOT + Superadmin-relativer Pfad). Siehe MEDIA_ASSETS_AND_ARCHIVE_SPEC.md §7.1.""" from __future__ import annotations import os import re import shutil import unicodedata from pathlib import Path from typing import Any, Optional _CLUB_SLUG_TRANS = str.maketrans( { "ä": "ae", "ö": "oe", "ü": "ue", "ß": "ss", "Ä": "ae", "Ö": "oe", "Ü": "ue", } ) def _default_media_root() -> Path: return Path(os.getenv("MEDIA_ROOT", str(Path(__file__).resolve().parent / "media"))) def normalize_local_relative_root(raw: str) -> str: s = (raw or "").strip().replace("\\", "/") s = s.strip("/") if not s: return "" if ".." in s.split("/"): raise ValueError("Pfad darf nicht '..' enthalten") if s.startswith("/"): raise ValueError("Nur relativer Pfad erlaubt") return s def get_effective_media_root(cur: Any) -> Path: """ MEDIA_ROOT aus ENV mit optionalem local_relative_root aus platform_media_storage (id=1). """ base = _default_media_root().resolve() rel = "" try: cur.execute( "SELECT local_relative_root FROM platform_media_storage WHERE id = 1", ) row = cur.fetchone() if row is not None: v = row["local_relative_root"] if isinstance(row, dict) else row[0] rel = normalize_local_relative_root(str(v or "")) except Exception: rel = "" if not rel: return base return (base / rel).resolve() def library_media_kind_dir(mime_type: Optional[str], ext: str) -> str: """ Unterordner unter library/* — an API-Filter media_kind angelehnt (image/video/pdf/other). Bei fehlendem MIME wird anhand der Dateiendung geraten (letzte Instanz: other). """ m = (mime_type or "").strip().lower() x = (ext or "").strip().lower() if x and not x.startswith("."): x = "." + x if m.startswith("image/"): return "image" if m.startswith("video/"): return "video" if m == "application/pdf" or (m and "pdf" in m): return "pdf" if x in ( ".jpg", ".jpeg", ".png", ".gif", ".webp", ".heic", ".heif", ".bmp", ".svg", ".avif", ): return "image" if x in (".mp4", ".mov", ".webm", ".mkv", ".avi", ".m4v"): return "video" if x == ".pdf": return "pdf" return "other" def library_club_path_segment(club_id: int, club_name: Optional[str]) -> str: """ Ein Verzeichnissegment pro Verein: aus Anzeigenamen abgeleitet + „-c{id}“ für Eindeutigkeit (Umbenennung, Kollisionen nach Slugify). Nur [a-z0-9-], keine Pfad-Sonderzeichen. Fehlt der Name / Slug leer → „verein-c{id}“. """ cid = int(club_id) if cid < 1: raise ValueError("club_id muss eine positive Ganzzahl sein") raw = unicodedata.normalize("NFKC", (club_name or "").strip()).translate(_CLUB_SLUG_TRANS).lower() raw = re.sub(r"[^a-z0-9]+", "-", raw) raw = raw.strip("-") if len(raw) > 48: raw = raw[:48].rstrip("-") if not raw: base = f"verein-c{cid}" else: base = f"{raw}-c{cid}" if not re.fullmatch(r"[a-z0-9]+(?:-[a-z0-9]+)*", base): raise ValueError("Ungültiger abgeleiteter Vereins-Ordnername") return base def path_under_media_root(media_root: Path, storage_key: str) -> Optional[Path]: """Gibt absoluten Pfad zurück oder None bei Path-Traversal.""" key = (storage_key or "").strip().replace("\\", "/").lstrip("/") if not key or ".." in key.split("/"): return None p = (media_root / key).resolve() try: p.relative_to(media_root.resolve()) except ValueError: return None return p def library_storage_key( visibility: str, club_id: Optional[int], sha256_hex: str, ext: str, *, uploader_profile_id: Optional[int] = None, mime_type: Optional[str] = None, club_name: Optional[str] = None, ) -> str: """ Relativer Speicherpfad unter MEDIA_ROOT für lokale media_assets. - official → library/official/{kind}/{sha256}{ext} - club (vereinsgeteilt) → library/{vereins-segment}/{kind}/{sha256}{ext} - private → dieselbe Ordnerlogik wie Verein: library/{vereins-segment}/{kind}/{sha}.u{profile}{ext} Dateiname bei private: „{sha}.u{profile_id}{ext}“ (nicht Unterordner u{…}), damit Ordnerstruktur wie bei „Verein“. Vereins-Segment: aus club_name abgeleitet + „-c{club_id}“ — siehe library_club_path_segment. kind ∈ {image, video, pdf, other} — siehe library_media_kind_dir. Kein Ordnername „private“ auf der Platte. Private Dateien unterscheiden sich nur im Dateinamen (.u{Profil} vor Endung). """ vis = (visibility or "private").strip().lower() if vis not in ("private", "club", "official"): raise ValueError(f"Ungültige Sichtbarkeit für Speicherpfad: {visibility!r}") sha = (sha256_hex or "").strip().lower() if len(sha) != 64 or any(c not in "0123456789abcdef" for c in sha): raise ValueError("sha256_hex muss 64 Hex-Zeichen sein") e = (ext or "").strip() if not e: e = ".bin" if not e.startswith("."): e = "." + e if ".." in e or "/" in e or "\\" in e or "\x00" in e: raise ValueError("Ungültige Dateiendung") kind = library_media_kind_dir(mime_type, e) e = e[:16] club_blob = f"{kind}/{sha}{e}" if vis == "official": return f"library/official/{club_blob}" if club_id is None: raise ValueError("Verein (club_id) ist für diese Sichtbarkeit auf der Platte erforderlich") cid = int(club_id) if cid < 1: raise ValueError("club_id muss eine positive Ganzzahl sein") club_seg = library_club_path_segment(cid, club_name) if vis == "club": return f"library/{club_seg}/{club_blob}" if uploader_profile_id is None: raise ValueError("uploader_profile_id ist für private Archiv-Medien erforderlich") up = int(uploader_profile_id) if up < 1: raise ValueError("uploader_profile_id muss positiv sein") priv_name = f"{sha}.u{up}{e}" return f"library/{club_seg}/{kind}/{priv_name}" def relocate_local_media_file(media_root: Path, old_storage_key: str, new_storage_key: str) -> None: """ Physisches Verschieben bei geändertem library_*-Pfad (z. B. nach PATCH visibility/club_id). - Kein Op, wenn Quelle fehlt aber Ziel bereits existiert (idempotent). - Erwartet storage_backend=local; Aufrufer prüft das. """ if (old_storage_key or "").replace("\\", "/").lstrip("/") == (new_storage_key or "").replace( "\\", "/" ).lstrip("/"): return old_p = path_under_media_root(media_root, old_storage_key) new_p = path_under_media_root(media_root, new_storage_key) if old_p is None or new_p is None: raise ValueError("Ungültiger Speicherpfad (Path-Traversal)") if new_p.is_file(): if not old_p.is_file(): return if old_p.resolve() == new_p.resolve(): return raise FileExistsError(f"Zieldatei existiert bereits: {new_storage_key}") if not old_p.is_file(): raise FileNotFoundError(f"Medien-Quelldatei nicht gefunden: {old_storage_key}") new_p.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(old_p), str(new_p))