shinkan-jinkendo/backend/routers/media_assets.py
Lars f544975a6c
Some checks failed
Deploy Development / deploy (push) Successful in 40s
Test Suite / pytest-backend (push) Successful in 34s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 10s
Test Suite / playwright-tests (push) Failing after 1m5s
feat(media-journal): add Superadmin media journal endpoint and UI integration
2026-05-11 09:24:39 +02:00

1747 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Medien-Archiv (Liste, Datei) und Lifecycle — MEDIA_ASSETS_AND_ARCHIVE_SPEC."""
from __future__ import annotations
from typing import Any, Literal, Optional, Union
import hashlib
from pathlib import Path
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile
from pydantic import BaseModel, Field, model_validator
from club_tenancy import (
assert_club_member,
assert_valid_governance_visibility,
club_ids_for_profile_with_roles,
is_platform_admin,
is_superadmin,
library_content_visible_to_profile,
)
from db import get_db, get_cursor, r2d
from media_lifecycle import (
LC_ACTIVE,
LC_TRASH_HIDDEN,
LC_TRASH_SOFT,
assert_can_edit_media_asset_metadata,
assert_can_manage_media_asset_lifecycle,
assert_can_trash_soft,
fetch_media_asset_row,
purge_media_asset,
reactivate_media_asset_from_trash,
superadmin_force_lifecycle_state,
superadmin_hard_delete_media_asset,
transition_recover_from_hidden,
transition_to_trash_hidden,
transition_to_trash_soft,
)
from media_storage import get_effective_media_root, library_storage_key, path_under_media_root, relocate_local_media_file
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible
from media_rights import (
assert_rights_for_promotion,
update_rights_quick_fields,
validate_rights_declaration,
write_rights_declaration,
check_rights_coverage,
VISIBILITY_LEVELS,
)
from routers.exercises import _upload_limit_bytes, resolve_upload_mime_type
router = APIRouter(prefix="/api/media-assets", tags=["media-assets"])
class MediaLifecycleBody(BaseModel):
action: Literal[
"trash_soft",
"trash_hidden",
"recover",
"purge",
"reactivate",
"superadmin_force_lifecycle",
"superadmin_hard_delete",
]
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
@model_validator(mode="after")
def _target_lifecycle_rules(self):
if self.action == "superadmin_force_lifecycle":
if not self.target_lifecycle:
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
elif self.target_lifecycle is not None:
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
return self
class MediaAssetPatch(BaseModel):
copyright_notice: Optional[str] = Field(None, max_length=8000)
original_filename: Optional[str] = Field(None, max_length=300)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
tags: Optional[list[str]] = None
# P-06: optionale Rechte-Erklaerung (wird Pflicht wenn Promotion zu hoeherer Sichtbarkeit)
rights_holder_confirmed: Optional[bool] = None
contains_identifiable_persons: Optional[bool] = None
person_consent_confirmed: Optional[bool] = None
person_consent_context: Optional[str] = Field(None, max_length=2000)
contains_minors: Optional[bool] = None
parental_consent_confirmed: Optional[bool] = None
parental_consent_context: Optional[str] = Field(None, max_length=2000)
contains_music: Optional[bool] = None
music_rights_confirmed: Optional[bool] = None
music_rights_context: Optional[str] = Field(None, max_length=2000)
contains_third_party_content: Optional[bool] = None
third_party_rights_confirmed: Optional[bool] = None
third_party_rights_context: Optional[str] = Field(None, max_length=2000)
class RightsDeclarationBody(BaseModel):
"""P-06: Explizite Re-Deklaration / Nachdeklaration fuer ein bestehendes Medium."""
target_visibility: str = Field(..., pattern="^(private|club|official)$")
rights_holder_confirmed: bool
contains_identifiable_persons: bool
person_consent_confirmed: Optional[bool] = None
person_consent_context: Optional[str] = Field(None, max_length=2000)
contains_minors: bool
parental_consent_confirmed: Optional[bool] = None
parental_consent_context: Optional[str] = Field(None, max_length=2000)
contains_music: bool
music_rights_confirmed: Optional[bool] = None
music_rights_context: Optional[str] = Field(None, max_length=2000)
contains_third_party_content: bool
third_party_rights_confirmed: Optional[bool] = None
third_party_rights_context: Optional[str] = Field(None, max_length=2000)
class MediaBulkLifecycleBody(BaseModel):
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
action: Literal[
"trash_soft",
"trash_hidden",
"recover",
"purge",
"reactivate",
"superadmin_force_lifecycle",
"superadmin_hard_delete",
]
target_lifecycle: Optional[Literal["active", "trash_soft", "trash_hidden"]] = None
@model_validator(mode="after")
def _bulk_target(self):
if self.action == "superadmin_force_lifecycle" and not self.target_lifecycle:
raise ValueError("target_lifecycle ist für diese Aktion erforderlich")
if self.action != "superadmin_force_lifecycle" and self.target_lifecycle is not None:
raise ValueError("target_lifecycle nur bei superadmin_force_lifecycle")
return self
class MediaBulkPatchBody(BaseModel):
media_asset_ids: list[int] = Field(..., min_length=1, max_length=200)
copyright_notice: Optional[str] = Field(None, max_length=8000)
original_filename: Optional[str] = Field(None, max_length=300)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
# P-06 Kontext (bei Promotion)
person_consent_context: Optional[str] = Field(None, max_length=2000)
parental_consent_context: Optional[str] = Field(None, max_length=2000)
music_rights_context: Optional[str] = Field(None, max_length=2000)
third_party_rights_context: Optional[str] = Field(None, max_length=2000)
tags: Optional[list[str]] = None
# P-06: Rechterklaerung (gilt fuer alle Assets des Batches bei Promotion)
rights_holder_confirmed: Optional[bool] = None
contains_identifiable_persons: Optional[bool] = None
person_consent_confirmed: Optional[bool] = None
contains_minors: Optional[bool] = None
parental_consent_confirmed: Optional[bool] = None
contains_music: Optional[bool] = None
music_rights_confirmed: Optional[bool] = None
contains_third_party_content: Optional[bool] = None
third_party_rights_confirmed: Optional[bool] = None
_LIFECYCLE_LIST_FILTERS = frozenset({"active", "trash_soft", "trash_hidden", "all"})
_MEDIA_KIND_FILTERS = frozenset({"all", "image", "video", "pdf", "other"})
_MAX_TAGS = 40
_MAX_TAG_LEN = 48
_MEDIA_ASSETS_TAGS_COLUMN: Optional[bool] = None
_TRAINING_UNIT_EXERCISES_TABLE: Optional[bool] = None
def _media_assets_tags_column_present(cur: Any) -> bool:
"""True nach Migration 046; verhindert 500 wenn Code neuer ist als das Schema."""
global _MEDIA_ASSETS_TAGS_COLUMN
if _MEDIA_ASSETS_TAGS_COLUMN is not None:
return _MEDIA_ASSETS_TAGS_COLUMN
cur.execute(
"""
SELECT 1
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'media_assets'
AND column_name = 'tags'
LIMIT 1
"""
)
_MEDIA_ASSETS_TAGS_COLUMN = cur.fetchone() is not None
return _MEDIA_ASSETS_TAGS_COLUMN
def _training_unit_exercises_table_present(cur: Any) -> bool:
"""True wenn planning-Migration (training_unit_exercises) im Schema liegt — sonst keine Einheiten-Links."""
global _TRAINING_UNIT_EXERCISES_TABLE
if _TRAINING_UNIT_EXERCISES_TABLE is not None:
return _TRAINING_UNIT_EXERCISES_TABLE
cur.execute(
"""
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'training_unit_exercises'
LIMIT 1
"""
)
_TRAINING_UNIT_EXERCISES_TABLE = cur.fetchone() is not None
return _TRAINING_UNIT_EXERCISES_TABLE
def _normalize_media_tags(raw: Union[list[str], None]) -> list[str]:
"""PostgreSQL text[]; Einträge gekürzt und ohne Dubletten (case-insensitive)."""
if raw is None:
return []
if not isinstance(raw, list):
raise HTTPException(status_code=400, detail="tags muss eine Liste von Strings sein")
out: list[str] = []
seen: set[str] = set()
for item in raw:
s = str(item).strip()
if not s:
continue
s = s[:_MAX_TAG_LEN]
key = s.lower()
if key in seen:
continue
seen.add(key)
out.append(s)
if len(out) >= _MAX_TAGS:
break
return out
def _usage_for_media_assets(cur: Any, asset_ids: list[int]) -> dict[int, dict[str, Any]]:
"""Übungen und Trainingseinheiten, die dieses Archiv-Medium nutzen."""
if not asset_ids:
return {}
cur.execute(
"""SELECT em.media_asset_id, e.id AS exercise_id, e.title AS exercise_title
FROM exercise_media em
JOIN exercises e ON e.id = em.exercise_id
WHERE em.media_asset_id = ANY(%s)
ORDER BY em.media_asset_id, e.title NULLS LAST, e.id""",
(asset_ids,),
)
ex_by_asset: dict[int, list[dict]] = {int(aid): [] for aid in asset_ids}
seen_ex: dict[int, set[int]] = {int(aid): set() for aid in asset_ids}
exercise_ids: set[int] = set()
for r in cur.fetchall():
d = r2d(r)
aid = int(d["media_asset_id"])
eid = int(d["exercise_id"])
if aid not in seen_ex or eid in seen_ex[aid]:
continue
seen_ex[aid].add(eid)
exercise_ids.add(eid)
title = (d.get("exercise_title") or "").strip() or f"Übung #{eid}"
ex_by_asset[aid].append({"id": eid, "title": title})
exercise_to_units: dict[int, list[dict]] = {}
if exercise_ids and _training_unit_exercises_table_present(cur):
cur.execute(
"""SELECT tue.exercise_id, tu.id AS unit_id, tu.planned_date,
COALESCE(tg.name, '') AS group_name
FROM training_unit_exercises tue
JOIN training_units tu ON tu.id = tue.training_unit_id
LEFT JOIN training_groups tg ON tg.id = tu.group_id
WHERE tue.exercise_id = ANY(%s)""",
(list(exercise_ids),),
)
for row in cur.fetchall():
d = r2d(row)
eid = int(d["exercise_id"])
uid = int(d["unit_id"])
pd = d.get("planned_date")
planned: Optional[str]
if pd is None:
planned = None
elif hasattr(pd, "isoformat"):
planned = str(pd.isoformat())
else:
planned = str(pd)
exercise_to_units.setdefault(eid, []).append(
{
"id": uid,
"planned_date": planned,
"group_name": d.get("group_name") or "",
}
)
for eid, units in exercise_to_units.items():
seen_u: set[int] = set()
uniq: list[dict] = []
for u in units:
if u["id"] not in seen_u:
seen_u.add(u["id"])
uniq.append(u)
exercise_to_units[eid] = uniq
out: dict[int, dict[str, Any]] = {}
for aid in asset_ids:
aid_i = int(aid)
exs = ex_by_asset.get(aid_i, [])
seen_u2: set[int] = set()
units_agg: list[dict] = []
for ex in exs:
for u in exercise_to_units.get(ex["id"], []):
if u["id"] not in seen_u2:
seen_u2.add(u["id"])
units_agg.append(u)
out[aid_i] = {"exercises": exs, "training_units": units_agg}
return out
def _fetch_filter_uploaders(cur: Any, is_adm: bool, profile_id: int) -> list[dict]:
cur.execute(
"""SELECT DISTINCT ma.uploaded_by_profile_id AS id, pr.name, pr.email
FROM media_assets ma
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
WHERE ma.uploaded_by_profile_id IS NOT NULL
AND ma.lifecycle_state IN ('active', 'trash_soft', 'trash_hidden')
AND (
%s
OR lower(trim(ma.visibility)) = 'official'
OR (
lower(trim(ma.visibility)) = 'private'
AND ma.uploaded_by_profile_id = %s
)
OR (
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)
)
ORDER BY pr.name NULLS LAST, ma.uploaded_by_profile_id
LIMIT 400""",
(is_adm, profile_id, profile_id),
)
rows = [r2d(x) for x in cur.fetchall()]
out: list[dict] = []
for r in rows:
pid = r.get("id")
if pid is None:
continue
label = (r.get("name") or "").strip()
if not label:
label = (r.get("email") or "").strip()
if not label:
label = f"Profil #{pid}"
out.append({"id": int(pid), "label": label})
return out
def _effective_media_patch_fields(patch_fields: dict, asset: dict) -> dict:
"""
Nach visibility-Wechsel club_id konsistent setzen.
official → club_id NULL. private/club: club_id aus Patch oder bisheriger Zeile beibehalten
(private nutzt club_id für Vereinsordner auf der Platte).
"""
eff = dict(patch_fields)
if eff.get("visibility") is not None:
v = str(eff["visibility"]).strip().lower()
if v == "official":
eff["club_id"] = None
elif v == "club" and "club_id" not in eff:
eff["club_id"] = asset.get("club_id")
elif v == "private" and "club_id" not in eff:
eff["club_id"] = asset.get("club_id")
return eff
def _club_display_name_for_storage(cur: Any, club_id: int) -> str:
"""Anzeigename für library_club_path_segment; leer wenn Verein fehlt."""
cur.execute("SELECT name FROM clubs WHERE id = %s", (int(club_id),))
row = cur.fetchone()
if not row:
return ""
return str(r2d(row).get("name") or "")
def _relocate_asset_file_if_governance_changed(
cur: Any,
media_root: Path,
asset_id: int,
asset: dict,
next_vis: str,
next_club_id: Optional[int],
) -> Optional[str]:
"""
Passt bei local-Assets den Dateipfad an, wenn sich Sichtbarkeit/Verein ändert.
Aktualisiert exercise_media.file_path. Gibt neuen storage_key oder None zurück.
"""
if (asset.get("storage_backend") or "local") != "local":
return None
old_key = (asset.get("storage_key") or "").strip()
sha = (asset.get("sha256") or "").strip().lower()
if not old_key or len(sha) != 64:
return None
ext = Path(old_key.replace("\\", "/")).suffix or ".bin"
try:
up = asset.get("uploaded_by_profile_id")
up_i = int(up) if up is not None else None
club_nm: Optional[str] = None
if next_vis in ("club", "private") and next_club_id is not None:
club_nm = _club_display_name_for_storage(cur, next_club_id)
new_key = library_storage_key(
next_vis,
next_club_id if next_vis != "official" else None,
sha,
ext,
uploader_profile_id=up_i if next_vis == "private" else None,
mime_type=str(asset.get("mime_type") or "") or None,
club_name=club_nm,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
old_norm = old_key.replace("\\", "/").lstrip("/")
if new_key == old_norm:
return None
try:
relocate_local_media_file(media_root, old_key, new_key)
except FileNotFoundError as e:
raise HTTPException(status_code=500, detail=f"Mediendatei fehlt auf der Platte: {e}") from e
except FileExistsError as e:
raise HTTPException(status_code=409, detail=str(e)) from e
except (ValueError, OSError) as e:
raise HTTPException(status_code=500, detail=str(e)) from e
db_path = f"/media/{new_key}"
cur.execute(
"UPDATE exercise_media SET file_path = %s WHERE media_asset_id = %s",
(db_path, asset_id),
)
return new_key
def _list_active_visibility_clause(is_plat: bool, profile_id: int) -> tuple[str, list[Any]]:
"""Sichtbare aktive Einträge: official; private (eigen oder Plattform-Admin); Verein wie Bibliotheks-SQL."""
parts = ["lower(trim(ma.visibility)) = 'official'"]
vals: list[Any] = []
if is_plat:
parts.append("lower(trim(ma.visibility)) = 'private'")
else:
parts.append("(lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s)")
vals.append(profile_id)
club_plat = (
"(lower(trim(ma.visibility)) = 'club' AND ma.club_id IS NOT NULL AND ("
"EXISTS (SELECT 1 FROM club_members cm WHERE cm.profile_id = %s AND cm.club_id = ma.club_id "
"AND cm.status = 'active') OR NOT EXISTS (SELECT 1 FROM club_members cm2 "
"WHERE cm2.profile_id = %s AND cm2.club_id = ma.club_id)))"
)
if is_plat:
parts.append(club_plat)
vals.extend([profile_id, profile_id])
else:
parts.append(
"""(
lower(trim(ma.visibility)) = 'club'
AND EXISTS (
SELECT 1 FROM club_members cm
WHERE cm.profile_id = %s
AND cm.club_id = ma.club_id
AND cm.status = 'active'
)
)"""
)
vals.append(profile_id)
sql = "(" + " OR ".join(parts) + ")"
return sql, vals
def _list_trash_visibility_clause(
is_plat: bool,
is_sup: bool,
profile_id: int,
admin_club_ids: set[int],
) -> tuple[str, list[Any]]:
"""
Papierkorb nur für eigene private Medien; Vereins-Admins zusätzlich Vereins-Papierkorb ihres Vereins.
Official/Plattform: Superadmin oder Plattform-Admin sieht alles im Papierkorb.
"""
if is_plat or is_sup:
return "(TRUE)", []
parts: list[str] = []
vals: list[Any] = []
parts.append(
"(lower(trim(ma.visibility)) = 'private' AND ma.uploaded_by_profile_id = %s)",
)
vals.append(profile_id)
if admin_club_ids:
parts.append(
"(lower(trim(ma.visibility)) = 'club' AND ma.club_id = ANY(%s))",
)
vals.append(list(admin_club_ids))
return "(" + " OR ".join(parts) + ")", vals
def _list_main_visibility_where(
lifecycle: str,
is_plat: bool,
is_sup: bool,
profile_id: int,
admin_club_ids: set[int],
) -> tuple[str, list[Any]]:
"""
Kombiniert lifecycle mit Leserechten. Papierkorb-Stufen für normale Nutzer stark eingeschränkt.
"""
lc = (lifecycle or "active").strip().lower()
if lc not in _LIFECYCLE_LIST_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger lifecycle-Filter")
active_sql, active_params = _list_active_visibility_clause(is_plat, profile_id)
trash_sql, trash_params = _list_trash_visibility_clause(
is_plat, is_sup, profile_id, admin_club_ids
)
active_block = f"(ma.lifecycle_state = 'active' AND {active_sql})"
trash_block = (
f"(ma.lifecycle_state IN ('trash_soft', 'trash_hidden') AND {trash_sql})"
)
if lc == "active":
return active_block, active_params
if lc == "trash_soft":
return (
f"(ma.lifecycle_state = 'trash_soft' AND {trash_sql})",
trash_params,
)
if lc == "trash_hidden":
return (
f"(ma.lifecycle_state = 'trash_hidden' AND {trash_sql})",
trash_params,
)
# all
combined = f"(({active_block}) OR ({trash_block}))"
return combined, active_params + trash_params
def _fetch_asset_file_row(cur: Any, asset_id: int) -> Optional[dict]:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
storage_key, mime_type, original_filename
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
return r2d(row) if row else None
def _assert_can_view_archive_asset(cur: Any, tenant: TenantContext, asset: dict) -> None:
if not library_content_visible_to_profile(
cur,
tenant.profile_id,
(asset.get("visibility") or "").strip().lower(),
asset.get("club_id"),
asset.get("uploaded_by_profile_id"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Medium")
def _item_permissions(row: dict, tenant: TenantContext, admin_club_ids: set[int]) -> dict:
"""Berechnete UI-/Policy-Flags pro Zeile (ohne zusätzliche DB)."""
role_raw = tenant.global_role
role = (role_raw or "").strip().lower()
pid = int(tenant.profile_id)
sup = is_superadmin(role_raw)
plat = is_platform_admin(role_raw)
vis = (row.get("visibility") or "private").strip().lower()
uid = row.get("uploaded_by_profile_id")
cid = row.get("club_id")
lc = (row.get("lifecycle_state") or "active").strip().lower()
is_owner = uid is not None and int(uid) == pid
club_mgr = plat or (cid is not None and int(cid) in admin_club_ids)
if vis == "official":
edit_metadata = sup
trash_soft = lc == "active" and sup
can_manage_adv = sup
else:
edit_metadata = (
sup
or (vis == "club" and club_mgr)
or (vis == "private" and is_owner)
)
trash_soft = lc == "active" and (
sup or plat or (vis == "private" and is_owner) or (vis == "club" and club_mgr)
)
can_manage_adv = (
sup
or plat
or (vis == "private" and is_owner)
or (vis == "club" and club_mgr)
)
trash_hidden = lc in ("active", "trash_soft") and can_manage_adv
recover_from_hidden = lc == "trash_hidden" and can_manage_adv
reactivate = lc in ("trash_soft", "trash_hidden") and can_manage_adv
purge = lc == "trash_hidden" and sup
return {
"edit_metadata": edit_metadata,
"change_visibility": edit_metadata,
"trash_soft": trash_soft,
"trash_hidden": trash_hidden,
"recover": recover_from_hidden,
"reactivate": reactivate,
"purge": purge,
"superadmin_lifecycle": sup,
"superadmin_hard_delete": sup,
}
def _apply_lifecycle_action(
cur: Any,
conn: Any,
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext,
) -> dict:
asset = fetch_media_asset_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
action = body.action
role_raw = tenant.global_role
if action == "superadmin_hard_delete":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Nur Superadmin")
ok = superadmin_hard_delete_media_asset(cur, conn, asset_id)
if not ok:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
return {"ok": True, "hard_deleted": asset_id}
if action == "superadmin_force_lifecycle":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Nur Superadmin")
tl = body.target_lifecycle or "active"
mp = {"active": LC_ACTIVE, "trash_soft": LC_TRASH_SOFT, "trash_hidden": LC_TRASH_HIDDEN}
return superadmin_force_lifecycle_state(cur, conn, asset_id, mp[tl])
if action == "purge":
if not is_superadmin(role_raw):
raise HTTPException(status_code=403, detail="Endgültiges Löschen nur als Superadmin")
state = (asset.get("lifecycle_state") or "").strip().lower()
if state != LC_TRASH_HIDDEN:
raise HTTPException(
status_code=400,
detail="Nur ausgeblendete Medien (Stufe 2) dürfen mit dieser Aktion entfernt werden",
)
if not purge_media_asset(cur, conn, asset_id):
raise HTTPException(status_code=400, detail="Löschen nicht möglich")
return {"ok": True, "purged": asset_id}
if action == "trash_soft":
assert_can_trash_soft(cur, tenant, asset)
return transition_to_trash_soft(cur, conn, asset_id)
if action == "trash_hidden":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return transition_to_trash_hidden(cur, conn, asset_id)
if action == "recover":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return transition_recover_from_hidden(cur, conn, asset_id)
if action == "reactivate":
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
return reactivate_media_asset_from_trash(cur, conn, asset_id)
raise HTTPException(status_code=500, detail="Interner Fehler: lifecycle action")
_MAX_BULK_LIBRARY_FILES = 25
def _ingest_library_media_file(
cur: Any,
tenant: TenantContext,
raw: bytes,
filename: Optional[str],
content_type: Optional[str],
visibility: str,
club_id_form: Optional[int],
decl: Optional[dict] = None,
copyright_notice: Optional[str] = None,
) -> dict:
"""Neues Archiv-Medium oder aktiver Dedupe-Treffer (sha256 + Sichtbarkeit + Verein). Kein exercise_media.
decl: P-06 Rechterklaerung (bereits validiert). Bei Dedupe-Treffer wird eine neue Erklaerung
fuer die aktuelle Aktion geschrieben falls decl uebergeben wird.
"""
profile_id = tenant.profile_id
role = tenant.global_role or ""
vis = (visibility or "private").strip().lower()
if vis not in ("private", "club", "official"):
raise HTTPException(status_code=400, detail="Ungültige Sichtbarkeit")
next_cid: Optional[int] = None
if vis == "club":
if club_id_form is None:
raise HTTPException(status_code=400, detail="Verein erforderlich für Sichtbarkeit „Verein“")
next_cid = int(club_id_form)
elif vis == "private":
if club_id_form is not None:
next_cid = int(club_id_form)
elif is_platform_admin(role):
raise HTTPException(
status_code=400,
detail=(
"Private Archiv-Uploads als Plattform-Admin: bitte den Zielverein wählen und "
"club_id im Formular setzen (nicht vom allgemeinen Kontext ableiten)."
),
)
else:
cid = tenant.effective_club_id
next_cid = int(cid) if cid is not None else None
if next_cid is None:
raise HTTPException(
status_code=400,
detail=(
"Private Medien werden pro Verein abgelegt. Bitte aktiven Verein setzen "
"(Header X-Active-Club-Id) oder club_id im Formular übergeben — auch für Plattform-Admins."
),
)
if not is_platform_admin(role):
assert_club_member(cur, profile_id, next_cid)
assert_valid_governance_visibility(cur, profile_id, role, vis, next_cid if vis == "club" else None)
max_b = _upload_limit_bytes(tenant)
if len(raw) > max_b:
raise HTTPException(
status_code=413,
detail=f"Datei zu groß (max. {max_b // (1024 * 1024)} MB)",
)
try:
mime = resolve_upload_mime_type(raw, content_type, filename)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
full_sha = hashlib.sha256(raw).hexdigest()
if vis == "private":
cur.execute(
"""SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets
WHERE sha256 = %s AND lower(trim(visibility)) = %s
AND (club_id IS NOT DISTINCT FROM %s)
AND (uploaded_by_profile_id IS NOT DISTINCT FROM %s)
LIMIT 1""",
(full_sha, vis, next_cid, profile_id),
)
else:
cur.execute(
"""SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets
WHERE sha256 = %s AND lower(trim(visibility)) = %s
AND (club_id IS NOT DISTINCT FROM %s)
LIMIT 1""",
(full_sha, vis, next_cid),
)
existing_asset = cur.fetchone()
if existing_asset:
ea = r2d(existing_asset)
lc = (ea.get("lifecycle_state") or "").strip().lower()
if lc == "active":
return {
"status": "duplicate",
"media_asset_id": int(ea["id"]),
"original_filename": ea.get("original_filename"),
}
if lc in ("trash_soft", "trash_hidden"):
raise HTTPException(
status_code=409,
detail={
"code": "MEDIA_ASSET_IN_TRASH",
"message": (
"Diese Datei ist inhaltsgleich (SHA-256) mit einem Archiv-Medium im Papierkorb."
),
"media_asset_id": ea["id"],
"lifecycle_state": lc,
},
)
raise HTTPException(
status_code=409,
detail="Es existiert bereits ein Archiv-Eintrag zu dieser Datei in einem nicht nutzbaren Zustand.",
)
ext = Path(filename or "").suffix[:12] if filename else ""
if not ext and mime == "image/jpeg":
ext = ".jpg"
elif not ext and mime == "image/png":
ext = ".png"
elif not ext and mime in ("image/heic", "image/heif"):
ext = ".heic"
elif not ext and mime == "video/mp4":
ext = ".mp4"
elif not ext and mime == "video/quicktime":
ext = ".mov"
club_nm = ""
if next_cid is not None:
club_nm = _club_display_name_for_storage(cur, next_cid)
media_root = get_effective_media_root(cur)
try:
storage_key = library_storage_key(
vis,
next_cid,
full_sha,
ext,
uploader_profile_id=profile_id if vis == "private" else None,
mime_type=mime,
club_name=club_nm,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
dest_path = path_under_media_root(media_root, storage_key)
if dest_path is None:
raise HTTPException(status_code=500, detail="Ungültiger Speicherpfad")
dest_path.parent.mkdir(parents=True, exist_ok=True)
if not dest_path.is_file():
dest_path.write_bytes(raw)
clean_cr = (copyright_notice or "").strip() or None
cur.execute(
"""INSERT INTO media_assets (
mime_type, byte_size, sha256, original_filename, visibility, club_id,
uploaded_by_profile_id, copyright_notice, storage_backend, storage_key, lifecycle_state
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, 'local', %s, 'active')
RETURNING id""",
(
mime,
len(raw),
full_sha,
filename or storage_key,
vis,
next_cid,
profile_id,
clean_cr,
storage_key,
),
)
ar = cur.fetchone()
aid = int(r2d(ar)["id"])
# P-06: Erklaerung schreiben und Schnellfelder setzen
if decl is not None:
write_rights_declaration(cur, aid, tenant.profile_id, "upload", vis, decl)
update_rights_quick_fields(cur, aid, vis)
return {"status": "created", "media_asset_id": aid, "original_filename": filename or storage_key}
@router.post("/bulk-upload")
async def bulk_upload_media_assets(
tenant: TenantContext = Depends(get_tenant_context),
files: list[UploadFile] = File(..., description="Mehrere Dateien (jpeg, png, gif, mp4, pdf)"),
visibility: str = Form("private"),
club_id: Optional[int] = Form(None),
copyright_notice: Optional[str] = Form(None),
# P-06 Rechterklaerung (gilt fuer alle Dateien des Batches)
rights_holder_confirmed: bool = Form(...),
contains_identifiable_persons: bool = Form(...),
person_consent_confirmed: Optional[bool] = Form(None),
person_consent_context: Optional[str] = Form(None),
contains_minors: bool = Form(...),
parental_consent_confirmed: Optional[bool] = Form(None),
parental_consent_context: Optional[str] = Form(None),
contains_music: bool = Form(...),
music_rights_confirmed: Optional[bool] = Form(None),
music_rights_context: Optional[str] = Form(None),
contains_third_party_content: bool = Form(...),
third_party_rights_confirmed: Optional[bool] = Form(None),
third_party_rights_context: Optional[str] = Form(None),
):
"""Mehrere Dateien ins Archiv; Dedupe wie Übungs-Upload. Pro Datei eigene DB-Transaktion.
P-06: Eine Rechterklaerung gilt fuer alle Dateien des Batches.
VORLAEUTIG: Texte und Pflichtfelder noch nicht juristisch geprueft (p06-v1-conservative).
"""
if not files:
raise HTTPException(status_code=400, detail="Keine Dateien übermittelt")
if len(files) > _MAX_BULK_LIBRARY_FILES:
raise HTTPException(
status_code=400,
detail=f"Maximal {_MAX_BULK_LIBRARY_FILES} Dateien pro Anfrage",
)
decl = {
"rights_holder_confirmed": rights_holder_confirmed,
"contains_identifiable_persons": contains_identifiable_persons,
"person_consent_confirmed": person_consent_confirmed,
"person_consent_context": person_consent_context,
"contains_minors": contains_minors,
"parental_consent_confirmed": parental_consent_confirmed,
"parental_consent_context": parental_consent_context,
"contains_music": contains_music,
"music_rights_confirmed": music_rights_confirmed,
"music_rights_context": music_rights_context,
"contains_third_party_content": contains_third_party_content,
"third_party_rights_confirmed": third_party_rights_confirmed,
"third_party_rights_context": third_party_rights_context,
}
target_vis = (visibility or "private").strip().lower()
validate_rights_declaration(decl, target_vis)
results: list[dict[str, Any]] = []
created = duplicate = failed = 0
for uf in files:
fn = uf.filename or "ohne_name"
try:
raw = await uf.read()
if not raw:
results.append({"filename": fn, "ok": False, "detail": "Leere Datei"})
failed += 1
continue
with get_db() as conn:
cur = get_cursor(conn)
r = _ingest_library_media_file(
cur,
tenant,
raw,
uf.filename,
uf.content_type,
visibility,
club_id,
decl=decl,
copyright_notice=copyright_notice,
)
conn.commit()
results.append({"filename": fn, "ok": True, **r})
if r["status"] == "created":
created += 1
else:
duplicate += 1
except HTTPException as e:
detail = e.detail
if isinstance(detail, dict):
detail_s = detail.get("message") or detail.get("code") or str(detail)
else:
detail_s = str(detail)
results.append(
{
"filename": fn,
"ok": False,
"status_code": e.status_code,
"detail": detail_s,
},
)
failed += 1
except Exception as e:
results.append({"filename": fn, "ok": False, "detail": str(e)})
failed += 1
return {
"results": results,
"created_count": created,
"duplicate_count": duplicate,
"failed_count": failed,
}
@router.get("")
def list_media_assets(
tenant: TenantContext = Depends(get_tenant_context),
q: Optional[str] = Query(None, max_length=120),
lifecycle: str = Query(
"active",
description="active | trash_soft | trash_hidden | all (nicht purgierte Zustände)",
),
media_kind: str = Query(
"all",
description="all | image | video | pdf | other",
),
club_id: Optional[int] = Query(
None,
ge=1,
description="Nur Superadmin: nach Verein filtern",
),
uploaded_by: Optional[int] = Query(
None,
ge=1,
description="Nach Uploader (Profil-ID), wenn erlaubt",
),
include_filter_meta: bool = Query(
False,
description="Uploader-Liste für Filter (nur wenn Metadaten sichtbar)",
),
limit: int = Query(30, ge=1, le=100),
offset: int = Query(0, ge=0),
):
mk = (media_kind or "all").strip().lower()
if mk not in _MEDIA_KIND_FILTERS:
raise HTTPException(status_code=400, detail="Ungültiger media_kind")
role = tenant.global_role or ""
is_adm = is_platform_admin(role)
sup = is_superadmin(role)
profile_id = tenant.profile_id
needle = (q or "").strip()
if club_id is not None and not sup:
raise HTTPException(status_code=403, detail="Vereinsfilter nur für Superadmin")
media_kind_sql = ""
if mk == "image":
# %% für psycopg2: sonst wird % als Platzhalter-Syntax interpretiert
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'image/%%'"
elif mk == "video":
media_kind_sql = " AND lower(COALESCE(ma.mime_type, '')) LIKE 'video/%%'"
elif mk == "pdf":
media_kind_sql = (
" AND (lower(COALESCE(ma.mime_type, '')) = 'application/pdf'"
" OR lower(COALESCE(ma.mime_type, '')) LIKE '%%pdf%%')"
)
elif mk == "other":
media_kind_sql = (
" AND COALESCE(ma.mime_type, '') <> ''"
" AND lower(ma.mime_type) NOT LIKE 'image/%%'"
" AND lower(ma.mime_type) NOT LIKE 'video/%%'"
" AND lower(ma.mime_type) <> 'application/pdf'"
" AND lower(ma.mime_type) NOT LIKE '%%pdf%%'"
)
club_sql = ""
club_sql_params: list[Any] = []
if club_id is not None:
club_sql = " AND ma.club_id = %s"
club_sql_params.append(club_id)
with get_db() as conn:
cur = get_cursor(conn)
admin_club_ids = club_ids_for_profile_with_roles(cur, profile_id, "club_admin")
vis_main_sql, vis_params = _list_main_visibility_where(
lifecycle, is_adm, sup, profile_id, admin_club_ids
)
show_uploader = sup or is_adm or bool(admin_club_ids)
if uploaded_by is not None and not show_uploader:
raise HTTPException(status_code=403, detail="Uploader-Filter nicht erlaubt")
has_tags_col = _media_assets_tags_column_present(cur)
tags_select = "ma.tags," if has_tags_col else "ARRAY[]::text[] AS tags,"
uploaded_sql = ""
uploaded_params: list[Any] = []
if uploaded_by is not None:
uploaded_sql = " AND ma.uploaded_by_profile_id = %s"
uploaded_params.append(uploaded_by)
search_sql = ""
search_params: list[Any] = []
if needle:
like = f"%{needle}%"
if has_tags_col:
search_params = [like, like, like, like]
search_sql = (
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
" OR COALESCE(ma.copyright_notice, '') ILIKE %s"
" OR EXISTS (SELECT 1 FROM unnest(ma.tags) AS t WHERE t::text ILIKE %s))"
)
else:
search_params = [like, like, like]
search_sql = (
" AND (ma.original_filename ILIKE %s OR ma.storage_key ILIKE %s"
" OR COALESCE(ma.copyright_notice, '') ILIKE %s)"
)
params: list[Any] = (
vis_params
+ club_sql_params
+ uploaded_params
+ search_params
+ [limit, offset]
)
cur.execute(
f"""SELECT ma.id, ma.mime_type, ma.byte_size, ma.original_filename, ma.visibility, ma.club_id,
ma.uploaded_by_profile_id, ma.lifecycle_state, ma.created_at, ma.sha256,
ma.copyright_notice, ma.storage_key, {tags_select}
pr.name AS uploader_name,
pr.email AS uploader_email,
cl.name AS club_name
FROM media_assets ma
LEFT JOIN profiles pr ON pr.id = ma.uploaded_by_profile_id
LEFT JOIN clubs cl ON cl.id = ma.club_id
WHERE {vis_main_sql}
{club_sql}
{uploaded_sql}
{media_kind_sql}
{search_sql}
ORDER BY ma.updated_at DESC NULLS LAST, ma.created_at DESC
LIMIT %s OFFSET %s""",
params,
)
rows = [r2d(r) for r in cur.fetchall()]
show_club = sup or is_adm or bool(admin_club_ids)
asset_ids = [int(r["id"]) for r in rows]
usage_map = _usage_for_media_assets(cur, asset_ids)
for r in rows:
r["permissions"] = _item_permissions(r, tenant, admin_club_ids)
tid = int(r["id"])
r["usage"] = usage_map.get(tid, {"exercises": [], "training_units": []})
tags_val = r.get("tags")
if tags_val is None:
r["tags"] = []
elif not isinstance(tags_val, list):
r["tags"] = list(tags_val) if tags_val else []
if not show_uploader:
r["uploader_name"] = None
r["uploader_email"] = None
if not show_club:
r["club_name"] = None
viewer = {
"show_uploader_meta": show_uploader,
"show_club_meta": show_club,
"is_superadmin": sup,
"is_platform_admin": is_adm,
}
filter_meta = None
if include_filter_meta and show_uploader:
filter_meta = {"uploaders": _fetch_filter_uploaders(cur, is_adm, profile_id)}
return {
"items": rows,
"limit": limit,
"offset": offset,
"lifecycle": lifecycle.strip().lower(),
"media_kind": mk,
"viewer": viewer,
"filter_meta": filter_meta,
}
@router.api_route("/{asset_id}/file", methods=["GET", "HEAD"])
def download_media_asset_file(
request: Request,
asset_id: int,
tenant: TenantContext = Depends(get_tenant_context_flexible),
):
"""Direktzugriff auf Archiv-Datei (Thumbnail/Vorschau); Auth wie Übungs-Medien (?ssetoken)."""
from routers.exercises import _binary_media_response
with get_db() as conn:
cur = get_cursor(conn)
asset = _fetch_asset_file_row(cur, asset_id)
if not asset:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
lc = (asset.get("lifecycle_state") or "").strip().lower()
if lc == "active":
_assert_can_view_archive_asset(cur, tenant, asset)
elif lc in ("trash_soft", "trash_hidden"):
assert_can_manage_media_asset_lifecycle(cur, tenant, asset)
else:
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
sk = asset.get("storage_key")
if not sk:
raise HTTPException(status_code=404, detail="Keine Datei hinterlegt")
media_root = get_effective_media_root(cur)
abs_p = path_under_media_root(media_root, str(sk))
if not abs_p or not abs_p.is_file():
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
mime = asset.get("mime_type") or "application/octet-stream"
fname = asset.get("original_filename") or abs_p.name
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
@router.post("/{asset_id}/lifecycle")
def post_media_asset_lifecycle(
asset_id: int,
body: MediaLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
with get_db() as conn:
cur = get_cursor(conn)
return _apply_lifecycle_action(cur, conn, asset_id, body, tenant)
@router.post("/bulk-lifecycle")
def bulk_media_lifecycle(
body: MediaBulkLifecycleBody,
tenant: TenantContext = Depends(get_tenant_context),
):
inner = MediaLifecycleBody(action=body.action, target_lifecycle=body.target_lifecycle)
updated: list[int] = []
failed: list[dict] = []
with get_db() as conn:
cur = get_cursor(conn)
for aid in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
try:
_apply_lifecycle_action(cur, conn, aid, inner, tenant)
updated.append(aid)
except HTTPException as he:
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
failed.append({"id": aid, "detail": msg})
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
@router.post("/bulk-patch")
def bulk_media_patch(
body: MediaBulkPatchBody,
tenant: TenantContext = Depends(get_tenant_context),
):
raw = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
patch_fields = {
k: v
for k, v in raw.items()
if k != "media_asset_ids" and (k == "tags" or v is not None)
}
if not patch_fields:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
updated: list[int] = []
failed: list[dict] = []
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
for asset_id in sorted(set(int(x) for x in body.media_asset_ids if x and int(x) > 0)):
try:
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename, sha256, storage_key, storage_backend, mime_type
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
failed.append({"id": asset_id, "detail": "Medium nicht gefunden"})
continue
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
if "tags" in patch_fields and not _media_assets_tags_column_present(cur):
failed.append(
{
"id": asset_id,
"detail": "Schlagwörter (tags) erfordern DB-Migration 046.",
}
)
continue
eff = _effective_media_patch_fields(patch_fields, asset)
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
next_cid = eff.get("club_id", asset.get("club_id"))
if "visibility" in patch_fields or "club_id" in patch_fields:
assert_valid_governance_visibility(
cur,
profile_id,
role,
next_vis,
int(next_cid) if next_cid is not None else None,
)
if next_vis in ("private", "club") and next_cid is None:
failed.append(
{
"id": asset_id,
"detail": (
"Für private oder Vereins-Medien wird club_id benötigt (Vereinsordner)."
),
}
)
continue
if next_vis in ("club", "official"):
effective_copyright = (
patch_fields.get("copyright_notice") or asset.get("copyright_notice") or ""
)
if not str(effective_copyright).strip():
failed.append(
{
"id": asset_id,
"detail": (
"Für Vereins- oder offizielle Medien ist eine "
"Urheberrechtsangabe (copyright_notice) Pflicht."
),
}
)
continue
# P-06: Rechteprüfung bei Promotion
cur_vis = (asset.get("visibility") or "private").strip().lower()
bulk_p06_decl: Optional[dict] = None
bulk_p06_action: Optional[str] = None
if VISIBILITY_LEVELS.get(next_vis, 0) > VISIBILITY_LEVELS.get(cur_vis, 0):
coverage = check_rights_coverage(cur, asset_id, next_vis)
if coverage != "ok":
p06_fields = {
"rights_holder_confirmed": patch_fields.get("rights_holder_confirmed"),
"contains_identifiable_persons": patch_fields.get("contains_identifiable_persons"),
"person_consent_confirmed": patch_fields.get("person_consent_confirmed"),
"contains_minors": patch_fields.get("contains_minors"),
"parental_consent_confirmed": patch_fields.get("parental_consent_confirmed"),
"contains_music": patch_fields.get("contains_music"),
"music_rights_confirmed": patch_fields.get("music_rights_confirmed"),
"contains_third_party_content": patch_fields.get("contains_third_party_content"),
"third_party_rights_confirmed": patch_fields.get("third_party_rights_confirmed"),
}
if p06_fields.get("rights_holder_confirmed") is None:
code = "LEGACY_REDECLARATION_REQUIRED" if coverage == "legacy" else "RIGHTS_SCOPE_INSUFFICIENT"
failed.append({
"id": asset_id,
"detail": f"{code}: Rechterklaerung fuer '{next_vis}' erforderlich.",
})
continue
try:
validate_rights_declaration(p06_fields, next_vis)
except HTTPException as p06e:
d = p06e.detail
failed.append({"id": asset_id, "detail": d.get("message") if isinstance(d, dict) else str(d)})
continue
bulk_p06_decl = p06_fields
bulk_p06_action = "legacy_re_declaration" if coverage == "legacy" else "promote_club" if next_vis == "club" else "promote_official"
new_sk: Optional[str] = None
if "visibility" in patch_fields or "club_id" in patch_fields:
next_club_param: Optional[int] = None
if next_vis in ("club", "private"):
next_club_param = int(next_cid) if next_cid is not None else None
media_root = get_effective_media_root(cur)
new_sk = _relocate_asset_file_if_governance_changed(
cur, media_root, asset_id, asset, next_vis, next_club_param
)
sets: list[str] = []
vals: list[Any] = []
if "copyright_notice" in patch_fields:
sets.append("copyright_notice = %s")
vals.append(patch_fields["copyright_notice"])
if "original_filename" in patch_fields:
sets.append("original_filename = %s")
vals.append(patch_fields["original_filename"])
if "tags" in patch_fields:
sets.append("tags = %s")
vals.append(_normalize_media_tags(patch_fields["tags"]))
if "visibility" in patch_fields or "club_id" in patch_fields:
sets.append("visibility = %s")
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
sets.append("club_id = %s")
vals.append(next_cid)
if new_sk:
sets.append("storage_key = %s")
vals.append(new_sk)
if not sets:
failed.append({"id": asset_id, "detail": "Nichts zu aktualisieren"})
continue
sets.append("updated_at = NOW()")
vals.append(asset_id)
cur.execute(
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
# P-06: Declaration-Log schreiben wenn neue Erklaerung bei Promotion
if bulk_p06_decl is not None and bulk_p06_action is not None:
write_rights_declaration(
cur, asset_id, profile_id, bulk_p06_action, next_vis, bulk_p06_decl
)
update_rights_quick_fields(cur, asset_id, next_vis)
conn.commit()
updated.append(asset_id)
except HTTPException as he:
msg = he.detail if isinstance(he.detail, str) else str(he.detail)
failed.append({"id": asset_id, "detail": msg})
return {"updated": updated, "failed": failed, "updated_count": len(updated), "failed_count": len(failed)}
@router.patch("/{asset_id}")
def patch_media_asset(
asset_id: int,
body: MediaAssetPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, visibility, club_id, uploaded_by_profile_id, lifecycle_state,
copyright_notice, original_filename, sha256, storage_key, storage_backend, mime_type
FROM media_assets WHERE id = %s""",
(asset_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
if "tags" in data and not _media_assets_tags_column_present(cur):
raise HTTPException(
status_code=503,
detail="Schlagwörter (tags) erfordern die Datenbank-Migration 046. Bitte Migration ausführen.",
)
eff = _effective_media_patch_fields(data, asset)
next_vis = str(eff.get("visibility", asset["visibility"])).strip().lower()
next_cid = eff.get("club_id", asset.get("club_id"))
_p06_pending_decl: Optional[dict] = None
_p06_action: Optional[str] = None
if "visibility" in data or "club_id" in data:
assert_valid_governance_visibility(
cur,
profile_id,
role,
next_vis,
int(next_cid) if next_cid is not None else None,
)
if next_vis in ("private", "club") and next_cid is None:
raise HTTPException(
status_code=400,
detail=(
"Für private oder Vereins-Medien wird club_id benötigt (Vereinsordner). "
"Bitte im PATCH setzen, z. B. bei Wechsel von „offiziell“ zu „privat“."
),
)
if next_vis in ("club", "official"):
effective_copyright = (
data.get("copyright_notice") or asset.get("copyright_notice") or ""
)
if not str(effective_copyright).strip():
raise HTTPException(
status_code=400,
detail=(
"Für Vereins- oder offizielle Medien ist eine Urheberrechtsangabe "
"(copyright_notice) Pflicht. Bitte vor oder zusammen mit der "
"Freigabe angeben."
),
)
# P-06: Rechteprüfung bei Sichtbarkeits-Promotion
cur_vis = (asset.get("visibility") or "private").strip().lower()
if VISIBILITY_LEVELS.get(next_vis, 0) > VISIBILITY_LEVELS.get(cur_vis, 0):
coverage = check_rights_coverage(cur, asset_id, next_vis)
if coverage != "ok":
# Neue Erklaerung muss im Body mitgeliefert werden
p06_fields = {
"rights_holder_confirmed": data.get("rights_holder_confirmed"),
"contains_identifiable_persons": data.get("contains_identifiable_persons"),
"person_consent_confirmed": data.get("person_consent_confirmed"),
"contains_minors": data.get("contains_minors"),
"parental_consent_confirmed": data.get("parental_consent_confirmed"),
"contains_music": data.get("contains_music"),
"music_rights_confirmed": data.get("music_rights_confirmed"),
"contains_third_party_content": data.get("contains_third_party_content"),
"third_party_rights_confirmed": data.get("third_party_rights_confirmed"),
}
if coverage == "legacy":
if p06_fields.get("rights_holder_confirmed") is None:
raise HTTPException(
status_code=400,
detail={
"code": "LEGACY_REDECLARATION_REQUIRED",
"message": (
"Dieses Medium wurde vor Einfuehrung der Einwilligungspflicht "
"hochgeladen. Bitte eine vollstaendige Rechterklaerung (P-06-Felder) "
"zusammen mit dem PATCH uebergeben."
),
"asset_id": asset_id,
},
)
else:
if p06_fields.get("rights_holder_confirmed") is None:
raise HTTPException(
status_code=400,
detail={
"code": "RIGHTS_SCOPE_INSUFFICIENT",
"message": (
f"Die vorhandene Erklaerung deckt '{next_vis}' nicht ab. "
"Bitte eine neue Erklaerung (P-06-Felder) mitschicken."
),
"asset_id": asset_id,
"target_visibility": next_vis,
},
)
validate_rights_declaration(p06_fields, next_vis)
_p06_pending_decl = p06_fields
_p06_action = "legacy_re_declaration" if coverage == "legacy" else "promote_club" if next_vis == "club" else "promote_official"
else:
_p06_pending_decl = None
_p06_action = None
else:
_p06_pending_decl = None
_p06_action = None
new_sk: Optional[str] = None
if "visibility" in data or "club_id" in data:
next_club_param: Optional[int] = None
if next_vis in ("club", "private"):
next_club_param = int(next_cid) if next_cid is not None else None
media_root = get_effective_media_root(cur)
new_sk = _relocate_asset_file_if_governance_changed(
cur, media_root, asset_id, asset, next_vis, next_club_param
)
sets: list[str] = []
vals: list[Any] = []
if "copyright_notice" in data:
sets.append("copyright_notice = %s")
vals.append(data["copyright_notice"])
if "original_filename" in data:
sets.append("original_filename = %s")
vals.append(data["original_filename"])
if "tags" in data:
sets.append("tags = %s")
vals.append(_normalize_media_tags(data["tags"]))
if "visibility" in data or "club_id" in data:
sets.append("visibility = %s")
vals.append(str(eff.get("visibility", asset["visibility"])).strip())
sets.append("club_id = %s")
vals.append(next_cid)
if new_sk:
sets.append("storage_key = %s")
vals.append(new_sk)
if sets:
sets.append("updated_at = NOW()")
vals.append(asset_id)
cur.execute(
f"UPDATE media_assets SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
# P-06: Declaration-Log schreiben wenn neue Erklaerung bei Promotion
if _p06_pending_decl is not None and _p06_action is not None:
write_rights_declaration(
cur, asset_id, profile_id, _p06_action, next_vis, _p06_pending_decl
)
update_rights_quick_fields(cur, asset_id, next_vis)
conn.commit()
has_tags = _media_assets_tags_column_present(cur)
if has_tags:
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice, tags
FROM media_assets WHERE id = %s""",
(asset_id,),
)
else:
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, lifecycle_state, created_at, sha256, copyright_notice
FROM media_assets WHERE id = %s""",
(asset_id,),
)
out = r2d(cur.fetchone())
if not has_tags:
out["tags"] = []
return out
# ---------------------------------------------------------------------------
# P-06: Re-Deklaration / Nachdeklaration
# ---------------------------------------------------------------------------
@router.post("/{asset_id}/rights-declarations")
def create_rights_declaration(
asset_id: int,
body: RightsDeclarationBody,
tenant: TenantContext = Depends(get_tenant_context),
):
"""P-06: Explizite Rechte-Erklaerung fuer ein bestehendes Medium.
Verwendung:
- Altmedium ('legacy_unreviewed'): Erste Erklaerung nach P-06 (action_type='legacy_re_declaration')
- Medium mit Erklaerung fuer niedrigere Sichtbarkeit: Neue Erklaerung fuer Zielsichtbarkeit
(action_type='re_declaration')
Die Erklaerung aendert NICHT die Sichtbarkeit des Mediums; dafuer PATCH verwenden.
"""
profile_id = tenant.profile_id
decl = body.model_dump() if hasattr(body, "model_dump") else body.dict()
target_vis = decl.pop("target_visibility")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, uploaded_by_profile_id, lifecycle_state, rights_status FROM media_assets WHERE id = %s",
(asset_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(row)
assert_can_edit_media_asset_metadata(cur, tenant, asset)
validate_rights_declaration(decl, target_vis)
rs = (asset.get("rights_status") or "legacy_unreviewed").strip().lower()
action_type = "legacy_re_declaration" if rs == "legacy_unreviewed" else "re_declaration"
decl_id = write_rights_declaration(cur, asset_id, profile_id, action_type, target_vis, decl)
update_rights_quick_fields(cur, asset_id, target_vis)
conn.commit()
return {
"declaration_id": decl_id,
"asset_id": asset_id,
"action_type": action_type,
"target_visibility": target_vis,
"rights_status": "declared",
}
# ---------------------------------------------------------------------------
# P-06: Admin Legacy-Übersicht
# ---------------------------------------------------------------------------
admin_rights_router = APIRouter(prefix="/api/admin/media-rights", tags=["admin", "media-rights"])
@admin_rights_router.get("/legacy-summary")
def get_legacy_rights_summary(
tenant: TenantContext = Depends(get_tenant_context),
):
"""P-06 Admin: Zusammenfassung wie viele Medien noch im legacy_unreviewed-Status sind."""
role = tenant.global_role
if not is_platform_admin(role):
raise HTTPException(status_code=403, detail="Keine Berechtigung (Plattform-Admin erforderlich)")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT rights_status, COUNT(*) AS cnt
FROM media_assets
WHERE lifecycle_state = 'active'
GROUP BY rights_status
ORDER BY rights_status
"""
)
rows = [r2d(r) for r in cur.fetchall()]
totals = {r["rights_status"]: int(r["cnt"]) for r in rows}
total_active = sum(totals.values())
cur.execute(
"SELECT COUNT(*) AS cnt FROM media_asset_rights_declarations"
)
decl_count_row = cur.fetchone()
total_declarations = int(r2d(decl_count_row)["cnt"]) if decl_count_row else 0
return {
"total_active_assets": total_active,
"legacy_unreviewed": totals.get("legacy_unreviewed", 0),
"declared": totals.get("declared", 0),
"blocked": totals.get("blocked", 0),
"total_declarations_logged": total_declarations,
}
@admin_rights_router.get("/legacy-assets")
def get_legacy_rights_assets(
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
visibility: Optional[str] = Query(None, pattern="^(private|club|official)$"),
tenant: TenantContext = Depends(get_tenant_context),
):
"""P-06 Admin: Liste der Medien mit rights_status = 'legacy_unreviewed' oder 'blocked'."""
role = tenant.global_role
if not is_platform_admin(role):
raise HTTPException(status_code=403, detail="Keine Berechtigung (Plattform-Admin erforderlich)")
with get_db() as conn:
cur = get_cursor(conn)
conditions = ["ma.lifecycle_state = 'active'", "ma.rights_status != 'declared'"]
params: list[Any] = []
if visibility:
conditions.append("ma.visibility = %s")
params.append(visibility)
where = " AND ".join(conditions)
cur.execute(
f"""
SELECT ma.id, ma.original_filename, ma.visibility, ma.rights_status,
ma.created_at, ma.uploaded_by_profile_id,
p.username AS uploader_username
FROM media_assets ma
LEFT JOIN profiles p ON p.id = ma.uploaded_by_profile_id
WHERE {where}
ORDER BY ma.created_at DESC
LIMIT %s OFFSET %s
""",
(*params, limit, offset),
)
assets = [r2d(r) for r in cur.fetchall()]
cur.execute(
f"""
SELECT COUNT(*) AS cnt
FROM media_assets ma
WHERE {where}
""",
tuple(params),
)
total_row = cur.fetchone()
total = int(r2d(total_row)["cnt"]) if total_row else 0
return {"total": total, "limit": limit, "offset": offset, "assets": assets}
@admin_rights_router.get("/assets/{asset_id}/journal")
def get_media_asset_journal(
asset_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
"""P-06 Superadmin: Vollständiges Deklarationsjournal für ein Medium."""
if not is_superadmin(tenant.global_role):
raise HTTPException(status_code=403, detail="Keine Berechtigung (Superadmin erforderlich)")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT id, original_filename, visibility, rights_status,
rights_declared_for_visibility, rights_declared_at,
copyright_notice, mime_type, lifecycle_state,
uploaded_by_profile_id, created_at
FROM media_assets
WHERE id = %s
""",
(asset_id,),
)
asset_row = cur.fetchone()
if not asset_row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(asset_row)
cur.execute(
"""
SELECT d.id, d.declared_at, d.action_type, d.target_visibility,
d.declaration_version,
d.rights_holder_confirmed,
d.contains_identifiable_persons, d.person_consent_confirmed,
d.person_consent_context,
d.contains_minors, d.parental_consent_confirmed,
d.parental_consent_context,
d.contains_music, d.music_rights_confirmed,
d.music_rights_context,
d.contains_third_party_content, d.third_party_rights_confirmed,
d.third_party_rights_context,
d.declared_by_profile_id,
p.username AS declared_by_username,
p.email AS declared_by_email
FROM media_asset_rights_declarations d
LEFT JOIN profiles p ON p.id = d.declared_by_profile_id
WHERE d.media_asset_id = %s
ORDER BY d.declared_at DESC
""",
(asset_id,),
)
declarations = [r2d(r) for r in cur.fetchall()]
return {"asset": asset, "declarations": declarations}