All checks were successful
Deploy Development / deploy (push) Successful in 36s
Test Suite / pytest-backend (push) Successful in 25s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 28s
Co-authored-by: Cursor <cursoragent@cursor.com>
1236 lines
46 KiB
Python
1236 lines
46 KiB
Python
"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Literal, Optional, Union
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
|
|
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile
|
|
from pydantic import BaseModel, Field, model_validator
|
|
|
|
from club_tenancy import (
|
|
assert_club_member,
|
|
assert_valid_governance_visibility,
|
|
club_ids_for_profile_with_roles,
|
|
is_platform_admin,
|
|
is_superadmin,
|
|
library_content_visible_to_profile,
|
|
)
|
|
from db import get_db, get_cursor, r2d
|
|
from media_lifecycle import (
|
|
LC_ACTIVE,
|
|
LC_TRASH_HIDDEN,
|
|
LC_TRASH_SOFT,
|
|
assert_can_edit_media_asset_metadata,
|
|
assert_can_manage_media_asset_lifecycle,
|
|
assert_can_trash_soft,
|
|
fetch_media_asset_row,
|
|
purge_media_asset,
|
|
reactivate_media_asset_from_trash,
|
|
superadmin_force_lifecycle_state,
|
|
superadmin_hard_delete_media_asset,
|
|
transition_recover_from_hidden,
|
|
transition_to_trash_hidden,
|
|
transition_to_trash_soft,
|
|
)
|
|
from media_storage import get_effective_media_root, library_storage_key, path_under_media_root, relocate_local_media_file
|
|
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible
|
|
|
|
from routers.exercises import _upload_limit_bytes, resolve_upload_mime_type
|
|
|
|
router = APIRouter(prefix="/api/media-assets", tags=["media-assets"])
|
|
|
|
|
|
class MediaLifecycleBody(BaseModel):
|
|
action: Literal[
|
|
"trash_soft",
|
|
"trash_hidden",
|
|
"recover",
|
|
"purge",
|
|
"reactivate",
|
|
"superadmin_force_lifecycle",
|
|
"superadmin_hard_delete",
|
|
]
|
|
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
|
|
|
|
@model_validator(mode="after")
|
|
def _target_lifecycle_rules(self):
|
|
if self.action == "superadmin_force_lifecycle":
|
|
if not self.target_lifecycle:
|
|
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
|
|
elif self.target_lifecycle is not None:
|
|
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
|
|
return self
|
|
|
|
|
|
class MediaAssetPatch(BaseModel):
|
|
copyright_notice: Optional[str] = Field(None, max_length=8000)
|
|
original_filename: Optional[str] = Field(None, max_length=300)
|
|
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
|
|
club_id: Optional[int] = None
|
|
tags: Optional[list[str]] = None
|
|
|
|
|
|
class MediaBulkLifecycleBody(BaseModel):
|
|
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
|
|
action: Literal[
|
|
"trash_soft",
|
|
"trash_hidden",
|
|
"recover",
|
|
"purge",
|
|
"reactivate",
|
|
"superadmin_force_lifecycle",
|
|
"superadmin_hard_delete",
|
|
]
|
|
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
|
|
|
|
@model_validator(mode="after")
|
|
def _bulk_target(self):
|
|
if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle:
|
|
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
|
|
if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None:
|
|
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
|
|
return self
|
|
|
|
|
|
class MediaBulkPatchBody(BaseModel):
|
|
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
|
|
copyright_notice: Optional[str] = Field(None, max_length=8000)
|
|
original_filename: Optional[str] = Field(None, max_length=300)
|
|
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
|
|
club_id: Optional[int] = None
|
|
tags: Optional[list[str]] = None
|
|
|
|
|
|
_LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"})
|
|
_MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"})
|
|
_MAX_TAGS = 40
|
|
_MAX_TAG_LEN = 48
|
|
|
|
_MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None
|
|
_TRAINING_UNIT_EXERCISES_TABLE: Optional[bool] = None
|
|
|
|
|
|
def _media_assets_tags_column_present(cur: Any) -> bool:
|
|
"""True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema."""
|
|
global _MEDIA_ASSETS_TAGS_COLUMN
|
|
if _MEDIA_ASSETS_TAGS_COLUMN is not None:
|
|
return _MEDIA_ASSETS_TAGS_COLUMN
|
|
cur.execute(
|
|
"""
|
|
SELECT 1
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'media_assets'
|
|
AND column_name = 'tags'
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
_MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None
|
|
return _MEDIA_ASSETS_TAGS_COLUMN
|
|
|
|
|
|
def _training_unit_exercises_table_present(cur: Any) -> bool:
|
|
"""True wenn planning-Migration (training_unit_exercises) im Schema liegt — sonst keine Einheiten-Links."""
|
|
global _TRAINING_UNIT_EXERCISES_TABLE
|
|
if _TRAINING_UNIT_EXERCISES_TABLE is not None:
|
|
return _TRAINING_UNIT_EXERCISES_TABLE
|
|
cur.execute(
|
|
"""
|
|
SELECT 1
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'training_unit_exercises'
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
_TRAINING_UNIT_EXERCISES_TABLE = cur.fetchone() is not None
|
|
return _TRAINING_UNIT_EXERCISES_TABLE
|
|
|
|
|
|
def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]:
|
|
"""PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive)."""
|
|
if raw is None:
|
|
return []
|
|
if not isinstance(raw, list):
|
|
raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein")
|
|
out: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in raw:
|
|
s = str(item).strip()
|
|
if not s:
|
|
continue
|
|
s = s[:_MAX_TAG_LEN]
|
|
key = s.lower()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(s)
|
|
if len(out) >= _MAX_TAGS:
|
|
break
|
|
return out
|
|
|
|
|
|
def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]:
|
|
"""Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen."""
|
|
if not asset_ids:
|
|
return {}
|
|
cur.execute(
|
|
"""SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title
|
|
FROM exercise_media em
|
|
JOIN exercises e ON e.id = em.exercise_id
|
|
WHERE em.media_asset_id = ANY(%s)
|
|
ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""",
|
|
(asset_ids,),
|
|
)
|
|
ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids}
|
|
seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids}
|
|
exercise_ids: set[int] = set()
|
|
for r in cur.fetchall():
|
|
d = r2d(r)
|
|
aid = int(d["media_asset_id"])
|
|
eid = int(d["exercise_id"])
|
|
if aid not in seen_ex or eid in seen_ex[aid]:
|
|
continue
|
|
seen_ex[aid].add(eid)
|
|
exercise_ids.add(eid)
|
|
title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}"
|
|
ex_by_asset[aid].append({"id": eid, "title": title})
|
|
exercise_to_units: dict[int, list[dict]] = {}
|
|
if exercise_ids and _training_unit_exercises_table_present(cur):
|
|
cur.execute(
|
|
"""SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date,
|
|
COALESCE(tg.name, '') AS group_name
|
|
FROM training_unit_exercises tue
|
|
JOIN training_units tu ON tu.id = tue.training_unit_id
|
|
LEFT JOIN training_groups tg ON tg.id = tu.group_id
|
|
WHERE tue.exercise_id = ANY(%s)""",
|
|
(list(exercise_ids),),
|
|
)
|
|
for row in cur.fetchall():
|
|
d = r2d(row)
|
|
eid = int(d["exercise_id"])
|
|
uid = int(d["unit_id"])
|
|
pd = d.get("planned_date")
|
|
planned: Optional[str]
|
|
if pd is None:
|
|
planned = None
|
|
elif hasattr(pd, "isoformat"):
|
|
planned = str(pd.isoformat())
|
|
else:
|
|
planned = str(pd)
|
|
exercise_to_units.setdefault(eid, []).append(
|
|
{
|
|
"id": uid,
|
|
"planned_date": planned,
|
|
"group_name": d.get("group_name") or "",
|
|
}
|
|
)
|
|
for eid, units in exercise_to_units.items():
|
|
seen_u: set[int] = set()
|
|
uniq: list[dict] = []
|
|
for u in units:
|
|
if u["id"] not in seen_u:
|
|
seen_u.add(u["id"])
|
|
uniq.append(u)
|
|
exercise_to_units[eid] = uniq
|
|
out: dict[int, dict[str, Any]] = {}
|
|
for aid in asset_ids:
|
|
aid_i = int(aid)
|
|
exs = ex_by_asset.get(aid_i, [])
|
|
seen_u2: set[int] = set()
|
|
units_agg: list[dict] = []
|
|
for ex in exs:
|
|
for u in exercise_to_units.get(ex["id"], []):
|
|
if u["id"] not in seen_u2:
|
|
seen_u2.add(u["id"])
|
|
units_agg.append(u)
|
|
out[aid_i] = {"exercises": exs, "training_units": units_agg}
|
|
return out
|
|
|
|
|
|
def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]:
|
|
cur.execute(
|
|
"""SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email
|
|
FROM media_assets ma
|
|
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
|
|
WHERE ma.uploaded_by_profile_id IS NOT NULL
|
|
AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')
|
|
AND (
|
|
%s
|
|
OR lower(trim(ma.visibility)) = 'official'
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'private'
|
|
AND ma.uploaded_by_profile_id = %s
|
|
)
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'club'
|
|
AND EXISTS (
|
|
SELECT 1 FROM club_members cm
|
|
WHERE cm.profile_id = %s
|
|
AND cm.club_id = ma.club_id
|
|
AND cm.status = 'active'
|
|
)
|
|
)
|
|
)
|
|
ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id
|
|
LIMIT 400""",
|
|
(is_adm, profile_id, profile_id),
|
|
)
|
|
rows = [r2d(x) for x in cur.fetchall()]
|
|
out: list[dict] = []
|
|
for r in rows:
|
|
pid = r.get("id")
|
|
if pid is None:
|
|
continue
|
|
label = (r.get("name") or "").strip()
|
|
if not label:
|
|
label = (r.get("email") or "").strip()
|
|
if not label:
|
|
label = f"Profil #{pid}"
|
|
out.append({"id": int(pid), "label": label})
|
|
return out
|
|
|
|
|
|
def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict:
|
|
"""
|
|
Nach visibility-Wechsel club_id konsistent setzen.
|
|
|
|
official → club_id NULL. private/club: club_id aus Patch oder bisheriger Zeile beibehalten
|
|
(private nutzt club_id für Vereinsordner auf der Platte).
|
|
"""
|
|
eff = dict(patch_fields)
|
|
if eff.get("visibility") is not None:
|
|
v = str(eff["visibility"]).strip().lower()
|
|
if v == "official":
|
|
eff["club_id"] = None
|
|
elif v == "club" and "club_id" not in eff:
|
|
eff["club_id"] = asset.get("club_id")
|
|
elif v == "private" and "club_id" not in eff:
|
|
eff["club_id"] = asset.get("club_id")
|
|
return eff
|
|
|
|
|
|
def _club_display_name_for_storage(cur: Any, club_id: int) -> str:
|
|
"""Anzeigename für library_club_path_segment; leer wenn Verein fehlt."""
|
|
cur.execute("SELECT name FROM clubs WHERE id = %s", (int(club_id),))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return ""
|
|
return str(r2d(row).get("name") or "")
|
|
|
|
|
|
def _relocate_asset_file_if_governance_changed(
|
|
cur: Any,
|
|
media_root: Path,
|
|
asset_id: int,
|
|
asset: dict,
|
|
next_vis: str,
|
|
next_club_id: Optional[int],
|
|
) -> Optional[str]:
|
|
"""
|
|
Passt bei local-Assets den Dateipfad an, wenn sich Sichtbarkeit/Verein ändert.
|
|
Aktualisiert exercise_media.file_path. Gibt neuen storage_key oder None zurück.
|
|
"""
|
|
if (asset.get("storage_backend") or "local") != "local":
|
|
return None
|
|
old_key = (asset.get("storage_key") or "").strip()
|
|
sha = (asset.get("sha256") or "").strip().lower()
|
|
if not old_key or len(sha) != 64:
|
|
return None
|
|
ext = Path(old_key.replace("\\", "/")).suffix or ".bin"
|
|
try:
|
|
up = asset.get("uploaded_by_profile_id")
|
|
up_i = int(up) if up is not None else None
|
|
club_nm: Optional[str] = None
|
|
if next_vis in ("club", "private") and next_club_id is not None:
|
|
club_nm = _club_display_name_for_storage(cur, next_club_id)
|
|
new_key = library_storage_key(
|
|
next_vis,
|
|
next_club_id if next_vis != "official" else None,
|
|
sha,
|
|
ext,
|
|
uploader_profile_id=up_i if next_vis == "private" else None,
|
|
mime_type=str(asset.get("mime_type") or "") or None,
|
|
club_name=club_nm,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
old_norm = old_key.replace("\\", "/").lstrip("/")
|
|
if new_key == old_norm:
|
|
return None
|
|
try:
|
|
relocate_local_media_file(media_root, old_key, new_key)
|
|
except FileNotFoundError as e:
|
|
raise HTTPException(status_code=500, detail=f"Mediendatei fehlt auf der Platte: {e}") from e
|
|
except FileExistsError as e:
|
|
raise HTTPException(status_code=409, detail=str(e)) from e
|
|
except (ValueError, OSError) as e:
|
|
raise HTTPException(status_code=500, detail=str(e)) from e
|
|
|
|
db_path = f"/media/{new_key}"
|
|
cur.execute(
|
|
"UPDATE exercise_media SET file_path = %s WHERE media_asset_id = %s",
|
|
(db_path, asset_id),
|
|
)
|
|
return new_key
|
|
|
|
|
|
def _lifecycle_where_sql(lifecycle: str) -> str:
|
|
lc = (lifecycle or "active").strip().lower()
|
|
if lc not in _LIFECYCLE_LIST_FILTERS:
|
|
raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter")
|
|
if lc == "active":
|
|
return "ma.lifecycle_state = 'active'"
|
|
if lc == "trash_soft":
|
|
return "ma.lifecycle_state = 'trash_soft'"
|
|
if lc == "trash_hidden":
|
|
return "ma.lifecycle_state = 'trash_hidden'"
|
|
return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')"
|
|
|
|
|
|
def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]:
|
|
cur.execute(
|
|
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
|
|
storage_key, mime_type, original_filename
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return r2d(row) if row else None
|
|
|
|
|
|
def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None:
|
|
if not library_content_visible_to_profile(
|
|
cur,
|
|
tenant.profile_id,
|
|
(asset.get("visibility") or "").strip().lower(),
|
|
asset.get("club_id"),
|
|
asset.get("uploaded_by_profile_id"),
|
|
tenant.global_role,
|
|
):
|
|
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium")
|
|
|
|
|
|
def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict:
|
|
"""Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB)."""
|
|
role_raw = tenant.global_role
|
|
role = (role_raw or "").strip().lower()
|
|
pid = int(tenant.profile_id)
|
|
sup = is_superadmin(role_raw)
|
|
plat = is_platform_admin(role_raw)
|
|
vis = (row.get("visibility") or "private").strip().lower()
|
|
uid = row.get("uploaded_by_profile_id")
|
|
cid = row.get("club_id")
|
|
lc = (row.get("lifecycle_state") or "active").strip().lower()
|
|
is_owner = uid is not None and int(uid) == pid
|
|
club_mgr = plat or (cid is not None and int(cid) in admin_club_ids)
|
|
|
|
if vis == "official":
|
|
edit_metadata = sup
|
|
trash_soft = lc == "active" and sup
|
|
can_manage_adv = sup
|
|
else:
|
|
edit_metadata = (
|
|
sup
|
|
or (vis == "club" and club_mgr)
|
|
or (vis == "private" and is_owner)
|
|
)
|
|
trash_soft = lc == "active" and (
|
|
sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr)
|
|
)
|
|
can_manage_adv = (
|
|
sup
|
|
or plat
|
|
or (vis == "private" and is_owner)
|
|
or (vis == "club" and club_mgr)
|
|
)
|
|
|
|
trash_hidden = lc in ("active", "trash_soft") and can_manage_adv
|
|
recover_from_hidden = lc == "trash_hidden" and can_manage_adv
|
|
reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv
|
|
purge = lc == "trash_hidden" and sup
|
|
|
|
return {
|
|
"edit_metadata": edit_metadata,
|
|
"change_visibility": edit_metadata,
|
|
"trash_soft": trash_soft,
|
|
"trash_hidden": trash_hidden,
|
|
"recover": recover_from_hidden,
|
|
"reactivate": reactivate,
|
|
"purge": purge,
|
|
"superadmin_lifecycle": sup,
|
|
"superadmin_hard_delete": sup,
|
|
}
|
|
|
|
|
|
def _apply_lifecycle_action(
|
|
cur: Any,
|
|
conn: Any,
|
|
asset_id: int,
|
|
body: MediaLifecycleBody,
|
|
tenant: TenantContext,
|
|
) -> dict:
|
|
asset = fetch_media_asset_row(cur, asset_id)
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
|
|
action = body.action
|
|
role_raw = tenant.global_role
|
|
|
|
if action == "superadmin_hard_delete":
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(status_code=403, detail="Nur Superadmin")
|
|
ok = superadmin_hard_delete_media_asset(cur, conn, asset_id)
|
|
if not ok:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
return {"ok": True, "hard_deleted": asset_id}
|
|
|
|
if action == "superadmin_force_lifecycle":
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(status_code=403, detail="Nur Superadmin")
|
|
tl = body.target_lifecycle or "active"
|
|
mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN}
|
|
return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl])
|
|
|
|
if action == "purge":
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin")
|
|
state = (asset.get("lifecycle_state") or "").strip().lower()
|
|
if state != LC_TRASH_HIDDEN:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden",
|
|
)
|
|
if not purge_media_asset(cur, conn, asset_id):
|
|
raise HTTPException(status_code=400, detail="Löschen nicht möglich")
|
|
return {"ok": True, "purged": asset_id}
|
|
|
|
if action == "trash_soft":
|
|
assert_can_trash_soft(cur, tenant, asset)
|
|
return transition_to_trash_soft(cur, conn, asset_id)
|
|
if action == "trash_hidden":
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
return transition_to_trash_hidden(cur, conn, asset_id)
|
|
if action == "recover":
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
return transition_recover_from_hidden(cur, conn, asset_id)
|
|
if action == "reactivate":
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
return reactivate_media_asset_from_trash(cur, conn, asset_id)
|
|
|
|
raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action")
|
|
|
|
|
|
_MAX_BULK_LIBRARY_FILES = 25
|
|
|
|
|
|
def _ingest_library_media_file(
|
|
cur: Any,
|
|
tenant: TenantContext,
|
|
raw: bytes,
|
|
filename: Optional[str],
|
|
content_type: Optional[str],
|
|
visibility: str,
|
|
club_id_form: Optional[int],
|
|
) -> dict:
|
|
"""Neues Archiv-Medium oder aktiver Dedupe-Treffer (sha256 + Sichtbarkeit + Verein). Kein exercise_media."""
|
|
profile_id = tenant.profile_id
|
|
role = tenant.global_role or ""
|
|
vis = (visibility or "private").strip().lower()
|
|
if vis not in ("private", "club", "official"):
|
|
raise HTTPException(status_code=400, detail="Ungültige Sichtbarkeit")
|
|
|
|
next_cid: Optional[int] = None
|
|
if vis == "club":
|
|
if club_id_form is None:
|
|
raise HTTPException(status_code=400, detail="Verein erforderlich für Sichtbarkeit „Verein“")
|
|
next_cid = int(club_id_form)
|
|
elif vis == "private":
|
|
if club_id_form is not None:
|
|
next_cid = int(club_id_form)
|
|
elif is_platform_admin(role):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
"Private Archiv-Uploads als Plattform-Admin: bitte den Zielverein wählen und "
|
|
"club_id im Formular setzen (nicht vom allgemeinen Kontext ableiten)."
|
|
),
|
|
)
|
|
else:
|
|
cid = tenant.effective_club_id
|
|
next_cid = int(cid) if cid is not None else None
|
|
if next_cid is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
"Private Medien werden pro Verein abgelegt. Bitte aktiven Verein setzen "
|
|
"(Header X-Active-Club-Id) oder club_id im Formular übergeben — auch für Plattform-Admins."
|
|
),
|
|
)
|
|
if not is_platform_admin(role):
|
|
assert_club_member(cur, profile_id, next_cid)
|
|
|
|
assert_valid_governance_visibility(cur, profile_id, role, vis, next_cid if vis == "club" else None)
|
|
|
|
max_b = _upload_limit_bytes(tenant)
|
|
if len(raw) > max_b:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"Datei zu groß (max. {max_b // (1024 * 1024)} MB)",
|
|
)
|
|
|
|
try:
|
|
mime = resolve_upload_mime_type(raw, content_type, filename)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
|
|
full_sha = hashlib.sha256(raw).hexdigest()
|
|
if vis == "private":
|
|
cur.execute(
|
|
"""SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets
|
|
WHERE sha256 = %s AND lower(trim(visibility)) = %s
|
|
AND (club_id IS NOT DISTINCT FROM %s)
|
|
AND (uploaded_by_profile_id IS NOT DISTINCT FROM %s)
|
|
LIMIT 1""",
|
|
(full_sha, vis, next_cid, profile_id),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets
|
|
WHERE sha256 = %s AND lower(trim(visibility)) = %s
|
|
AND (club_id IS NOT DISTINCT FROM %s)
|
|
LIMIT 1""",
|
|
(full_sha, vis, next_cid),
|
|
)
|
|
existing_asset = cur.fetchone()
|
|
|
|
if existing_asset:
|
|
ea = r2d(existing_asset)
|
|
lc = (ea.get("lifecycle_state") or "").strip().lower()
|
|
if lc == "active":
|
|
return {
|
|
"status": "duplicate",
|
|
"media_asset_id": int(ea["id"]),
|
|
"original_filename": ea.get("original_filename"),
|
|
}
|
|
if lc in ("trash_soft", "trash_hidden"):
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail={
|
|
"code": "MEDIA_ASSET_IN_TRASH",
|
|
"message": (
|
|
"Diese Datei ist inhaltsgleich (SHA-256) mit einem Archiv-Medium im Papierkorb."
|
|
),
|
|
"media_asset_id": ea["id"],
|
|
"lifecycle_state": lc,
|
|
},
|
|
)
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Es existiert bereits ein Archiv-Eintrag zu dieser Datei in einem nicht nutzbaren Zustand.",
|
|
)
|
|
|
|
ext = Path(filename or "").suffix[:12] if filename else ""
|
|
if not ext and mime == "image/jpeg":
|
|
ext = ".jpg"
|
|
elif not ext and mime == "image/png":
|
|
ext = ".png"
|
|
elif not ext and mime in ("image/heic", "image/heif"):
|
|
ext = ".heic"
|
|
elif not ext and mime == "video/mp4":
|
|
ext = ".mp4"
|
|
elif not ext and mime == "video/quicktime":
|
|
ext = ".mov"
|
|
|
|
club_nm = ""
|
|
if next_cid is not None:
|
|
club_nm = _club_display_name_for_storage(cur, next_cid)
|
|
|
|
media_root = get_effective_media_root(cur)
|
|
try:
|
|
storage_key = library_storage_key(
|
|
vis,
|
|
next_cid,
|
|
full_sha,
|
|
ext,
|
|
uploader_profile_id=profile_id if vis == "private" else None,
|
|
mime_type=mime,
|
|
club_name=club_nm,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e)) from e
|
|
dest_path = path_under_media_root(media_root, storage_key)
|
|
if dest_path is None:
|
|
raise HTTPException(status_code=500, detail="Ungültiger Speicherpfad")
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if not dest_path.is_file():
|
|
dest_path.write_bytes(raw)
|
|
|
|
cur.execute(
|
|
"""INSERT INTO media_assets (
|
|
mime_type, byte_size, sha256, original_filename, visibility, club_id,
|
|
uploaded_by_profile_id, copyright_notice, storage_backend, storage_key, lifecycle_state
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, NULL, 'local', %s, 'active')
|
|
RETURNING id""",
|
|
(
|
|
mime,
|
|
len(raw),
|
|
full_sha,
|
|
filename or storage_key,
|
|
vis,
|
|
next_cid,
|
|
profile_id,
|
|
storage_key,
|
|
),
|
|
)
|
|
ar = cur.fetchone()
|
|
aid = int(r2d(ar)["id"])
|
|
return {"status": "created", "media_asset_id": aid, "original_filename": filename or storage_key}
|
|
|
|
|
|
@router.post("/bulk-upload")
|
|
async def bulk_upload_media_assets(
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
files: list[UploadFile] = File(..., description="Mehrere Dateien (jpeg, png, gif, mp4, pdf)"),
|
|
visibility: str = Form("private"),
|
|
club_id: Optional[int] = Form(None),
|
|
):
|
|
"""Mehrere Dateien ins Archiv; Dedupe wie Übungs-Upload. Pro Datei eigene DB-Transaktion."""
|
|
if not files:
|
|
raise HTTPException(status_code=400, detail="Keine Dateien übermittelt")
|
|
if len(files) > _MAX_BULK_LIBRARY_FILES:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Maximal {_MAX_BULK_LIBRARY_FILES} Dateien pro Anfrage",
|
|
)
|
|
|
|
results: list[dict[str, Any]] = []
|
|
created = duplicate = failed = 0
|
|
|
|
for uf in files:
|
|
fn = uf.filename or "ohne_name"
|
|
try:
|
|
raw = await uf.read()
|
|
if not raw:
|
|
results.append({"filename": fn, "ok": False, "detail": "Leere Datei"})
|
|
failed += 1
|
|
continue
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
r = _ingest_library_media_file(
|
|
cur,
|
|
tenant,
|
|
raw,
|
|
uf.filename,
|
|
uf.content_type,
|
|
visibility,
|
|
club_id,
|
|
)
|
|
results.append({"filename": fn, "ok": True, **r})
|
|
if r["status"] == "created":
|
|
created += 1
|
|
else:
|
|
duplicate += 1
|
|
except HTTPException as e:
|
|
detail = e.detail
|
|
if isinstance(detail, dict):
|
|
detail_s = detail.get("message") or detail.get("code") or str(detail)
|
|
else:
|
|
detail_s = str(detail)
|
|
results.append(
|
|
{
|
|
"filename": fn,
|
|
"ok": False,
|
|
"status_code": e.status_code,
|
|
"detail": detail_s,
|
|
},
|
|
)
|
|
failed += 1
|
|
except Exception as e:
|
|
results.append({"filename": fn, "ok": False, "detail": str(e)})
|
|
failed += 1
|
|
|
|
return {
|
|
"results": results,
|
|
"created_count": created,
|
|
"duplicate_count": duplicate,
|
|
"failed_count": failed,
|
|
}
|
|
|
|
|
|
@router.get("")
|
|
def list_media_assets(
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
q: Optional[str] = Query(None, max_length=120),
|
|
lifecycle: str = Query(
|
|
"active",
|
|
description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)",
|
|
),
|
|
media_kind: str = Query(
|
|
"all",
|
|
description="all | image | video | pdf | other",
|
|
),
|
|
club_id: Optional[int] = Query(
|
|
None,
|
|
ge=1,
|
|
description="Nur Superadmin: nach Verein filtern",
|
|
),
|
|
uploaded_by: Optional[int] = Query(
|
|
None,
|
|
ge=1,
|
|
description="Nach Uploader (Profil-ID), wenn erlaubt",
|
|
),
|
|
include_filter_meta: bool = Query(
|
|
False,
|
|
description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)",
|
|
),
|
|
limit: int = Query(30, ge=1, le=100),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
lc_where = _lifecycle_where_sql(lifecycle)
|
|
mk = (media_kind or "all").strip().lower()
|
|
if mk not in _MEDIA_KIND_FILTERS:
|
|
raise HTTPException(status_code=400, detail="Ungültiger media_kind")
|
|
role = tenant.global_role or ""
|
|
is_adm = is_platform_admin(role)
|
|
sup = is_superadmin(role)
|
|
profile_id = tenant.profile_id
|
|
needle = (q or "").strip()
|
|
|
|
if club_id is not None and not sup:
|
|
raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin")
|
|
|
|
media_kind_sql = ""
|
|
if mk == "image":
|
|
# %% für psycopg2: sonst wird % als Platzhalter-Syntax interpretiert
|
|
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%%'"
|
|
elif mk == "video":
|
|
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%%'"
|
|
elif mk == "pdf":
|
|
media_kind_sql = (
|
|
" AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'"
|
|
" OR lower(COALESCE(ma.mime_type, '')) LIKE '%%pdf%%')"
|
|
)
|
|
elif mk == "other":
|
|
media_kind_sql = (
|
|
" AND COALESCE(ma.mime_type, '') <> ''"
|
|
" AND lower(ma.mime_type) NOT LIKE 'image/%%'"
|
|
" AND lower(ma.mime_type) NOT LIKE 'video/%%'"
|
|
" AND lower(ma.mime_type) <> 'application/pdf'"
|
|
" AND lower(ma.mime_type) NOT LIKE '%%pdf%%'"
|
|
)
|
|
|
|
club_sql = ""
|
|
club_sql_params: list[Any] = []
|
|
if club_id is not None:
|
|
club_sql = " AND ma.club_id = %s"
|
|
club_sql_params.append(club_id)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin")
|
|
show_uploader = sup or is_adm or bool(admin_club_ids)
|
|
if uploaded_by is not None and not show_uploader:
|
|
raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt")
|
|
|
|
has_tags_col = _media_assets_tags_column_present(cur)
|
|
tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags,"
|
|
|
|
uploaded_sql = ""
|
|
uploaded_params: list[Any] = []
|
|
if uploaded_by is not None:
|
|
uploaded_sql = " AND ma.uploaded_by_profile_id = %s"
|
|
uploaded_params.append(uploaded_by)
|
|
|
|
search_sql = ""
|
|
search_params: list[Any] = []
|
|
if needle:
|
|
like = f"%{needle}%"
|
|
if has_tags_col:
|
|
search_params = [like, like, like, like]
|
|
search_sql = (
|
|
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
|
|
" OR COALESCE(ma.copyright_notice, '') ILIKE %s"
|
|
" OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))"
|
|
)
|
|
else:
|
|
search_params = [like, like, like]
|
|
search_sql = (
|
|
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
|
|
" OR COALESCE(ma.copyright_notice, '') ILIKE %s)"
|
|
)
|
|
|
|
params: list[Any] = (
|
|
[is_adm, profile_id, profile_id]
|
|
+ club_sql_params
|
|
+ uploaded_params
|
|
+ search_params
|
|
+ [limit, offset]
|
|
)
|
|
|
|
cur.execute(
|
|
f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id,
|
|
ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256,
|
|
ma.copyright_notice, ma.storage_key, {tags_select}
|
|
pr.name AS uploader_name,
|
|
pr.email AS uploader_email,
|
|
cl.name AS club_name
|
|
FROM media_assets ma
|
|
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
|
|
LEFT JOIN clubs cl ON cl.id = ma.club_id
|
|
WHERE {lc_where}
|
|
AND (
|
|
%s
|
|
OR lower(trim(ma.visibility)) = 'official'
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'private'
|
|
AND ma.uploaded_by_profile_id = %s
|
|
)
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'club'
|
|
AND EXISTS (
|
|
SELECT 1 FROM club_members cm
|
|
WHERE cm.profile_id = %s
|
|
AND cm.club_id = ma.club_id
|
|
AND cm.status = 'active'
|
|
)
|
|
)
|
|
)
|
|
{club_sql}
|
|
{uploaded_sql}
|
|
{media_kind_sql}
|
|
{search_sql}
|
|
ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC
|
|
LIMIT %s OFFSET %s""",
|
|
params,
|
|
)
|
|
rows = [r2d(r) for r in cur.fetchall()]
|
|
show_club = sup or is_adm
|
|
asset_ids = [int(r["id"]) for r in rows]
|
|
usage_map = _usage_for_media_assets(cur, asset_ids)
|
|
for r in rows:
|
|
r["permissions"] = _item_permissions(r, tenant, admin_club_ids)
|
|
tid = int(r["id"])
|
|
r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []})
|
|
tags_val = r.get("tags")
|
|
if tags_val is None:
|
|
r["tags"] = []
|
|
elif not isinstance(tags_val, list):
|
|
r["tags"] = list(tags_val) if tags_val else []
|
|
if not show_uploader:
|
|
r["uploader_name"] = None
|
|
r["uploader_email"] = None
|
|
if not show_club:
|
|
r["club_name"] = None
|
|
viewer = {
|
|
"show_uploader_meta": show_uploader,
|
|
"show_club_meta": show_club,
|
|
"is_superadmin": sup,
|
|
"is_platform_admin": is_adm,
|
|
}
|
|
filter_meta = None
|
|
if include_filter_meta and show_uploader:
|
|
filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)}
|
|
|
|
return {
|
|
"items": rows,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"lifecycle": lifecycle.strip().lower(),
|
|
"media_kind": mk,
|
|
"viewer": viewer,
|
|
"filter_meta": filter_meta,
|
|
}
|
|
|
|
|
|
@router.api_route("/{asset_id}/file", methods=["GET", "HEAD"])
|
|
def download_media_asset_file(
|
|
request: Request,
|
|
asset_id: int,
|
|
tenant: TenantContext = Depends(get_tenant_context_flexible),
|
|
):
|
|
"""Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken)."""
|
|
from routers.exercises import _binary_media_response
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
asset = _fetch_asset_file_row(cur, asset_id)
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
lc = (asset.get("lifecycle_state") or "").strip().lower()
|
|
if lc == "active":
|
|
_assert_can_view_archive_asset(cur, tenant, asset)
|
|
elif lc in ("trash_soft", "trash_hidden"):
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
|
|
|
|
sk = asset.get("storage_key")
|
|
if not sk:
|
|
raise HTTPException(status_code=404, detail="Keine Datei hinterlegt")
|
|
|
|
media_root = get_effective_media_root(cur)
|
|
abs_p = path_under_media_root(media_root, str(sk))
|
|
if not abs_p or not abs_p.is_file():
|
|
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
|
|
|
mime = asset.get("mime_type") or "application/octet-stream"
|
|
fname = asset.get("original_filename") or abs_p.name
|
|
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
|
|
|
|
|
|
@router.post("/{asset_id}/lifecycle")
|
|
def post_media_asset_lifecycle(
|
|
asset_id: int,
|
|
body: MediaLifecycleBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
return _apply_lifecycle_action(cur, conn, asset_id, body, tenant)
|
|
|
|
|
|
@router.post("/bulk-lifecycle")
|
|
def bulk_media_lifecycle(
|
|
body: MediaBulkLifecycleBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle)
|
|
updated: list[int] = []
|
|
failed: list[dict] = []
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
|
|
try:
|
|
_apply_lifecycle_action(cur, conn, aid, inner, tenant)
|
|
updated.append(aid)
|
|
except HTTPException as he:
|
|
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
|
|
failed.append({"id": aid, "detail": msg})
|
|
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
|
|
|
|
|
|
@router.post("/bulk-patch")
|
|
def bulk_media_patch(
|
|
body: MediaBulkPatchBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
|
|
patch_fields = {
|
|
k: v
|
|
for k, v in raw.items()
|
|
if k != "media_asset_ids" and (k == "tags" or v is not None)
|
|
}
|
|
if not patch_fields:
|
|
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
|
|
|
|
updated: list[int] = []
|
|
failed: list[dict] = []
|
|
profile_id = tenant.profile_id
|
|
role = tenant.global_role
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
|
|
try:
|
|
cur.execute(
|
|
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
|
|
copyright_notice, original_filename, sha256, storage_key, storage_backend, mime_type
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
failed.append({"id": asset_id, "detail": "Medium nicht gefunden"})
|
|
continue
|
|
asset = r2d(row)
|
|
assert_can_edit_media_asset_metadata(cur, tenant, asset)
|
|
|
|
if "tags" in patch_fields and not _media_assets_tags_column_present(cur):
|
|
failed.append(
|
|
{
|
|
"id": asset_id,
|
|
"detail": "Schlagwörter (tags) erfordern DB-Migration 046.",
|
|
}
|
|
)
|
|
continue
|
|
|
|
eff = _effective_media_patch_fields(patch_fields, asset)
|
|
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
|
|
next_cid = eff.get("club_id", asset.get("club_id"))
|
|
if "visibility" in patch_fields or "club_id" in patch_fields:
|
|
assert_valid_governance_visibility(
|
|
cur,
|
|
profile_id,
|
|
role,
|
|
next_vis,
|
|
int(next_cid) if next_cid is not None else None,
|
|
)
|
|
if next_vis in ("private", "club") and next_cid is None:
|
|
failed.append(
|
|
{
|
|
"id": asset_id,
|
|
"detail": (
|
|
"Für private oder Vereins-Medien wird club_id benötigt (Vereinsordner)."
|
|
),
|
|
}
|
|
)
|
|
continue
|
|
|
|
new_sk: Optional[str] = None
|
|
if "visibility" in patch_fields or "club_id" in patch_fields:
|
|
next_club_param: Optional[int] = None
|
|
if next_vis in ("club", "private"):
|
|
next_club_param = int(next_cid) if next_cid is not None else None
|
|
media_root = get_effective_media_root(cur)
|
|
new_sk = _relocate_asset_file_if_governance_changed(
|
|
cur, media_root, asset_id, asset, next_vis, next_club_param
|
|
)
|
|
|
|
sets: list[str] = []
|
|
vals: list[Any] = []
|
|
if "copyright_notice" in patch_fields:
|
|
sets.append("copyright_notice = %s")
|
|
vals.append(patch_fields["copyright_notice"])
|
|
if "original_filename" in patch_fields:
|
|
sets.append("original_filename = %s")
|
|
vals.append(patch_fields["original_filename"])
|
|
if "tags" in patch_fields:
|
|
sets.append("tags = %s")
|
|
vals.append(_normalize_media_tags(patch_fields["tags"]))
|
|
if "visibility" in patch_fields or "club_id" in patch_fields:
|
|
sets.append("visibility = %s")
|
|
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
|
|
sets.append("club_id = %s")
|
|
vals.append(next_cid)
|
|
if new_sk:
|
|
sets.append("storage_key = %s")
|
|
vals.append(new_sk)
|
|
if not sets:
|
|
failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"})
|
|
continue
|
|
sets.append("updated_at = NOW()")
|
|
vals.append(asset_id)
|
|
cur.execute(
|
|
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
|
|
tuple(vals),
|
|
)
|
|
conn.commit()
|
|
updated.append(asset_id)
|
|
except HTTPException as he:
|
|
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
|
|
failed.append({"id": asset_id, "detail": msg})
|
|
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
|
|
|
|
|
|
@router.patch("/{asset_id}")
|
|
def patch_media_asset(
|
|
asset_id: int,
|
|
body: MediaAssetPatch,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
profile_id = tenant.profile_id
|
|
role = tenant.global_role
|
|
data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
|
|
if not data:
|
|
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
|
|
copyright_notice, original_filename, sha256, storage_key, storage_backend, mime_type
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
asset = r2d(row)
|
|
assert_can_edit_media_asset_metadata(cur, tenant, asset)
|
|
|
|
if "tags" in data and not _media_assets_tags_column_present(cur):
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.",
|
|
)
|
|
|
|
eff = _effective_media_patch_fields(data, asset)
|
|
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
|
|
next_cid = eff.get("club_id", asset.get("club_id"))
|
|
if "visibility" in data or "club_id" in data:
|
|
assert_valid_governance_visibility(
|
|
cur,
|
|
profile_id,
|
|
role,
|
|
next_vis,
|
|
int(next_cid) if next_cid is not None else None,
|
|
)
|
|
if next_vis in ("private", "club") and next_cid is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=(
|
|
"Für private oder Vereins-Medien wird club_id benötigt (Vereinsordner). "
|
|
"Bitte im PATCH setzen, z. B. bei Wechsel von „offiziell“ zu „privat“."
|
|
),
|
|
)
|
|
|
|
new_sk: Optional[str] = None
|
|
if "visibility" in data or "club_id" in data:
|
|
next_club_param: Optional[int] = None
|
|
if next_vis in ("club", "private"):
|
|
next_club_param = int(next_cid) if next_cid is not None else None
|
|
media_root = get_effective_media_root(cur)
|
|
new_sk = _relocate_asset_file_if_governance_changed(
|
|
cur, media_root, asset_id, asset, next_vis, next_club_param
|
|
)
|
|
|
|
sets: list[str] = []
|
|
vals: list[Any] = []
|
|
if "copyright_notice" in data:
|
|
sets.append("copyright_notice = %s")
|
|
vals.append(data["copyright_notice"])
|
|
if "original_filename" in data:
|
|
sets.append("original_filename = %s")
|
|
vals.append(data["original_filename"])
|
|
if "tags" in data:
|
|
sets.append("tags = %s")
|
|
vals.append(_normalize_media_tags(data["tags"]))
|
|
if "visibility" in data or "club_id" in data:
|
|
sets.append("visibility = %s")
|
|
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
|
|
sets.append("club_id = %s")
|
|
vals.append(next_cid)
|
|
if new_sk:
|
|
sets.append("storage_key = %s")
|
|
vals.append(new_sk)
|
|
if sets:
|
|
sets.append("updated_at = NOW()")
|
|
vals.append(asset_id)
|
|
cur.execute(
|
|
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
|
|
tuple(vals),
|
|
)
|
|
conn.commit()
|
|
has_tags = _media_assets_tags_column_present(cur)
|
|
if has_tags:
|
|
cur.execute(
|
|
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
|
|
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
|
|
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
out = r2d(cur.fetchone())
|
|
if not has_tags:
|
|
out["tags"] = []
|
|
return out
|