"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC.""" from __future__ import annotations from typing import Any, Literal, Optional from fastapi import APIRouter, Depends, HTTPException, Query, Request from pydantic import BaseModel, Field, model_validator from club_tenancy import ( assert_valid_governance_visibility, club_ids_for_profile_with_roles, is_platform_admin, is_superadmin, library_content_visible_to_profile, ) from db import get_db, get_cursor, r2d from media_lifecycle import ( LC_ACTIVE, LC_TRASH_HIDDEN, LC_TRASH_SOFT, assert_can_edit_media_asset_metadata, assert_can_manage_media_asset_lifecycle, assert_can_trash_soft, fetch_media_asset_row, purge_media_asset, reactivate_media_asset_from_trash, superadmin_force_lifecycle_state, superadmin_hard_delete_media_asset, transition_recover_from_hidden, transition_to_trash_hidden, transition_to_trash_soft, ) from media_storage import get_effective_media_root, path_under_media_root from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible router = APIRouter(prefix="/api/media-assets", tags=["media-assets"]) class MediaLifecycleBody(BaseModel): action: Literal[ "trash_soft", "trash_hidden", "recover", "purge", "reactivate", "superadmin_force_lifecycle", "superadmin_hard_delete", ] target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None @model_validator(mode="after") def _target_lifecycle_rules(self): if self.action == "superadmin_force_lifecycle": if not self.target_lifecycle: raise ValueError("target_lifecycle ist für diese Aktion erforderlich") elif self.target_lifecycle is not None: raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle") return self class MediaAssetPatch(BaseModel): copyright_notice: Optional[str] = Field(None, max_length=8000) original_filename: Optional[str] = Field(None, max_length=300) visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None class MediaBulkLifecycleBody(BaseModel): media_asset_ids: list[int] = Field(..., min_length=1, max_length=200) action: Literal[ "trash_soft", "trash_hidden", "recover", "purge", "reactivate", "superadmin_force_lifecycle", "superadmin_hard_delete", ] target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None @model_validator(mode="after") def _bulk_target(self): if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle: raise ValueError("target_lifecycle ist für diese Aktion erforderlich") if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None: raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle") return self class MediaBulkPatchBody(BaseModel): media_asset_ids: list[int] = Field(..., min_length=1, max_length=200) copyright_notice: Optional[str] = Field(None, max_length=8000) original_filename: Optional[str] = Field(None, max_length=300) visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None _LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"}) def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict: """Nach visibility-Wechsel club_id konsistent setzen (official/private → NULL).""" eff = dict(patch_fields) if eff.get("visibility") is not None: v = str(eff["visibility"]).strip().lower() if v in ("official", "private"): eff["club_id"] = None elif v == "club" and "club_id" not in eff: eff["club_id"] = asset.get("club_id") return eff def _lifecycle_where_sql(lifecycle: str) -> str: lc = (lifecycle or "active").strip().lower() if lc not in _LIFECYCLE_LIST_FILTERS: raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter") if lc == "active": return "ma.lifecycle_state = 'active'" if lc == "trash_soft": return "ma.lifecycle_state = 'trash_soft'" if lc == "trash_hidden": return "ma.lifecycle_state = 'trash_hidden'" return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')" def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]: cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, storage_key, mime_type, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() return r2d(row) if row else None def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None: if not library_content_visible_to_profile( cur, tenant.profile_id, (asset.get("visibility") or "").strip().lower(), asset.get("club_id"), asset.get("uploaded_by_profile_id"), tenant.global_role, ): raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium") def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict: """Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB).""" role_raw = tenant.global_role role = (role_raw or "").strip().lower() pid = int(tenant.profile_id) sup = is_superadmin(role_raw) plat = is_platform_admin(role_raw) vis = (row.get("visibility") or "private").strip().lower() uid = row.get("uploaded_by_profile_id") cid = row.get("club_id") lc = (row.get("lifecycle_state") or "active").strip().lower() is_owner = uid is not None and int(uid) == pid club_mgr = plat or (cid is not None and int(cid) in admin_club_ids) edit_metadata = ( sup or (vis == "official" and plat) or (vis == "club" and club_mgr) or (vis == "private" and is_owner) ) trash_soft = lc == "active" and ( sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr) ) if vis == "official" and not (sup or plat): trash_soft = False can_manage_adv = ( sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr) ) trash_hidden = lc in ("active", "trash_soft") and can_manage_adv recover_from_hidden = lc == "trash_hidden" and can_manage_adv reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv purge = lc == "trash_hidden" and sup return { "edit_metadata": edit_metadata, "change_visibility": edit_metadata, "trash_soft": trash_soft, "trash_hidden": trash_hidden, "recover": recover_from_hidden, "reactivate": reactivate, "purge": purge, "superadmin_lifecycle": sup, "superadmin_hard_delete": sup, } def _apply_lifecycle_action( cur: Any, conn: Any, asset_id: int, body: MediaLifecycleBody, tenant: TenantContext, ) -> dict: asset = fetch_media_asset_row(cur, asset_id) if not asset: raise HTTPException(status_code=404, detail="Medium nicht gefunden") action = body.action role_raw = tenant.global_role if action == "superadmin_hard_delete": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Nur Superadmin") ok = superadmin_hard_delete_media_asset(cur, conn, asset_id) if not ok: raise HTTPException(status_code=404, detail="Medium nicht gefunden") return {"ok": True, "hard_deleted": asset_id} if action == "superadmin_force_lifecycle": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Nur Superadmin") tl = body.target_lifecycle or "active" mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN} return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl]) if action == "purge": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin") state = (asset.get("lifecycle_state") or "").strip().lower() if state != LC_TRASH_HIDDEN: raise HTTPException( status_code=400, detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden", ) if not purge_media_asset(cur, conn, asset_id): raise HTTPException(status_code=400, detail="Löschen nicht möglich") return {"ok": True, "purged": asset_id} if action == "trash_soft": assert_can_trash_soft(cur, tenant, asset) return transition_to_trash_soft(cur, conn, asset_id) if action == "trash_hidden": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return transition_to_trash_hidden(cur, conn, asset_id) if action == "recover": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return transition_recover_from_hidden(cur, conn, asset_id) if action == "reactivate": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return reactivate_media_asset_from_trash(cur, conn, asset_id) raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action") @router.get("") def list_media_assets( tenant: TenantContext = Depends(get_tenant_context), q: Optional[str] = Query(None, max_length=120), lifecycle: str = Query( "active", description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)", ), limit: int = Query(30, ge=1, le=100), offset: int = Query(0, ge=0), ): lc_where = _lifecycle_where_sql(lifecycle) role = tenant.global_role or "" is_adm = is_platform_admin(role) profile_id = tenant.profile_id needle = (q or "").strip() params: list[Any] = [is_adm, profile_id, profile_id] search_sql = "" if needle: like = f"%{needle}%" params.extend([like, like]) search_sql = " AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s)" params.extend([limit, offset]) with get_db() as conn: cur = get_cursor(conn) admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin") cur.execute( f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id, ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256, ma.copyright_notice, ma.storage_key, pr.name AS uploader_name, pr.email AS uploader_email, cl.name AS club_name FROM media_assets ma LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id LEFT JOIN clubs cl ON cl.id = ma.club_id WHERE {lc_where} AND ( %s OR lower(trim(ma.visibility)) = 'official' OR ( lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s ) OR ( lower(trim(ma.visibility)) = 'club' AND EXISTS ( SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id AND cm.status = 'active' ) ) ) {search_sql} ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC LIMIT %s OFFSET %s""", params, ) rows = [r2d(r) for r in cur.fetchall()] show_uploader = is_superadmin(role) or is_platform_admin(role) or bool(admin_club_ids) show_club = is_superadmin(role) or is_platform_admin(role) for r in rows: r["permissions"] = _item_permissions(r, tenant, admin_club_ids) if not show_uploader: r["uploader_name"] = None r["uploader_email"] = None if not show_club: r["club_name"] = None viewer = { "show_uploader_meta": show_uploader, "show_club_meta": show_club, "is_superadmin": is_superadmin(role), "is_platform_admin": is_platform_admin(role), } return { "items": rows, "limit": limit, "offset": offset, "lifecycle": lifecycle.strip().lower(), "viewer": viewer, } @router.api_route("/{asset_id}/file", methods=["GET", "HEAD"]) def download_media_asset_file( request: Request, asset_id: int, tenant: TenantContext = Depends(get_tenant_context_flexible), ): """Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken).""" from routers.exercises import _binary_media_response with get_db() as conn: cur = get_cursor(conn) asset = _fetch_asset_file_row(cur, asset_id) if not asset: raise HTTPException(status_code=404, detail="Medium nicht gefunden") lc = (asset.get("lifecycle_state") or "").strip().lower() if lc == "active": _assert_can_view_archive_asset(cur, tenant, asset) elif lc in ("trash_soft", "trash_hidden"): assert_can_manage_media_asset_lifecycle(cur, tenant, asset) else: raise HTTPException(status_code=404, detail="Medium nicht verfügbar") sk = asset.get("storage_key") if not sk: raise HTTPException(status_code=404, detail="Keine Datei hinterlegt") media_root = get_effective_media_root(cur) abs_p = path_under_media_root(media_root, str(sk)) if not abs_p or not abs_p.is_file(): raise HTTPException(status_code=404, detail="Datei nicht gefunden") mime = asset.get("mime_type") or "application/octet-stream" fname = asset.get("original_filename") or abs_p.name return _binary_media_response(abs_p, mime, str(fname) if fname else None, request) @router.post("/{asset_id}/lifecycle") def post_media_asset_lifecycle( asset_id: int, body: MediaLifecycleBody, tenant: TenantContext = Depends(get_tenant_context), ): with get_db() as conn: cur = get_cursor(conn) return _apply_lifecycle_action(cur, conn, asset_id, body, tenant) @router.post("/bulk-lifecycle") def bulk_media_lifecycle( body: MediaBulkLifecycleBody, tenant: TenantContext = Depends(get_tenant_context), ): inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle) updated: list[int] = [] failed: list[dict] = [] with get_db() as conn: cur = get_cursor(conn) for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)): try: _apply_lifecycle_action(cur, conn, aid, inner, tenant) updated.append(aid) except HTTPException as he: msg = he.detail if isinstance(he.detail, str) else str(he.detail) failed.append({"id": aid, "detail": msg}) return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)} @router.post("/bulk-patch") def bulk_media_patch( body: MediaBulkPatchBody, tenant: TenantContext = Depends(get_tenant_context), ): raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True) patch_fields = {k: v for k, v in raw.items() if k != "media_asset_ids" and v is not None} if not patch_fields: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") updated: list[int] = [] failed: list[dict] = [] profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)): try: cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, copyright_notice, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() if not row: failed.append({"id": asset_id, "detail": "Medium nicht gefunden"}) continue asset = r2d(row) assert_can_edit_media_asset_metadata(cur, tenant, asset) eff = _effective_media_patch_fields(patch_fields, asset) next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower() next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id") if "visibility" in patch_fields or "club_id" in patch_fields: assert_valid_governance_visibility( cur, profile_id, role, next_vis, int(next_cid) if next_cid is not None else None, ) sets: list[str] = [] vals: list[Any] = [] if "copyright_notice" in patch_fields: sets.append("copyright_notice = %s") vals.append(patch_fields["copyright_notice"]) if "original_filename" in patch_fields: sets.append("original_filename = %s") vals.append(patch_fields["original_filename"]) if "visibility" in patch_fields or "club_id" in patch_fields: sets.append("visibility = %s") vals.append(str(eff.get("visibility", asset["visibility"])).strip()) sets.append("club_id = %s") vals.append(eff.get("club_id")) if not sets: failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"}) continue sets.append("updated_at = NOW()") vals.append(asset_id) cur.execute( f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s", tuple(vals), ) conn.commit() updated.append(asset_id) except HTTPException as he: msg = he.detail if isinstance(he.detail, str) else str(he.detail) failed.append({"id": asset_id, "detail": msg}) return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)} @router.patch("/{asset_id}") def patch_media_asset( asset_id: int, body: MediaAssetPatch, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True) if not data: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, copyright_notice, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Medium nicht gefunden") asset = r2d(row) assert_can_edit_media_asset_metadata(cur, tenant, asset) eff = _effective_media_patch_fields(data, asset) next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower() next_cid = eff["club_id"] if "club_id" in eff else asset.get("club_id") if "visibility" in data or "club_id" in data: assert_valid_governance_visibility( cur, profile_id, role, next_vis, int(next_cid) if next_cid is not None else None, ) sets: list[str] = [] vals: list[Any] = [] if "copyright_notice" in data: sets.append("copyright_notice = %s") vals.append(data["copyright_notice"]) if "original_filename" in data: sets.append("original_filename = %s") vals.append(data["original_filename"]) if "visibility" in data or "club_id" in data: sets.append("visibility = %s") vals.append(str(eff.get("visibility", asset["visibility"])).strip()) sets.append("club_id = %s") vals.append(eff.get("club_id")) if sets: sets.append("updated_at = NOW()") vals.append(asset_id) cur.execute( f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s", tuple(vals), ) conn.commit() cur.execute( """SELECT id, mime_type, byte_size, original_filename, visibility, club_id, uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice FROM media_assets WHERE id = %s""", (asset_id,), ) out = r2d(cur.fetchone()) return out