shinkan-jinkendo/backend/media_storage.py
Lars 4fb77d6927
All checks were successful
Deploy Development / deploy (push) Successful in 35s
Test Suite / pytest-backend (push) Successful in 24s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 23s
feat(media): library/* Speicherpfade nach Sichtbarkeit und Verein
- media_storage.library_storage_key + relocate_local_media_file
- Übungs- und Archiv-Upload nutzen library/private|club/c*|official
- PATCH/Bulk-Patch: bei visibility/club_id Datei umziehen, storage_key + exercise_media.file_path
- pytest: test_library_storage_key; Mocks angepasst
- version 0.8.52 / media_assets 1.7.0 / exercises 2.17.0

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-08 09:47:16 +02:00

134 lines
4.3 KiB
Python

"""Effektives Medien-Wurzelverzeichnis (MEDIA_ROOT + Superadmin-relativer Pfad). Siehe MEDIA_ASSETS_AND_ARCHIVE_SPEC.md §7.1."""
from __future__ import annotations
import os
import re
import shutil
from pathlib import Path
from typing import Any, Optional
def _default_media_root() -> Path:
return Path(os.getenv("MEDIA_ROOT", str(Path(__file__).resolve().parent / "media")))
def normalize_local_relative_root(raw: str) -> str:
s = (raw or "").strip().replace("\\", "/")
s = s.strip("/")
if not s:
return ""
if ".." in s.split("/"):
raise ValueError("Pfad darf nicht '..' enthalten")
if s.startswith("/"):
raise ValueError("Nur relativer Pfad erlaubt")
return s
def get_effective_media_root(cur: Any) -> Path:
"""
MEDIA_ROOT aus ENV mit optionalem local_relative_root aus platform_media_storage (id=1).
"""
base = _default_media_root().resolve()
rel = ""
try:
cur.execute(
"SELECT local_relative_root FROM platform_media_storage WHERE id = 1",
)
row = cur.fetchone()
if row is not None:
v = row["local_relative_root"] if isinstance(row, dict) else row[0]
rel = normalize_local_relative_root(str(v or ""))
except Exception:
rel = ""
if not rel:
return base
return (base / rel).resolve()
def path_under_media_root(media_root: Path, storage_key: str) -> Optional[Path]:
"""Gibt absoluten Pfad zurück oder None bei Path-Traversal."""
key = (storage_key or "").strip().replace("\\", "/").lstrip("/")
if not key or ".." in key.split("/"):
return None
p = (media_root / key).resolve()
try:
p.relative_to(media_root.resolve())
except ValueError:
return None
return p
def library_storage_key(
visibility: str,
club_id: Optional[int],
sha256_hex: str,
ext: str,
) -> str:
"""
Relativer Speicherpfad unter MEDIA_ROOT für lokale media_assets.
Layout (Mandantentrennung, globale Medien getrennt):
- private → library/private/{sha256}{ext}
- club → library/club/c{club_id}/{sha256}{ext}
- official → library/official/{sha256}{ext}
"""
vis = (visibility or "private").strip().lower()
if vis not in ("private", "club", "official"):
raise ValueError(f"Ungültige Sichtbarkeit für Speicherpfad: {visibility!r}")
sha = (sha256_hex or "").strip().lower()
if len(sha) != 64 or any(c not in "0123456789abcdef" for c in sha):
raise ValueError("sha256_hex muss 64 Hex-Zeichen sein")
e = (ext or "").strip()
if not e:
e = ".bin"
if not e.startswith("."):
e = "." + e
e = e[:16]
if ".." in e or "/" in e or "\\" in e or "\x00" in e:
raise ValueError("Ungültige Dateiendung")
blob = f"{sha}{e}"
if vis == "private":
return f"library/private/{blob}"
if vis == "official":
return f"library/official/{blob}"
if club_id is None:
raise ValueError("club_id ist für Sichtbarkeit „Verein“ erforderlich")
cid = int(club_id)
if cid < 1:
raise ValueError("club_id muss eine positive Ganzzahl sein")
return f"library/club/c{cid}/{blob}"
def relocate_local_media_file(media_root: Path, old_storage_key: str, new_storage_key: str) -> None:
"""
Physisches Verschieben bei geändertem library_*-Pfad (z. B. nach PATCH visibility/club_id).
- Kein Op, wenn Quelle fehlt aber Ziel bereits existiert (idempotent).
- Erwartet storage_backend=local; Aufrufer prüft das.
"""
if (old_storage_key or "").replace("\\", "/").lstrip("/") == (new_storage_key or "").replace(
"\\", "/"
).lstrip("/"):
return
old_p = path_under_media_root(media_root, old_storage_key)
new_p = path_under_media_root(media_root, new_storage_key)
if old_p is None or new_p is None:
raise ValueError("Ungültiger Speicherpfad (Path-Traversal)")
if new_p.is_file():
if not old_p.is_file():
return
if old_p.resolve() == new_p.resolve():
return
raise FileExistsError(f"Zieldatei existiert bereits: {new_storage_key}")
if not old_p.is_file():
raise FileNotFoundError(f"Medien-Quelldatei nicht gefunden: {old_storage_key}")
new_p.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(old_p), str(new_p))