mitai-jinkendo/backend/routers/workflow_questions.py
Lars ca562b7130
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 14s
feat: Phase 1 - Fragenergänzung + Strukturierter Container
Backend:
- question_augmenter.py (290 Zeilen): Hybrid-Modell für Fragenergänzungen
  * merge_question_augmentations(): Knotengebundene Fragen überschreiben Prompt-Defaults
  * augment_prompt_with_questions(): Markdown-formatierte Fragenergänzung
  * parse_question_augmentations_from_jsonb(): JSONB → QuestionAugmentation[]
- result_container_parser.py (250 Zeilen): Markdown-Sektionen-Parsing
  * parse_result_container(): Extrahiert Analysekern, Entscheidungsanteil, Begründungsanker
  * validate_decision_signal(): Normalisierung gegen answer_spectrum
  * Fallback-Parsing bei unstrukturierten Antworten
- routers/workflow_questions.py (236 Zeilen): CRUD für workflow_question_catalog
  * GET /api/workflow/questions (mit active_only Filter)
  * POST/PUT/DELETE (Admin only, Soft Delete)
- prompt_executor.py: Integration in execute_base_prompt()
  * Fragenergänzung vor LLM-Call (wenn node_questions oder catalog vorhanden)
  * Result-Container-Parsing nach LLM-Response
- main.py: Router-Registrierung (workflow_questions)

Tests:
- test_phase1_question_augmenter.py (8 Tests): Hybrid-Modell, Formatierung, JSONB-Parsing
- test_phase1_result_container_parser.py (17 Tests): Sektion-Extraktion, Decision-Parsing, Validierung

Alle 25 Unit-Tests bestanden.

version: 0.9j (backend)
module:  workflow 0.2.0

Konzept: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md (Phase 1)
2026-04-03 18:02:25 +02:00

236 lines
7.1 KiB
Python

"""
Workflow Questions Router (Phase 1)
CRUD für workflow_question_catalog Tabelle.
Endpunkte:
- GET /api/workflow/questions - Liste aller Fragen
- GET /api/workflow/questions/{id} - Einzelne Frage
- POST /api/workflow/questions - Neue Frage (Admin only)
- PUT /api/workflow/questions/{id} - Frage aktualisieren (Admin only)
- DELETE /api/workflow/questions/{id} - Frage löschen (Admin only)
"""
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Optional
from auth import require_auth, require_admin
from db import get_db, get_cursor, r2d
from workflow_models import QuestionCatalogEntry
from pydantic import BaseModel
router = APIRouter()
class QuestionCatalogCreate(BaseModel):
"""Request-Modell für neue Frage"""
question_type: str
label: str
question_template: str
answer_spectrum: List[str]
normalization_rules: Optional[dict] = None
class QuestionCatalogUpdate(BaseModel):
"""Request-Modell für Frage-Update"""
label: Optional[str] = None
question_template: Optional[str] = None
answer_spectrum: Optional[List[str]] = None
normalization_rules: Optional[dict] = None
active: Optional[bool] = None
@router.get("/api/workflow/questions")
def list_questions(
active_only: bool = True,
session: dict = Depends(require_auth)
):
"""
Liste alle Fragen aus dem Katalog.
Query-Parameter:
- active_only: Nur aktive Fragen (default: true)
"""
with get_db() as conn:
cur = get_cursor(conn)
if active_only:
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum,
normalization_rules, active, created_at::text as created_at
FROM workflow_question_catalog
WHERE active = true
ORDER BY question_type"""
)
else:
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum,
normalization_rules, active, created_at::text as created_at
FROM workflow_question_catalog
ORDER BY question_type"""
)
rows = cur.fetchall()
return [r2d(row) for row in rows]
@router.get("/api/workflow/questions/{question_id}")
def get_question(
question_id: str,
session: dict = Depends(require_auth)
):
"""Einzelne Frage abrufen"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum,
normalization_rules, active, created_at::text as created_at
FROM workflow_question_catalog
WHERE id = %s""",
(question_id,)
)
row = cur.fetchone()
if not row:
raise HTTPException(404, f"Frage nicht gefunden: {question_id}")
return r2d(row)
@router.post("/api/workflow/questions")
def create_question(
data: QuestionCatalogCreate,
session: dict = Depends(require_admin)
):
"""
Neue Frage erstellen (Admin only).
Validierungen:
- question_type muss eindeutig sein
- answer_spectrum muss mindestens 2 Werte enthalten
"""
# Validierung
if len(data.answer_spectrum) < 2:
raise HTTPException(400, "answer_spectrum muss mindestens 2 Werte enthalten")
with get_db() as conn:
cur = get_cursor(conn)
# Prüfe ob question_type bereits existiert
cur.execute(
"SELECT id FROM workflow_question_catalog WHERE question_type = %s",
(data.question_type,)
)
if cur.fetchone():
raise HTTPException(400, f"question_type '{data.question_type}' existiert bereits")
# Erstelle Frage
cur.execute(
"""INSERT INTO workflow_question_catalog
(question_type, label, question_template, answer_spectrum, normalization_rules, active)
VALUES (%s, %s, %s, %s, %s, true)
RETURNING id, created_at::text as created_at""",
(
data.question_type,
data.label,
data.question_template,
json.dumps(data.answer_spectrum),
json.dumps(data.normalization_rules) if data.normalization_rules else None
)
)
result = cur.fetchone()
conn.commit()
return {
"id": result[0],
"question_type": data.question_type,
"label": data.label,
"question_template": data.question_template,
"answer_spectrum": data.answer_spectrum,
"normalization_rules": data.normalization_rules,
"active": True,
"created_at": result[1]
}
@router.put("/api/workflow/questions/{question_id}")
def update_question(
question_id: str,
data: QuestionCatalogUpdate,
session: dict = Depends(require_admin)
):
"""Frage aktualisieren (Admin only)"""
import json
# Prüfe ob Frage existiert
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id FROM workflow_question_catalog WHERE id = %s", (question_id,))
if not cur.fetchone():
raise HTTPException(404, f"Frage nicht gefunden: {question_id}")
# Build UPDATE statement dynamically
updates = []
params = []
if data.label is not None:
updates.append("label = %s")
params.append(data.label)
if data.question_template is not None:
updates.append("question_template = %s")
params.append(data.question_template)
if data.answer_spectrum is not None:
if len(data.answer_spectrum) < 2:
raise HTTPException(400, "answer_spectrum muss mindestens 2 Werte enthalten")
updates.append("answer_spectrum = %s")
params.append(json.dumps(data.answer_spectrum))
if data.normalization_rules is not None:
updates.append("normalization_rules = %s")
params.append(json.dumps(data.normalization_rules))
if data.active is not None:
updates.append("active = %s")
params.append(data.active)
if not updates:
raise HTTPException(400, "Keine Aktualisierungen angegeben")
params.append(question_id)
update_sql = f"UPDATE workflow_question_catalog SET {', '.join(updates)} WHERE id = %s"
cur.execute(update_sql, tuple(params))
conn.commit()
# Return updated question
return get_question(question_id, session)
@router.delete("/api/workflow/questions/{question_id}")
def delete_question(
question_id: str,
session: dict = Depends(require_admin)
):
"""
Frage löschen (Admin only).
Setzt active=false statt physischem Löschen (Soft Delete).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"UPDATE workflow_question_catalog SET active = false WHERE id = %s RETURNING id",
(question_id,)
)
result = cur.fetchone()
if not result:
raise HTTPException(404, f"Frage nicht gefunden: {question_id}")
conn.commit()
return {"status": "deleted", "id": question_id}