shinkan-jinkendo/backend/routers/content_reports.py
Lars 60709df615
All checks were successful
Deploy Development / deploy (push) Successful in 37s
Test Suite / pytest-backend (push) Successful in 35s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 11s
Test Suite / playwright-tests (push) Successful in 58s
feat: Implement Content Reporting Backend
- Added new API endpoints for content reporting, including submission, retrieval, and status updates.
- Created database migration for `content_reports` table to store report data.
- Integrated content reports into the existing admin inbox for better management.
- Implemented validation for report submissions, including required fields and email format.
- Added tests for content reporting functionality, covering various scenarios and edge cases.
- Updated frontend API utility to include new content report methods.
- Bumped app version to 0.8.87 and updated relevant page versions.
2026-05-11 17:54:53 +02:00

585 lines
21 KiB
Python

"""
P-13: Content-Melde-Backend
Meldeverfahren fuer moeglicherweise rechtswidrige Inhalte (DSA-Grundlage, KRIT-03).
Architektur:
- Fachliche Daten in 'content_reports' Tabelle (Migration 052)
- Integration in bestehende Admin-Inbox (GET /api/me/inbox/content-reports)
- Keine separate Admin-Queue; die InboxPage.jsx zeigt beide Vorgangstypen
- P-11-Anschluss: Superadmin kann aus Meldung heraus Legal Hold setzen
Berechtigungen:
- Meldung einreichen: optional Auth (ohne Auth nur fuer official-Medien erlaubt)
- Liste/Detail: Plattform-Admin (admin/superadmin)
- Status-Bearbeitung: Plattform-Admin
- Legal Hold aus Meldung: ausschliesslich Superadmin
"""
from __future__ import annotations
import re
from typing import Optional
from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel, Field, field_validator
from club_tenancy import is_platform_admin, is_superadmin
from db import get_db, get_cursor, r2d
from media_legal_hold import (
LEGAL_HOLD_REASON_CODES,
assert_superadmin_for_legal_hold,
set_legal_hold,
)
from tenant_context import TenantContext, get_tenant_context
router = APIRouter(prefix="/api", tags=["content_reports"])
# Erlaubte Meldungsgruende
REPORT_REASONS = frozenset({
"copyright",
"image_rights",
"privacy",
"minors",
"illegal_content",
"youth_protection",
"offensive_content",
"other",
})
# Gruende mit hoher Prioritaet (illegal, Minderjaehrigenschutz)
HIGH_PRIORITY_REASONS = frozenset({
"minors",
"illegal_content",
"youth_protection",
})
# Mapping: report_reason -> legal_hold reason_code
_REASON_TO_HOLD_CODE = {
"copyright": "copyright_complaint",
"image_rights": "rights_dispute",
"privacy": "privacy_complaint",
"minors": "youth_protection",
"illegal_content": "illegal_content",
"youth_protection": "youth_protection",
"offensive_content": "illegal_content",
"other": "other",
}
_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
# ─── Pydantic-Modelle ────────────────────────────────────────────────────────
class ContentReportCreate(BaseModel):
target_type: str = Field(default="media_asset")
target_id: int = Field(..., ge=1)
report_reason: str
report_description: str = Field(..., min_length=10, max_length=8000)
reporter_name: str = Field(..., min_length=1, max_length=200)
reporter_email: str = Field(..., min_length=5, max_length=200)
good_faith_confirmed: bool
@field_validator("target_type")
@classmethod
def validate_target_type(cls, v: str) -> str:
if v not in ("media_asset", "exercise"):
raise ValueError("target_type muss 'media_asset' oder 'exercise' sein")
return v
@field_validator("report_reason")
@classmethod
def validate_reason(cls, v: str) -> str:
if v not in REPORT_REASONS:
raise ValueError(f"Ungueltiger report_reason. Erlaubt: {sorted(REPORT_REASONS)}")
return v
@field_validator("reporter_email")
@classmethod
def validate_email(cls, v: str) -> str:
if not _EMAIL_RE.match(v.strip()):
raise ValueError("Ungueltige E-Mail-Adresse")
return v.strip().lower()
@field_validator("reporter_name")
@classmethod
def validate_name(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("Name darf nicht leer sein")
return v
class ContentReportPatch(BaseModel):
status: Optional[str] = None
resolution_note: Optional[str] = Field(None, max_length=4000)
assigned_to_profile_id: Optional[int] = None
@field_validator("status")
@classmethod
def validate_status(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
allowed = {"submitted", "under_review", "resolved_no_action",
"resolved_legal_hold", "rejected_invalid"}
if v not in allowed:
raise ValueError(f"Ungueltiger status. Erlaubt: {sorted(allowed)}")
return v
class ContentReportLegalHoldBody(BaseModel):
reason_note: str = Field(..., min_length=5, max_length=2000)
# ─── Hilfsfunktionen ────────────────────────────────────────────────────────
def _assert_platform_admin(role: Optional[str]) -> None:
if not is_platform_admin(role):
raise HTTPException(status_code=403, detail="Nur Plattform-Admins koennen Meldungen bearbeiten")
def _is_media_asset_visible_anonymous(cur, asset_id: int) -> bool:
"""True wenn das Medium als 'official' und aktiv gilt (fuer anonyme Meldung zugaenglich)."""
cur.execute(
"""
SELECT 1 FROM media_assets
WHERE id = %s
AND visibility = 'official'
AND lifecycle_state = 'active'
AND (legal_hold_active = FALSE OR legal_hold_active IS NULL)
""",
(asset_id,),
)
return cur.fetchone() is not None
def _is_media_asset_visible_to_profile(cur, asset_id: int, profile_id: int, global_role: Optional[str]) -> bool:
"""Prueft ob ein eingeloggter Nutzer das Medium sehen darf."""
if is_platform_admin(global_role):
cur.execute("SELECT 1 FROM media_assets WHERE id = %s", (asset_id,))
return cur.fetchone() is not None
# Sichtbarkeit pruefen: official (immer), club (Mitglied), private (Eigentuemerrechte sind breiter)
cur.execute(
"""
SELECT ma.id, ma.visibility, ma.club_id, ma.uploaded_by_profile_id
FROM media_assets ma
WHERE ma.id = %s
AND ma.lifecycle_state = 'active'
AND (ma.legal_hold_active = FALSE OR ma.legal_hold_active IS NULL)
""",
(asset_id,),
)
row = cur.fetchone()
if not row:
return False
asset = r2d(row)
visibility = asset.get("visibility")
if visibility == "official":
return True
if visibility == "club":
club_id = asset.get("club_id")
if not club_id:
return False
cur.execute(
"""
SELECT 1 FROM club_members
WHERE profile_id = %s AND club_id = %s AND status = 'active'
""",
(profile_id, club_id),
)
return cur.fetchone() is not None
if visibility == "private":
# Private Medien: Eigentuemerrechte oder gleicher Verein mit club_admin
if asset.get("uploaded_by_profile_id") == profile_id:
return True
club_id = asset.get("club_id")
if club_id:
cur.execute(
"""
SELECT 1 FROM club_members cm
INNER JOIN club_member_roles r ON r.club_member_id = cm.id
WHERE cm.profile_id = %s AND cm.club_id = %s AND cm.status = 'active'
AND r.role_code = 'club_admin'
""",
(profile_id, club_id),
)
return cur.fetchone() is not None
return False
def _is_exercise_visible_to_profile(cur, exercise_id: int, profile_id: Optional[int], global_role: Optional[str]) -> bool:
"""Vereinfachte Sichtbarkeitspruefung fuer Uebungen (fuer Meldungen)."""
if profile_id and is_platform_admin(global_role):
cur.execute("SELECT 1 FROM exercises WHERE id = %s AND status != 'deleted'", (exercise_id,))
return cur.fetchone() is not None
if profile_id is None:
cur.execute(
"SELECT 1 FROM exercises WHERE id = %s AND visibility = 'official' AND status = 'active'",
(exercise_id,),
)
return cur.fetchone() is not None
cur.execute(
"""
SELECT e.visibility, e.club_id, e.created_by_profile_id
FROM exercises e WHERE e.id = %s AND e.status != 'deleted'
""",
(exercise_id,),
)
row = cur.fetchone()
if not row:
return False
ex = r2d(row)
if ex.get("visibility") == "official":
return True
if ex.get("visibility") == "club":
club_id = ex.get("club_id")
if not club_id:
return False
cur.execute(
"SELECT 1 FROM club_members WHERE profile_id = %s AND club_id = %s AND status = 'active'",
(profile_id, club_id),
)
return cur.fetchone() is not None
if ex.get("visibility") == "private":
return ex.get("created_by_profile_id") == profile_id
return False
def _resolve_optional_auth(cur, x_auth_token: Optional[str]) -> tuple[Optional[int], Optional[str]]:
"""Gibt (profile_id, global_role) zurueck wenn Token gueltig, sonst (None, None)."""
if not x_auth_token:
return None, None
cur.execute(
"""
SELECT p.id AS profile_id, p.role
FROM sessions s
INNER JOIN profiles p ON p.id = s.profile_id
WHERE s.token = %s
AND (s.expires_at IS NULL OR s.expires_at > NOW())
""",
(x_auth_token,),
)
row = cur.fetchone()
if not row:
return None, None
d = r2d(row)
return d.get("profile_id"), d.get("role")
def _report_row_to_dict(row) -> dict:
return r2d(row)
# ─── Endpoints ──────────────────────────────────────────────────────────────
@router.post("/content-reports", status_code=201)
def submit_content_report(
body: ContentReportCreate,
x_auth_token: Optional[str] = Header(default=None),
):
"""
Meldung einreichen (optionale Authentifizierung).
- Ohne Login: nur fuer 'official'-Medien erlaubt (öffentlich erreichbare Inhalte)
- Mit Login: fuer alle Inhalte, die der Nutzer laut Sichtbarkeitslogik sehen darf
- Gutglaubenserklärung (good_faith_confirmed=true) ist Pflicht
"""
if not body.good_faith_confirmed:
raise HTTPException(
status_code=400,
detail="Die Gutglaubenserklärung (good_faith_confirmed) muss bestätigt werden.",
)
priority = "high" if body.report_reason in HIGH_PRIORITY_REASONS else "normal"
with get_db() as conn:
cur = get_cursor(conn)
# Optionale Auth aufloesen
profile_id, global_role = _resolve_optional_auth(cur, x_auth_token)
# Sichtbarkeit pruefen
if body.target_type == "media_asset":
if profile_id:
visible = _is_media_asset_visible_to_profile(cur, body.target_id, profile_id, global_role)
else:
visible = _is_media_asset_visible_anonymous(cur, body.target_id)
if not visible:
raise HTTPException(
status_code=404,
detail="Das gemeldete Medium existiert nicht oder ist nicht zugänglich.",
)
elif body.target_type == "exercise":
visible = _is_exercise_visible_to_profile(cur, body.target_id, profile_id, global_role)
if not visible:
raise HTTPException(
status_code=404,
detail="Die gemeldete Übung existiert nicht oder ist nicht zugänglich.",
)
cur.execute(
"""
INSERT INTO content_reports (
target_type, target_id, report_reason, report_description,
reporter_name, reporter_email, reporter_profile_id,
good_faith_confirmed, priority, status,
submitted_at, created_at, updated_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'submitted', NOW(), NOW(), NOW())
RETURNING id, status, priority, submitted_at
""",
(
body.target_type,
body.target_id,
body.report_reason,
body.report_description.strip(),
body.reporter_name,
body.reporter_email,
profile_id,
body.good_faith_confirmed,
priority,
),
)
row = cur.fetchone()
result = r2d(row)
conn.commit()
return {
"id": result["id"],
"status": result["status"],
"priority": result["priority"],
"submitted_at": str(result["submitted_at"]),
"message": "Ihre Meldung wurde entgegengenommen. Ein Administrator wird sie prüfen.",
}
@router.get("/me/inbox/content-reports")
def list_inbox_content_reports(
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Alle Meldungen fuer die Admin-Inbox.
Sichtbar fuer Plattform-Admins (admin + superadmin).
In der InboxPage neben den Join-Requests angezeigt.
"""
_assert_platform_admin(tenant.global_role)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
cr.id,
cr.target_type,
cr.target_id,
cr.report_reason,
cr.report_description,
cr.reporter_name,
cr.reporter_email,
cr.reporter_profile_id,
cr.priority,
cr.status,
cr.assigned_to_profile_id,
cr.reviewed_by_profile_id,
cr.reviewed_at,
cr.resolution_note,
cr.submitted_at,
cr.created_at,
cr.updated_at,
-- Ziel-Medium-Infos (wenn target_type = media_asset)
ma.original_filename AS target_filename,
ma.visibility AS target_visibility,
ma.media_kind AS target_media_kind,
ma.legal_hold_active AS target_legal_hold_active,
-- Ziel-Uebungs-Infos (wenn target_type = exercise)
ex.name AS target_exercise_name,
-- Reviewer-Name
rev.name AS reviewed_by_name,
-- Zugewiesen-Name
asgn.name AS assigned_to_name
FROM content_reports cr
LEFT JOIN media_assets ma ON ma.id = cr.target_id AND cr.target_type = 'media_asset'
LEFT JOIN exercises ex ON ex.id = cr.target_id AND cr.target_type = 'exercise'
LEFT JOIN profiles rev ON rev.id = cr.reviewed_by_profile_id
LEFT JOIN profiles asgn ON asgn.id = cr.assigned_to_profile_id
ORDER BY
CASE WHEN cr.status = 'submitted' THEN 0
WHEN cr.status = 'under_review' THEN 1
ELSE 2 END ASC,
cr.priority DESC,
cr.created_at ASC
LIMIT 500
""",
)
return [_report_row_to_dict(row) for row in cur.fetchall()]
@router.get("/content-reports/{report_id}")
def get_content_report(
report_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Detail-Ansicht einer Meldung fuer Plattform-Admins."""
_assert_platform_admin(tenant.global_role)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
cr.*,
ma.original_filename AS target_filename,
ma.visibility AS target_visibility,
ma.media_kind AS target_media_kind,
ma.legal_hold_active AS target_legal_hold_active,
ma.legal_hold_reason_code AS target_legal_hold_reason_code,
ex.name AS target_exercise_name,
ex.visibility AS target_exercise_visibility,
rev.name AS reviewed_by_name,
asgn.name AS assigned_to_name,
rep.name AS reporter_profile_name
FROM content_reports cr
LEFT JOIN media_assets ma ON ma.id = cr.target_id AND cr.target_type = 'media_asset'
LEFT JOIN exercises ex ON ex.id = cr.target_id AND cr.target_type = 'exercise'
LEFT JOIN profiles rev ON rev.id = cr.reviewed_by_profile_id
LEFT JOIN profiles asgn ON asgn.id = cr.assigned_to_profile_id
LEFT JOIN profiles rep ON rep.id = cr.reporter_profile_id
WHERE cr.id = %s
""",
(report_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Meldung nicht gefunden")
return _report_row_to_dict(row)
@router.patch("/content-reports/{report_id}")
def patch_content_report(
report_id: int,
body: ContentReportPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Status und Bearbeitungsnotiz einer Meldung aktualisieren.
Abschluss ohne Massnahme (resolved_no_action, rejected_invalid) erfordert resolution_note.
"""
_assert_platform_admin(tenant.global_role)
pid = tenant.profile_id
if body.status in ("resolved_no_action", "rejected_invalid") and not (body.resolution_note or "").strip():
raise HTTPException(
status_code=400,
detail="Bei Abschluss ohne Massnahme ist eine Begründung (resolution_note) Pflicht.",
)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id, status FROM content_reports WHERE id = %s", (report_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Meldung nicht gefunden")
updates = ["updated_at = NOW()"]
params = []
if body.status is not None:
updates.append("status = %s")
params.append(body.status)
if body.status in ("resolved_no_action", "resolved_legal_hold", "rejected_invalid"):
updates.append("reviewed_by_profile_id = %s")
updates.append("reviewed_at = NOW()")
params.append(pid)
if body.resolution_note is not None:
updates.append("resolution_note = %s")
params.append(body.resolution_note.strip() or None)
if body.assigned_to_profile_id is not None:
updates.append("assigned_to_profile_id = %s")
params.append(body.assigned_to_profile_id)
if len(updates) == 1:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren angegeben")
params.append(report_id)
cur.execute(
f"UPDATE content_reports SET {', '.join(updates)} WHERE id = %s RETURNING id, status",
params,
)
updated = r2d(cur.fetchone())
conn.commit()
return {"ok": True, "id": updated["id"], "status": updated["status"]}
@router.post("/content-reports/{report_id}/legal-hold")
def set_legal_hold_from_report(
report_id: int,
body: ContentReportLegalHoldBody,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Legal Hold (P-11) aus einer Meldung heraus setzen.
Nur Superadmin. Nur fuer Meldungen mit target_type='media_asset'.
Der Reason-Code wird automatisch aus dem report_reason der Meldung abgeleitet.
Nach dem Setzen wird der Report-Status auf 'resolved_legal_hold' gesetzt.
"""
assert_superadmin_for_legal_hold(tenant.global_role)
pid = tenant.profile_id
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, target_type, target_id, report_reason, status FROM content_reports WHERE id = %s",
(report_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Meldung nicht gefunden")
report = r2d(row)
if report["target_type"] != "media_asset":
raise HTTPException(
status_code=400,
detail="Legal Hold ist nur fuer Meldungen mit target_type='media_asset' verfügbar.",
)
asset_id = int(report["target_id"])
reason_code = _REASON_TO_HOLD_CODE.get(report["report_reason"], "other")
# P-11-Service aufrufen
set_legal_hold(
cur=cur,
conn=conn,
asset_id=asset_id,
acting_profile_id=pid,
reason_code=reason_code,
reason_note=f"[P-13 Meldung #{report_id}] {body.reason_note.strip()}",
)
# set_legal_hold committed bereits — neue Transaktion fuer Report-Update
cur.execute(
"""
UPDATE content_reports
SET status = 'resolved_legal_hold',
reviewed_by_profile_id = %s,
reviewed_at = NOW(),
resolution_note = %s,
updated_at = NOW()
WHERE id = %s
RETURNING id, status
""",
(pid, f"Legal Hold gesetzt auf Asset #{asset_id} (reason_code: {reason_code})", report_id),
)
updated = r2d(cur.fetchone())
conn.commit()
return {
"ok": True,
"report_id": updated["id"],
"report_status": updated["status"],
"asset_id": asset_id,
"legal_hold_reason_code": reason_code,
}