shinkan-jinkendo/backend/routers/legal_documents.py
Lars 8992c300f1
Some checks failed
Deploy Development / deploy (push) Successful in 35s
Test Suite / pytest-backend (push) Successful in 33s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 8s
Test Suite / playwright-tests (push) Failing after 51s
feat(legal): Als-Entwurf-kopieren für Rechtstexte
POST /api/admin/legal-documents/{id}/copy-as-draft übernimmt Titel +
Inhalt des Quelldokuments und legt einen neuen Entwurf mit
nächster Versionsnummer an. Funktioniert für alle Status (draft/published/archived).

UI: Copy-Button (⎘) in jeder Dokumentzeile; nach Kopie wird die
Liste automatisch aktualisiert und der neue Entwurf ist sichtbar.

version: 0.8.72
module:  legal_documents 1.1.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-10 12:25:04 +02:00

476 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Rechtstexte (Impressum, Datenschutz, Nutzungsbedingungen, Medienrichtlinie).
Öffentlich (kein Auth):
GET /api/legal-documents/{document_type}/published
Superadmin only:
GET /api/admin/legal-documents Liste aller Versionen
POST /api/admin/legal-documents Neuen Entwurf anlegen
GET /api/admin/legal-documents/{id} Einzelnes Dokument
PUT /api/admin/legal-documents/{id} Entwurf bearbeiten
POST /api/admin/legal-documents/{id}/publish Veröffentlichen
POST /api/admin/legal-documents/{id}/archive Archivieren
GET /api/admin/legal-documents/{id}/audit Änderungslog
"""
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from auth import require_auth
from club_tenancy import is_superadmin
from db import get_db, get_cursor, r2d
# ACCESS_LAYER exempt: Plattform-Rechtstexte ohne Vereinsbezug.
# Öffentlicher GET-Endpoint ohne jegliche Auth; Admin-Endpoints nutzen require_auth + is_superadmin().
# Eingetragen in backend/scripts/check_access_layer_hints.py EXEMPT_ROUTERS.
router = APIRouter(tags=["legal_documents"])
VALID_TYPES = {"impressum", "privacy_policy", "terms_of_use", "media_policy"}
def _require_superadmin(session: dict):
role = (session.get("role") or "").lower()
if not is_superadmin(role):
raise HTTPException(status_code=403, detail="Nur Superadmins")
return session
class LegalDocumentCreate(BaseModel):
document_type: str
title: str
content_sections: List[Dict[str, Any]] = []
change_note: Optional[str] = None
class LegalDocumentUpdate(BaseModel):
title: Optional[str] = None
content_sections: Optional[List[Dict[str, Any]]] = None
change_note: Optional[str] = None
class PublishRequest(BaseModel):
change_note: Optional[str] = None
# ─── Public endpoint ────────────────────────────────────────────────────────
@router.get("/api/legal-documents/{document_type}/published")
def get_published_legal_document(document_type: str):
"""Liefert das aktuell veröffentlichte Dokument oder null. Kein Auth erforderlich."""
if document_type not in VALID_TYPES:
raise HTTPException(status_code=404, detail="Unbekannter Dokumententyp")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT id, document_type, version, title, content_sections,
status, change_note, published_at, updated_at
FROM legal_documents
WHERE document_type = %s AND status = 'published'
LIMIT 1
""",
(document_type,),
)
row = cur.fetchone()
if not row:
return None
doc = r2d(row)
return doc
# ─── Superadmin endpoints ────────────────────────────────────────────────────
@router.get("/api/admin/legal-documents")
def list_legal_documents(
document_type: Optional[str] = None,
session: dict = Depends(require_auth),
):
"""Alle Versionen aller Rechtstexte (oder gefiltert nach document_type)."""
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
if document_type:
cur.execute(
"""
SELECT d.id, d.document_type, d.version, d.title, d.status,
d.change_note, d.created_at, d.updated_at, d.published_at,
p.name AS created_by_name
FROM legal_documents d
LEFT JOIN profiles p ON p.id = d.created_by_profile_id
WHERE d.document_type = %s
ORDER BY d.document_type, d.version DESC
""",
(document_type,),
)
else:
cur.execute(
"""
SELECT d.id, d.document_type, d.version, d.title, d.status,
d.change_note, d.created_at, d.updated_at, d.published_at,
p.name AS created_by_name
FROM legal_documents d
LEFT JOIN profiles p ON p.id = d.created_by_profile_id
ORDER BY d.document_type, d.version DESC
"""
)
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.post("/api/admin/legal-documents", status_code=201)
def create_legal_document(
body: LegalDocumentCreate,
session: dict = Depends(require_auth),
):
"""Neuen Entwurf anlegen. Versionsnummer = max(vorherige) + 1."""
_require_superadmin(session)
if body.document_type not in VALID_TYPES:
raise HTTPException(status_code=422, detail="Ungültiger document_type")
profile_id = session["profile_id"]
import json as _json
sections_json = _json.dumps(body.content_sections, ensure_ascii=False)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT COALESCE(MAX(version), 0) FROM legal_documents WHERE document_type = %s",
(body.document_type,),
)
row = cur.fetchone()
next_version = list(row.values())[0] + 1
cur.execute(
"""
INSERT INTO legal_documents
(document_type, version, title, content_sections, status, change_note, created_by_profile_id)
VALUES (%s, %s, %s, %s::jsonb, 'draft', %s, %s)
RETURNING id, document_type, version, title, content_sections,
status, change_note, created_at, updated_at
""",
(body.document_type, next_version, body.title, sections_json, body.change_note, profile_id),
)
new_row = cur.fetchone()
new_id = list(new_row.values())[0]
cur.execute(
"""
INSERT INTO legal_document_audit (legal_document_id, action, changed_by_profile_id, change_note)
VALUES (%s, 'created', %s, %s)
""",
(new_id, profile_id, body.change_note),
)
conn.commit()
return r2d(new_row)
@router.get("/api/admin/legal-documents/{doc_id}")
def get_legal_document(doc_id: int, session: dict = Depends(require_auth)):
"""Einzelnes Dokument (alle Felder inkl. content_sections)."""
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT d.id, d.document_type, d.version, d.title, d.content_sections,
d.status, d.change_note, d.created_at, d.updated_at,
d.published_at, d.created_by_profile_id,
p.name AS created_by_name,
pb.name AS published_by_name
FROM legal_documents d
LEFT JOIN profiles p ON p.id = d.created_by_profile_id
LEFT JOIN profiles pb ON pb.id = d.published_by_profile_id
WHERE d.id = %s
""",
(doc_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
return r2d(row)
@router.put("/api/admin/legal-documents/{doc_id}")
def update_legal_document(
doc_id: int,
body: LegalDocumentUpdate,
session: dict = Depends(require_auth),
):
"""Entwurf bearbeiten. Nur status='draft' kann bearbeitet werden."""
_require_superadmin(session)
profile_id = session["profile_id"]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, status, title, content_sections, change_note FROM legal_documents WHERE id = %s",
(doc_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
doc = r2d(row)
if doc["status"] != "draft":
raise HTTPException(status_code=409, detail="Nur Entwürfe können bearbeitet werden")
import json as _json
new_title = body.title if body.title is not None else doc["title"]
new_sections = body.content_sections if body.content_sections is not None else doc["content_sections"]
new_note = body.change_note if body.change_note is not None else doc["change_note"]
sections_json = _json.dumps(new_sections, ensure_ascii=False)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
UPDATE legal_documents
SET title = %s, content_sections = %s::jsonb, change_note = %s, updated_at = NOW()
WHERE id = %s
RETURNING id, document_type, version, title, content_sections,
status, change_note, created_at, updated_at
""",
(new_title, sections_json, new_note, doc_id),
)
updated = cur.fetchone()
cur.execute(
"""
INSERT INTO legal_document_audit (legal_document_id, action, changed_by_profile_id, change_note)
VALUES (%s, 'updated', %s, %s)
""",
(doc_id, profile_id, new_note),
)
conn.commit()
return r2d(updated)
@router.post("/api/admin/legal-documents/{doc_id}/publish")
def publish_legal_document(
doc_id: int,
body: PublishRequest = PublishRequest(),
session: dict = Depends(require_auth),
):
"""
Veröffentlicht dieses Dokument. Ein bisher veröffentlichtes Dokument desselben
Typs wird automatisch auf 'archived' gesetzt (partial unique index erzwingt dies).
"""
_require_superadmin(session)
profile_id = session["profile_id"]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, document_type, status FROM legal_documents WHERE id = %s",
(doc_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
doc = r2d(row)
if doc["status"] == "published":
raise HTTPException(status_code=409, detail="Dokument ist bereits veröffentlicht")
if doc["status"] == "archived":
raise HTTPException(status_code=409, detail="Archivierte Dokumente können nicht veröffentlicht werden")
with get_db() as conn:
cur = get_cursor(conn)
# Vorheriges published-Dokument desselben Typs archivieren
cur.execute(
"""
UPDATE legal_documents
SET status = 'archived', updated_at = NOW()
WHERE document_type = %s AND status = 'published' AND id != %s
""",
(doc["document_type"], doc_id),
)
# Dieses Dokument veröffentlichen
cur.execute(
"""
UPDATE legal_documents
SET status = 'published', published_at = NOW(),
published_by_profile_id = %s, updated_at = NOW()
WHERE id = %s
RETURNING id, document_type, version, title, content_sections,
status, change_note, published_at, updated_at
""",
(profile_id, doc_id),
)
updated = cur.fetchone()
cur.execute(
"""
INSERT INTO legal_document_audit
(legal_document_id, action, changed_by_profile_id, change_note, previous_status)
VALUES (%s, 'published', %s, %s, %s)
""",
(doc_id, profile_id, body.change_note, doc["status"]),
)
conn.commit()
return r2d(updated)
@router.post("/api/admin/legal-documents/{doc_id}/archive")
def archive_legal_document(
doc_id: int,
session: dict = Depends(require_auth),
):
"""Archiviert ein Dokument. Veröffentlichte Dokumente dürfen nur archiviert werden wenn kein anderes published ist."""
_require_superadmin(session)
profile_id = session["profile_id"]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, status FROM legal_documents WHERE id = %s",
(doc_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
doc = r2d(row)
if doc["status"] == "archived":
raise HTTPException(status_code=409, detail="Dokument ist bereits archiviert")
prev_status = doc["status"]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
UPDATE legal_documents
SET status = 'archived', updated_at = NOW()
WHERE id = %s
RETURNING id, document_type, version, title, status, updated_at
""",
(doc_id,),
)
updated = cur.fetchone()
cur.execute(
"""
INSERT INTO legal_document_audit
(legal_document_id, action, changed_by_profile_id, previous_status)
VALUES (%s, 'archived', %s, %s)
""",
(doc_id, profile_id, prev_status),
)
conn.commit()
return r2d(updated)
@router.post("/api/admin/legal-documents/{doc_id}/copy-as-draft", status_code=201)
def copy_legal_document_as_draft(
doc_id: int,
session: dict = Depends(require_auth),
):
"""
Kopiert ein beliebiges Dokument (egal welcher Status) als neuen Entwurf mit
nächster Versionsnummer. Inhalt und Titel werden übernommen.
"""
_require_superadmin(session)
profile_id = session["profile_id"]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, document_type, title, content_sections FROM legal_documents WHERE id = %s",
(doc_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
src = r2d(row)
import json as _json
sections_json = _json.dumps(src["content_sections"], ensure_ascii=False)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT COALESCE(MAX(version), 0) FROM legal_documents WHERE document_type = %s",
(src["document_type"],),
)
row2 = cur.fetchone()
next_version = list(row2.values())[0] + 1
cur.execute(
"""
INSERT INTO legal_documents
(document_type, version, title, content_sections, status, change_note, created_by_profile_id)
VALUES (%s, %s, %s, %s::jsonb, 'draft', NULL, %s)
RETURNING id, document_type, version, title, content_sections,
status, change_note, created_at, updated_at
""",
(src["document_type"], next_version, src["title"], sections_json, profile_id),
)
new_row = cur.fetchone()
new_id = list(new_row.values())[0]
cur.execute(
"""
INSERT INTO legal_document_audit
(legal_document_id, action, changed_by_profile_id, change_note)
VALUES (%s, 'created', %s, %s)
""",
(new_id, profile_id, f"Kopie von Version {src.get('version', '?')} (ID {doc_id})"),
)
conn.commit()
return r2d(new_row)
@router.get("/api/admin/legal-documents/{doc_id}/audit")
def get_legal_document_audit(doc_id: int, session: dict = Depends(require_auth)):
"""Änderungslog für ein Dokument."""
_require_superadmin(session)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id FROM legal_documents WHERE id = %s", (doc_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
cur.execute(
"""
SELECT a.id, a.action, a.change_note, a.previous_status, a.created_at,
p.name AS changed_by_name
FROM legal_document_audit a
LEFT JOIN profiles p ON p.id = a.changed_by_profile_id
WHERE a.legal_document_id = %s
ORDER BY a.created_at DESC
""",
(doc_id,),
)
rows = cur.fetchall()
return [r2d(r) for r in rows]