Some checks failed
Deploy Development / deploy (push) Successful in 46s
Test Suite / pytest-backend (push) Failing after 2s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 50s
DB 047: legal_documents (versioniert, draft/published/archived) +
legal_document_audit (Änderungslog); Partial-Unique-Index garantiert
max. ein published-Dokument pro document_type.
Backend: GET /api/legal-documents/{type}/published (kein Auth);
Superadmin-CRUD + Publish/Archive + Audit unter /api/admin/legal-documents.
Frontend: LegalPage lädt aus API mit Platzhalter-Fallback;
AdminLegalDocumentsPage (/admin/legal-documents) mit Tab-Navigation,
Versionsliste, Entwurf-Editor, Publish/Archive-Workflow, Änderungslog.
AdminPageNav: Link „Rechtstexte" ergänzt.
version: 0.8.71 (backend + frontend)
module: legal_documents 1.0.0
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
409 lines
14 KiB
Python
409 lines
14 KiB
Python
"""
|
||
Rechtstexte (Impressum, Datenschutz, Nutzungsbedingungen, Medienrichtlinie).
|
||
|
||
Öffentlich (kein Auth):
|
||
GET /api/legal-documents/{document_type}/published
|
||
|
||
Superadmin only:
|
||
GET /api/admin/legal-documents – Liste aller Versionen
|
||
POST /api/admin/legal-documents – Neuen Entwurf anlegen
|
||
GET /api/admin/legal-documents/{id} – Einzelnes Dokument
|
||
PUT /api/admin/legal-documents/{id} – Entwurf bearbeiten
|
||
POST /api/admin/legal-documents/{id}/publish – Veröffentlichen
|
||
POST /api/admin/legal-documents/{id}/archive – Archivieren
|
||
GET /api/admin/legal-documents/{id}/audit – Änderungslog
|
||
"""
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
from pydantic import BaseModel
|
||
|
||
from auth import require_auth
|
||
from club_tenancy import is_superadmin
|
||
from db import get_db, get_cursor, r2d
|
||
|
||
router = APIRouter(tags=["legal_documents"])
|
||
|
||
VALID_TYPES = {"impressum", "privacy_policy", "terms_of_use", "media_policy"}
|
||
|
||
|
||
def _require_superadmin(session: dict):
|
||
role = (session.get("role") or "").lower()
|
||
if not is_superadmin(role):
|
||
raise HTTPException(status_code=403, detail="Nur Superadmins")
|
||
return session
|
||
|
||
|
||
class LegalDocumentCreate(BaseModel):
|
||
document_type: str
|
||
title: str
|
||
content_sections: List[Dict[str, Any]] = []
|
||
change_note: Optional[str] = None
|
||
|
||
|
||
class LegalDocumentUpdate(BaseModel):
|
||
title: Optional[str] = None
|
||
content_sections: Optional[List[Dict[str, Any]]] = None
|
||
change_note: Optional[str] = None
|
||
|
||
|
||
class PublishRequest(BaseModel):
|
||
change_note: Optional[str] = None
|
||
|
||
|
||
# ─── Public endpoint ────────────────────────────────────────────────────────
|
||
|
||
@router.get("/api/legal-documents/{document_type}/published")
|
||
def get_published_legal_document(document_type: str):
|
||
"""Liefert das aktuell veröffentlichte Dokument oder null. Kein Auth erforderlich."""
|
||
if document_type not in VALID_TYPES:
|
||
raise HTTPException(status_code=404, detail="Unbekannter Dokumententyp")
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""
|
||
SELECT id, document_type, version, title, content_sections,
|
||
status, change_note, published_at, updated_at
|
||
FROM legal_documents
|
||
WHERE document_type = %s AND status = 'published'
|
||
LIMIT 1
|
||
""",
|
||
(document_type,),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
return None
|
||
|
||
doc = r2d(row)
|
||
return doc
|
||
|
||
|
||
# ─── Superadmin endpoints ────────────────────────────────────────────────────
|
||
|
||
@router.get("/api/admin/legal-documents")
|
||
def list_legal_documents(
|
||
document_type: Optional[str] = None,
|
||
session: dict = Depends(require_auth),
|
||
):
|
||
"""Alle Versionen aller Rechtstexte (oder gefiltert nach document_type)."""
|
||
_require_superadmin(session)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
if document_type:
|
||
cur.execute(
|
||
"""
|
||
SELECT d.id, d.document_type, d.version, d.title, d.status,
|
||
d.change_note, d.created_at, d.updated_at, d.published_at,
|
||
p.name AS created_by_name
|
||
FROM legal_documents d
|
||
LEFT JOIN profiles p ON p.id = d.created_by_profile_id
|
||
WHERE d.document_type = %s
|
||
ORDER BY d.document_type, d.version DESC
|
||
""",
|
||
(document_type,),
|
||
)
|
||
else:
|
||
cur.execute(
|
||
"""
|
||
SELECT d.id, d.document_type, d.version, d.title, d.status,
|
||
d.change_note, d.created_at, d.updated_at, d.published_at,
|
||
p.name AS created_by_name
|
||
FROM legal_documents d
|
||
LEFT JOIN profiles p ON p.id = d.created_by_profile_id
|
||
ORDER BY d.document_type, d.version DESC
|
||
"""
|
||
)
|
||
rows = cur.fetchall()
|
||
|
||
return [r2d(r) for r in rows]
|
||
|
||
|
||
@router.post("/api/admin/legal-documents", status_code=201)
|
||
def create_legal_document(
|
||
body: LegalDocumentCreate,
|
||
session: dict = Depends(require_auth),
|
||
):
|
||
"""Neuen Entwurf anlegen. Versionsnummer = max(vorherige) + 1."""
|
||
_require_superadmin(session)
|
||
|
||
if body.document_type not in VALID_TYPES:
|
||
raise HTTPException(status_code=422, detail="Ungültiger document_type")
|
||
|
||
profile_id = session["profile_id"]
|
||
|
||
import json as _json
|
||
sections_json = _json.dumps(body.content_sections, ensure_ascii=False)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"SELECT COALESCE(MAX(version), 0) FROM legal_documents WHERE document_type = %s",
|
||
(body.document_type,),
|
||
)
|
||
row = cur.fetchone()
|
||
next_version = list(row.values())[0] + 1
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO legal_documents
|
||
(document_type, version, title, content_sections, status, change_note, created_by_profile_id)
|
||
VALUES (%s, %s, %s, %s::jsonb, 'draft', %s, %s)
|
||
RETURNING id, document_type, version, title, content_sections,
|
||
status, change_note, created_at, updated_at
|
||
""",
|
||
(body.document_type, next_version, body.title, sections_json, body.change_note, profile_id),
|
||
)
|
||
new_row = cur.fetchone()
|
||
new_id = list(new_row.values())[0]
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO legal_document_audit (legal_document_id, action, changed_by_profile_id, change_note)
|
||
VALUES (%s, 'created', %s, %s)
|
||
""",
|
||
(new_id, profile_id, body.change_note),
|
||
)
|
||
conn.commit()
|
||
|
||
return r2d(new_row)
|
||
|
||
|
||
@router.get("/api/admin/legal-documents/{doc_id}")
|
||
def get_legal_document(doc_id: int, session: dict = Depends(require_auth)):
|
||
"""Einzelnes Dokument (alle Felder inkl. content_sections)."""
|
||
_require_superadmin(session)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""
|
||
SELECT d.id, d.document_type, d.version, d.title, d.content_sections,
|
||
d.status, d.change_note, d.created_at, d.updated_at,
|
||
d.published_at, d.created_by_profile_id,
|
||
p.name AS created_by_name,
|
||
pb.name AS published_by_name
|
||
FROM legal_documents d
|
||
LEFT JOIN profiles p ON p.id = d.created_by_profile_id
|
||
LEFT JOIN profiles pb ON pb.id = d.published_by_profile_id
|
||
WHERE d.id = %s
|
||
""",
|
||
(doc_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
|
||
|
||
return r2d(row)
|
||
|
||
|
||
@router.put("/api/admin/legal-documents/{doc_id}")
|
||
def update_legal_document(
|
||
doc_id: int,
|
||
body: LegalDocumentUpdate,
|
||
session: dict = Depends(require_auth),
|
||
):
|
||
"""Entwurf bearbeiten. Nur status='draft' kann bearbeitet werden."""
|
||
_require_superadmin(session)
|
||
|
||
profile_id = session["profile_id"]
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"SELECT id, status, title, content_sections, change_note FROM legal_documents WHERE id = %s",
|
||
(doc_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
|
||
|
||
doc = r2d(row)
|
||
if doc["status"] != "draft":
|
||
raise HTTPException(status_code=409, detail="Nur Entwürfe können bearbeitet werden")
|
||
|
||
import json as _json
|
||
|
||
new_title = body.title if body.title is not None else doc["title"]
|
||
new_sections = body.content_sections if body.content_sections is not None else doc["content_sections"]
|
||
new_note = body.change_note if body.change_note is not None else doc["change_note"]
|
||
sections_json = _json.dumps(new_sections, ensure_ascii=False)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""
|
||
UPDATE legal_documents
|
||
SET title = %s, content_sections = %s::jsonb, change_note = %s, updated_at = NOW()
|
||
WHERE id = %s
|
||
RETURNING id, document_type, version, title, content_sections,
|
||
status, change_note, created_at, updated_at
|
||
""",
|
||
(new_title, sections_json, new_note, doc_id),
|
||
)
|
||
updated = cur.fetchone()
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO legal_document_audit (legal_document_id, action, changed_by_profile_id, change_note)
|
||
VALUES (%s, 'updated', %s, %s)
|
||
""",
|
||
(doc_id, profile_id, new_note),
|
||
)
|
||
conn.commit()
|
||
|
||
return r2d(updated)
|
||
|
||
|
||
@router.post("/api/admin/legal-documents/{doc_id}/publish")
|
||
def publish_legal_document(
|
||
doc_id: int,
|
||
body: PublishRequest = PublishRequest(),
|
||
session: dict = Depends(require_auth),
|
||
):
|
||
"""
|
||
Veröffentlicht dieses Dokument. Ein bisher veröffentlichtes Dokument desselben
|
||
Typs wird automatisch auf 'archived' gesetzt (partial unique index erzwingt dies).
|
||
"""
|
||
_require_superadmin(session)
|
||
|
||
profile_id = session["profile_id"]
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"SELECT id, document_type, status FROM legal_documents WHERE id = %s",
|
||
(doc_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
|
||
|
||
doc = r2d(row)
|
||
if doc["status"] == "published":
|
||
raise HTTPException(status_code=409, detail="Dokument ist bereits veröffentlicht")
|
||
if doc["status"] == "archived":
|
||
raise HTTPException(status_code=409, detail="Archivierte Dokumente können nicht veröffentlicht werden")
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
# Vorheriges published-Dokument desselben Typs archivieren
|
||
cur.execute(
|
||
"""
|
||
UPDATE legal_documents
|
||
SET status = 'archived', updated_at = NOW()
|
||
WHERE document_type = %s AND status = 'published' AND id != %s
|
||
""",
|
||
(doc["document_type"], doc_id),
|
||
)
|
||
# Dieses Dokument veröffentlichen
|
||
cur.execute(
|
||
"""
|
||
UPDATE legal_documents
|
||
SET status = 'published', published_at = NOW(),
|
||
published_by_profile_id = %s, updated_at = NOW()
|
||
WHERE id = %s
|
||
RETURNING id, document_type, version, title, content_sections,
|
||
status, change_note, published_at, updated_at
|
||
""",
|
||
(profile_id, doc_id),
|
||
)
|
||
updated = cur.fetchone()
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO legal_document_audit
|
||
(legal_document_id, action, changed_by_profile_id, change_note, previous_status)
|
||
VALUES (%s, 'published', %s, %s, %s)
|
||
""",
|
||
(doc_id, profile_id, body.change_note, doc["status"]),
|
||
)
|
||
conn.commit()
|
||
|
||
return r2d(updated)
|
||
|
||
|
||
@router.post("/api/admin/legal-documents/{doc_id}/archive")
|
||
def archive_legal_document(
|
||
doc_id: int,
|
||
session: dict = Depends(require_auth),
|
||
):
|
||
"""Archiviert ein Dokument. Veröffentlichte Dokumente dürfen nur archiviert werden wenn kein anderes published ist."""
|
||
_require_superadmin(session)
|
||
|
||
profile_id = session["profile_id"]
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"SELECT id, status FROM legal_documents WHERE id = %s",
|
||
(doc_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
|
||
|
||
doc = r2d(row)
|
||
if doc["status"] == "archived":
|
||
raise HTTPException(status_code=409, detail="Dokument ist bereits archiviert")
|
||
|
||
prev_status = doc["status"]
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute(
|
||
"""
|
||
UPDATE legal_documents
|
||
SET status = 'archived', updated_at = NOW()
|
||
WHERE id = %s
|
||
RETURNING id, document_type, version, title, status, updated_at
|
||
""",
|
||
(doc_id,),
|
||
)
|
||
updated = cur.fetchone()
|
||
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO legal_document_audit
|
||
(legal_document_id, action, changed_by_profile_id, previous_status)
|
||
VALUES (%s, 'archived', %s, %s)
|
||
""",
|
||
(doc_id, profile_id, prev_status),
|
||
)
|
||
conn.commit()
|
||
|
||
return r2d(updated)
|
||
|
||
|
||
@router.get("/api/admin/legal-documents/{doc_id}/audit")
|
||
def get_legal_document_audit(doc_id: int, session: dict = Depends(require_auth)):
|
||
"""Änderungslog für ein Dokument."""
|
||
_require_superadmin(session)
|
||
|
||
with get_db() as conn:
|
||
cur = get_cursor(conn)
|
||
cur.execute("SELECT id FROM legal_documents WHERE id = %s", (doc_id,))
|
||
if not cur.fetchone():
|
||
raise HTTPException(status_code=404, detail="Dokument nicht gefunden")
|
||
|
||
cur.execute(
|
||
"""
|
||
SELECT a.id, a.action, a.change_note, a.previous_status, a.created_at,
|
||
p.name AS changed_by_name
|
||
FROM legal_document_audit a
|
||
LEFT JOIN profiles p ON p.id = a.changed_by_profile_id
|
||
WHERE a.legal_document_id = %s
|
||
ORDER BY a.created_at DESC
|
||
""",
|
||
(doc_id,),
|
||
)
|
||
rows = cur.fetchall()
|
||
|
||
return [r2d(r) for r in rows]
|