shinkan-jinkendo/backend/routers/content_reports.py
Lars 56fc6d853d
All checks were successful
Deploy Development / deploy (push) Successful in 37s
Test Suite / pytest-backend (push) Successful in 34s
Test Suite / lint-backend (push) Successful in 1s
Test Suite / build-frontend (push) Successful in 11s
Test Suite / playwright-tests (push) Successful in 59s
fix(P-13): repair 3 failing CI pytest tests for content_reports
- Add early 403 in set_legal_hold_from_report for plain admin (before DB
  call), fixing test_legal_hold_from_report_requires_superadmin
- Update test_list_inbox_requires_platform_admin to mock DB COUNT query
  (returns cnt=0) so it exercises the club_admin code path correctly
- Extend test_patch_report_under_review mock row with target_type,
  target_id, resolution_note fields now required by the audit-log path

version: 0.8.94
module:  content_reports 1.5.1

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 22:25:23 +02:00

932 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
P-13: Content-Melde-Backend
Meldeverfahren fuer moeglicherweise rechtswidrige Inhalte (DSA-Grundlage, KRIT-03).
Architektur:
- Fachliche Daten in 'content_reports' Tabelle (Migration 052)
- Integration in bestehende Admin-Inbox (GET /api/me/inbox/content-reports)
- Keine separate Admin-Queue; die InboxPage.jsx zeigt beide Vorgangstypen
- P-11-Anschluss: Superadmin kann aus Meldung heraus Legal Hold setzen
Berechtigungen:
- Meldung einreichen: optional Auth (ohne Auth nur fuer official-Medien erlaubt)
- Liste/Detail: Plattform-Admin (alle Meldungen) oder Club-Admin (nur eigene Vereinsmedien)
- Status-Bearbeitung: Plattform-Admin
- Legal Hold aus Meldung: ausschliesslich Superadmin
E-Mail-Benachrichtigungen:
- Nach Eingang: Bestaetigung an Melder
- Nach Eingang: Benachrichtigung aller Plattform-Admins (best-effort, kein Fehler wenn SMTP fehlt)
Audit-Log (Migration 053):
- Bei media_asset-Meldungen: Eintrag in media_asset_audit_log (event_type='content_report_filed')
"""
from __future__ import annotations
import os
import re
from typing import Optional
from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel, Field, field_validator
from club_tenancy import is_platform_admin, is_superadmin
from db import get_db, get_cursor, r2d
from media_legal_hold import (
LEGAL_HOLD_REASON_CODES,
assert_superadmin_for_legal_hold,
set_legal_hold,
)
from media_rights import write_audit_log_entry
from tenant_context import TenantContext, get_tenant_context
router = APIRouter(prefix="/api", tags=["content_reports"])
# Erlaubte Meldungsgruende
REPORT_REASONS = frozenset({
"copyright",
"image_rights",
"privacy",
"minors",
"illegal_content",
"youth_protection",
"offensive_content",
"other",
})
REASON_LABELS_DE = {
"copyright": "Urheberrecht",
"image_rights": "Bildrechte",
"privacy": "Datenschutz / Persönlichkeitsrecht",
"minors": "Minderjährige",
"illegal_content": "Rechtswidriger Inhalt",
"youth_protection": "Jugendschutz",
"offensive_content": "Beleidigender Inhalt",
"other": "Sonstiges",
}
# Gruende mit hoher Prioritaet (illegal, Minderjaehrigenschutz)
HIGH_PRIORITY_REASONS = frozenset({
"minors",
"illegal_content",
"youth_protection",
})
# Mapping: report_reason -> legal_hold reason_code
_REASON_TO_HOLD_CODE = {
"copyright": "copyright_complaint",
"image_rights": "rights_dispute",
"privacy": "privacy_complaint",
"minors": "youth_protection",
"illegal_content": "illegal_content",
"youth_protection": "youth_protection",
"offensive_content": "illegal_content",
"other": "other",
}
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
# ─── Pydantic-Modelle ────────────────────────────────────────────────────────
class ContentReportCreate(BaseModel):
target_type: str = Field(default="media_asset")
target_id: int = Field(..., ge=1)
report_reason: str
report_description: str = Field(..., min_length=10, max_length=8000)
reporter_name: str = Field(..., min_length=1, max_length=200)
reporter_email: str = Field(..., min_length=5, max_length=200)
good_faith_confirmed: bool
@field_validator("target_type")
@classmethod
def validate_target_type(cls, v: str) -> str:
if v not in ("media_asset", "exercise"):
raise ValueError("target_type muss 'media_asset' oder 'exercise' sein")
return v
@field_validator("report_reason")
@classmethod
def validate_reason(cls, v: str) -> str:
if v not in REPORT_REASONS:
raise ValueError(f"Ungueltiger report_reason. Erlaubt: {sorted(REPORT_REASONS)}")
return v
@field_validator("reporter_email")
@classmethod
def validate_email(cls, v: str) -> str:
if not _EMAIL_RE.match(v.strip()):
raise ValueError("Ungueltige E-Mail-Adresse")
return v.strip().lower()
@field_validator("reporter_name")
@classmethod
def validate_name(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("Name darf nicht leer sein")
return v
class ContentReportPatch(BaseModel):
status: Optional[str] = None
resolution_note: Optional[str] = Field(None, max_length=4000)
assigned_to_profile_id: Optional[int] = None
@field_validator("status")
@classmethod
def validate_status(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
allowed = {"submitted", "under_review", "resolved_no_action",
"resolved_legal_hold", "rejected_invalid"}
if v not in allowed:
raise ValueError(f"Ungueltiger status. Erlaubt: {sorted(allowed)}")
return v
class ContentReportLegalHoldBody(BaseModel):
reason_note: str = Field(..., min_length=5, max_length=2000)
# ─── Hilfsfunktionen ────────────────────────────────────────────────────────
def _assert_can_manage_report(cur, role: Optional[str], pid: int, report: dict) -> None:
"""Platform-Admins: jede Meldung. Club-Admin: nur Meldungen zu Medien ihres Vereins."""
if is_platform_admin(role):
return
if report.get("target_type") != "media_asset":
raise HTTPException(status_code=403, detail="Nur Plattform-Admins können Meldungen zu Übungen bearbeiten")
cur.execute("SELECT club_id FROM media_assets WHERE id = %s", (int(report["target_id"]),))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
club_id = r2d(row).get("club_id")
if not club_id:
raise HTTPException(status_code=403, detail="Kein Zugriff Medium ohne Vereinszugehörigkeit")
cur.execute(
"""
SELECT 1 FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.club_id = %s
AND cm.status = 'active' AND r.role_code = 'club_admin'
""",
(pid, club_id),
)
if cur.fetchone() is None:
raise HTTPException(status_code=403, detail="Kein Zugriff auf diese Meldung")
def _assert_can_set_legal_hold_from_report(cur, role: Optional[str], pid: int, asset_id: int) -> None:
"""Superadmin: immer. Club-Admin: nur für Vereinsmedien mit visibility != 'official'."""
if is_superadmin(role):
return
cur.execute("SELECT club_id, visibility FROM media_assets WHERE id = %s", (asset_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
asset = r2d(row)
if asset.get("visibility") == "official":
raise HTTPException(
status_code=403,
detail="Legal Hold auf offiziellen Medien erfordert Superadmin-Rechte",
)
club_id = asset.get("club_id")
if not club_id:
raise HTTPException(status_code=403, detail="Legal Hold erfordert Superadmin-Rechte für Medien ohne Vereinszugehörigkeit")
cur.execute(
"""
SELECT 1 FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.club_id = %s
AND cm.status = 'active' AND r.role_code = 'club_admin'
""",
(pid, club_id),
)
if cur.fetchone() is None:
raise HTTPException(status_code=403, detail="Kein Zugriff Superadmin oder Vereinsadmin des Vereins erforderlich")
def _is_media_asset_visible_anonymous(cur, asset_id: int) -> bool:
"""True wenn das Medium als 'official' und aktiv gilt (fuer anonyme Meldung zugaenglich)."""
cur.execute(
"""
SELECT 1 FROM media_assets
WHERE id = %s
AND visibility = 'official'
AND lifecycle_state = 'active'
AND (legal_hold_active = FALSE OR legal_hold_active IS NULL)
""",
(asset_id,),
)
return cur.fetchone() is not None
def _is_media_asset_visible_to_profile(cur, asset_id: int, profile_id: int, global_role: Optional[str]) -> bool:
"""Prueft ob ein eingeloggter Nutzer das Medium sehen darf."""
if is_platform_admin(global_role):
cur.execute("SELECT 1 FROM media_assets WHERE id = %s", (asset_id,))
return cur.fetchone() is not None
cur.execute(
"""
SELECT ma.id, ma.visibility, ma.club_id, ma.uploaded_by_profile_id
FROM media_assets ma
WHERE ma.id = %s
AND ma.lifecycle_state = 'active'
AND (ma.legal_hold_active = FALSE OR ma.legal_hold_active IS NULL)
""",
(asset_id,),
)
row = cur.fetchone()
if not row:
return False
asset = r2d(row)
visibility = asset.get("visibility")
if visibility == "official":
return True
if visibility == "club":
club_id = asset.get("club_id")
if not club_id:
return False
cur.execute(
"""
SELECT 1 FROM club_members
WHERE profile_id = %s AND club_id = %s AND status = 'active'
""",
(profile_id, club_id),
)
return cur.fetchone() is not None
if visibility == "private":
if asset.get("uploaded_by_profile_id") == profile_id:
return True
club_id = asset.get("club_id")
if club_id:
cur.execute(
"""
SELECT 1 FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.club_id = %s AND cm.status = 'active'
AND r.role_code = 'club_admin'
""",
(profile_id, club_id),
)
return cur.fetchone() is not None
return False
def _is_exercise_visible_to_profile(cur, exercise_id: int, profile_id: Optional[int], global_role: Optional[str]) -> bool:
"""Vereinfachte Sichtbarkeitspruefung fuer Uebungen (fuer Meldungen)."""
if profile_id and is_platform_admin(global_role):
cur.execute("SELECT 1 FROM exercises WHERE id = %s AND status != 'deleted'", (exercise_id,))
return cur.fetchone() is not None
if profile_id is None:
cur.execute(
"SELECT 1 FROM exercises WHERE id = %s AND visibility = 'official' AND status = 'active'",
(exercise_id,),
)
return cur.fetchone() is not None
cur.execute(
"""
SELECT e.visibility, e.club_id, e.created_by_profile_id
FROM exercises e WHERE e.id = %s AND e.status != 'deleted'
""",
(exercise_id,),
)
row = cur.fetchone()
if not row:
return False
ex = r2d(row)
if ex.get("visibility") == "official":
return True
if ex.get("visibility") == "club":
club_id = ex.get("club_id")
if not club_id:
return False
cur.execute(
"SELECT 1 FROM club_members WHERE profile_id = %s AND club_id = %s AND status = 'active'",
(profile_id, club_id),
)
return cur.fetchone() is not None
if ex.get("visibility") == "private":
return ex.get("created_by_profile_id") == profile_id
return False
def _resolve_optional_auth(cur, x_auth_token: Optional[str]) -> tuple[Optional[int], Optional[str]]:
"""Gibt (profile_id, global_role) zurueck wenn Token gueltig, sonst (None, None)."""
if not x_auth_token:
return None, None
cur.execute(
"""
SELECT p.id AS profile_id, p.role
FROM sessions s
INNER JOIN profiles p ON p.id = s.profile_id
WHERE s.token = %s
AND (s.expires_at IS NULL OR s.expires_at > NOW())
""",
(x_auth_token,),
)
row = cur.fetchone()
if not row:
return None, None
d = r2d(row)
return d.get("profile_id"), d.get("role")
def _get_platform_admin_emails(cur) -> list[str]:
"""Gibt E-Mail-Adressen aller aktiven Plattform-Admins zurueck."""
cur.execute(
"""
SELECT email FROM profiles
WHERE role IN ('admin', 'superadmin')
AND email IS NOT NULL AND email <> ''
"""
)
return [r2d(row)["email"] for row in cur.fetchall()]
def _send_email(to: str, subject: str, body: str) -> None:
"""SMTP-Versand (best-effort, ignoriert Fehler und nicht konfiguriertes SMTP)."""
try:
import smtplib, ssl
from email.mime.text import MIMEText
smtp_host = (os.getenv("SMTP_HOST") or "").strip()
smtp_user = (os.getenv("SMTP_USER") or "").strip()
smtp_pass = os.getenv("SMTP_PASS") or ""
if not smtp_host or not smtp_user or not smtp_pass:
return
smtp_port = int(os.getenv("SMTP_PORT") or "587")
smtp_from = os.getenv("SMTP_FROM", "noreply@jinkendo.de")
force_ssl = (os.getenv("SMTP_SSL", "").strip().lower() in ("1", "true", "yes")) or smtp_port == 465
use_tls = os.getenv("SMTP_STARTTLS", "true").strip().lower() not in ("0", "false", "no")
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = smtp_from
msg["To"] = to
ctx = ssl.create_default_context()
if force_ssl:
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=ctx, timeout=30) as srv:
srv.login(smtp_user, smtp_pass)
srv.send_message(msg)
else:
with smtplib.SMTP(smtp_host, smtp_port, timeout=30) as srv:
srv.ehlo()
if use_tls:
srv.starttls(context=ctx)
srv.ehlo()
srv.login(smtp_user, smtp_pass)
srv.send_message(msg)
except Exception as exc:
print(f"[SMTP content_reports] Fehler: {exc}")
def _notify_report_submitted(
*,
report_id: int,
reporter_name: str,
reporter_email: str,
reason: str,
priority: str,
target_type: str,
target_id: int,
target_label: str,
admin_emails: list[str],
) -> None:
"""Sendet Bestaetigungs-Mail an Melder + Benachrichtigung an alle Plattform-Admins (best-effort)."""
app_url = (os.getenv("APP_URL") or "https://shinkan.jinkendo.de").rstrip("/")
reason_label = REASON_LABELS_DE.get(reason, reason)
prio_label = "DRINGEND" if priority == "high" else "normal"
# Bestaetigungs-Mail an Melder
confirmation_body = (
f"Hallo {reporter_name},\n\n"
f"Ihre Meldung (#{report_id}) wurde entgegengenommen.\n\n"
f"Meldegrund: {reason_label}\n"
f"Gemeldeter Inhalt: {target_label}\n\n"
f"Ein Administrator wird Ihre Meldung zeitnah prüfen. Sie müssen nichts weiter tun.\n\n"
f"Shinkan Jinkendo\n"
f"{app_url}"
)
_send_email(reporter_email, f"Meldung #{report_id} eingegangen Shinkan Jinkendo", confirmation_body)
# Admin-Benachrichtigung
admin_body = (
f"Neue Inhaltsmeldung #{report_id} [{prio_label}]\n\n"
f"Meldegrund: {reason_label}\n"
f"Gemeldeter Inhalt: {target_label}\n"
f"Gemeldet von: {reporter_name} <{reporter_email}>\n\n"
f"Posteingang öffnen: {app_url}/inbox\n"
)
subject = f"Inhaltsmeldung #{report_id} [{prio_label}] Shinkan Jinkendo"
for email in admin_emails:
_send_email(email, subject, admin_body)
def _report_row_to_dict(row) -> dict:
return r2d(row)
# ─── Endpoints ──────────────────────────────────────────────────────────────
@router.post("/content-reports", status_code=201)
def submit_content_report(
body: ContentReportCreate,
x_auth_token: Optional[str] = Header(default=None),
):
"""
Meldung einreichen (optionale Authentifizierung).
- Ohne Login: nur fuer 'official'-Medien erlaubt (öffentlich erreichbare Inhalte)
- Mit Login: fuer alle Inhalte, die der Nutzer laut Sichtbarkeitslogik sehen darf
- Gutglaubenserklärung (good_faith_confirmed=true) ist Pflicht
- Journaleintrag im Medien-Audit-Log (wenn target_type=media_asset)
- E-Mail-Bestaetigung an Melder + Benachrichtigung an Plattform-Admins (best-effort)
"""
if not body.good_faith_confirmed:
raise HTTPException(
status_code=400,
detail="Die Gutglaubenserklärung (good_faith_confirmed) muss bestätigt werden.",
)
priority = "high" if body.report_reason in HIGH_PRIORITY_REASONS else "normal"
report_id: int
admin_emails: list[str] = []
with get_db() as conn:
cur = get_cursor(conn)
# Optionale Auth aufloesen
profile_id, global_role = _resolve_optional_auth(cur, x_auth_token)
# Sichtbarkeit pruefen
if body.target_type == "media_asset":
if profile_id:
visible = _is_media_asset_visible_to_profile(cur, body.target_id, profile_id, global_role)
else:
visible = _is_media_asset_visible_anonymous(cur, body.target_id)
if not visible:
raise HTTPException(
status_code=404,
detail="Das gemeldete Medium existiert nicht oder ist nicht zugänglich.",
)
elif body.target_type == "exercise":
visible = _is_exercise_visible_to_profile(cur, body.target_id, profile_id, global_role)
if not visible:
raise HTTPException(
status_code=404,
detail="Die gemeldete Übung existiert nicht oder ist nicht zugänglich.",
)
cur.execute(
"""
INSERT INTO content_reports (
target_type, target_id, report_reason, report_description,
reporter_name, reporter_email, reporter_profile_id,
good_faith_confirmed, priority, status,
submitted_at, created_at, updated_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'submitted', NOW(), NOW(), NOW())
RETURNING id, status, priority, submitted_at
""",
(
body.target_type,
body.target_id,
body.report_reason,
body.report_description.strip(),
body.reporter_name,
body.reporter_email,
profile_id,
body.good_faith_confirmed,
priority,
),
)
row = cur.fetchone()
result = r2d(row)
report_id = int(result["id"])
# Dateinamen / Zielbezeichnung fuer E-Mail abfragen
target_label_for_email = f"{'Medium' if body.target_type == 'media_asset' else 'Übung'} #{body.target_id}"
if body.target_type == "media_asset":
cur.execute("SELECT original_filename FROM media_assets WHERE id = %s", (body.target_id,))
fn_row = cur.fetchone()
if fn_row:
fn = r2d(fn_row).get("original_filename") or ""
if fn:
target_label_for_email = f"{fn} (Medium #{body.target_id})"
elif body.target_type == "exercise":
cur.execute("SELECT title FROM exercises WHERE id = %s", (body.target_id,))
ex_row = cur.fetchone()
if ex_row:
ex_name = r2d(ex_row).get("title") or ""
if ex_name:
target_label_for_email = f"{ex_name} (Übung #{body.target_id})"
# Audit-Log-Eintrag (nur fuer media_asset-Meldungen)
if body.target_type == "media_asset":
write_audit_log_entry(
cur,
body.target_id,
profile_id,
"content_report_filed",
{},
{
"content_report_id": report_id,
"report_reason": REASON_LABELS_DE.get(body.report_reason, body.report_reason),
"priority": "hoch" if priority == "high" else "normal",
},
)
# Admin-E-Mails vor Commit abfragen (Verbindung noch offen)
admin_emails = _get_platform_admin_emails(cur)
conn.commit()
# E-Mails nach Commit senden (best-effort, kein Rollback-Risiko)
_notify_report_submitted(
report_id=report_id,
reporter_name=body.reporter_name,
reporter_email=body.reporter_email,
reason=body.report_reason,
priority=priority,
target_type=body.target_type,
target_id=body.target_id,
target_label=target_label_for_email,
admin_emails=admin_emails,
)
return {
"id": report_id,
"status": result["status"],
"priority": result["priority"],
"submitted_at": str(result["submitted_at"]),
"message": "Ihre Meldung wurde entgegengenommen. Ein Administrator wird sie prüfen.",
}
@router.get("/me/inbox/content-reports")
def list_inbox_content_reports(
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Meldungen fuer die Inbox.
- Plattform-Admins (admin/superadmin): alle Meldungen
- Club-Admins: nur Meldungen ueber Medien ihres Vereins
"""
pid = tenant.profile_id
role = tenant.global_role
is_padmin = is_platform_admin(role)
with get_db() as conn:
cur = get_cursor(conn)
if is_padmin:
# Plattform-Admin: alle Meldungen
cur.execute(
"""
SELECT
cr.id,
cr.target_type,
cr.target_id,
cr.report_reason,
cr.report_description,
cr.reporter_name,
cr.reporter_email,
cr.reporter_profile_id,
cr.priority,
cr.status,
cr.assigned_to_profile_id,
cr.reviewed_by_profile_id,
cr.reviewed_at,
cr.resolution_note,
cr.submitted_at,
cr.created_at,
cr.updated_at,
ma.original_filename AS target_filename,
ma.visibility AS target_visibility,
ma.mime_type AS target_mime_type,
ma.legal_hold_active AS target_legal_hold_active,
ex.title AS target_exercise_name,
rev.name AS reviewed_by_name,
asgn.name AS assigned_to_name
FROM content_reports cr
LEFT JOIN media_assets ma ON ma.id = cr.target_id AND cr.target_type = 'media_asset'
LEFT JOIN exercises ex ON ex.id = cr.target_id AND cr.target_type = 'exercise'
LEFT JOIN profiles rev ON rev.id = cr.reviewed_by_profile_id
LEFT JOIN profiles asgn ON asgn.id = cr.assigned_to_profile_id
ORDER BY
CASE WHEN cr.status = 'submitted' THEN 0
WHEN cr.status = 'under_review' THEN 1
ELSE 2 END ASC,
cr.priority DESC,
cr.created_at ASC
LIMIT 500
""",
)
else:
# Club-Admin: Meldungen ueber Medien der eigenen Vereine
# Zuerst pruefen ob ueberhaupt Club-Admin-Rolle vorhanden
cur.execute(
"""
SELECT COUNT(*) AS cnt
FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.status = 'active' AND r.role_code = 'club_admin'
""",
(pid,),
)
cnt_row = r2d(cur.fetchone())
if int(cnt_row.get("cnt", 0)) == 0:
raise HTTPException(status_code=403, detail="Kein Zugriff auf Inhaltsmeldungen")
cur.execute(
"""
SELECT
cr.id,
cr.target_type,
cr.target_id,
cr.report_reason,
cr.report_description,
cr.reporter_name,
cr.reporter_email,
cr.reporter_profile_id,
cr.priority,
cr.status,
cr.assigned_to_profile_id,
cr.reviewed_by_profile_id,
cr.reviewed_at,
cr.resolution_note,
cr.submitted_at,
cr.created_at,
cr.updated_at,
ma.original_filename AS target_filename,
ma.visibility AS target_visibility,
ma.mime_type AS target_mime_type,
ma.legal_hold_active AS target_legal_hold_active,
ex.title AS target_exercise_name,
rev.name AS reviewed_by_name,
asgn.name AS assigned_to_name
FROM content_reports cr
LEFT JOIN media_assets ma ON ma.id = cr.target_id AND cr.target_type = 'media_asset'
LEFT JOIN exercises ex ON ex.id = cr.target_id AND cr.target_type = 'exercise'
LEFT JOIN profiles rev ON rev.id = cr.reviewed_by_profile_id
LEFT JOIN profiles asgn ON asgn.id = cr.assigned_to_profile_id
WHERE cr.target_type = 'media_asset'
AND ma.club_id IN (
SELECT cm.club_id
FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.status = 'active' AND r.role_code = 'club_admin'
)
ORDER BY
CASE WHEN cr.status = 'submitted' THEN 0
WHEN cr.status = 'under_review' THEN 1
ELSE 2 END ASC,
cr.priority DESC,
cr.created_at ASC
LIMIT 200
""",
(pid,),
)
return [_report_row_to_dict(row) for row in cur.fetchall()]
@router.get("/content-reports/{report_id}")
def get_content_report(
report_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Detail-Ansicht einer Meldung fuer Plattform-Admins und zustaendige Club-Admins."""
pid = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
cr.*,
ma.original_filename AS target_filename,
ma.visibility AS target_visibility,
ma.mime_type AS target_mime_type,
ma.legal_hold_active AS target_legal_hold_active,
ma.legal_hold_reason_code AS target_legal_hold_reason_code,
ex.title AS target_exercise_name,
ex.visibility AS target_exercise_visibility,
rev.name AS reviewed_by_name,
asgn.name AS assigned_to_name,
rep.name AS reporter_profile_name
FROM content_reports cr
LEFT JOIN media_assets ma ON ma.id = cr.target_id AND cr.target_type = 'media_asset'
LEFT JOIN exercises ex ON ex.id = cr.target_id AND cr.target_type = 'exercise'
LEFT JOIN profiles rev ON rev.id = cr.reviewed_by_profile_id
LEFT JOIN profiles asgn ON asgn.id = cr.assigned_to_profile_id
LEFT JOIN profiles rep ON rep.id = cr.reporter_profile_id
WHERE cr.id = %s
""",
(report_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Meldung nicht gefunden")
report_dict = _report_row_to_dict(row)
_assert_can_manage_report(cur, role, pid, report_dict)
return report_dict
@router.patch("/content-reports/{report_id}")
def patch_content_report(
report_id: int,
body: ContentReportPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Status und Bearbeitungsnotiz einer Meldung aktualisieren.
Abschluss ohne Massnahme (resolved_no_action, rejected_invalid) erfordert resolution_note.
Plattform-Admins: jede Meldung. Club-Admins: nur Meldungen zu Medien ihres Vereins.
"""
pid = tenant.profile_id
role = tenant.global_role
if body.status in ("resolved_no_action", "rejected_invalid") and not (body.resolution_note or "").strip():
raise HTTPException(
status_code=400,
detail="Bei Abschluss ohne Massnahme ist eine Begründung (resolution_note) Pflicht.",
)
STATUS_LABELS_DE = {
"submitted": "Eingegangen",
"under_review": "In Bearbeitung",
"resolved_no_action": "Abgeschlossen (kein Handlungsbedarf)",
"resolved_legal_hold": "Abgeschlossen (Legal Hold)",
"rejected_invalid": "Abgewiesen (ungültig)",
}
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, status, resolution_note, target_type, target_id FROM content_reports WHERE id = %s",
(report_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Meldung nicht gefunden")
old_report = r2d(row)
_assert_can_manage_report(cur, role, pid, old_report)
old_status = old_report["status"]
old_note = (old_report.get("resolution_note") or "").strip()
updates = ["updated_at = NOW()"]
params = []
if body.status is not None:
updates.append("status = %s")
params.append(body.status)
if body.status in ("resolved_no_action", "resolved_legal_hold", "rejected_invalid"):
updates.append("reviewed_by_profile_id = %s")
updates.append("reviewed_at = NOW()")
params.append(pid)
elif body.status == "submitted":
# Wieder öffnen: Prüferfelder zurücksetzen
updates.append("reviewed_by_profile_id = NULL")
updates.append("reviewed_at = NULL")
if body.resolution_note is not None:
updates.append("resolution_note = %s")
params.append(body.resolution_note.strip() or None)
if body.assigned_to_profile_id is not None:
updates.append("assigned_to_profile_id = %s")
params.append(body.assigned_to_profile_id)
if len(updates) == 1:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren angegeben")
params.append(report_id)
cur.execute(
f"UPDATE content_reports SET {', '.join(updates)} WHERE id = %s RETURNING id, status",
params,
)
updated = r2d(cur.fetchone())
new_status = updated["status"]
is_media = old_report["target_type"] == "media_asset"
if is_media:
new_note = (body.resolution_note or "").strip() if body.resolution_note is not None else old_note
# Audit-Log: Statuswechsel
if body.status is not None and old_status != new_status:
write_audit_log_entry(
cur,
int(old_report["target_id"]),
pid,
"content_report_filed",
{"status": STATUS_LABELS_DE.get(old_status, old_status)},
{
"content_report_id": report_id,
"status": STATUS_LABELS_DE.get(new_status, new_status),
**({"begründung": new_note} if new_note else {}),
},
)
# Audit-Log: reine Notizänderung (ohne Statuswechsel)
elif body.resolution_note is not None and new_note != old_note:
write_audit_log_entry(
cur,
int(old_report["target_id"]),
pid,
"content_report_filed",
{"content_report_id": report_id, "begründung": old_note or None},
{"content_report_id": report_id, "begründung": new_note or None},
)
conn.commit()
return {"ok": True, "id": updated["id"], "status": updated["status"]}
@router.post("/content-reports/{report_id}/legal-hold")
def set_legal_hold_from_report(
report_id: int,
body: ContentReportLegalHoldBody,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Legal Hold (P-11) aus einer Meldung heraus setzen.
Superadmin: immer. Club-Admin: nur fuer Vereinsmedien (visibility != 'official').
Nur fuer Meldungen mit target_type='media_asset'.
Der Reason-Code wird automatisch aus dem report_reason der Meldung abgeleitet.
Nach dem Setzen wird der Report-Status auf 'resolved_legal_hold' gesetzt.
"""
pid = tenant.profile_id
role = tenant.global_role
# Plattform-Admin ohne Superadmin hat keine Legal-Hold-Rechte (fruehzeitig ablehnen)
if is_platform_admin(role) and not is_superadmin(role):
raise HTTPException(
status_code=403,
detail="Legal Hold erfordert Superadmin oder Vereinsadmin-Rechte",
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, target_type, target_id, report_reason, status FROM content_reports WHERE id = %s",
(report_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Meldung nicht gefunden")
report = r2d(row)
if report["target_type"] != "media_asset":
raise HTTPException(
status_code=400,
detail="Legal Hold ist nur fuer Meldungen mit target_type='media_asset' verfügbar.",
)
asset_id = int(report["target_id"])
_assert_can_set_legal_hold_from_report(cur, role, pid, asset_id)
reason_code = _REASON_TO_HOLD_CODE.get(report["report_reason"], "other")
# P-11-Service aufrufen
set_legal_hold(
cur=cur,
conn=conn,
asset_id=asset_id,
acting_profile_id=pid,
reason_code=reason_code,
reason_note=f"[P-13 Meldung #{report_id}] {body.reason_note.strip()}",
)
# set_legal_hold committed bereits — neue Transaktion fuer Report-Update
cur.execute(
"""
UPDATE content_reports
SET status = 'resolved_legal_hold',
reviewed_by_profile_id = %s,
reviewed_at = NOW(),
resolution_note = %s,
updated_at = NOW()
WHERE id = %s
RETURNING id, status
""",
(pid, f"Legal Hold gesetzt auf Asset #{asset_id} (reason_code: {reason_code})", report_id),
)
updated = r2d(cur.fetchone())
conn.commit()
return {
"ok": True,
"report_id": updated["id"],
"report_status": updated["status"],
"asset_id": asset_id,
"legal_hold_reason_code": reason_code,
}