shinkan-jinkendo/backend/routers/media_assets.py
Lars f9e6e61244
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Test Suite / pytest-backend (push) Successful in 24s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Successful in 29s
feat: implement bulk upload functionality for media assets
- Added a new API endpoint for bulk uploading media assets, allowing users to upload multiple files in a single request.
- Implemented validation for file types and sizes during the upload process, ensuring compliance with allowed formats and limits.
- Enhanced the MediaLibraryPage component to support bulk file selection and visibility options, improving user experience.
- Updated CSS styles for the upload interface to enhance layout and accessibility.
- Added tests to verify the functionality of the new bulk upload feature and its integration with existing media asset management.
2026-05-07 21:36:35 +02:00

1066 lines
40 KiB
Python

"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC."""
from __future__ import annotations
from typing import Any, Literal, Optional, Union
import hashlib
from pathlib import Path
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile
from pydantic import BaseModel, Field, model_validator
from club_tenancy import (
assert_valid_governance_visibility,
club_ids_for_profile_with_roles,
is_platform_admin,
is_superadmin,
library_content_visible_to_profile,
)
from db import get_db, get_cursor, r2d
from media_lifecycle import (
LC_ACTIVE,
LC_TRASH_HIDDEN,
LC_TRASH_SOFT,
assert_can_edit_media_asset_metadata,
assert_can_manage_media_asset_lifecycle,
assert_can_trash_soft,
fetch_media_asset_row,
purge_media_asset,
reactivate_media_asset_from_trash,
superadmin_force_lifecycle_state,
superadmin_hard_delete_media_asset,
transition_recover_from_hidden,
transition_to_trash_hidden,
transition_to_trash_soft,
)
from media_storage import get_effective_media_root, path_under_media_root
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible
from routers.exercises import ALLOWED_UPLOAD_MIMES, _upload_limit_bytes
router = APIRouter(prefix="/api/media-assets", tags=["media-assets"])
class MediaLifecycleBody(BaseModel):
action: Literal[
"trash_soft",
"trash_hidden",
"recover",
"purge",
"reactivate",
"superadmin_force_lifecycle",
"superadmin_hard_delete",
]
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
@model_validator(mode="after")
def _target_lifecycle_rules(self):
if self.action == "superadmin_force_lifecycle":
if not self.target_lifecycle:
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
elif self.target_lifecycle is not None:
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
return self
class MediaAssetPatch(BaseModel):
copyright_notice: Optional[str] = Field(None, max_length=8000)
original_filename: Optional[str] = Field(None, max_length=300)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
tags: Optional[list[str]] = None
class MediaBulkLifecycleBody(BaseModel):
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
action: Literal[
"trash_soft",
"trash_hidden",
"recover",
"purge",
"reactivate",
"superadmin_force_lifecycle",
"superadmin_hard_delete",
]
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
@model_validator(mode="after")
def _bulk_target(self):
if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle:
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None:
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
return self
class MediaBulkPatchBody(BaseModel):
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
copyright_notice: Optional[str] = Field(None, max_length=8000)
original_filename: Optional[str] = Field(None, max_length=300)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
tags: Optional[list[str]] = None
_LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"})
_MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"})
_MAX_TAGS = 40
_MAX_TAG_LEN = 48
_MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None
_TRAINING_UNIT_EXERCISES_TABLE: Optional[bool] = None
def _media_assets_tags_column_present(cur: Any) -> bool:
"""True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema."""
global _MEDIA_ASSETS_TAGS_COLUMN
if _MEDIA_ASSETS_TAGS_COLUMN is not None:
return _MEDIA_ASSETS_TAGS_COLUMN
cur.execute(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'media_assets'
AND column_name = 'tags'
LIMIT 1
"""
)
_MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None
return _MEDIA_ASSETS_TAGS_COLUMN
def _training_unit_exercises_table_present(cur: Any) -> bool:
"""True wenn planning-Migration (training_unit_exercises) im Schema liegt — sonst keine Einheiten-Links."""
global _TRAINING_UNIT_EXERCISES_TABLE
if _TRAINING_UNIT_EXERCISES_TABLE is not None:
return _TRAINING_UNIT_EXERCISES_TABLE
cur.execute(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'training_unit_exercises'
LIMIT 1
"""
)
_TRAINING_UNIT_EXERCISES_TABLE = cur.fetchone() is not None
return _TRAINING_UNIT_EXERCISES_TABLE
def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]:
"""PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive)."""
if raw is None:
return []
if not isinstance(raw, list):
raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein")
out: list[str] = []
seen: set[str] = set()
for item in raw:
s = str(item).strip()
if not s:
continue
s = s[:_MAX_TAG_LEN]
key = s.lower()
if key in seen:
continue
seen.add(key)
out.append(s)
if len(out) >= _MAX_TAGS:
break
return out
def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]:
"""Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen."""
if not asset_ids:
return {}
cur.execute(
"""SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title
FROM exercise_media em
JOIN exercises e ON e.id = em.exercise_id
WHERE em.media_asset_id = ANY(%s)
ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""",
(asset_ids,),
)
ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids}
seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids}
exercise_ids: set[int] = set()
for r in cur.fetchall():
d = r2d(r)
aid = int(d["media_asset_id"])
eid = int(d["exercise_id"])
if aid not in seen_ex or eid in seen_ex[aid]:
continue
seen_ex[aid].add(eid)
exercise_ids.add(eid)
title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}"
ex_by_asset[aid].append({"id": eid, "title": title})
exercise_to_units: dict[int, list[dict]] = {}
if exercise_ids and _training_unit_exercises_table_present(cur):
cur.execute(
"""SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date,
COALESCE(tg.name, '') AS group_name
FROM training_unit_exercises tue
JOIN training_units tu ON tu.id = tue.training_unit_id
LEFT JOIN training_groups tg ON tg.id = tu.group_id
WHERE tue.exercise_id = ANY(%s)""",
(list(exercise_ids),),
)
for row in cur.fetchall():
d = r2d(row)
eid = int(d["exercise_id"])
uid = int(d["unit_id"])
pd = d.get("planned_date")
planned: Optional[str]
if pd is None:
planned = None
elif hasattr(pd, "isoformat"):
planned = str(pd.isoformat())
else:
planned = str(pd)
exercise_to_units.setdefault(eid, []).append(
{
"id": uid,
"planned_date": planned,
"group_name": d.get("group_name") or "",
}
)
for eid, units in exercise_to_units.items():
seen_u: set[int] = set()
uniq: list[dict] = []
for u in units:
if u["id"] not in seen_u:
seen_u.add(u["id"])
uniq.append(u)
exercise_to_units[eid] = uniq
out: dict[int, dict[str, Any]] = {}
for aid in asset_ids:
aid_i = int(aid)
exs = ex_by_asset.get(aid_i, [])
seen_u2: set[int] = set()
units_agg: list[dict] = []
for ex in exs:
for u in exercise_to_units.get(ex["id"], []):
if u["id"] not in seen_u2:
seen_u2.add(u["id"])
units_agg.append(u)
out[aid_i] = {"exercises": exs, "training_units": units_agg}
return out
def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]:
cur.execute(
"""SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email
FROM media_assets ma
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
WHERE ma.uploaded_by_profile_id IS NOT NULL
AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')
AND (
%s
OR lower(trim(ma.visibility)) = 'official'
OR (
lower(trim(ma.visibility)) = 'private'
AND ma.uploaded_by_profile_id = %s
)
OR (
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)
)
ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id
LIMIT 400""",
(is_adm, profile_id, profile_id),
)
rows = [r2d(x) for x in cur.fetchall()]
out: list[dict] = []
for r in rows:
pid = r.get("id")
if pid is None:
continue
label = (r.get("name") or "").strip()
if not label:
label = (r.get("email") or "").strip()
if not label:
label = f"Profil #{pid}"
out.append({"id": int(pid), "label": label})
return out
def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict:
"""Nach visibility-Wechsel club_id konsistent setzen (official/private → NULL)."""
eff = dict(patch_fields)
if eff.get("visibility") is not None:
v = str(eff["visibility"]).strip().lower()
if v in ("official", "private"):
eff["club_id"] = None
elif v == "club" and "club_id" not in eff:
eff["club_id"] = asset.get("club_id")
return eff
def _lifecycle_where_sql(lifecycle: str) -> str:
lc = (lifecycle or "active").strip().lower()
if lc not in _LIFECYCLE_LIST_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter")
if lc == "active":
return "ma.lifecycle_state = 'active'"
if lc == "trash_soft":
return "ma.lifecycle_state = 'trash_soft'"
if lc == "trash_hidden":
return "ma.lifecycle_state = 'trash_hidden'"
return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')"
def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
storage_key, mime_type, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
return r2d(row) if row else None
def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None:
if not library_content_visible_to_profile(
cur,
tenant.profile_id,
(asset.get("visibility") or "").strip().lower(),
asset.get("club_id"),
asset.get("uploaded_by_profile_id"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium")
def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict:
"""Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB)."""
role_raw = tenant.global_role
role = (role_raw or "").strip().lower()
pid = int(tenant.profile_id)
sup = is_superadmin(role_raw)
plat = is_platform_admin(role_raw)
vis = (row.get("visibility") or "private").strip().lower()
uid = row.get("uploaded_by_profile_id")
cid = row.get("club_id")
lc = (row.get("lifecycle_state") or "active").strip().lower()
is_owner = uid is not None and int(uid) == pid
club_mgr = plat or (cid is not None and int(cid) in admin_club_ids)
edit_metadata = (
sup
or (vis == "official" and plat)
or (vis == "club" and club_mgr)
or (vis == "private" and is_owner)
)
trash_soft = lc == "active" and (
sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr)
)
if vis == "official" and not (sup or plat):
trash_soft = False
can_manage_adv = (
sup
or plat
or (vis == "private" and is_owner)
or (vis == "club" and club_mgr)
)
trash_hidden = lc in ("active", "trash_soft") and can_manage_adv
recover_from_hidden = lc == "trash_hidden" and can_manage_adv
reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv
purge = lc == "trash_hidden" and sup
return {
"edit_metadata": edit_metadata,
"change_visibility": edit_metadata,
"trash_soft": trash_soft,
"trash_hidden": trash_hidden,
"recover": recover_from_hidden,
"reactivate": reactivate,
"purge": purge,
"superadmin_lifecycle": sup,
"superadmin_hard_delete": sup,
}
def _apply_lifecycle_action(
cur: Any,
conn: Any,
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext,
) -> dict:
asset = fetch_media_asset_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
action = body.action
role_raw = tenant.global_role
if action == "superadmin_hard_delete":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Nur Superadmin")
ok = superadmin_hard_delete_media_asset(cur, conn, asset_id)
if not ok:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
return {"ok": True, "hard_deleted": asset_id}
if action == "superadmin_force_lifecycle":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Nur Superadmin")
tl = body.target_lifecycle or "active"
mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN}
return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl])
if action == "purge":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin")
state = (asset.get("lifecycle_state") or "").strip().lower()
if state != LC_TRASH_HIDDEN:
raise HTTPException(
status_code=400,
detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden",
)
if not purge_media_asset(cur, conn, asset_id):
raise HTTPException(status_code=400, detail="Löschen nicht möglich")
return {"ok": True, "purged": asset_id}
if action == "trash_soft":
assert_can_trash_soft(cur, tenant, asset)
return transition_to_trash_soft(cur, conn, asset_id)
if action == "trash_hidden":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return transition_to_trash_hidden(cur, conn, asset_id)
if action == "recover":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return transition_recover_from_hidden(cur, conn, asset_id)
if action == "reactivate":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return reactivate_media_asset_from_trash(cur, conn, asset_id)
raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action")
_MAX_BULK_LIBRARY_FILES = 25
def _ingest_library_media_file(
cur: Any,
tenant: TenantContext,
raw: bytes,
filename: Optional[str],
content_type: Optional[str],
visibility: str,
club_id_form: Optional[int],
) -> dict:
"""Neues Archiv-Medium oder aktiver Dedupe-Treffer (sha256 + Sichtbarkeit + Verein). Kein exercise_media."""
profile_id = tenant.profile_id
role = tenant.global_role or ""
vis = (visibility or "private").strip().lower()
if vis not in ("private", "club", "official"):
raise HTTPException(status_code=400, detail="Ungültige Sichtbarkeit")
next_cid: Optional[int] = None
if vis == "club":
if club_id_form is None:
raise HTTPException(status_code=400, detail="Verein erforderlich für Sichtbarkeit „Verein“")
next_cid = int(club_id_form)
assert_valid_governance_visibility(cur, profile_id, role, vis, next_cid)
max_b = _upload_limit_bytes(tenant)
if len(raw) > max_b:
raise HTTPException(
status_code=413,
detail=f"Datei zu groß (max. {max_b // (1024 * 1024)} MB)",
)
mime = (content_type or "").split(";")[0].strip().lower()
if mime not in ALLOWED_UPLOAD_MIMES:
raise HTTPException(
status_code=400,
detail=f"Dateityp nicht erlaubt: {mime or 'unbekannt'}",
)
full_sha = hashlib.sha256(raw).hexdigest()
cur.execute(
"""SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets
WHERE sha256 = %s AND lower(trim(visibility)) = %s
AND (club_id IS NOT DISTINCT FROM %s)
LIMIT 1""",
(full_sha, vis, next_cid),
)
existing_asset = cur.fetchone()
if existing_asset:
ea = r2d(existing_asset)
lc = (ea.get("lifecycle_state") or "").strip().lower()
if lc == "active":
return {
"status": "duplicate",
"media_asset_id": int(ea["id"]),
"original_filename": ea.get("original_filename"),
}
if lc in ("trash_soft", "trash_hidden"):
raise HTTPException(
status_code=409,
detail={
"code": "MEDIA_ASSET_IN_TRASH",
"message": (
"Diese Datei ist inhaltsgleich (SHA-256) mit einem Archiv-Medium im Papierkorb."
),
"media_asset_id": ea["id"],
"lifecycle_state": lc,
},
)
raise HTTPException(
status_code=409,
detail="Es existiert bereits ein Archiv-Eintrag zu dieser Datei in einem nicht nutzbaren Zustand.",
)
ext = Path(filename or "").suffix[:12] if filename else ""
if not ext and mime == "image/jpeg":
ext = ".jpg"
elif not ext and mime == "image/png":
ext = ".png"
media_root = get_effective_media_root(cur)
storage_key = f"exercises/{full_sha}{ext}"
dest_path = path_under_media_root(media_root, storage_key)
if dest_path is None:
raise HTTPException(status_code=500, detail="Ungültiger Speicherpfad")
dest_path.parent.mkdir(parents=True, exist_ok=True)
if not dest_path.is_file():
dest_path.write_bytes(raw)
cur.execute(
"""INSERT INTO media_assets (
mime_type, byte_size, sha256, original_filename, visibility, club_id,
uploaded_by_profile_id, copyright_notice, storage_backend, storage_key, lifecycle_state
) VALUES (%s, %s, %s, %s, %s, %s, %s, NULL, 'local', %s, 'active')
RETURNING id""",
(
mime,
len(raw),
full_sha,
filename or storage_key,
vis,
next_cid,
profile_id,
storage_key,
),
)
ar = cur.fetchone()
aid = int(r2d(ar)["id"])
return {"status": "created", "media_asset_id": aid, "original_filename": filename or storage_key}
@router.post("/bulk-upload")
async def bulk_upload_media_assets(
tenant: TenantContext = Depends(get_tenant_context),
files: list[UploadFile] = File(..., description="Mehrere Dateien (jpeg, png, gif, mp4, pdf)"),
visibility: str = Form("private"),
club_id: Optional[int] = Form(None),
):
"""Mehrere Dateien ins Archiv; Dedupe wie Übungs-Upload. Pro Datei eigene DB-Transaktion."""
if not files:
raise HTTPException(status_code=400, detail="Keine Dateien übermittelt")
if len(files) > _MAX_BULK_LIBRARY_FILES:
raise HTTPException(
status_code=400,
detail=f"Maximal {_MAX_BULK_LIBRARY_FILES} Dateien pro Anfrage",
)
results: list[dict[str, Any]] = []
created = duplicate = failed = 0
for uf in files:
fn = uf.filename or "ohne_name"
try:
raw = await uf.read()
if not raw:
results.append({"filename": fn, "ok": False, "detail": "Leere Datei"})
failed += 1
continue
with get_db() as conn:
cur = get_cursor(conn)
r = _ingest_library_media_file(
cur,
tenant,
raw,
uf.filename,
uf.content_type,
visibility,
club_id,
)
results.append({"filename": fn, "ok": True, **r})
if r["status"] == "created":
created += 1
else:
duplicate += 1
except HTTPException as e:
detail = e.detail
if isinstance(detail, dict):
detail_s = detail.get("message") or detail.get("code") or str(detail)
else:
detail_s = str(detail)
results.append(
{
"filename": fn,
"ok": False,
"status_code": e.status_code,
"detail": detail_s,
},
)
failed += 1
except Exception as e:
results.append({"filename": fn, "ok": False, "detail": str(e)})
failed += 1
return {
"results": results,
"created_count": created,
"duplicate_count": duplicate,
"failed_count": failed,
}
@router.get("")
def list_media_assets(
tenant: TenantContext = Depends(get_tenant_context),
q: Optional[str] = Query(None, max_length=120),
lifecycle: str = Query(
"active",
description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)",
),
media_kind: str = Query(
"all",
description="all | image | video | pdf | other",
),
club_id: Optional[int] = Query(
None,
ge=1,
description="Nur Superadmin: nach Verein filtern",
),
uploaded_by: Optional[int] = Query(
None,
ge=1,
description="Nach Uploader (Profil-ID), wenn erlaubt",
),
include_filter_meta: bool = Query(
False,
description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)",
),
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
):
lc_where = _lifecycle_where_sql(lifecycle)
mk = (media_kind or "all").strip().lower()
if mk not in _MEDIA_KIND_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger media_kind")
role = tenant.global_role or ""
is_adm = is_platform_admin(role)
sup = is_superadmin(role)
profile_id = tenant.profile_id
needle = (q or "").strip()
if club_id is not None and not sup:
raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin")
media_kind_sql = ""
if mk == "image":
# %% für psycopg2: sonst wird % als Platzhalter-Syntax interpretiert
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%%'"
elif mk == "video":
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%%'"
elif mk == "pdf":
media_kind_sql = (
" AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'"
" OR lower(COALESCE(ma.mime_type, '')) LIKE '%%pdf%%')"
)
elif mk == "other":
media_kind_sql = (
" AND COALESCE(ma.mime_type, '') <> ''"
" AND lower(ma.mime_type) NOT LIKE 'image/%%'"
" AND lower(ma.mime_type) NOT LIKE 'video/%%'"
" AND lower(ma.mime_type) <> 'application/pdf'"
" AND lower(ma.mime_type) NOT LIKE '%%pdf%%'"
)
club_sql = ""
club_sql_params: list[Any] = []
if club_id is not None:
club_sql = " AND ma.club_id = %s"
club_sql_params.append(club_id)
with get_db() as conn:
cur = get_cursor(conn)
admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin")
show_uploader = sup or is_adm or bool(admin_club_ids)
if uploaded_by is not None and not show_uploader:
raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt")
has_tags_col = _media_assets_tags_column_present(cur)
tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags,"
uploaded_sql = ""
uploaded_params: list[Any] = []
if uploaded_by is not None:
uploaded_sql = " AND ma.uploaded_by_profile_id = %s"
uploaded_params.append(uploaded_by)
search_sql = ""
search_params: list[Any] = []
if needle:
like = f"%{needle}%"
if has_tags_col:
search_params = [like, like, like, like]
search_sql = (
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
" OR COALESCE(ma.copyright_notice, '') ILIKE %s"
" OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))"
)
else:
search_params = [like, like, like]
search_sql = (
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
" OR COALESCE(ma.copyright_notice, '') ILIKE %s)"
)
params: list[Any] = (
[is_adm, profile_id, profile_id]
+ club_sql_params
+ uploaded_params
+ search_params
+ [limit, offset]
)
cur.execute(
f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id,
ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256,
ma.copyright_notice, ma.storage_key, {tags_select}
pr.name AS uploader_name,
pr.email AS uploader_email,
cl.name AS club_name
FROM media_assets ma
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
LEFT JOIN clubs cl ON cl.id = ma.club_id
WHERE {lc_where}
AND (
%s
OR lower(trim(ma.visibility)) = 'official'
OR (
lower(trim(ma.visibility)) = 'private'
AND ma.uploaded_by_profile_id = %s
)
OR (
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)
)
{club_sql}
{uploaded_sql}
{media_kind_sql}
{search_sql}
ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC
LIMIT %s OFFSET %s""",
params,
)
rows = [r2d(r) for r in cur.fetchall()]
show_club = sup or is_adm
asset_ids = [int(r["id"]) for r in rows]
usage_map = _usage_for_media_assets(cur, asset_ids)
for r in rows:
r["permissions"] = _item_permissions(r, tenant, admin_club_ids)
tid = int(r["id"])
r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []})
tags_val = r.get("tags")
if tags_val is None:
r["tags"] = []
elif not isinstance(tags_val, list):
r["tags"] = list(tags_val) if tags_val else []
if not show_uploader:
r["uploader_name"] = None
r["uploader_email"] = None
if not show_club:
r["club_name"] = None
viewer = {
"show_uploader_meta": show_uploader,
"show_club_meta": show_club,
"is_superadmin": sup,
"is_platform_admin": is_adm,
}
filter_meta = None
if include_filter_meta and show_uploader:
filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)}
return {
"items": rows,
"limit": limit,
"offset": offset,
"lifecycle": lifecycle.strip().lower(),
"media_kind": mk,
"viewer": viewer,
"filter_meta": filter_meta,
}
@router.api_route("/{asset_id}/file", methods=["GET", "HEAD"])
def download_media_asset_file(
request: Request,
asset_id: int,
tenant: TenantContext = Depends(get_tenant_context_flexible),
):
"""Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken)."""
from routers.exercises import _binary_media_response
with get_db() as conn:
cur = get_cursor(conn)
asset = _fetch_asset_file_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
lc = (asset.get("lifecycle_state") or "").strip().lower()
if lc == "active":
_assert_can_view_archive_asset(cur, tenant, asset)
elif lc in ("trash_soft", "trash_hidden"):
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
else:
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
sk = asset.get("storage_key")
if not sk:
raise HTTPException(status_code=404, detail="Keine Datei hinterlegt")
media_root = get_effective_media_root(cur)
abs_p = path_under_media_root(media_root, str(sk))
if not abs_p or not abs_p.is_file():
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
mime = asset.get("mime_type") or "application/octet-stream"
fname = asset.get("original_filename") or abs_p.name
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
@router.post("/{asset_id}/lifecycle")
def post_media_asset_lifecycle(
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
with get_db() as conn:
cur = get_cursor(conn)
return _apply_lifecycle_action(cur, conn, asset_id, body, tenant)
@router.post("/bulk-lifecycle")
def bulk_media_lifecycle(
body: MediaBulkLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle)
updated: list[int] = []
failed: list[dict] = []
with get_db() as conn:
cur = get_cursor(conn)
for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
try:
_apply_lifecycle_action(cur, conn, aid, inner, tenant)
updated.append(aid)
except HTTPException as he:
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
failed.append({"id": aid, "detail": msg})
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
@router.post("/bulk-patch")
def bulk_media_patch(
body: MediaBulkPatchBody,
tenant: TenantContext = Depends(get_tenant_context),
):
raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
patch_fields = {
k: v
for k, v in raw.items()
if k != "media_asset_ids" and (k == "tags" or v is not None)
}
if not patch_fields:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
updated: list[int] = []
failed: list[dict] = []
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
try:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
failed.append({"id": asset_id, "detail": "Medium nicht gefunden"})
continue
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
if "tags" in patch_fields and not _media_assets_tags_column_present(cur):
failed.append(
{
"id": asset_id,
"detail": "Schlagwörter (tags) erfordern DB-Migration 046.",
}
)
continue
eff = _effective_media_patch_fields(patch_fields, asset)
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id")
if "visibility" in patch_fields or "club_id" in patch_fields:
assert_valid_governance_visibility(
cur,
profile_id,
role,
next_vis,
int(next_cid) if next_cid is not None else None,
)
sets: list[str] = []
vals: list[Any] = []
if "copyright_notice" in patch_fields:
sets.append("copyright_notice = %s")
vals.append(patch_fields["copyright_notice"])
if "original_filename" in patch_fields:
sets.append("original_filename = %s")
vals.append(patch_fields["original_filename"])
if "tags" in patch_fields:
sets.append("tags = %s")
vals.append(_normalize_media_tags(patch_fields["tags"]))
if "visibility" in patch_fields or "club_id" in patch_fields:
sets.append("visibility = %s")
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
sets.append("club_id = %s")
vals.append(eff.get("club_id"))
if not sets:
failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"})
continue
sets.append("updated_at = NOW()")
vals.append(asset_id)
cur.execute(
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
conn.commit()
updated.append(asset_id)
except HTTPException as he:
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
failed.append({"id": asset_id, "detail": msg})
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
@router.patch("/{asset_id}")
def patch_media_asset(
asset_id: int,
body: MediaAssetPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
if "tags" in data and not _media_assets_tags_column_present(cur):
raise HTTPException(
status_code=503,
detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.",
)
eff = _effective_media_patch_fields(data, asset)
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id")
if "visibility" in data or "club_id" in data:
assert_valid_governance_visibility(
cur,
profile_id,
role,
next_vis,
int(next_cid) if next_cid is not None else None,
)
sets: list[str] = []
vals: list[Any] = []
if "copyright_notice" in data:
sets.append("copyright_notice = %s")
vals.append(data["copyright_notice"])
if "original_filename" in data:
sets.append("original_filename = %s")
vals.append(data["original_filename"])
if "tags" in data:
sets.append("tags = %s")
vals.append(_normalize_media_tags(data["tags"]))
if "visibility" in data or "club_id" in data:
sets.append("visibility = %s")
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
sets.append("club_id = %s")
vals.append(eff.get("club_id"))
if sets:
sets.append("updated_at = NOW()")
vals.append(asset_id)
cur.execute(
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
conn.commit()
has_tags = _media_assets_tags_column_present(cur)
if has_tags:
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags
FROM media_assets WHERE id = %s""",
(asset_id,),
)
else:
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice
FROM media_assets WHERE id = %s""",
(asset_id,),
)
out = r2d(cur.fetchone())
if not has_tags:
out["tags"] = []
return out