"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC.""" from __future__ import annotations from typing import Any, Literal, Optional, Union from fastapi import APIRouter, Depends, HTTPException, Query, Request from pydantic import BaseModel, Field, model_validator from club_tenancy import ( assert_valid_governance_visibility, club_ids_for_profile_with_roles, is_platform_admin, is_superadmin, library_content_visible_to_profile, ) from db import get_db, get_cursor, r2d from media_lifecycle import ( LC_ACTIVE, LC_TRASH_HIDDEN, LC_TRASH_SOFT, assert_can_edit_media_asset_metadata, assert_can_manage_media_asset_lifecycle, assert_can_trash_soft, fetch_media_asset_row, purge_media_asset, reactivate_media_asset_from_trash, superadmin_force_lifecycle_state, superadmin_hard_delete_media_asset, transition_recover_from_hidden, transition_to_trash_hidden, transition_to_trash_soft, ) from media_storage import get_effective_media_root, path_under_media_root from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible router = APIRouter(prefix="/api/media-assets", tags=["media-assets"]) class MediaLifecycleBody(BaseModel): action: Literal[ "trash_soft", "trash_hidden", "recover", "purge", "reactivate", "superadmin_force_lifecycle", "superadmin_hard_delete", ] target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None @model_validator(mode="after") def _target_lifecycle_rules(self): if self.action == "superadmin_force_lifecycle": if not self.target_lifecycle: raise ValueError("target_lifecycle ist für diese Aktion erforderlich") elif self.target_lifecycle is not None: raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle") return self class MediaAssetPatch(BaseModel): copyright_notice: Optional[str] = Field(None, max_length=8000) original_filename: Optional[str] = Field(None, max_length=300) visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None tags: Optional[list[str]] = None class MediaBulkLifecycleBody(BaseModel): media_asset_ids: list[int] = Field(..., min_length=1, max_length=200) action: Literal[ "trash_soft", "trash_hidden", "recover", "purge", "reactivate", "superadmin_force_lifecycle", "superadmin_hard_delete", ] target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None @model_validator(mode="after") def _bulk_target(self): if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle: raise ValueError("target_lifecycle ist für diese Aktion erforderlich") if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None: raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle") return self class MediaBulkPatchBody(BaseModel): media_asset_ids: list[int] = Field(..., min_length=1, max_length=200) copyright_notice: Optional[str] = Field(None, max_length=8000) original_filename: Optional[str] = Field(None, max_length=300) visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None tags: Optional[list[str]] = None _LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"}) _MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"}) _MAX_TAGS = 40 _MAX_TAG_LEN = 48 _MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None _TRAINING_UNIT_EXERCISES_TABLE: Optional[bool] = None def _media_assets_tags_column_present(cur: Any) -> bool: """True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema.""" global _MEDIA_ASSETS_TAGS_COLUMN if _MEDIA_ASSETS_TAGS_COLUMN is not None: return _MEDIA_ASSETS_TAGS_COLUMN cur.execute( """ SELECT 1 FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'media_assets' AND column_name = 'tags' LIMIT 1 """ ) _MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None return _MEDIA_ASSETS_TAGS_COLUMN def _training_unit_exercises_table_present(cur: Any) -> bool: """True wenn planning-Migration (training_unit_exercises) im Schema liegt — sonst keine Einheiten-Links.""" global _TRAINING_UNIT_EXERCISES_TABLE if _TRAINING_UNIT_EXERCISES_TABLE is not None: return _TRAINING_UNIT_EXERCISES_TABLE cur.execute( """ SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'training_unit_exercises' LIMIT 1 """ ) _TRAINING_UNIT_EXERCISES_TABLE = cur.fetchone() is not None return _TRAINING_UNIT_EXERCISES_TABLE def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]: """PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive).""" if raw is None: return [] if not isinstance(raw, list): raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein") out: list[str] = [] seen: set[str] = set() for item in raw: s = str(item).strip() if not s: continue s = s[:_MAX_TAG_LEN] key = s.lower() if key in seen: continue seen.add(key) out.append(s) if len(out) >= _MAX_TAGS: break return out def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]: """Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen.""" if not asset_ids: return {} cur.execute( """SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title FROM exercise_media em JOIN exercises e ON e.id = em.exercise_id WHERE em.media_asset_id = ANY(%s) ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""", (asset_ids,), ) ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids} seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids} exercise_ids: set[int] = set() for r in cur.fetchall(): d = r2d(r) aid = int(d["media_asset_id"]) eid = int(d["exercise_id"]) if aid not in seen_ex or eid in seen_ex[aid]: continue seen_ex[aid].add(eid) exercise_ids.add(eid) title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}" ex_by_asset[aid].append({"id": eid, "title": title}) exercise_to_units: dict[int, list[dict]] = {} if exercise_ids and _training_unit_exercises_table_present(cur): cur.execute( """SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date, COALESCE(tg.name, '') AS group_name FROM training_unit_exercises tue JOIN training_units tu ON tu.id = tue.training_unit_id LEFT JOIN training_groups tg ON tg.id = tu.group_id WHERE tue.exercise_id = ANY(%s)""", (list(exercise_ids),), ) for row in cur.fetchall(): d = r2d(row) eid = int(d["exercise_id"]) uid = int(d["unit_id"]) pd = d.get("planned_date") planned: Optional[str] if pd is None: planned = None elif hasattr(pd, "isoformat"): planned = str(pd.isoformat()) else: planned = str(pd) exercise_to_units.setdefault(eid, []).append( { "id": uid, "planned_date": planned, "group_name": d.get("group_name") or "", } ) for eid, units in exercise_to_units.items(): seen_u: set[int] = set() uniq: list[dict] = [] for u in units: if u["id"] not in seen_u: seen_u.add(u["id"]) uniq.append(u) exercise_to_units[eid] = uniq out: dict[int, dict[str, Any]] = {} for aid in asset_ids: aid_i = int(aid) exs = ex_by_asset.get(aid_i, []) seen_u2: set[int] = set() units_agg: list[dict] = [] for ex in exs: for u in exercise_to_units.get(ex["id"], []): if u["id"] not in seen_u2: seen_u2.add(u["id"]) units_agg.append(u) out[aid_i] = {"exercises": exs, "training_units": units_agg} return out def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]: cur.execute( """SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email FROM media_assets ma LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id WHERE ma.uploaded_by_profile_id IS NOT NULL AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden') AND ( %s OR lower(trim(ma.visibility)) = 'official' OR ( lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s ) OR ( lower(trim(ma.visibility)) = 'club' AND EXISTS ( SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id AND cm.status = 'active' ) ) ) ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id LIMIT 400""", (is_adm, profile_id, profile_id), ) rows = [r2d(x) for x in cur.fetchall()] out: list[dict] = [] for r in rows: pid = r.get("id") if pid is None: continue label = (r.get("name") or "").strip() if not label: label = (r.get("email") or "").strip() if not label: label = f"Profil #{pid}" out.append({"id": int(pid), "label": label}) return out def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict: """Nach visibility-Wechsel club_id konsistent setzen (official/private → NULL).""" eff = dict(patch_fields) if eff.get("visibility") is not None: v = str(eff["visibility"]).strip().lower() if v in ("official", "private"): eff["club_id"] = None elif v == "club" and "club_id" not in eff: eff["club_id"] = asset.get("club_id") return eff def _lifecycle_where_sql(lifecycle: str) -> str: lc = (lifecycle or "active").strip().lower() if lc not in _LIFECYCLE_LIST_FILTERS: raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter") if lc == "active": return "ma.lifecycle_state = 'active'" if lc == "trash_soft": return "ma.lifecycle_state = 'trash_soft'" if lc == "trash_hidden": return "ma.lifecycle_state = 'trash_hidden'" return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')" def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]: cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, storage_key, mime_type, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() return r2d(row) if row else None def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None: if not library_content_visible_to_profile( cur, tenant.profile_id, (asset.get("visibility") or "").strip().lower(), asset.get("club_id"), asset.get("uploaded_by_profile_id"), tenant.global_role, ): raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium") def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict: """Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB).""" role_raw = tenant.global_role role = (role_raw or "").strip().lower() pid = int(tenant.profile_id) sup = is_superadmin(role_raw) plat = is_platform_admin(role_raw) vis = (row.get("visibility") or "private").strip().lower() uid = row.get("uploaded_by_profile_id") cid = row.get("club_id") lc = (row.get("lifecycle_state") or "active").strip().lower() is_owner = uid is not None and int(uid) == pid club_mgr = plat or (cid is not None and int(cid) in admin_club_ids) edit_metadata = ( sup or (vis == "official" and plat) or (vis == "club" and club_mgr) or (vis == "private" and is_owner) ) trash_soft = lc == "active" and ( sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr) ) if vis == "official" and not (sup or plat): trash_soft = False can_manage_adv = ( sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr) ) trash_hidden = lc in ("active", "trash_soft") and can_manage_adv recover_from_hidden = lc == "trash_hidden" and can_manage_adv reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv purge = lc == "trash_hidden" and sup return { "edit_metadata": edit_metadata, "change_visibility": edit_metadata, "trash_soft": trash_soft, "trash_hidden": trash_hidden, "recover": recover_from_hidden, "reactivate": reactivate, "purge": purge, "superadmin_lifecycle": sup, "superadmin_hard_delete": sup, } def _apply_lifecycle_action( cur: Any, conn: Any, asset_id: int, body: MediaLifecycleBody, tenant: TenantContext, ) -> dict: asset = fetch_media_asset_row(cur, asset_id) if not asset: raise HTTPException(status_code=404, detail="Medium nicht gefunden") action = body.action role_raw = tenant.global_role if action == "superadmin_hard_delete": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Nur Superadmin") ok = superadmin_hard_delete_media_asset(cur, conn, asset_id) if not ok: raise HTTPException(status_code=404, detail="Medium nicht gefunden") return {"ok": True, "hard_deleted": asset_id} if action == "superadmin_force_lifecycle": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Nur Superadmin") tl = body.target_lifecycle or "active" mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN} return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl]) if action == "purge": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin") state = (asset.get("lifecycle_state") or "").strip().lower() if state != LC_TRASH_HIDDEN: raise HTTPException( status_code=400, detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden", ) if not purge_media_asset(cur, conn, asset_id): raise HTTPException(status_code=400, detail="Löschen nicht möglich") return {"ok": True, "purged": asset_id} if action == "trash_soft": assert_can_trash_soft(cur, tenant, asset) return transition_to_trash_soft(cur, conn, asset_id) if action == "trash_hidden": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return transition_to_trash_hidden(cur, conn, asset_id) if action == "recover": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return transition_recover_from_hidden(cur, conn, asset_id) if action == "reactivate": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return reactivate_media_asset_from_trash(cur, conn, asset_id) raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action") @router.get("") def list_media_assets( tenant: TenantContext = Depends(get_tenant_context), q: Optional[str] = Query(None, max_length=120), lifecycle: str = Query( "active", description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)", ), media_kind: str = Query( "all", description="all | image | video | pdf | other", ), club_id: Optional[int] = Query( None, ge=1, description="Nur Superadmin: nach Verein filtern", ), uploaded_by: Optional[int] = Query( None, ge=1, description="Nach Uploader (Profil-ID), wenn erlaubt", ), include_filter_meta: bool = Query( False, description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)", ), limit: int = Query(30, ge=1, le=100), offset: int = Query(0, ge=0), ): lc_where = _lifecycle_where_sql(lifecycle) mk = (media_kind or "all").strip().lower() if mk not in _MEDIA_KIND_FILTERS: raise HTTPException(status_code=400, detail="Ungültiger media_kind") role = tenant.global_role or "" is_adm = is_platform_admin(role) sup = is_superadmin(role) profile_id = tenant.profile_id needle = (q or "").strip() if club_id is not None and not sup: raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin") media_kind_sql = "" if mk == "image": media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%'" elif mk == "video": media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%'" elif mk == "pdf": media_kind_sql = ( " AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'" " OR lower(COALESCE(ma.mime_type, '')) LIKE '%pdf%')" ) elif mk == "other": media_kind_sql = ( " AND COALESCE(ma.mime_type, '') <> ''" " AND lower(ma.mime_type) NOT LIKE 'image/%'" " AND lower(ma.mime_type) NOT LIKE 'video/%'" " AND lower(ma.mime_type) <> 'application/pdf'" " AND lower(ma.mime_type) NOT LIKE '%pdf%'" ) club_sql = "" club_sql_params: list[Any] = [] if club_id is not None: club_sql = " AND ma.club_id = %s" club_sql_params.append(club_id) with get_db() as conn: cur = get_cursor(conn) admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin") show_uploader = sup or is_adm or bool(admin_club_ids) if uploaded_by is not None and not show_uploader: raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt") has_tags_col = _media_assets_tags_column_present(cur) tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags," uploaded_sql = "" uploaded_params: list[Any] = [] if uploaded_by is not None: uploaded_sql = " AND ma.uploaded_by_profile_id = %s" uploaded_params.append(uploaded_by) search_sql = "" search_params: list[Any] = [] if needle: like = f"%{needle}%" if has_tags_col: search_params = [like, like, like, like] search_sql = ( " AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s" " OR COALESCE(ma.copyright_notice, '') ILIKE %s" " OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))" ) else: search_params = [like, like, like] search_sql = ( " AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s" " OR COALESCE(ma.copyright_notice, '') ILIKE %s)" ) params: list[Any] = ( [is_adm, profile_id, profile_id] + club_sql_params + uploaded_params + search_params + [limit, offset] ) cur.execute( f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id, ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256, ma.copyright_notice, ma.storage_key, {tags_select} pr.name AS uploader_name, pr.email AS uploader_email, cl.name AS club_name FROM media_assets ma LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id LEFT JOIN clubs cl ON cl.id = ma.club_id WHERE {lc_where} AND ( %s OR lower(trim(ma.visibility)) = 'official' OR ( lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s ) OR ( lower(trim(ma.visibility)) = 'club' AND EXISTS ( SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id AND cm.status = 'active' ) ) ) {club_sql} {uploaded_sql} {media_kind_sql} {search_sql} ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC LIMIT %s OFFSET %s""", params, ) rows = [r2d(r) for r in cur.fetchall()] show_club = sup or is_adm asset_ids = [int(r["id"]) for r in rows] usage_map = _usage_for_media_assets(cur, asset_ids) for r in rows: r["permissions"] = _item_permissions(r, tenant, admin_club_ids) tid = int(r["id"]) r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []}) tags_val = r.get("tags") if tags_val is None: r["tags"] = [] elif not isinstance(tags_val, list): r["tags"] = list(tags_val) if tags_val else [] if not show_uploader: r["uploader_name"] = None r["uploader_email"] = None if not show_club: r["club_name"] = None viewer = { "show_uploader_meta": show_uploader, "show_club_meta": show_club, "is_superadmin": sup, "is_platform_admin": is_adm, } filter_meta = None if include_filter_meta and show_uploader: filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)} return { "items": rows, "limit": limit, "offset": offset, "lifecycle": lifecycle.strip().lower(), "media_kind": mk, "viewer": viewer, "filter_meta": filter_meta, } @router.api_route("/{asset_id}/file", methods=["GET", "HEAD"]) def download_media_asset_file( request: Request, asset_id: int, tenant: TenantContext = Depends(get_tenant_context_flexible), ): """Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken).""" from routers.exercises import _binary_media_response with get_db() as conn: cur = get_cursor(conn) asset = _fetch_asset_file_row(cur, asset_id) if not asset: raise HTTPException(status_code=404, detail="Medium nicht gefunden") lc = (asset.get("lifecycle_state") or "").strip().lower() if lc == "active": _assert_can_view_archive_asset(cur, tenant, asset) elif lc in ("trash_soft", "trash_hidden"): assert_can_manage_media_asset_lifecycle(cur, tenant, asset) else: raise HTTPException(status_code=404, detail="Medium nicht verfügbar") sk = asset.get("storage_key") if not sk: raise HTTPException(status_code=404, detail="Keine Datei hinterlegt") media_root = get_effective_media_root(cur) abs_p = path_under_media_root(media_root, str(sk)) if not abs_p or not abs_p.is_file(): raise HTTPException(status_code=404, detail="Datei nicht gefunden") mime = asset.get("mime_type") or "application/octet-stream" fname = asset.get("original_filename") or abs_p.name return _binary_media_response(abs_p, mime, str(fname) if fname else None, request) @router.post("/{asset_id}/lifecycle") def post_media_asset_lifecycle( asset_id: int, body: MediaLifecycleBody, tenant: TenantContext = Depends(get_tenant_context), ): with get_db() as conn: cur = get_cursor(conn) return _apply_lifecycle_action(cur, conn, asset_id, body, tenant) @router.post("/bulk-lifecycle") def bulk_media_lifecycle( body: MediaBulkLifecycleBody, tenant: TenantContext = Depends(get_tenant_context), ): inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle) updated: list[int] = [] failed: list[dict] = [] with get_db() as conn: cur = get_cursor(conn) for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)): try: _apply_lifecycle_action(cur, conn, aid, inner, tenant) updated.append(aid) except HTTPException as he: msg = he.detail if isinstance(he.detail, str) else str(he.detail) failed.append({"id": aid, "detail": msg}) return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)} @router.post("/bulk-patch") def bulk_media_patch( body: MediaBulkPatchBody, tenant: TenantContext = Depends(get_tenant_context), ): raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True) patch_fields = { k: v for k, v in raw.items() if k != "media_asset_ids" and (k == "tags" or v is not None) } if not patch_fields: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") updated: list[int] = [] failed: list[dict] = [] profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)): try: cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, copyright_notice, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() if not row: failed.append({"id": asset_id, "detail": "Medium nicht gefunden"}) continue asset = r2d(row) assert_can_edit_media_asset_metadata(cur, tenant, asset) if "tags" in patch_fields and not _media_assets_tags_column_present(cur): failed.append( { "id": asset_id, "detail": "Schlagwörter (tags) erfordern DB-Migration 046.", } ) continue eff = _effective_media_patch_fields(patch_fields, asset) next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower() next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id") if "visibility" in patch_fields or "club_id" in patch_fields: assert_valid_governance_visibility( cur, profile_id, role, next_vis, int(next_cid) if next_cid is not None else None, ) sets: list[str] = [] vals: list[Any] = [] if "copyright_notice" in patch_fields: sets.append("copyright_notice = %s") vals.append(patch_fields["copyright_notice"]) if "original_filename" in patch_fields: sets.append("original_filename = %s") vals.append(patch_fields["original_filename"]) if "tags" in patch_fields: sets.append("tags = %s") vals.append(_normalize_media_tags(patch_fields["tags"])) if "visibility" in patch_fields or "club_id" in patch_fields: sets.append("visibility = %s") vals.append(str(eff.get("visibility", asset["visibility"])).strip()) sets.append("club_id = %s") vals.append(eff.get("club_id")) if not sets: failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"}) continue sets.append("updated_at = NOW()") vals.append(asset_id) cur.execute( f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s", tuple(vals), ) conn.commit() updated.append(asset_id) except HTTPException as he: msg = he.detail if isinstance(he.detail, str) else str(he.detail) failed.append({"id": asset_id, "detail": msg}) return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)} @router.patch("/{asset_id}") def patch_media_asset( asset_id: int, body: MediaAssetPatch, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True) if not data: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, copyright_notice, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Medium nicht gefunden") asset = r2d(row) assert_can_edit_media_asset_metadata(cur, tenant, asset) if "tags" in data and not _media_assets_tags_column_present(cur): raise HTTPException( status_code=503, detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.", ) eff = _effective_media_patch_fields(data, asset) next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower() next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id") if "visibility" in data or "club_id" in data: assert_valid_governance_visibility( cur, profile_id, role, next_vis, int(next_cid) if next_cid is not None else None, ) sets: list[str] = [] vals: list[Any] = [] if "copyright_notice" in data: sets.append("copyright_notice = %s") vals.append(data["copyright_notice"]) if "original_filename" in data: sets.append("original_filename = %s") vals.append(data["original_filename"]) if "tags" in data: sets.append("tags = %s") vals.append(_normalize_media_tags(data["tags"])) if "visibility" in data or "club_id" in data: sets.append("visibility = %s") vals.append(str(eff.get("visibility", asset["visibility"])).strip()) sets.append("club_id = %s") vals.append(eff.get("club_id")) if sets: sets.append("updated_at = NOW()") vals.append(asset_id) cur.execute( f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s", tuple(vals), ) conn.commit() has_tags = _media_assets_tags_column_present(cur) if has_tags: cur.execute( """SELECT id, mime_type, byte_size, original_filename, visibility, club_id, uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags FROM media_assets WHERE id = %s""", (asset_id,), ) else: cur.execute( """SELECT id, mime_type, byte_size, original_filename, visibility, club_id, uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice FROM media_assets WHERE id = %s""", (asset_id,), ) out = r2d(cur.fetchone()) if not has_tags: out["tags"] = [] return out