All checks were successful
Deploy Development / deploy (push) Successful in 36s
Test Suite / pytest-backend (push) Successful in 24s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 35s
- Updated APP_VERSION to 0.8.51 and MODULE_VERSIONS for media_assets to 1.5.1. - Added a new function to check for the presence of the 'training_unit_exercises' table, enhancing conditional handling in media asset queries. - Updated changelog to reflect the latest changes, including improved handling of media links based on the existence of the training unit exercises table.
876 lines
33 KiB
Python
876 lines
33 KiB
Python
"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Literal, Optional, Union
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request
|
|
from pydantic import BaseModel, Field, model_validator
|
|
|
|
from club_tenancy import (
|
|
assert_valid_governance_visibility,
|
|
club_ids_for_profile_with_roles,
|
|
is_platform_admin,
|
|
is_superadmin,
|
|
library_content_visible_to_profile,
|
|
)
|
|
from db import get_db, get_cursor, r2d
|
|
from media_lifecycle import (
|
|
LC_ACTIVE,
|
|
LC_TRASH_HIDDEN,
|
|
LC_TRASH_SOFT,
|
|
assert_can_edit_media_asset_metadata,
|
|
assert_can_manage_media_asset_lifecycle,
|
|
assert_can_trash_soft,
|
|
fetch_media_asset_row,
|
|
purge_media_asset,
|
|
reactivate_media_asset_from_trash,
|
|
superadmin_force_lifecycle_state,
|
|
superadmin_hard_delete_media_asset,
|
|
transition_recover_from_hidden,
|
|
transition_to_trash_hidden,
|
|
transition_to_trash_soft,
|
|
)
|
|
from media_storage import get_effective_media_root, path_under_media_root
|
|
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible
|
|
|
|
router = APIRouter(prefix="/api/media-assets", tags=["media-assets"])
|
|
|
|
|
|
class MediaLifecycleBody(BaseModel):
|
|
action: Literal[
|
|
"trash_soft",
|
|
"trash_hidden",
|
|
"recover",
|
|
"purge",
|
|
"reactivate",
|
|
"superadmin_force_lifecycle",
|
|
"superadmin_hard_delete",
|
|
]
|
|
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
|
|
|
|
@model_validator(mode="after")
|
|
def _target_lifecycle_rules(self):
|
|
if self.action == "superadmin_force_lifecycle":
|
|
if not self.target_lifecycle:
|
|
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
|
|
elif self.target_lifecycle is not None:
|
|
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
|
|
return self
|
|
|
|
|
|
class MediaAssetPatch(BaseModel):
|
|
copyright_notice: Optional[str] = Field(None, max_length=8000)
|
|
original_filename: Optional[str] = Field(None, max_length=300)
|
|
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
|
|
club_id: Optional[int] = None
|
|
tags: Optional[list[str]] = None
|
|
|
|
|
|
class MediaBulkLifecycleBody(BaseModel):
|
|
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
|
|
action: Literal[
|
|
"trash_soft",
|
|
"trash_hidden",
|
|
"recover",
|
|
"purge",
|
|
"reactivate",
|
|
"superadmin_force_lifecycle",
|
|
"superadmin_hard_delete",
|
|
]
|
|
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
|
|
|
|
@model_validator(mode="after")
|
|
def _bulk_target(self):
|
|
if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle:
|
|
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
|
|
if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None:
|
|
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
|
|
return self
|
|
|
|
|
|
class MediaBulkPatchBody(BaseModel):
|
|
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
|
|
copyright_notice: Optional[str] = Field(None, max_length=8000)
|
|
original_filename: Optional[str] = Field(None, max_length=300)
|
|
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
|
|
club_id: Optional[int] = None
|
|
tags: Optional[list[str]] = None
|
|
|
|
|
|
_LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"})
|
|
_MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"})
|
|
_MAX_TAGS = 40
|
|
_MAX_TAG_LEN = 48
|
|
|
|
_MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None
|
|
_TRAINING_UNIT_EXERCISES_TABLE: Optional[bool] = None
|
|
|
|
|
|
def _media_assets_tags_column_present(cur: Any) -> bool:
|
|
"""True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema."""
|
|
global _MEDIA_ASSETS_TAGS_COLUMN
|
|
if _MEDIA_ASSETS_TAGS_COLUMN is not None:
|
|
return _MEDIA_ASSETS_TAGS_COLUMN
|
|
cur.execute(
|
|
"""
|
|
SELECT 1
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'media_assets'
|
|
AND column_name = 'tags'
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
_MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None
|
|
return _MEDIA_ASSETS_TAGS_COLUMN
|
|
|
|
|
|
def _training_unit_exercises_table_present(cur: Any) -> bool:
|
|
"""True wenn planning-Migration (training_unit_exercises) im Schema liegt — sonst keine Einheiten-Links."""
|
|
global _TRAINING_UNIT_EXERCISES_TABLE
|
|
if _TRAINING_UNIT_EXERCISES_TABLE is not None:
|
|
return _TRAINING_UNIT_EXERCISES_TABLE
|
|
cur.execute(
|
|
"""
|
|
SELECT 1
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_name = 'training_unit_exercises'
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
_TRAINING_UNIT_EXERCISES_TABLE = cur.fetchone() is not None
|
|
return _TRAINING_UNIT_EXERCISES_TABLE
|
|
|
|
|
|
def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]:
|
|
"""PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive)."""
|
|
if raw is None:
|
|
return []
|
|
if not isinstance(raw, list):
|
|
raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein")
|
|
out: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in raw:
|
|
s = str(item).strip()
|
|
if not s:
|
|
continue
|
|
s = s[:_MAX_TAG_LEN]
|
|
key = s.lower()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(s)
|
|
if len(out) >= _MAX_TAGS:
|
|
break
|
|
return out
|
|
|
|
|
|
def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]:
|
|
"""Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen."""
|
|
if not asset_ids:
|
|
return {}
|
|
cur.execute(
|
|
"""SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title
|
|
FROM exercise_media em
|
|
JOIN exercises e ON e.id = em.exercise_id
|
|
WHERE em.media_asset_id = ANY(%s)
|
|
ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""",
|
|
(asset_ids,),
|
|
)
|
|
ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids}
|
|
seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids}
|
|
exercise_ids: set[int] = set()
|
|
for r in cur.fetchall():
|
|
d = r2d(r)
|
|
aid = int(d["media_asset_id"])
|
|
eid = int(d["exercise_id"])
|
|
if aid not in seen_ex or eid in seen_ex[aid]:
|
|
continue
|
|
seen_ex[aid].add(eid)
|
|
exercise_ids.add(eid)
|
|
title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}"
|
|
ex_by_asset[aid].append({"id": eid, "title": title})
|
|
exercise_to_units: dict[int, list[dict]] = {}
|
|
if exercise_ids and _training_unit_exercises_table_present(cur):
|
|
cur.execute(
|
|
"""SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date,
|
|
COALESCE(tg.name, '') AS group_name
|
|
FROM training_unit_exercises tue
|
|
JOIN training_units tu ON tu.id = tue.training_unit_id
|
|
LEFT JOIN training_groups tg ON tg.id = tu.group_id
|
|
WHERE tue.exercise_id = ANY(%s)""",
|
|
(list(exercise_ids),),
|
|
)
|
|
for row in cur.fetchall():
|
|
d = r2d(row)
|
|
eid = int(d["exercise_id"])
|
|
uid = int(d["unit_id"])
|
|
pd = d.get("planned_date")
|
|
planned: Optional[str]
|
|
if pd is None:
|
|
planned = None
|
|
elif hasattr(pd, "isoformat"):
|
|
planned = str(pd.isoformat())
|
|
else:
|
|
planned = str(pd)
|
|
exercise_to_units.setdefault(eid, []).append(
|
|
{
|
|
"id": uid,
|
|
"planned_date": planned,
|
|
"group_name": d.get("group_name") or "",
|
|
}
|
|
)
|
|
for eid, units in exercise_to_units.items():
|
|
seen_u: set[int] = set()
|
|
uniq: list[dict] = []
|
|
for u in units:
|
|
if u["id"] not in seen_u:
|
|
seen_u.add(u["id"])
|
|
uniq.append(u)
|
|
exercise_to_units[eid] = uniq
|
|
out: dict[int, dict[str, Any]] = {}
|
|
for aid in asset_ids:
|
|
aid_i = int(aid)
|
|
exs = ex_by_asset.get(aid_i, [])
|
|
seen_u2: set[int] = set()
|
|
units_agg: list[dict] = []
|
|
for ex in exs:
|
|
for u in exercise_to_units.get(ex["id"], []):
|
|
if u["id"] not in seen_u2:
|
|
seen_u2.add(u["id"])
|
|
units_agg.append(u)
|
|
out[aid_i] = {"exercises": exs, "training_units": units_agg}
|
|
return out
|
|
|
|
|
|
def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]:
|
|
cur.execute(
|
|
"""SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email
|
|
FROM media_assets ma
|
|
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
|
|
WHERE ma.uploaded_by_profile_id IS NOT NULL
|
|
AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')
|
|
AND (
|
|
%s
|
|
OR lower(trim(ma.visibility)) = 'official'
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'private'
|
|
AND ma.uploaded_by_profile_id = %s
|
|
)
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'club'
|
|
AND EXISTS (
|
|
SELECT 1 FROM club_members cm
|
|
WHERE cm.profile_id = %s
|
|
AND cm.club_id = ma.club_id
|
|
AND cm.status = 'active'
|
|
)
|
|
)
|
|
)
|
|
ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id
|
|
LIMIT 400""",
|
|
(is_adm, profile_id, profile_id),
|
|
)
|
|
rows = [r2d(x) for x in cur.fetchall()]
|
|
out: list[dict] = []
|
|
for r in rows:
|
|
pid = r.get("id")
|
|
if pid is None:
|
|
continue
|
|
label = (r.get("name") or "").strip()
|
|
if not label:
|
|
label = (r.get("email") or "").strip()
|
|
if not label:
|
|
label = f"Profil #{pid}"
|
|
out.append({"id": int(pid), "label": label})
|
|
return out
|
|
|
|
|
|
def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict:
|
|
"""Nach visibility-Wechsel club_id konsistent setzen (official/private → NULL)."""
|
|
eff = dict(patch_fields)
|
|
if eff.get("visibility") is not None:
|
|
v = str(eff["visibility"]).strip().lower()
|
|
if v in ("official", "private"):
|
|
eff["club_id"] = None
|
|
elif v == "club" and "club_id" not in eff:
|
|
eff["club_id"] = asset.get("club_id")
|
|
return eff
|
|
|
|
|
|
def _lifecycle_where_sql(lifecycle: str) -> str:
|
|
lc = (lifecycle or "active").strip().lower()
|
|
if lc not in _LIFECYCLE_LIST_FILTERS:
|
|
raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter")
|
|
if lc == "active":
|
|
return "ma.lifecycle_state = 'active'"
|
|
if lc == "trash_soft":
|
|
return "ma.lifecycle_state = 'trash_soft'"
|
|
if lc == "trash_hidden":
|
|
return "ma.lifecycle_state = 'trash_hidden'"
|
|
return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')"
|
|
|
|
|
|
def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]:
|
|
cur.execute(
|
|
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
|
|
storage_key, mime_type, original_filename
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
return r2d(row) if row else None
|
|
|
|
|
|
def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None:
|
|
if not library_content_visible_to_profile(
|
|
cur,
|
|
tenant.profile_id,
|
|
(asset.get("visibility") or "").strip().lower(),
|
|
asset.get("club_id"),
|
|
asset.get("uploaded_by_profile_id"),
|
|
tenant.global_role,
|
|
):
|
|
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium")
|
|
|
|
|
|
def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict:
|
|
"""Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB)."""
|
|
role_raw = tenant.global_role
|
|
role = (role_raw or "").strip().lower()
|
|
pid = int(tenant.profile_id)
|
|
sup = is_superadmin(role_raw)
|
|
plat = is_platform_admin(role_raw)
|
|
vis = (row.get("visibility") or "private").strip().lower()
|
|
uid = row.get("uploaded_by_profile_id")
|
|
cid = row.get("club_id")
|
|
lc = (row.get("lifecycle_state") or "active").strip().lower()
|
|
is_owner = uid is not None and int(uid) == pid
|
|
club_mgr = plat or (cid is not None and int(cid) in admin_club_ids)
|
|
|
|
edit_metadata = (
|
|
sup
|
|
or (vis == "official" and plat)
|
|
or (vis == "club" and club_mgr)
|
|
or (vis == "private" and is_owner)
|
|
)
|
|
|
|
trash_soft = lc == "active" and (
|
|
sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr)
|
|
)
|
|
if vis == "official" and not (sup or plat):
|
|
trash_soft = False
|
|
|
|
can_manage_adv = (
|
|
sup
|
|
or plat
|
|
or (vis == "private" and is_owner)
|
|
or (vis == "club" and club_mgr)
|
|
)
|
|
|
|
trash_hidden = lc in ("active", "trash_soft") and can_manage_adv
|
|
recover_from_hidden = lc == "trash_hidden" and can_manage_adv
|
|
reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv
|
|
purge = lc == "trash_hidden" and sup
|
|
|
|
return {
|
|
"edit_metadata": edit_metadata,
|
|
"change_visibility": edit_metadata,
|
|
"trash_soft": trash_soft,
|
|
"trash_hidden": trash_hidden,
|
|
"recover": recover_from_hidden,
|
|
"reactivate": reactivate,
|
|
"purge": purge,
|
|
"superadmin_lifecycle": sup,
|
|
"superadmin_hard_delete": sup,
|
|
}
|
|
|
|
|
|
def _apply_lifecycle_action(
|
|
cur: Any,
|
|
conn: Any,
|
|
asset_id: int,
|
|
body: MediaLifecycleBody,
|
|
tenant: TenantContext,
|
|
) -> dict:
|
|
asset = fetch_media_asset_row(cur, asset_id)
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
|
|
action = body.action
|
|
role_raw = tenant.global_role
|
|
|
|
if action == "superadmin_hard_delete":
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(status_code=403, detail="Nur Superadmin")
|
|
ok = superadmin_hard_delete_media_asset(cur, conn, asset_id)
|
|
if not ok:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
return {"ok": True, "hard_deleted": asset_id}
|
|
|
|
if action == "superadmin_force_lifecycle":
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(status_code=403, detail="Nur Superadmin")
|
|
tl = body.target_lifecycle or "active"
|
|
mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN}
|
|
return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl])
|
|
|
|
if action == "purge":
|
|
if not is_superadmin(role_raw):
|
|
raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin")
|
|
state = (asset.get("lifecycle_state") or "").strip().lower()
|
|
if state != LC_TRASH_HIDDEN:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden",
|
|
)
|
|
if not purge_media_asset(cur, conn, asset_id):
|
|
raise HTTPException(status_code=400, detail="Löschen nicht möglich")
|
|
return {"ok": True, "purged": asset_id}
|
|
|
|
if action == "trash_soft":
|
|
assert_can_trash_soft(cur, tenant, asset)
|
|
return transition_to_trash_soft(cur, conn, asset_id)
|
|
if action == "trash_hidden":
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
return transition_to_trash_hidden(cur, conn, asset_id)
|
|
if action == "recover":
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
return transition_recover_from_hidden(cur, conn, asset_id)
|
|
if action == "reactivate":
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
return reactivate_media_asset_from_trash(cur, conn, asset_id)
|
|
|
|
raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action")
|
|
|
|
|
|
@router.get("")
|
|
def list_media_assets(
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
q: Optional[str] = Query(None, max_length=120),
|
|
lifecycle: str = Query(
|
|
"active",
|
|
description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)",
|
|
),
|
|
media_kind: str = Query(
|
|
"all",
|
|
description="all | image | video | pdf | other",
|
|
),
|
|
club_id: Optional[int] = Query(
|
|
None,
|
|
ge=1,
|
|
description="Nur Superadmin: nach Verein filtern",
|
|
),
|
|
uploaded_by: Optional[int] = Query(
|
|
None,
|
|
ge=1,
|
|
description="Nach Uploader (Profil-ID), wenn erlaubt",
|
|
),
|
|
include_filter_meta: bool = Query(
|
|
False,
|
|
description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)",
|
|
),
|
|
limit: int = Query(30, ge=1, le=100),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
lc_where = _lifecycle_where_sql(lifecycle)
|
|
mk = (media_kind or "all").strip().lower()
|
|
if mk not in _MEDIA_KIND_FILTERS:
|
|
raise HTTPException(status_code=400, detail="Ungültiger media_kind")
|
|
role = tenant.global_role or ""
|
|
is_adm = is_platform_admin(role)
|
|
sup = is_superadmin(role)
|
|
profile_id = tenant.profile_id
|
|
needle = (q or "").strip()
|
|
|
|
if club_id is not None and not sup:
|
|
raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin")
|
|
|
|
media_kind_sql = ""
|
|
if mk == "image":
|
|
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%'"
|
|
elif mk == "video":
|
|
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%'"
|
|
elif mk == "pdf":
|
|
media_kind_sql = (
|
|
" AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'"
|
|
" OR lower(COALESCE(ma.mime_type, '')) LIKE '%pdf%')"
|
|
)
|
|
elif mk == "other":
|
|
media_kind_sql = (
|
|
" AND COALESCE(ma.mime_type, '') <> ''"
|
|
" AND lower(ma.mime_type) NOT LIKE 'image/%'"
|
|
" AND lower(ma.mime_type) NOT LIKE 'video/%'"
|
|
" AND lower(ma.mime_type) <> 'application/pdf'"
|
|
" AND lower(ma.mime_type) NOT LIKE '%pdf%'"
|
|
)
|
|
|
|
club_sql = ""
|
|
club_sql_params: list[Any] = []
|
|
if club_id is not None:
|
|
club_sql = " AND ma.club_id = %s"
|
|
club_sql_params.append(club_id)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin")
|
|
show_uploader = sup or is_adm or bool(admin_club_ids)
|
|
if uploaded_by is not None and not show_uploader:
|
|
raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt")
|
|
|
|
has_tags_col = _media_assets_tags_column_present(cur)
|
|
tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags,"
|
|
|
|
uploaded_sql = ""
|
|
uploaded_params: list[Any] = []
|
|
if uploaded_by is not None:
|
|
uploaded_sql = " AND ma.uploaded_by_profile_id = %s"
|
|
uploaded_params.append(uploaded_by)
|
|
|
|
search_sql = ""
|
|
search_params: list[Any] = []
|
|
if needle:
|
|
like = f"%{needle}%"
|
|
if has_tags_col:
|
|
search_params = [like, like, like, like]
|
|
search_sql = (
|
|
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
|
|
" OR COALESCE(ma.copyright_notice, '') ILIKE %s"
|
|
" OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))"
|
|
)
|
|
else:
|
|
search_params = [like, like, like]
|
|
search_sql = (
|
|
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
|
|
" OR COALESCE(ma.copyright_notice, '') ILIKE %s)"
|
|
)
|
|
|
|
params: list[Any] = (
|
|
[is_adm, profile_id, profile_id]
|
|
+ club_sql_params
|
|
+ uploaded_params
|
|
+ search_params
|
|
+ [limit, offset]
|
|
)
|
|
|
|
cur.execute(
|
|
f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id,
|
|
ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256,
|
|
ma.copyright_notice, ma.storage_key, {tags_select}
|
|
pr.name AS uploader_name,
|
|
pr.email AS uploader_email,
|
|
cl.name AS club_name
|
|
FROM media_assets ma
|
|
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
|
|
LEFT JOIN clubs cl ON cl.id = ma.club_id
|
|
WHERE {lc_where}
|
|
AND (
|
|
%s
|
|
OR lower(trim(ma.visibility)) = 'official'
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'private'
|
|
AND ma.uploaded_by_profile_id = %s
|
|
)
|
|
OR (
|
|
lower(trim(ma.visibility)) = 'club'
|
|
AND EXISTS (
|
|
SELECT 1 FROM club_members cm
|
|
WHERE cm.profile_id = %s
|
|
AND cm.club_id = ma.club_id
|
|
AND cm.status = 'active'
|
|
)
|
|
)
|
|
)
|
|
{club_sql}
|
|
{uploaded_sql}
|
|
{media_kind_sql}
|
|
{search_sql}
|
|
ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC
|
|
LIMIT %s OFFSET %s""",
|
|
params,
|
|
)
|
|
rows = [r2d(r) for r in cur.fetchall()]
|
|
show_club = sup or is_adm
|
|
asset_ids = [int(r["id"]) for r in rows]
|
|
usage_map = _usage_for_media_assets(cur, asset_ids)
|
|
for r in rows:
|
|
r["permissions"] = _item_permissions(r, tenant, admin_club_ids)
|
|
tid = int(r["id"])
|
|
r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []})
|
|
tags_val = r.get("tags")
|
|
if tags_val is None:
|
|
r["tags"] = []
|
|
elif not isinstance(tags_val, list):
|
|
r["tags"] = list(tags_val) if tags_val else []
|
|
if not show_uploader:
|
|
r["uploader_name"] = None
|
|
r["uploader_email"] = None
|
|
if not show_club:
|
|
r["club_name"] = None
|
|
viewer = {
|
|
"show_uploader_meta": show_uploader,
|
|
"show_club_meta": show_club,
|
|
"is_superadmin": sup,
|
|
"is_platform_admin": is_adm,
|
|
}
|
|
filter_meta = None
|
|
if include_filter_meta and show_uploader:
|
|
filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)}
|
|
|
|
return {
|
|
"items": rows,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
"lifecycle": lifecycle.strip().lower(),
|
|
"media_kind": mk,
|
|
"viewer": viewer,
|
|
"filter_meta": filter_meta,
|
|
}
|
|
|
|
|
|
@router.api_route("/{asset_id}/file", methods=["GET", "HEAD"])
|
|
def download_media_asset_file(
|
|
request: Request,
|
|
asset_id: int,
|
|
tenant: TenantContext = Depends(get_tenant_context_flexible),
|
|
):
|
|
"""Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken)."""
|
|
from routers.exercises import _binary_media_response
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
asset = _fetch_asset_file_row(cur, asset_id)
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
lc = (asset.get("lifecycle_state") or "").strip().lower()
|
|
if lc == "active":
|
|
_assert_can_view_archive_asset(cur, tenant, asset)
|
|
elif lc in ("trash_soft", "trash_hidden"):
|
|
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
|
|
else:
|
|
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
|
|
|
|
sk = asset.get("storage_key")
|
|
if not sk:
|
|
raise HTTPException(status_code=404, detail="Keine Datei hinterlegt")
|
|
|
|
media_root = get_effective_media_root(cur)
|
|
abs_p = path_under_media_root(media_root, str(sk))
|
|
if not abs_p or not abs_p.is_file():
|
|
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
|
|
|
|
mime = asset.get("mime_type") or "application/octet-stream"
|
|
fname = asset.get("original_filename") or abs_p.name
|
|
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
|
|
|
|
|
|
@router.post("/{asset_id}/lifecycle")
|
|
def post_media_asset_lifecycle(
|
|
asset_id: int,
|
|
body: MediaLifecycleBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
return _apply_lifecycle_action(cur, conn, asset_id, body, tenant)
|
|
|
|
|
|
@router.post("/bulk-lifecycle")
|
|
def bulk_media_lifecycle(
|
|
body: MediaBulkLifecycleBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle)
|
|
updated: list[int] = []
|
|
failed: list[dict] = []
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
|
|
try:
|
|
_apply_lifecycle_action(cur, conn, aid, inner, tenant)
|
|
updated.append(aid)
|
|
except HTTPException as he:
|
|
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
|
|
failed.append({"id": aid, "detail": msg})
|
|
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
|
|
|
|
|
|
@router.post("/bulk-patch")
|
|
def bulk_media_patch(
|
|
body: MediaBulkPatchBody,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
|
|
patch_fields = {
|
|
k: v
|
|
for k, v in raw.items()
|
|
if k != "media_asset_ids" and (k == "tags" or v is not None)
|
|
}
|
|
if not patch_fields:
|
|
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
|
|
|
|
updated: list[int] = []
|
|
failed: list[dict] = []
|
|
profile_id = tenant.profile_id
|
|
role = tenant.global_role
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
|
|
try:
|
|
cur.execute(
|
|
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
|
|
copyright_notice, original_filename
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
failed.append({"id": asset_id, "detail": "Medium nicht gefunden"})
|
|
continue
|
|
asset = r2d(row)
|
|
assert_can_edit_media_asset_metadata(cur, tenant, asset)
|
|
|
|
if "tags" in patch_fields and not _media_assets_tags_column_present(cur):
|
|
failed.append(
|
|
{
|
|
"id": asset_id,
|
|
"detail": "Schlagwörter (tags) erfordern DB-Migration 046.",
|
|
}
|
|
)
|
|
continue
|
|
|
|
eff = _effective_media_patch_fields(patch_fields, asset)
|
|
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
|
|
next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id")
|
|
if "visibility" in patch_fields or "club_id" in patch_fields:
|
|
assert_valid_governance_visibility(
|
|
cur,
|
|
profile_id,
|
|
role,
|
|
next_vis,
|
|
int(next_cid) if next_cid is not None else None,
|
|
)
|
|
|
|
sets: list[str] = []
|
|
vals: list[Any] = []
|
|
if "copyright_notice" in patch_fields:
|
|
sets.append("copyright_notice = %s")
|
|
vals.append(patch_fields["copyright_notice"])
|
|
if "original_filename" in patch_fields:
|
|
sets.append("original_filename = %s")
|
|
vals.append(patch_fields["original_filename"])
|
|
if "tags" in patch_fields:
|
|
sets.append("tags = %s")
|
|
vals.append(_normalize_media_tags(patch_fields["tags"]))
|
|
if "visibility" in patch_fields or "club_id" in patch_fields:
|
|
sets.append("visibility = %s")
|
|
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
|
|
sets.append("club_id = %s")
|
|
vals.append(eff.get("club_id"))
|
|
if not sets:
|
|
failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"})
|
|
continue
|
|
sets.append("updated_at = NOW()")
|
|
vals.append(asset_id)
|
|
cur.execute(
|
|
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
|
|
tuple(vals),
|
|
)
|
|
conn.commit()
|
|
updated.append(asset_id)
|
|
except HTTPException as he:
|
|
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
|
|
failed.append({"id": asset_id, "detail": msg})
|
|
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
|
|
|
|
|
|
@router.patch("/{asset_id}")
|
|
def patch_media_asset(
|
|
asset_id: int,
|
|
body: MediaAssetPatch,
|
|
tenant: TenantContext = Depends(get_tenant_context),
|
|
):
|
|
profile_id = tenant.profile_id
|
|
role = tenant.global_role
|
|
data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
|
|
if not data:
|
|
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
|
|
copyright_notice, original_filename
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
|
|
asset = r2d(row)
|
|
assert_can_edit_media_asset_metadata(cur, tenant, asset)
|
|
|
|
if "tags" in data and not _media_assets_tags_column_present(cur):
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.",
|
|
)
|
|
|
|
eff = _effective_media_patch_fields(data, asset)
|
|
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
|
|
next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id")
|
|
if "visibility" in data or "club_id" in data:
|
|
assert_valid_governance_visibility(
|
|
cur,
|
|
profile_id,
|
|
role,
|
|
next_vis,
|
|
int(next_cid) if next_cid is not None else None,
|
|
)
|
|
|
|
sets: list[str] = []
|
|
vals: list[Any] = []
|
|
if "copyright_notice" in data:
|
|
sets.append("copyright_notice = %s")
|
|
vals.append(data["copyright_notice"])
|
|
if "original_filename" in data:
|
|
sets.append("original_filename = %s")
|
|
vals.append(data["original_filename"])
|
|
if "tags" in data:
|
|
sets.append("tags = %s")
|
|
vals.append(_normalize_media_tags(data["tags"]))
|
|
if "visibility" in data or "club_id" in data:
|
|
sets.append("visibility = %s")
|
|
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
|
|
sets.append("club_id = %s")
|
|
vals.append(eff.get("club_id"))
|
|
if sets:
|
|
sets.append("updated_at = NOW()")
|
|
vals.append(asset_id)
|
|
cur.execute(
|
|
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
|
|
tuple(vals),
|
|
)
|
|
conn.commit()
|
|
has_tags = _media_assets_tags_column_present(cur)
|
|
if has_tags:
|
|
cur.execute(
|
|
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
|
|
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
|
|
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice
|
|
FROM media_assets WHERE id = %s""",
|
|
(asset_id,),
|
|
)
|
|
out = r2d(cur.fetchone())
|
|
if not has_tags:
|
|
out["tags"] = []
|
|
return out
|