shinkan-jinkendo/backend/routers/media_assets.py
Lars 084ac38c2e
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Test Suite / pytest-backend (push) Successful in 24s
Test Suite / lint-backend (push) Successful in 1s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 24s
feat: add support for media asset tags and conditional database handling
- Introduced a function to check for the presence of the 'tags' column in the media_assets table, ensuring compatibility with database migrations.
- Updated media asset listing and patching functions to conditionally include tags based on the presence of the 'tags' column.
- Enhanced error handling for operations involving tags, providing clear feedback when the required database migration has not been performed.
- Improved search functionality to account for the presence of tags, optimizing queries based on available database schema.
2026-05-07 16:20:51 +02:00

857 lines
32 KiB
Python

"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC."""
from __future__ import annotations
from typing import Any, Literal, Optional, Union
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from pydantic import BaseModel, Field, model_validator
from club_tenancy import (
assert_valid_governance_visibility,
club_ids_for_profile_with_roles,
is_platform_admin,
is_superadmin,
library_content_visible_to_profile,
)
from db import get_db, get_cursor, r2d
from media_lifecycle import (
LC_ACTIVE,
LC_TRASH_HIDDEN,
LC_TRASH_SOFT,
assert_can_edit_media_asset_metadata,
assert_can_manage_media_asset_lifecycle,
assert_can_trash_soft,
fetch_media_asset_row,
purge_media_asset,
reactivate_media_asset_from_trash,
superadmin_force_lifecycle_state,
superadmin_hard_delete_media_asset,
transition_recover_from_hidden,
transition_to_trash_hidden,
transition_to_trash_soft,
)
from media_storage import get_effective_media_root, path_under_media_root
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible
router = APIRouter(prefix="/api/media-assets", tags=["media-assets"])
class MediaLifecycleBody(BaseModel):
action: Literal[
"trash_soft",
"trash_hidden",
"recover",
"purge",
"reactivate",
"superadmin_force_lifecycle",
"superadmin_hard_delete",
]
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
@model_validator(mode="after")
def _target_lifecycle_rules(self):
if self.action == "superadmin_force_lifecycle":
if not self.target_lifecycle:
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
elif self.target_lifecycle is not None:
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
return self
class MediaAssetPatch(BaseModel):
copyright_notice: Optional[str] = Field(None, max_length=8000)
original_filename: Optional[str] = Field(None, max_length=300)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
tags: Optional[list[str]] = None
class MediaBulkLifecycleBody(BaseModel):
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
action: Literal[
"trash_soft",
"trash_hidden",
"recover",
"purge",
"reactivate",
"superadmin_force_lifecycle",
"superadmin_hard_delete",
]
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
@model_validator(mode="after")
def _bulk_target(self):
if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle:
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None:
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
return self
class MediaBulkPatchBody(BaseModel):
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
copyright_notice: Optional[str] = Field(None, max_length=8000)
original_filename: Optional[str] = Field(None, max_length=300)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
tags: Optional[list[str]] = None
_LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"})
_MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"})
_MAX_TAGS = 40
_MAX_TAG_LEN = 48
_MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None
def _media_assets_tags_column_present(cur: Any) -> bool:
"""True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema."""
global _MEDIA_ASSETS_TAGS_COLUMN
if _MEDIA_ASSETS_TAGS_COLUMN is not None:
return _MEDIA_ASSETS_TAGS_COLUMN
cur.execute(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'media_assets'
AND column_name = 'tags'
LIMIT 1
"""
)
_MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None
return _MEDIA_ASSETS_TAGS_COLUMN
def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]:
"""PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive)."""
if raw is None:
return []
if not isinstance(raw, list):
raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein")
out: list[str] = []
seen: set[str] = set()
for item in raw:
s = str(item).strip()
if not s:
continue
s = s[:_MAX_TAG_LEN]
key = s.lower()
if key in seen:
continue
seen.add(key)
out.append(s)
if len(out) >= _MAX_TAGS:
break
return out
def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]:
"""Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen."""
if not asset_ids:
return {}
cur.execute(
"""SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title
FROM exercise_media em
JOIN exercises e ON e.id = em.exercise_id
WHERE em.media_asset_id = ANY(%s)
ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""",
(asset_ids,),
)
ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids}
seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids}
exercise_ids: set[int] = set()
for r in cur.fetchall():
d = r2d(r)
aid = int(d["media_asset_id"])
eid = int(d["exercise_id"])
if aid not in seen_ex or eid in seen_ex[aid]:
continue
seen_ex[aid].add(eid)
exercise_ids.add(eid)
title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}"
ex_by_asset[aid].append({"id": eid, "title": title})
exercise_to_units: dict[int, list[dict]] = {}
if exercise_ids:
cur.execute(
"""SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date,
COALESCE(tg.name, '') AS group_name
FROM training_unit_exercises tue
JOIN training_units tu ON tu.id = tue.training_unit_id
LEFT JOIN training_groups tg ON tg.id = tu.group_id
WHERE tue.exercise_id = ANY(%s)""",
(list(exercise_ids),),
)
for row in cur.fetchall():
d = r2d(row)
eid = int(d["exercise_id"])
uid = int(d["unit_id"])
pd = d.get("planned_date")
planned: Optional[str]
if pd is None:
planned = None
elif hasattr(pd, "isoformat"):
planned = str(pd.isoformat())
else:
planned = str(pd)
exercise_to_units.setdefault(eid, []).append(
{
"id": uid,
"planned_date": planned,
"group_name": d.get("group_name") or "",
}
)
for eid, units in exercise_to_units.items():
seen_u: set[int] = set()
uniq: list[dict] = []
for u in units:
if u["id"] not in seen_u:
seen_u.add(u["id"])
uniq.append(u)
exercise_to_units[eid] = uniq
out: dict[int, dict[str, Any]] = {}
for aid in asset_ids:
aid_i = int(aid)
exs = ex_by_asset.get(aid_i, [])
seen_u2: set[int] = set()
units_agg: list[dict] = []
for ex in exs:
for u in exercise_to_units.get(ex["id"], []):
if u["id"] not in seen_u2:
seen_u2.add(u["id"])
units_agg.append(u)
out[aid_i] = {"exercises": exs, "training_units": units_agg}
return out
def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]:
cur.execute(
"""SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email
FROM media_assets ma
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
WHERE ma.uploaded_by_profile_id IS NOT NULL
AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')
AND (
%s
OR lower(trim(ma.visibility)) = 'official'
OR (
lower(trim(ma.visibility)) = 'private'
AND ma.uploaded_by_profile_id = %s
)
OR (
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)
)
ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id
LIMIT 400""",
(is_adm, profile_id, profile_id),
)
rows = [r2d(x) for x in cur.fetchall()]
out: list[dict] = []
for r in rows:
pid = r.get("id")
if pid is None:
continue
label = (r.get("name") or "").strip()
if not label:
label = (r.get("email") or "").strip()
if not label:
label = f"Profil #{pid}"
out.append({"id": int(pid), "label": label})
return out
def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict:
"""Nach visibility-Wechsel club_id konsistent setzen (official/private → NULL)."""
eff = dict(patch_fields)
if eff.get("visibility") is not None:
v = str(eff["visibility"]).strip().lower()
if v in ("official", "private"):
eff["club_id"] = None
elif v == "club" and "club_id" not in eff:
eff["club_id"] = asset.get("club_id")
return eff
def _lifecycle_where_sql(lifecycle: str) -> str:
lc = (lifecycle or "active").strip().lower()
if lc not in _LIFECYCLE_LIST_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter")
if lc == "active":
return "ma.lifecycle_state = 'active'"
if lc == "trash_soft":
return "ma.lifecycle_state = 'trash_soft'"
if lc == "trash_hidden":
return "ma.lifecycle_state = 'trash_hidden'"
return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')"
def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
storage_key, mime_type, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
return r2d(row) if row else None
def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None:
if not library_content_visible_to_profile(
cur,
tenant.profile_id,
(asset.get("visibility") or "").strip().lower(),
asset.get("club_id"),
asset.get("uploaded_by_profile_id"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium")
def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict:
"""Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB)."""
role_raw = tenant.global_role
role = (role_raw or "").strip().lower()
pid = int(tenant.profile_id)
sup = is_superadmin(role_raw)
plat = is_platform_admin(role_raw)
vis = (row.get("visibility") or "private").strip().lower()
uid = row.get("uploaded_by_profile_id")
cid = row.get("club_id")
lc = (row.get("lifecycle_state") or "active").strip().lower()
is_owner = uid is not None and int(uid) == pid
club_mgr = plat or (cid is not None and int(cid) in admin_club_ids)
edit_metadata = (
sup
or (vis == "official" and plat)
or (vis == "club" and club_mgr)
or (vis == "private" and is_owner)
)
trash_soft = lc == "active" and (
sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr)
)
if vis == "official" and not (sup or plat):
trash_soft = False
can_manage_adv = (
sup
or plat
or (vis == "private" and is_owner)
or (vis == "club" and club_mgr)
)
trash_hidden = lc in ("active", "trash_soft") and can_manage_adv
recover_from_hidden = lc == "trash_hidden" and can_manage_adv
reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv
purge = lc == "trash_hidden" and sup
return {
"edit_metadata": edit_metadata,
"change_visibility": edit_metadata,
"trash_soft": trash_soft,
"trash_hidden": trash_hidden,
"recover": recover_from_hidden,
"reactivate": reactivate,
"purge": purge,
"superadmin_lifecycle": sup,
"superadmin_hard_delete": sup,
}
def _apply_lifecycle_action(
cur: Any,
conn: Any,
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext,
) -> dict:
asset = fetch_media_asset_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
action = body.action
role_raw = tenant.global_role
if action == "superadmin_hard_delete":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Nur Superadmin")
ok = superadmin_hard_delete_media_asset(cur, conn, asset_id)
if not ok:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
return {"ok": True, "hard_deleted": asset_id}
if action == "superadmin_force_lifecycle":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Nur Superadmin")
tl = body.target_lifecycle or "active"
mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN}
return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl])
if action == "purge":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin")
state = (asset.get("lifecycle_state") or "").strip().lower()
if state != LC_TRASH_HIDDEN:
raise HTTPException(
status_code=400,
detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden",
)
if not purge_media_asset(cur, conn, asset_id):
raise HTTPException(status_code=400, detail="Löschen nicht möglich")
return {"ok": True, "purged": asset_id}
if action == "trash_soft":
assert_can_trash_soft(cur, tenant, asset)
return transition_to_trash_soft(cur, conn, asset_id)
if action == "trash_hidden":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return transition_to_trash_hidden(cur, conn, asset_id)
if action == "recover":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return transition_recover_from_hidden(cur, conn, asset_id)
if action == "reactivate":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return reactivate_media_asset_from_trash(cur, conn, asset_id)
raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action")
@router.get("")
def list_media_assets(
tenant: TenantContext = Depends(get_tenant_context),
q: Optional[str] = Query(None, max_length=120),
lifecycle: str = Query(
"active",
description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)",
),
media_kind: str = Query(
"all",
description="all | image | video | pdf | other",
),
club_id: Optional[int] = Query(
None,
ge=1,
description="Nur Superadmin: nach Verein filtern",
),
uploaded_by: Optional[int] = Query(
None,
ge=1,
description="Nach Uploader (Profil-ID), wenn erlaubt",
),
include_filter_meta: bool = Query(
False,
description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)",
),
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
):
lc_where = _lifecycle_where_sql(lifecycle)
mk = (media_kind or "all").strip().lower()
if mk not in _MEDIA_KIND_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger media_kind")
role = tenant.global_role or ""
is_adm = is_platform_admin(role)
sup = is_superadmin(role)
profile_id = tenant.profile_id
needle = (q or "").strip()
if club_id is not None and not sup:
raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin")
media_kind_sql = ""
if mk == "image":
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%'"
elif mk == "video":
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%'"
elif mk == "pdf":
media_kind_sql = (
" AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'"
" OR lower(COALESCE(ma.mime_type, '')) LIKE '%pdf%')"
)
elif mk == "other":
media_kind_sql = (
" AND COALESCE(ma.mime_type, '') <> ''"
" AND lower(ma.mime_type) NOT LIKE 'image/%'"
" AND lower(ma.mime_type) NOT LIKE 'video/%'"
" AND lower(ma.mime_type) <> 'application/pdf'"
" AND lower(ma.mime_type) NOT LIKE '%pdf%'"
)
club_sql = ""
club_sql_params: list[Any] = []
if club_id is not None:
club_sql = " AND ma.club_id = %s"
club_sql_params.append(club_id)
with get_db() as conn:
cur = get_cursor(conn)
admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin")
show_uploader = sup or is_adm or bool(admin_club_ids)
if uploaded_by is not None and not show_uploader:
raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt")
has_tags_col = _media_assets_tags_column_present(cur)
tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags,"
uploaded_sql = ""
uploaded_params: list[Any] = []
if uploaded_by is not None:
uploaded_sql = " AND ma.uploaded_by_profile_id = %s"
uploaded_params.append(uploaded_by)
search_sql = ""
search_params: list[Any] = []
if needle:
like = f"%{needle}%"
if has_tags_col:
search_params = [like, like, like, like]
search_sql = (
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
" OR COALESCE(ma.copyright_notice, '') ILIKE %s"
" OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))"
)
else:
search_params = [like, like, like]
search_sql = (
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
" OR COALESCE(ma.copyright_notice, '') ILIKE %s)"
)
params: list[Any] = (
[is_adm, profile_id, profile_id]
+ club_sql_params
+ uploaded_params
+ search_params
+ [limit, offset]
)
cur.execute(
f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id,
ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256,
ma.copyright_notice, ma.storage_key, {tags_select}
pr.name AS uploader_name,
pr.email AS uploader_email,
cl.name AS club_name
FROM media_assets ma
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
LEFT JOIN clubs cl ON cl.id = ma.club_id
WHERE {lc_where}
AND (
%s
OR lower(trim(ma.visibility)) = 'official'
OR (
lower(trim(ma.visibility)) = 'private'
AND ma.uploaded_by_profile_id = %s
)
OR (
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)
)
{club_sql}
{uploaded_sql}
{media_kind_sql}
{search_sql}
ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC
LIMIT %s OFFSET %s""",
params,
)
rows = [r2d(r) for r in cur.fetchall()]
show_club = sup or is_adm
asset_ids = [int(r["id"]) for r in rows]
usage_map = _usage_for_media_assets(cur, asset_ids)
for r in rows:
r["permissions"] = _item_permissions(r, tenant, admin_club_ids)
tid = int(r["id"])
r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []})
tags_val = r.get("tags")
if tags_val is None:
r["tags"] = []
elif not isinstance(tags_val, list):
r["tags"] = list(tags_val) if tags_val else []
if not show_uploader:
r["uploader_name"] = None
r["uploader_email"] = None
if not show_club:
r["club_name"] = None
viewer = {
"show_uploader_meta": show_uploader,
"show_club_meta": show_club,
"is_superadmin": sup,
"is_platform_admin": is_adm,
}
filter_meta = None
if include_filter_meta and show_uploader:
filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)}
return {
"items": rows,
"limit": limit,
"offset": offset,
"lifecycle": lifecycle.strip().lower(),
"media_kind": mk,
"viewer": viewer,
"filter_meta": filter_meta,
}
@router.api_route("/{asset_id}/file", methods=["GET", "HEAD"])
def download_media_asset_file(
request: Request,
asset_id: int,
tenant: TenantContext = Depends(get_tenant_context_flexible),
):
"""Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken)."""
from routers.exercises import _binary_media_response
with get_db() as conn:
cur = get_cursor(conn)
asset = _fetch_asset_file_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
lc = (asset.get("lifecycle_state") or "").strip().lower()
if lc == "active":
_assert_can_view_archive_asset(cur, tenant, asset)
elif lc in ("trash_soft", "trash_hidden"):
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
else:
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
sk = asset.get("storage_key")
if not sk:
raise HTTPException(status_code=404, detail="Keine Datei hinterlegt")
media_root = get_effective_media_root(cur)
abs_p = path_under_media_root(media_root, str(sk))
if not abs_p or not abs_p.is_file():
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
mime = asset.get("mime_type") or "application/octet-stream"
fname = asset.get("original_filename") or abs_p.name
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
@router.post("/{asset_id}/lifecycle")
def post_media_asset_lifecycle(
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
with get_db() as conn:
cur = get_cursor(conn)
return _apply_lifecycle_action(cur, conn, asset_id, body, tenant)
@router.post("/bulk-lifecycle")
def bulk_media_lifecycle(
body: MediaBulkLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle)
updated: list[int] = []
failed: list[dict] = []
with get_db() as conn:
cur = get_cursor(conn)
for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
try:
_apply_lifecycle_action(cur, conn, aid, inner, tenant)
updated.append(aid)
except HTTPException as he:
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
failed.append({"id": aid, "detail": msg})
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
@router.post("/bulk-patch")
def bulk_media_patch(
body: MediaBulkPatchBody,
tenant: TenantContext = Depends(get_tenant_context),
):
raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
patch_fields = {
k: v
for k, v in raw.items()
if k != "media_asset_ids" and (k == "tags" or v is not None)
}
if not patch_fields:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
updated: list[int] = []
failed: list[dict] = []
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
try:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
failed.append({"id": asset_id, "detail": "Medium nicht gefunden"})
continue
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
if "tags" in patch_fields and not _media_assets_tags_column_present(cur):
failed.append(
{
"id": asset_id,
"detail": "Schlagwörter (tags) erfordern DB-Migration 046.",
}
)
continue
eff = _effective_media_patch_fields(patch_fields, asset)
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id")
if "visibility" in patch_fields or "club_id" in patch_fields:
assert_valid_governance_visibility(
cur,
profile_id,
role,
next_vis,
int(next_cid) if next_cid is not None else None,
)
sets: list[str] = []
vals: list[Any] = []
if "copyright_notice" in patch_fields:
sets.append("copyright_notice = %s")
vals.append(patch_fields["copyright_notice"])
if "original_filename" in patch_fields:
sets.append("original_filename = %s")
vals.append(patch_fields["original_filename"])
if "tags" in patch_fields:
sets.append("tags = %s")
vals.append(_normalize_media_tags(patch_fields["tags"]))
if "visibility" in patch_fields or "club_id" in patch_fields:
sets.append("visibility = %s")
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
sets.append("club_id = %s")
vals.append(eff.get("club_id"))
if not sets:
failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"})
continue
sets.append("updated_at = NOW()")
vals.append(asset_id)
cur.execute(
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
conn.commit()
updated.append(asset_id)
except HTTPException as he:
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
failed.append({"id": asset_id, "detail": msg})
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
@router.patch("/{asset_id}")
def patch_media_asset(
asset_id: int,
body: MediaAssetPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
if "tags" in data and not _media_assets_tags_column_present(cur):
raise HTTPException(
status_code=503,
detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.",
)
eff = _effective_media_patch_fields(data, asset)
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id")
if "visibility" in data or "club_id" in data:
assert_valid_governance_visibility(
cur,
profile_id,
role,
next_vis,
int(next_cid) if next_cid is not None else None,
)
sets: list[str] = []
vals: list[Any] = []
if "copyright_notice" in data:
sets.append("copyright_notice = %s")
vals.append(data["copyright_notice"])
if "original_filename" in data:
sets.append("original_filename = %s")
vals.append(data["original_filename"])
if "tags" in data:
sets.append("tags = %s")
vals.append(_normalize_media_tags(data["tags"]))
if "visibility" in data or "club_id" in data:
sets.append("visibility = %s")
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
sets.append("club_id = %s")
vals.append(eff.get("club_id"))
if sets:
sets.append("updated_at = NOW()")
vals.append(asset_id)
cur.execute(
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
conn.commit()
has_tags = _media_assets_tags_column_present(cur)
if has_tags:
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags
FROM media_assets WHERE id = %s""",
(asset_id,),
)
else:
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice
FROM media_assets WHERE id = %s""",
(asset_id,),
)
out = r2d(cur.fetchone())
if not has_tags:
out["tags"] = []
return out