"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC.""" from __future__ import annotations from typing import Any, Literal, Optional, Union import hashlib from pathlib import Path from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile from pydantic import BaseModel, Field, model_validator from club_tenancy import ( assert_club_member, assert_valid_governance_visibility, club_ids_for_profile_with_roles, is_platform_admin, is_superadmin, library_content_visible_to_profile, ) from db import get_db, get_cursor, r2d from media_lifecycle import ( LC_ACTIVE, LC_TRASH_HIDDEN, LC_TRASH_SOFT, assert_can_edit_media_asset_metadata, assert_can_manage_media_asset_lifecycle, assert_can_trash_soft, fetch_media_asset_row, purge_media_asset, reactivate_media_asset_from_trash, superadmin_force_lifecycle_state, superadmin_hard_delete_media_asset, transition_recover_from_hidden, transition_to_trash_hidden, transition_to_trash_soft, ) from media_storage import get_effective_media_root, library_storage_key, path_under_media_root, relocate_local_media_file from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible from routers.exercises import _upload_limit_bytes, resolve_upload_mime_type router = APIRouter(prefix="/api/media-assets", tags=["media-assets"]) class MediaLifecycleBody(BaseModel): action: Literal[ "trash_soft", "trash_hidden", "recover", "purge", "reactivate", "superadmin_force_lifecycle", "superadmin_hard_delete", ] target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None @model_validator(mode="after") def _target_lifecycle_rules(self): if self.action == "superadmin_force_lifecycle": if not self.target_lifecycle: raise ValueError("target_lifecycle ist für diese Aktion erforderlich") elif self.target_lifecycle is not None: raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle") return self class MediaAssetPatch(BaseModel): copyright_notice: Optional[str] = Field(None, max_length=8000) original_filename: Optional[str] = Field(None, max_length=300) visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None tags: Optional[list[str]] = None class MediaBulkLifecycleBody(BaseModel): media_asset_ids: list[int] = Field(..., min_length=1, max_length=200) action: Literal[ "trash_soft", "trash_hidden", "recover", "purge", "reactivate", "superadmin_force_lifecycle", "superadmin_hard_delete", ] target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None @model_validator(mode="after") def _bulk_target(self): if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle: raise ValueError("target_lifecycle ist für diese Aktion erforderlich") if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None: raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle") return self class MediaBulkPatchBody(BaseModel): media_asset_ids: list[int] = Field(..., min_length=1, max_length=200) copyright_notice: Optional[str] = Field(None, max_length=8000) original_filename: Optional[str] = Field(None, max_length=300) visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None tags: Optional[list[str]] = None _LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"}) _MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"}) _MAX_TAGS = 40 _MAX_TAG_LEN = 48 _MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None _TRAINING_UNIT_EXERCISES_TABLE: Optional[bool] = None def _media_assets_tags_column_present(cur: Any) -> bool: """True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema.""" global _MEDIA_ASSETS_TAGS_COLUMN if _MEDIA_ASSETS_TAGS_COLUMN is not None: return _MEDIA_ASSETS_TAGS_COLUMN cur.execute( """ SELECT 1 FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'media_assets' AND column_name = 'tags' LIMIT 1 """ ) _MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None return _MEDIA_ASSETS_TAGS_COLUMN def _training_unit_exercises_table_present(cur: Any) -> bool: """True wenn planning-Migration (training_unit_exercises) im Schema liegt — sonst keine Einheiten-Links.""" global _TRAINING_UNIT_EXERCISES_TABLE if _TRAINING_UNIT_EXERCISES_TABLE is not None: return _TRAINING_UNIT_EXERCISES_TABLE cur.execute( """ SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'training_unit_exercises' LIMIT 1 """ ) _TRAINING_UNIT_EXERCISES_TABLE = cur.fetchone() is not None return _TRAINING_UNIT_EXERCISES_TABLE def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]: """PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive).""" if raw is None: return [] if not isinstance(raw, list): raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein") out: list[str] = [] seen: set[str] = set() for item in raw: s = str(item).strip() if not s: continue s = s[:_MAX_TAG_LEN] key = s.lower() if key in seen: continue seen.add(key) out.append(s) if len(out) >= _MAX_TAGS: break return out def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]: """Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen.""" if not asset_ids: return {} cur.execute( """SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title FROM exercise_media em JOIN exercises e ON e.id = em.exercise_id WHERE em.media_asset_id = ANY(%s) ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""", (asset_ids,), ) ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids} seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids} exercise_ids: set[int] = set() for r in cur.fetchall(): d = r2d(r) aid = int(d["media_asset_id"]) eid = int(d["exercise_id"]) if aid not in seen_ex or eid in seen_ex[aid]: continue seen_ex[aid].add(eid) exercise_ids.add(eid) title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}" ex_by_asset[aid].append({"id": eid, "title": title}) exercise_to_units: dict[int, list[dict]] = {} if exercise_ids and _training_unit_exercises_table_present(cur): cur.execute( """SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date, COALESCE(tg.name, '') AS group_name FROM training_unit_exercises tue JOIN training_units tu ON tu.id = tue.training_unit_id LEFT JOIN training_groups tg ON tg.id = tu.group_id WHERE tue.exercise_id = ANY(%s)""", (list(exercise_ids),), ) for row in cur.fetchall(): d = r2d(row) eid = int(d["exercise_id"]) uid = int(d["unit_id"]) pd = d.get("planned_date") planned: Optional[str] if pd is None: planned = None elif hasattr(pd, "isoformat"): planned = str(pd.isoformat()) else: planned = str(pd) exercise_to_units.setdefault(eid, []).append( { "id": uid, "planned_date": planned, "group_name": d.get("group_name") or "", } ) for eid, units in exercise_to_units.items(): seen_u: set[int] = set() uniq: list[dict] = [] for u in units: if u["id"] not in seen_u: seen_u.add(u["id"]) uniq.append(u) exercise_to_units[eid] = uniq out: dict[int, dict[str, Any]] = {} for aid in asset_ids: aid_i = int(aid) exs = ex_by_asset.get(aid_i, []) seen_u2: set[int] = set() units_agg: list[dict] = [] for ex in exs: for u in exercise_to_units.get(ex["id"], []): if u["id"] not in seen_u2: seen_u2.add(u["id"]) units_agg.append(u) out[aid_i] = {"exercises": exs, "training_units": units_agg} return out def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]: cur.execute( """SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email FROM media_assets ma LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id WHERE ma.uploaded_by_profile_id IS NOT NULL AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden') AND ( %s OR lower(trim(ma.visibility)) = 'official' OR ( lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s ) OR ( lower(trim(ma.visibility)) = 'club' AND EXISTS ( SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id AND cm.status = 'active' ) ) ) ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id LIMIT 400""", (is_adm, profile_id, profile_id), ) rows = [r2d(x) for x in cur.fetchall()] out: list[dict] = [] for r in rows: pid = r.get("id") if pid is None: continue label = (r.get("name") or "").strip() if not label: label = (r.get("email") or "").strip() if not label: label = f"Profil #{pid}" out.append({"id": int(pid), "label": label}) return out def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict: """ Nach visibility-Wechsel club_id konsistent setzen. official → club_id NULL. private/club: club_id aus Patch oder bisheriger Zeile beibehalten (private nutzt club_id für Vereinsordner auf der Platte). """ eff = dict(patch_fields) if eff.get("visibility") is not None: v = str(eff["visibility"]).strip().lower() if v == "official": eff["club_id"] = None elif v == "club" and "club_id" not in eff: eff["club_id"] = asset.get("club_id") elif v == "private" and "club_id" not in eff: eff["club_id"] = asset.get("club_id") return eff def _club_display_name_for_storage(cur: Any, club_id: int) -> str: """Anzeigename für library_club_path_segment; leer wenn Verein fehlt.""" cur.execute("SELECT name FROM clubs WHERE id = %s", (int(club_id),)) row = cur.fetchone() if not row: return "" return str(r2d(row).get("name") or "") def _relocate_asset_file_if_governance_changed( cur: Any, media_root: Path, asset_id: int, asset: dict, next_vis: str, next_club_id: Optional[int], ) -> Optional[str]: """ Passt bei local-Assets den Dateipfad an, wenn sich Sichtbarkeit/Verein ändert. Aktualisiert exercise_media.file_path. Gibt neuen storage_key oder None zurück. """ if (asset.get("storage_backend") or "local") != "local": return None old_key = (asset.get("storage_key") or "").strip() sha = (asset.get("sha256") or "").strip().lower() if not old_key or len(sha) != 64: return None ext = Path(old_key.replace("\\", "/")).suffix or ".bin" try: up = asset.get("uploaded_by_profile_id") up_i = int(up) if up is not None else None club_nm: Optional[str] = None if next_vis in ("club", "private") and next_club_id is not None: club_nm = _club_display_name_for_storage(cur, next_club_id) new_key = library_storage_key( next_vis, next_club_id if next_vis != "official" else None, sha, ext, uploader_profile_id=up_i if next_vis == "private" else None, mime_type=str(asset.get("mime_type") or "") or None, club_name=club_nm, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e old_norm = old_key.replace("\\", "/").lstrip("/") if new_key == old_norm: return None try: relocate_local_media_file(media_root, old_key, new_key) except FileNotFoundError as e: raise HTTPException(status_code=500, detail=f"Mediendatei fehlt auf der Platte: {e}") from e except FileExistsError as e: raise HTTPException(status_code=409, detail=str(e)) from e except (ValueError, OSError) as e: raise HTTPException(status_code=500, detail=str(e)) from e db_path = f"/media/{new_key}" cur.execute( "UPDATE exercise_media SET file_path = %s WHERE media_asset_id = %s", (db_path, asset_id), ) return new_key def _list_active_visibility_clause(is_plat: bool, profile_id: int) -> tuple[str, list[Any]]: """Sichtbare aktive Einträge: official; private (eigen oder Plattform-Admin); Verein wie Bibliotheks-SQL.""" parts = ["lower(trim(ma.visibility)) = 'official'"] vals: list[Any] = [] if is_plat: parts.append("lower(trim(ma.visibility)) = 'private'") else: parts.append("(lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s)") vals.append(profile_id) club_plat = ( "(lower(trim(ma.visibility)) = 'club' AND ma.club_id IS NOT NULL AND (" "EXISTS (SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id " "AND cm.status = 'active') OR NOT EXISTS (SELECT 1 FROM club_members cm2 " "WHERE cm2.profile_id = %s AND cm2.club_id = ma.club_id)))" ) if is_plat: parts.append(club_plat) vals.extend([profile_id, profile_id]) else: parts.append( """( lower(trim(ma.visibility)) = 'club' AND EXISTS ( SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id AND cm.status = 'active' ) )""" ) vals.append(profile_id) sql = "(" + " OR ".join(parts) + ")" return sql, vals def _list_trash_visibility_clause( is_plat: bool, is_sup: bool, profile_id: int, admin_club_ids: set[int], ) -> tuple[str, list[Any]]: """ Papierkorb nur für eigene private Medien; Vereins-Admins zusätzlich Vereins-Papierkorb ihres Vereins. Official/Plattform: Superadmin oder Plattform-Admin sieht alles im Papierkorb. """ if is_plat or is_sup: return "(TRUE)", [] parts: list[str] = [] vals: list[Any] = [] parts.append( "(lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s)", ) vals.append(profile_id) if admin_club_ids: parts.append( "(lower(trim(ma.visibility)) = 'club' AND ma.club_id = ANY(%s))", ) vals.append(list(admin_club_ids)) return "(" + " OR ".join(parts) + ")", vals def _list_main_visibility_where( lifecycle: str, is_plat: bool, is_sup: bool, profile_id: int, admin_club_ids: set[int], ) -> tuple[str, list[Any]]: """ Kombiniert lifecycle mit Leserechten. Papierkorb-Stufen für normale Nutzer stark eingeschränkt. """ lc = (lifecycle or "active").strip().lower() if lc not in _LIFECYCLE_LIST_FILTERS: raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter") active_sql, active_params = _list_active_visibility_clause(is_plat, profile_id) trash_sql, trash_params = _list_trash_visibility_clause( is_plat, is_sup, profile_id, admin_club_ids ) active_block = f"(ma.lifecycle_state = 'active' AND {active_sql})" trash_block = ( f"(ma.lifecycle_state IN ('trash_soft', 'trash_hidden') AND {trash_sql})" ) if lc == "active": return active_block, active_params if lc == "trash_soft": return ( f"(ma.lifecycle_state = 'trash_soft' AND {trash_sql})", trash_params, ) if lc == "trash_hidden": return ( f"(ma.lifecycle_state = 'trash_hidden' AND {trash_sql})", trash_params, ) # all combined = f"(({active_block}) OR ({trash_block}))" return combined, active_params + trash_params def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]: cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, storage_key, mime_type, original_filename FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() return r2d(row) if row else None def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None: if not library_content_visible_to_profile( cur, tenant.profile_id, (asset.get("visibility") or "").strip().lower(), asset.get("club_id"), asset.get("uploaded_by_profile_id"), tenant.global_role, ): raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium") def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict: """Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB).""" role_raw = tenant.global_role role = (role_raw or "").strip().lower() pid = int(tenant.profile_id) sup = is_superadmin(role_raw) plat = is_platform_admin(role_raw) vis = (row.get("visibility") or "private").strip().lower() uid = row.get("uploaded_by_profile_id") cid = row.get("club_id") lc = (row.get("lifecycle_state") or "active").strip().lower() is_owner = uid is not None and int(uid) == pid club_mgr = plat or (cid is not None and int(cid) in admin_club_ids) if vis == "official": edit_metadata = sup trash_soft = lc == "active" and sup can_manage_adv = sup else: edit_metadata = ( sup or (vis == "club" and club_mgr) or (vis == "private" and is_owner) ) trash_soft = lc == "active" and ( sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr) ) can_manage_adv = ( sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr) ) trash_hidden = lc in ("active", "trash_soft") and can_manage_adv recover_from_hidden = lc == "trash_hidden" and can_manage_adv reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv purge = lc == "trash_hidden" and sup return { "edit_metadata": edit_metadata, "change_visibility": edit_metadata, "trash_soft": trash_soft, "trash_hidden": trash_hidden, "recover": recover_from_hidden, "reactivate": reactivate, "purge": purge, "superadmin_lifecycle": sup, "superadmin_hard_delete": sup, } def _apply_lifecycle_action( cur: Any, conn: Any, asset_id: int, body: MediaLifecycleBody, tenant: TenantContext, ) -> dict: asset = fetch_media_asset_row(cur, asset_id) if not asset: raise HTTPException(status_code=404, detail="Medium nicht gefunden") action = body.action role_raw = tenant.global_role if action == "superadmin_hard_delete": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Nur Superadmin") ok = superadmin_hard_delete_media_asset(cur, conn, asset_id) if not ok: raise HTTPException(status_code=404, detail="Medium nicht gefunden") return {"ok": True, "hard_deleted": asset_id} if action == "superadmin_force_lifecycle": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Nur Superadmin") tl = body.target_lifecycle or "active" mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN} return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl]) if action == "purge": if not is_superadmin(role_raw): raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin") state = (asset.get("lifecycle_state") or "").strip().lower() if state != LC_TRASH_HIDDEN: raise HTTPException( status_code=400, detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden", ) if not purge_media_asset(cur, conn, asset_id): raise HTTPException(status_code=400, detail="Löschen nicht möglich") return {"ok": True, "purged": asset_id} if action == "trash_soft": assert_can_trash_soft(cur, tenant, asset) return transition_to_trash_soft(cur, conn, asset_id) if action == "trash_hidden": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return transition_to_trash_hidden(cur, conn, asset_id) if action == "recover": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return transition_recover_from_hidden(cur, conn, asset_id) if action == "reactivate": assert_can_manage_media_asset_lifecycle(cur, tenant, asset) return reactivate_media_asset_from_trash(cur, conn, asset_id) raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action") _MAX_BULK_LIBRARY_FILES = 25 def _ingest_library_media_file( cur: Any, tenant: TenantContext, raw: bytes, filename: Optional[str], content_type: Optional[str], visibility: str, club_id_form: Optional[int], ) -> dict: """Neues Archiv-Medium oder aktiver Dedupe-Treffer (sha256 + Sichtbarkeit + Verein). Kein exercise_media.""" profile_id = tenant.profile_id role = tenant.global_role or "" vis = (visibility or "private").strip().lower() if vis not in ("private", "club", "official"): raise HTTPException(status_code=400, detail="Ungültige Sichtbarkeit") next_cid: Optional[int] = None if vis == "club": if club_id_form is None: raise HTTPException(status_code=400, detail="Verein erforderlich für Sichtbarkeit „Verein“") next_cid = int(club_id_form) elif vis == "private": if club_id_form is not None: next_cid = int(club_id_form) elif is_platform_admin(role): raise HTTPException( status_code=400, detail=( "Private Archiv-Uploads als Plattform-Admin: bitte den Zielverein wählen und " "club_id im Formular setzen (nicht vom allgemeinen Kontext ableiten)." ), ) else: cid = tenant.effective_club_id next_cid = int(cid) if cid is not None else None if next_cid is None: raise HTTPException( status_code=400, detail=( "Private Medien werden pro Verein abgelegt. Bitte aktiven Verein setzen " "(Header X-Active-Club-Id) oder club_id im Formular übergeben — auch für Plattform-Admins." ), ) if not is_platform_admin(role): assert_club_member(cur, profile_id, next_cid) assert_valid_governance_visibility(cur, profile_id, role, vis, next_cid if vis == "club" else None) max_b = _upload_limit_bytes(tenant) if len(raw) > max_b: raise HTTPException( status_code=413, detail=f"Datei zu groß (max. {max_b // (1024 * 1024)} MB)", ) try: mime = resolve_upload_mime_type(raw, content_type, filename) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e full_sha = hashlib.sha256(raw).hexdigest() if vis == "private": cur.execute( """SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets WHERE sha256 = %s AND lower(trim(visibility)) = %s AND (club_id IS NOT DISTINCT FROM %s) AND (uploaded_by_profile_id IS NOT DISTINCT FROM %s) LIMIT 1""", (full_sha, vis, next_cid, profile_id), ) else: cur.execute( """SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets WHERE sha256 = %s AND lower(trim(visibility)) = %s AND (club_id IS NOT DISTINCT FROM %s) LIMIT 1""", (full_sha, vis, next_cid), ) existing_asset = cur.fetchone() if existing_asset: ea = r2d(existing_asset) lc = (ea.get("lifecycle_state") or "").strip().lower() if lc == "active": return { "status": "duplicate", "media_asset_id": int(ea["id"]), "original_filename": ea.get("original_filename"), } if lc in ("trash_soft", "trash_hidden"): raise HTTPException( status_code=409, detail={ "code": "MEDIA_ASSET_IN_TRASH", "message": ( "Diese Datei ist inhaltsgleich (SHA-256) mit einem Archiv-Medium im Papierkorb." ), "media_asset_id": ea["id"], "lifecycle_state": lc, }, ) raise HTTPException( status_code=409, detail="Es existiert bereits ein Archiv-Eintrag zu dieser Datei in einem nicht nutzbaren Zustand.", ) ext = Path(filename or "").suffix[:12] if filename else "" if not ext and mime == "image/jpeg": ext = ".jpg" elif not ext and mime == "image/png": ext = ".png" elif not ext and mime in ("image/heic", "image/heif"): ext = ".heic" elif not ext and mime == "video/mp4": ext = ".mp4" elif not ext and mime == "video/quicktime": ext = ".mov" club_nm = "" if next_cid is not None: club_nm = _club_display_name_for_storage(cur, next_cid) media_root = get_effective_media_root(cur) try: storage_key = library_storage_key( vis, next_cid, full_sha, ext, uploader_profile_id=profile_id if vis == "private" else None, mime_type=mime, club_name=club_nm, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e dest_path = path_under_media_root(media_root, storage_key) if dest_path is None: raise HTTPException(status_code=500, detail="Ungültiger Speicherpfad") dest_path.parent.mkdir(parents=True, exist_ok=True) if not dest_path.is_file(): dest_path.write_bytes(raw) cur.execute( """INSERT INTO media_assets ( mime_type, byte_size, sha256, original_filename, visibility, club_id, uploaded_by_profile_id, copyright_notice, storage_backend, storage_key, lifecycle_state ) VALUES (%s, %s, %s, %s, %s, %s, %s, NULL, 'local', %s, 'active') RETURNING id""", ( mime, len(raw), full_sha, filename or storage_key, vis, next_cid, profile_id, storage_key, ), ) ar = cur.fetchone() aid = int(r2d(ar)["id"]) return {"status": "created", "media_asset_id": aid, "original_filename": filename or storage_key} @router.post("/bulk-upload") async def bulk_upload_media_assets( tenant: TenantContext = Depends(get_tenant_context), files: list[UploadFile] = File(..., description="Mehrere Dateien (jpeg, png, gif, mp4, pdf)"), visibility: str = Form("private"), club_id: Optional[int] = Form(None), ): """Mehrere Dateien ins Archiv; Dedupe wie Übungs-Upload. Pro Datei eigene DB-Transaktion.""" if not files: raise HTTPException(status_code=400, detail="Keine Dateien übermittelt") if len(files) > _MAX_BULK_LIBRARY_FILES: raise HTTPException( status_code=400, detail=f"Maximal {_MAX_BULK_LIBRARY_FILES} Dateien pro Anfrage", ) results: list[dict[str, Any]] = [] created = duplicate = failed = 0 for uf in files: fn = uf.filename or "ohne_name" try: raw = await uf.read() if not raw: results.append({"filename": fn, "ok": False, "detail": "Leere Datei"}) failed += 1 continue with get_db() as conn: cur = get_cursor(conn) r = _ingest_library_media_file( cur, tenant, raw, uf.filename, uf.content_type, visibility, club_id, ) results.append({"filename": fn, "ok": True, **r}) if r["status"] == "created": created += 1 else: duplicate += 1 except HTTPException as e: detail = e.detail if isinstance(detail, dict): detail_s = detail.get("message") or detail.get("code") or str(detail) else: detail_s = str(detail) results.append( { "filename": fn, "ok": False, "status_code": e.status_code, "detail": detail_s, }, ) failed += 1 except Exception as e: results.append({"filename": fn, "ok": False, "detail": str(e)}) failed += 1 return { "results": results, "created_count": created, "duplicate_count": duplicate, "failed_count": failed, } @router.get("") def list_media_assets( tenant: TenantContext = Depends(get_tenant_context), q: Optional[str] = Query(None, max_length=120), lifecycle: str = Query( "active", description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)", ), media_kind: str = Query( "all", description="all | image | video | pdf | other", ), club_id: Optional[int] = Query( None, ge=1, description="Nur Superadmin: nach Verein filtern", ), uploaded_by: Optional[int] = Query( None, ge=1, description="Nach Uploader (Profil-ID), wenn erlaubt", ), include_filter_meta: bool = Query( False, description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)", ), limit: int = Query(30, ge=1, le=100), offset: int = Query(0, ge=0), ): mk = (media_kind or "all").strip().lower() if mk not in _MEDIA_KIND_FILTERS: raise HTTPException(status_code=400, detail="Ungültiger media_kind") role = tenant.global_role or "" is_adm = is_platform_admin(role) sup = is_superadmin(role) profile_id = tenant.profile_id needle = (q or "").strip() if club_id is not None and not sup: raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin") media_kind_sql = "" if mk == "image": # %% für psycopg2: sonst wird % als Platzhalter-Syntax interpretiert media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%%'" elif mk == "video": media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%%'" elif mk == "pdf": media_kind_sql = ( " AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'" " OR lower(COALESCE(ma.mime_type, '')) LIKE '%%pdf%%')" ) elif mk == "other": media_kind_sql = ( " AND COALESCE(ma.mime_type, '') <> ''" " AND lower(ma.mime_type) NOT LIKE 'image/%%'" " AND lower(ma.mime_type) NOT LIKE 'video/%%'" " AND lower(ma.mime_type) <> 'application/pdf'" " AND lower(ma.mime_type) NOT LIKE '%%pdf%%'" ) club_sql = "" club_sql_params: list[Any] = [] if club_id is not None: club_sql = " AND ma.club_id = %s" club_sql_params.append(club_id) with get_db() as conn: cur = get_cursor(conn) admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin") vis_main_sql, vis_params = _list_main_visibility_where( lifecycle, is_adm, sup, profile_id, admin_club_ids ) show_uploader = sup or is_adm or bool(admin_club_ids) if uploaded_by is not None and not show_uploader: raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt") has_tags_col = _media_assets_tags_column_present(cur) tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags," uploaded_sql = "" uploaded_params: list[Any] = [] if uploaded_by is not None: uploaded_sql = " AND ma.uploaded_by_profile_id = %s" uploaded_params.append(uploaded_by) search_sql = "" search_params: list[Any] = [] if needle: like = f"%{needle}%" if has_tags_col: search_params = [like, like, like, like] search_sql = ( " AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s" " OR COALESCE(ma.copyright_notice, '') ILIKE %s" " OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))" ) else: search_params = [like, like, like] search_sql = ( " AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s" " OR COALESCE(ma.copyright_notice, '') ILIKE %s)" ) params: list[Any] = ( vis_params + club_sql_params + uploaded_params + search_params + [limit, offset] ) cur.execute( f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id, ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256, ma.copyright_notice, ma.storage_key, {tags_select} pr.name AS uploader_name, pr.email AS uploader_email, cl.name AS club_name FROM media_assets ma LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id LEFT JOIN clubs cl ON cl.id = ma.club_id WHERE {vis_main_sql} {club_sql} {uploaded_sql} {media_kind_sql} {search_sql} ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC LIMIT %s OFFSET %s""", params, ) rows = [r2d(r) for r in cur.fetchall()] show_club = sup or is_adm or bool(admin_club_ids) asset_ids = [int(r["id"]) for r in rows] usage_map = _usage_for_media_assets(cur, asset_ids) for r in rows: r["permissions"] = _item_permissions(r, tenant, admin_club_ids) tid = int(r["id"]) r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []}) tags_val = r.get("tags") if tags_val is None: r["tags"] = [] elif not isinstance(tags_val, list): r["tags"] = list(tags_val) if tags_val else [] if not show_uploader: r["uploader_name"] = None r["uploader_email"] = None if not show_club: r["club_name"] = None viewer = { "show_uploader_meta": show_uploader, "show_club_meta": show_club, "is_superadmin": sup, "is_platform_admin": is_adm, } filter_meta = None if include_filter_meta and show_uploader: filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)} return { "items": rows, "limit": limit, "offset": offset, "lifecycle": lifecycle.strip().lower(), "media_kind": mk, "viewer": viewer, "filter_meta": filter_meta, } @router.api_route("/{asset_id}/file", methods=["GET", "HEAD"]) def download_media_asset_file( request: Request, asset_id: int, tenant: TenantContext = Depends(get_tenant_context_flexible), ): """Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken).""" from routers.exercises import _binary_media_response with get_db() as conn: cur = get_cursor(conn) asset = _fetch_asset_file_row(cur, asset_id) if not asset: raise HTTPException(status_code=404, detail="Medium nicht gefunden") lc = (asset.get("lifecycle_state") or "").strip().lower() if lc == "active": _assert_can_view_archive_asset(cur, tenant, asset) elif lc in ("trash_soft", "trash_hidden"): assert_can_manage_media_asset_lifecycle(cur, tenant, asset) else: raise HTTPException(status_code=404, detail="Medium nicht verfügbar") sk = asset.get("storage_key") if not sk: raise HTTPException(status_code=404, detail="Keine Datei hinterlegt") media_root = get_effective_media_root(cur) abs_p = path_under_media_root(media_root, str(sk)) if not abs_p or not abs_p.is_file(): raise HTTPException(status_code=404, detail="Datei nicht gefunden") mime = asset.get("mime_type") or "application/octet-stream" fname = asset.get("original_filename") or abs_p.name return _binary_media_response(abs_p, mime, str(fname) if fname else None, request) @router.post("/{asset_id}/lifecycle") def post_media_asset_lifecycle( asset_id: int, body: MediaLifecycleBody, tenant: TenantContext = Depends(get_tenant_context), ): with get_db() as conn: cur = get_cursor(conn) return _apply_lifecycle_action(cur, conn, asset_id, body, tenant) @router.post("/bulk-lifecycle") def bulk_media_lifecycle( body: MediaBulkLifecycleBody, tenant: TenantContext = Depends(get_tenant_context), ): inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle) updated: list[int] = [] failed: list[dict] = [] with get_db() as conn: cur = get_cursor(conn) for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)): try: _apply_lifecycle_action(cur, conn, aid, inner, tenant) updated.append(aid) except HTTPException as he: msg = he.detail if isinstance(he.detail, str) else str(he.detail) failed.append({"id": aid, "detail": msg}) return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)} @router.post("/bulk-patch") def bulk_media_patch( body: MediaBulkPatchBody, tenant: TenantContext = Depends(get_tenant_context), ): raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True) patch_fields = { k: v for k, v in raw.items() if k != "media_asset_ids" and (k == "tags" or v is not None) } if not patch_fields: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") updated: list[int] = [] failed: list[dict] = [] profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)): try: cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, copyright_notice, original_filename, sha256, storage_key, storage_backend, mime_type FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() if not row: failed.append({"id": asset_id, "detail": "Medium nicht gefunden"}) continue asset = r2d(row) assert_can_edit_media_asset_metadata(cur, tenant, asset) if "tags" in patch_fields and not _media_assets_tags_column_present(cur): failed.append( { "id": asset_id, "detail": "Schlagwörter (tags) erfordern DB-Migration 046.", } ) continue eff = _effective_media_patch_fields(patch_fields, asset) next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower() next_cid = eff.get("club_id", asset.get("club_id")) if "visibility" in patch_fields or "club_id" in patch_fields: assert_valid_governance_visibility( cur, profile_id, role, next_vis, int(next_cid) if next_cid is not None else None, ) if next_vis in ("private", "club") and next_cid is None: failed.append( { "id": asset_id, "detail": ( "Für private oder Vereins-Medien wird club_id benötigt (Vereinsordner)." ), } ) continue if next_vis in ("club", "official"): effective_copyright = ( patch_fields.get("copyright_notice") or asset.get("copyright_notice") or "" ) if not str(effective_copyright).strip(): failed.append( { "id": asset_id, "detail": ( "Fur Vereins- oder offizielle Medien ist eine " "Urheberrechtsangabe (copyright_notice) Pflicht." ), } ) continue new_sk: Optional[str] = None if "visibility" in patch_fields or "club_id" in patch_fields: next_club_param: Optional[int] = None if next_vis in ("club", "private"): next_club_param = int(next_cid) if next_cid is not None else None media_root = get_effective_media_root(cur) new_sk = _relocate_asset_file_if_governance_changed( cur, media_root, asset_id, asset, next_vis, next_club_param ) sets: list[str] = [] vals: list[Any] = [] if "copyright_notice" in patch_fields: sets.append("copyright_notice = %s") vals.append(patch_fields["copyright_notice"]) if "original_filename" in patch_fields: sets.append("original_filename = %s") vals.append(patch_fields["original_filename"]) if "tags" in patch_fields: sets.append("tags = %s") vals.append(_normalize_media_tags(patch_fields["tags"])) if "visibility" in patch_fields or "club_id" in patch_fields: sets.append("visibility = %s") vals.append(str(eff.get("visibility", asset["visibility"])).strip()) sets.append("club_id = %s") vals.append(next_cid) if new_sk: sets.append("storage_key = %s") vals.append(new_sk) if not sets: failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"}) continue sets.append("updated_at = NOW()") vals.append(asset_id) cur.execute( f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s", tuple(vals), ) conn.commit() updated.append(asset_id) except HTTPException as he: msg = he.detail if isinstance(he.detail, str) else str(he.detail) failed.append({"id": asset_id, "detail": msg}) return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)} @router.patch("/{asset_id}") def patch_media_asset( asset_id: int, body: MediaAssetPatch, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True) if not data: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state, copyright_notice, original_filename, sha256, storage_key, storage_backend, mime_type FROM media_assets WHERE id = %s""", (asset_id,), ) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Medium nicht gefunden") asset = r2d(row) assert_can_edit_media_asset_metadata(cur, tenant, asset) if "tags" in data and not _media_assets_tags_column_present(cur): raise HTTPException( status_code=503, detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.", ) eff = _effective_media_patch_fields(data, asset) next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower() next_cid = eff.get("club_id", asset.get("club_id")) if "visibility" in data or "club_id" in data: assert_valid_governance_visibility( cur, profile_id, role, next_vis, int(next_cid) if next_cid is not None else None, ) if next_vis in ("private", "club") and next_cid is None: raise HTTPException( status_code=400, detail=( "Für private oder Vereins-Medien wird club_id benötigt (Vereinsordner). " "Bitte im PATCH setzen, z. B. bei Wechsel von „offiziell“ zu „privat“." ), ) if next_vis in ("club", "official"): effective_copyright = ( data.get("copyright_notice") or asset.get("copyright_notice") or "" ) if not str(effective_copyright).strip(): raise HTTPException( status_code=400, detail=( "Fur Vereins- oder offizielle Medien ist eine Urheberrechtsangabe " "(copyright_notice) Pflicht. Bitte vor oder zusammen mit der " "Freigabe angeben." ), ) new_sk: Optional[str] = None if "visibility" in data or "club_id" in data: next_club_param: Optional[int] = None if next_vis in ("club", "private"): next_club_param = int(next_cid) if next_cid is not None else None media_root = get_effective_media_root(cur) new_sk = _relocate_asset_file_if_governance_changed( cur, media_root, asset_id, asset, next_vis, next_club_param ) sets: list[str] = [] vals: list[Any] = [] if "copyright_notice" in data: sets.append("copyright_notice = %s") vals.append(data["copyright_notice"]) if "original_filename" in data: sets.append("original_filename = %s") vals.append(data["original_filename"]) if "tags" in data: sets.append("tags = %s") vals.append(_normalize_media_tags(data["tags"])) if "visibility" in data or "club_id" in data: sets.append("visibility = %s") vals.append(str(eff.get("visibility", asset["visibility"])).strip()) sets.append("club_id = %s") vals.append(next_cid) if new_sk: sets.append("storage_key = %s") vals.append(new_sk) if sets: sets.append("updated_at = NOW()") vals.append(asset_id) cur.execute( f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s", tuple(vals), ) conn.commit() has_tags = _media_assets_tags_column_present(cur) if has_tags: cur.execute( """SELECT id, mime_type, byte_size, original_filename, visibility, club_id, uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags FROM media_assets WHERE id = %s""", (asset_id,), ) else: cur.execute( """SELECT id, mime_type, byte_size, original_filename, visibility, club_id, uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice FROM media_assets WHERE id = %s""", (asset_id,), ) out = r2d(cur.fetchone()) if not has_tags: out["tags"] = [] return out