shinkan-jinkendo/backend/routers/media_assets.py
Lars 0a1816e38b
All checks were successful
Deploy Development / deploy (push) Successful in 36s
Test Suite / pytest-backend (push) Successful in 25s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 23s
feat: enhance media library and lifecycle management
- Updated media library to include lifecycle filtering options (active, trash_soft, trash_hidden) and copyright management capabilities.
- Implemented new API endpoints for listing media assets with lifecycle states and patching copyright notices.
- Enhanced frontend components to support navigation to the media library and integration of media management features in the ExerciseFormPage.
- Incremented version to 0.8.48, reflecting the latest improvements in media handling and governance.
2026-05-07 14:10:26 +02:00

256 lines
9.7 KiB
Python

"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC."""
from __future__ import annotations
from typing import Any, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from pydantic import BaseModel, Field
from club_tenancy import is_platform_admin, library_content_visible_to_profile
from db import get_db, get_cursor, r2d
from media_lifecycle import fetch_media_asset_row
from media_storage import get_effective_media_root, path_under_media_root
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible
router = APIRouter(prefix="/api/media-assets", tags=["media-assets"])
class MediaLifecycleBody(BaseModel):
action: Literal["trash_soft", "trash_hidden", "recover", "purge", "reactivate"]
class MediaAssetPatch(BaseModel):
copyright_notice: Optional[str] = Field(None, max_length=8000)
_LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"})
def _lifecycle_where_sql(lifecycle: str) -> str:
lc = (lifecycle or "active").strip().lower()
if lc not in _LIFECYCLE_LIST_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter")
if lc == "active":
return "ma.lifecycle_state = 'active'"
if lc == "trash_soft":
return "ma.lifecycle_state = 'trash_soft'"
if lc == "trash_hidden":
return "ma.lifecycle_state = 'trash_hidden'"
return "ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')"
def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
storage_key, mime_type, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
return r2d(row) if row else None
def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None:
if not library_content_visible_to_profile(
cur,
tenant.profile_id,
(asset.get("visibility") or "").strip().lower(),
asset.get("club_id"),
asset.get("uploaded_by_profile_id"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium")
@router.get("")
def list_media_assets(
tenant: TenantContext = Depends(get_tenant_context),
q: Optional[str] = Query(None, max_length=120),
lifecycle: str = Query(
"active",
description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)",
),
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
):
"""
Durchsuchbares Medien-Archiv; Sichtbarkeit wie Übungsbibliothek.
Standard lifecycle=active (Archiv-Picker); Manager-UI kann Papierkorb-Ansicht wählen.
"""
lc_where = _lifecycle_where_sql(lifecycle)
role = tenant.global_role or ""
is_adm = is_platform_admin(role)
profile_id = tenant.profile_id
needle = (q or "").strip()
params: list[Any] = [is_adm, profile_id, profile_id]
search_sql = ""
if needle:
like = f"%{needle}%"
params.extend([like, like])
search_sql = " AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s)"
params.extend([limit, offset])
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id,
ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256,
ma.copyright_notice
FROM media_assets ma
WHERE {lc_where}
AND (
%s
OR lower(trim(ma.visibility)) = 'official'
OR (
lower(trim(ma.visibility)) = 'private'
AND ma.uploaded_by_profile_id = %s
)
OR (
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)
)
{search_sql}
ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC
LIMIT %s OFFSET %s""",
params,
)
rows = [r2d(r) for r in cur.fetchall()]
return {"items": rows, "limit": limit, "offset": offset, "lifecycle": lifecycle.strip().lower()}
@router.api_route("/{asset_id}/file", methods=["GET", "HEAD"])
def download_media_asset_file(
request: Request,
asset_id: int,
tenant: TenantContext = Depends(get_tenant_context_flexible),
):
"""Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken)."""
from routers.exercises import _binary_media_response
with get_db() as conn:
cur = get_cursor(conn)
asset = _fetch_asset_file_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
lc = (asset.get("lifecycle_state") or "").strip().lower()
if lc == "active":
_assert_can_view_archive_asset(cur, tenant, asset)
elif lc in ("trash_soft", "trash_hidden"):
from media_lifecycle import assert_can_manage_media_asset_lifecycle
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
else:
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
sk = asset.get("storage_key")
if not sk:
raise HTTPException(status_code=404, detail="Keine Datei hinterlegt")
media_root = get_effective_media_root(cur)
abs_p = path_under_media_root(media_root, str(sk))
if not abs_p or not abs_p.is_file():
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
mime = asset.get("mime_type") or "application/octet-stream"
fname = asset.get("original_filename") or abs_p.name
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
@router.post("/{asset_id}/lifecycle")
def post_media_asset_lifecycle(
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Papierkorb-Übergänge — media_lifecycle."""
from media_lifecycle import (
assert_can_manage_media_asset_lifecycle,
purge_media_asset,
transition_recover_from_hidden,
transition_to_trash_hidden,
transition_to_trash_soft,
)
with get_db() as conn:
cur = get_cursor(conn)
asset = fetch_media_asset_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
action = body.action
if action == "trash_soft":
return transition_to_trash_soft(cur, conn, asset_id)
if action == "trash_hidden":
return transition_to_trash_hidden(cur, conn, asset_id)
if action == "recover":
return transition_recover_from_hidden(cur, conn, asset_id)
if action == "purge":
state = (asset.get("lifecycle_state") or "").strip().lower()
if state != "trash_hidden":
raise HTTPException(
status_code=400,
detail="Nur ausgeblendete Medien (Stufe 2) dürfen endgültig gelöscht werden",
)
ok = purge_media_asset(cur, conn, asset_id)
if not ok:
raise HTTPException(status_code=400, detail="Löschen nicht möglich")
return {"ok": True, "purged": asset_id}
if action == "reactivate":
from media_lifecycle import reactivate_media_asset_from_trash
return reactivate_media_asset_from_trash(cur, conn, asset_id)
raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action")
@router.patch("/{asset_id}")
def patch_media_asset(
asset_id: int,
body: MediaAssetPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Metadaten (z. B. Copyright) — gleiche Berechtigung wie Lifecycle-Verwaltung."""
from media_lifecycle import assert_can_manage_media_asset_lifecycle
data = body.dict(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(row)
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
if "copyright_notice" in data:
cn = data["copyright_notice"]
cur.execute(
"UPDATE media_assets SET copyright_notice = %s, updated_at = NOW() WHERE id = %s",
(cn, asset_id),
)
conn.commit()
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice
FROM media_assets WHERE id = %s""",
(asset_id,),
)
out = r2d(cur.fetchone())
return out