shinkan-jinkendo/backend/routers/exercise_enrichment_admin.py
Lars 46fae3da33
All checks were successful
Deploy Development / deploy (push) Successful in 51s
Test Suite / pytest-backend (push) Successful in 44s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m16s
Test Suite / pytest-backend (pull_request) Successful in 36s
Test Suite / lint-backend (pull_request) Successful in 0s
Test Suite / build-frontend (pull_request) Successful in 13s
Test Suite / k6 /health Baseline (pull_request) Successful in 33s
Test Suite / playwright-tests (pull_request) Successful in 1m23s
Enhance Exercise Enrichment Admin Functionality and Update Documentation
- Implemented a maximum of 3 exercises per preview request to prevent Gateway-504 errors, improving the stability of the exercise enrichment process.
- Adjusted batch sizes for applying exercises and previewing to optimize performance and resource management.
- Updated the frontend to reflect changes in preview handling, including user notifications about chunk sizes and potential timeouts.
- Incremented version to 0.8.180 and updated changelog to document these enhancements and fixes.
2026-05-23 07:46:35 +02:00

449 lines
15 KiB
Python

"""
Superadmin API: Batch-Anreicherung von Übungen per KI (Skills, Kurzfassung, Anleitung).
# ACCESS_LAYER exempt: Plattform-weites Superadmin-Werkzeug ohne TenantContext.
Siehe ACCESS_LAYER_ENDPOINT_AUDIT.md.
"""
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field, field_validator
from auth import require_auth
from club_tenancy import is_superadmin
from db import get_cursor, get_db, r2d
from exercise_enrichment import (
DEFAULT_SET_STATUS,
MAX_BATCH_EXERCISES,
MAX_PREVIEW_BATCH_EXERCISES,
SKILL_MERGE_MODES,
SkillMergeMode,
apply_exercise_enrichment,
estimate_llm_calls,
preview_exercise_enrichment,
)
_PREVIEW_MAX_WORKERS = 3
router = APIRouter(tags=["admin_exercise_enrichment"])
_VALID_STATUS_FILTER = frozenset({"draft", "in_review", "approved", "archived"})
_VALID_VISIBILITY = frozenset({"private", "club", "official", "all"})
_MAX_CANDIDATES_LIMIT = 100
_MAX_ANALYZE_IDS = 10_000
def _require_superadmin(session: dict) -> dict:
role = (session.get("role") or "").strip().lower()
if not is_superadmin(role):
raise HTTPException(status_code=403, detail="Nur Superadmins")
return session
def _normalize_id_list(raw: Optional[List[int]], *, max_items: Optional[int] = None) -> List[int]:
if not raw:
return []
seen: set[int] = set()
out: List[int] = []
for x in raw:
try:
xi = int(x)
except (TypeError, ValueError):
continue
if xi < 1 or xi in seen:
continue
seen.add(xi)
out.append(xi)
if max_items is not None and len(out) >= max_items:
break
return out
def _build_candidates_where(
*,
status: str,
visibility: Optional[str],
focus_area_id: Optional[int],
without_skills: bool,
with_ai_suggested_skills: bool,
) -> tuple[list[str], list[Any]]:
st = (status or "draft").strip().lower()
if st not in _VALID_STATUS_FILTER:
raise HTTPException(status_code=400, detail="Ungültiger Status-Filter")
where: List[str] = ["e.status = %s"]
params: List[Any] = [st]
vis_raw = (visibility or "private").strip().lower()
if vis_raw and vis_raw != "all":
if vis_raw not in _VALID_VISIBILITY - {"all"}:
raise HTTPException(status_code=400, detail="Ungültiger Visibility-Filter")
where.append("e.visibility = %s")
params.append(vis_raw)
if focus_area_id is not None:
where.append(
"EXISTS (SELECT 1 FROM exercise_focus_areas efa "
"WHERE efa.exercise_id = e.id AND efa.focus_area_id = %s)"
)
params.append(int(focus_area_id))
if without_skills:
where.append(
"NOT EXISTS (SELECT 1 FROM exercise_skills es WHERE es.exercise_id = e.id)"
)
if with_ai_suggested_skills:
where.append(
"EXISTS (SELECT 1 FROM exercise_skills es "
"WHERE es.exercise_id = e.id AND es.ai_suggested = true)"
)
return where, params
class EnrichmentModes(BaseModel):
skills: bool = True
summary: bool = False
instructions: bool = False
class EnrichmentPreviewBody(BaseModel):
exercise_ids: List[int] = Field(..., min_length=1, max_length=MAX_PREVIEW_BATCH_EXERCISES)
modes: EnrichmentModes = Field(default_factory=EnrichmentModes)
merge_mode: SkillMergeMode = "replace_all"
@field_validator("merge_mode")
@classmethod
def _merge_mode_ok(cls, v: str) -> str:
if v not in SKILL_MERGE_MODES:
raise ValueError("merge_mode: additive, replace_ai_only oder replace_all")
return v
class EnrichmentAnalyzeBody(BaseModel):
exercise_ids: List[int] = Field(..., min_length=1, max_length=_MAX_ANALYZE_IDS)
modes: EnrichmentModes = Field(default_factory=EnrichmentModes)
class EnrichmentApplyItem(BaseModel):
exercise_id: int = Field(..., ge=1)
merged_skills: List[Dict[str, Any]] = Field(default_factory=list)
summary: Optional[str] = None
instruction_fields: Optional[Dict[str, Any]] = None
class EnrichmentApplyBody(BaseModel):
items: List[EnrichmentApplyItem] = Field(..., min_length=1, max_length=MAX_BATCH_EXERCISES)
modes: EnrichmentModes = Field(default_factory=EnrichmentModes)
merge_mode: SkillMergeMode = "replace_all"
set_status: Optional[str] = DEFAULT_SET_STATUS
@field_validator("merge_mode")
@classmethod
def _merge_mode_ok(cls, v: str) -> str:
if v not in SKILL_MERGE_MODES:
raise ValueError("merge_mode: additive, replace_ai_only oder replace_all")
return v
@field_validator("set_status")
@classmethod
def _status_ok(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return None
s = str(v).strip().lower()
if s == "approved":
raise ValueError("Automatisches Freigeben (approved) ist nicht erlaubt")
if s not in _VALID_STATUS_FILTER:
raise ValueError("Ungültiger Ziel-Status")
return s
@router.get("/api/admin/exercise-enrichment/candidates")
def list_enrichment_candidates(
session: dict = Depends(require_auth),
status: str = Query(default="draft"),
visibility: Optional[str] = Query(default="private"),
focus_area_id: Optional[int] = Query(default=None, ge=1),
without_skills: bool = Query(default=False),
with_ai_suggested_skills: bool = Query(default=False),
search: Optional[str] = Query(default=None, max_length=200),
limit: int = Query(default=25, ge=1, le=_MAX_CANDIDATES_LIMIT),
offset: int = Query(default=0, ge=0),
):
"""Paginierte Kandidatenliste für Superadmin-Anreicherung."""
_require_superadmin(session)
where, params = _build_candidates_where(
status=status,
visibility=visibility,
focus_area_id=focus_area_id,
without_skills=without_skills,
with_ai_suggested_skills=with_ai_suggested_skills,
)
qtext = (search or "").strip()
if qtext:
where.append("e.search_vector @@ plainto_tsquery('german', %s)")
params.append(qtext)
where_sql = " AND ".join(where)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(f"SELECT COUNT(*) AS c FROM exercises e WHERE {where_sql}", tuple(params))
count_row = cur.fetchone()
total = int(count_row["c"] if isinstance(count_row, dict) else count_row[0])
cur.execute(
f"""
SELECT e.id, e.title, e.status, e.visibility, e.summary, e.updated_at,
(
SELECT fa.name FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC
LIMIT 1
) AS primary_focus_name,
(
SELECT COUNT(*)::int FROM exercise_skills es WHERE es.exercise_id = e.id
) AS skill_count,
(
SELECT COUNT(*)::int FROM exercise_skills es
WHERE es.exercise_id = e.id AND es.ai_suggested = true
) AS ai_suggested_skill_count
FROM exercises e
WHERE {where_sql}
ORDER BY e.updated_at DESC, e.id DESC
LIMIT %s OFFSET %s
""",
tuple(params + [limit, offset]),
)
rows = [r2d(r) for r in cur.fetchall()]
return {
"items": rows,
"total": total,
"limit": limit,
"offset": offset,
}
@router.get("/api/admin/exercise-enrichment/candidate-ids")
def list_enrichment_candidate_ids(
session: dict = Depends(require_auth),
status: str = Query(default="draft"),
visibility: Optional[str] = Query(default="private"),
focus_area_id: Optional[int] = Query(default=None, ge=1),
without_skills: bool = Query(default=False),
with_ai_suggested_skills: bool = Query(default=False),
search: Optional[str] = Query(default=None, max_length=200),
):
"""Alle IDs zum aktuellen Filter (für „Alle auswählen“) — ohne Pagination-Obergrenze."""
_require_superadmin(session)
where, params = _build_candidates_where(
status=status,
visibility=visibility,
focus_area_id=focus_area_id,
without_skills=without_skills,
with_ai_suggested_skills=with_ai_suggested_skills,
)
qtext = (search or "").strip()
if qtext:
where.append("e.search_vector @@ plainto_tsquery('german', %s)")
params.append(qtext)
where_sql = " AND ".join(where)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
f"SELECT e.id FROM exercises e WHERE {where_sql} ORDER BY e.updated_at DESC, e.id DESC",
tuple(params),
)
ids = [int(r["id"] if isinstance(r, dict) else r[0]) for r in cur.fetchall()]
return {"ids": ids, "total": len(ids)}
@router.post("/api/admin/exercise-enrichment/analyze")
def analyze_enrichment(body: EnrichmentAnalyzeBody, session: dict = Depends(require_auth)):
"""Kosten-/Umfangsanalyse vor dem Batch-Lauf (explizite Nutzerbestätigung)."""
_require_superadmin(session)
ids = _normalize_id_list(body.exercise_ids, max_items=_MAX_ANALYZE_IDS)
if not ids:
raise HTTPException(status_code=400, detail="Keine gültigen Übungs-IDs")
if not body.modes.skills and not body.modes.summary and not body.modes.instructions:
raise HTTPException(
status_code=400,
detail="Mindestens ein Modus (skills, summary oder instructions) aktivieren",
)
est = estimate_llm_calls(
exercise_count=len(ids),
want_skills=body.modes.skills,
want_summary=body.modes.summary,
want_instructions=body.modes.instructions,
)
return {
"exercise_count": len(ids),
"estimated_llm_calls": est,
"modes": body.modes.model_dump(),
"warning": (
f"Batch mit {len(ids)} Übungen und ca. {est['total']} LLM-Aufrufen — Kosten beachten."
if len(ids) >= 25 or est["total"] >= 50
else None
),
}
def _preview_one_exercise(
ex_id: int,
*,
want_skills: bool,
want_summary: bool,
want_instructions: bool,
merge_mode: SkillMergeMode,
) -> Dict[str, Any]:
"""Einzel-Preview mit eigener DB-Connection (Thread-Pool)."""
try:
with get_db() as conn:
cur = get_cursor(conn)
row = preview_exercise_enrichment(
cur,
ex_id,
want_skills=want_skills,
want_summary=want_summary,
want_instructions=want_instructions,
merge_mode=merge_mode,
)
return row
except HTTPException as he:
d = he.detail
return {"exercise_id": ex_id, "ok": False, "error": d if isinstance(d, str) else str(d)}
except Exception as exc: # pragma: no cover
return {"exercise_id": ex_id, "ok": False, "error": str(exc)}
@router.post("/api/admin/exercise-enrichment/preview")
def preview_enrichment(body: EnrichmentPreviewBody, session: dict = Depends(require_auth)):
"""Dry-Run: KI-Vorschläge laden, nichts speichern (max. 3 Übungen/Request, parallel)."""
_require_superadmin(session)
ids = _normalize_id_list(body.exercise_ids, max_items=MAX_PREVIEW_BATCH_EXERCISES)
if not ids:
raise HTTPException(status_code=400, detail="Keine gültigen Übungs-IDs")
modes = body.modes
if not modes.skills and not modes.summary and not modes.instructions:
raise HTTPException(
status_code=400,
detail="Mindestens ein Modus (skills, summary oder instructions) aktivieren",
)
results: List[Dict[str, Any]] = []
errors: List[Dict[str, Any]] = []
workers = min(_PREVIEW_MAX_WORKERS, len(ids))
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = [
pool.submit(
_preview_one_exercise,
ex_id,
want_skills=modes.skills,
want_summary=modes.summary,
want_instructions=modes.instructions,
merge_mode=body.merge_mode,
)
for ex_id in ids
]
for fut in as_completed(futures):
row = fut.result()
if row.get("ok"):
results.append(row)
else:
errors.append(row)
results.sort(key=lambda r: int(r.get("exercise_id") or 0))
errors.sort(key=lambda r: int(r.get("exercise_id") or 0))
est = estimate_llm_calls(
exercise_count=len(ids),
want_skills=modes.skills,
want_summary=modes.summary,
want_instructions=modes.instructions,
)
return {
"results": results,
"errors": errors,
"processed": len(ids),
"ok_count": len(results),
"error_count": len(errors),
"estimated_llm_calls": est,
"merge_mode": body.merge_mode,
"modes": modes.model_dump(),
}
@router.post("/api/admin/exercise-enrichment/apply")
def apply_enrichment(body: EnrichmentApplyBody, session: dict = Depends(require_auth)):
"""Vorschläge anwenden und optional Status setzen (Default: in_review)."""
_require_superadmin(session)
if not body.items:
raise HTTPException(status_code=400, detail="Keine Items")
modes = body.modes
if not modes.skills and not modes.summary and not modes.instructions:
raise HTTPException(status_code=400, detail="Kein Anwendungsmodus aktiv")
applied: List[Dict[str, Any]] = []
failed: List[Dict[str, Any]] = []
with get_db() as conn:
cur = get_cursor(conn)
for item in body.items:
ex_id = int(item.exercise_id)
try:
row = apply_exercise_enrichment(
cur,
ex_id,
merged_skills=item.merged_skills,
merge_mode=body.merge_mode,
set_status=body.set_status,
apply_skills=modes.skills,
summary_text=item.summary,
apply_summary=modes.summary,
instruction_fields=item.instruction_fields,
apply_instructions=modes.instructions,
)
if row.get("ok"):
applied.append(row)
else:
failed.append(row)
except HTTPException as he:
d = he.detail
failed.append({"exercise_id": ex_id, "ok": False, "error": d if isinstance(d, str) else str(d)})
except Exception as exc: # pragma: no cover
failed.append({"exercise_id": ex_id, "ok": False, "error": str(exc)})
conn.commit()
return {
"applied": applied,
"failed": failed,
"applied_count": len(applied),
"failed_count": len(failed),
"set_status": body.set_status,
"merge_mode": body.merge_mode,
"modes": modes.model_dump(),
}