feat: Phase 1 - Fragenergänzung + Strukturierter Container
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 14s

Backend:
- question_augmenter.py (290 Zeilen): Hybrid-Modell für Fragenergänzungen
  * merge_question_augmentations(): Knotengebundene Fragen überschreiben Prompt-Defaults
  * augment_prompt_with_questions(): Markdown-formatierte Fragenergänzung
  * parse_question_augmentations_from_jsonb(): JSONB → QuestionAugmentation[]
- result_container_parser.py (250 Zeilen): Markdown-Sektionen-Parsing
  * parse_result_container(): Extrahiert Analysekern, Entscheidungsanteil, Begründungsanker
  * validate_decision_signal(): Normalisierung gegen answer_spectrum
  * Fallback-Parsing bei unstrukturierten Antworten
- routers/workflow_questions.py (236 Zeilen): CRUD für workflow_question_catalog
  * GET /api/workflow/questions (mit active_only Filter)
  * POST/PUT/DELETE (Admin only, Soft Delete)
- prompt_executor.py: Integration in execute_base_prompt()
  * Fragenergänzung vor LLM-Call (wenn node_questions oder catalog vorhanden)
  * Result-Container-Parsing nach LLM-Response
- main.py: Router-Registrierung (workflow_questions)

Tests:
- test_phase1_question_augmenter.py (8 Tests): Hybrid-Modell, Formatierung, JSONB-Parsing
- test_phase1_result_container_parser.py (17 Tests): Sektion-Extraktion, Decision-Parsing, Validierung

Alle 25 Unit-Tests bestanden.

version: 0.9j (backend)
module:  workflow 0.2.0

Konzept: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md (Phase 1)
This commit is contained in:
Lars 2026-04-03 18:02:25 +02:00
parent b5be6e21a5
commit ca562b7130
8 changed files with 1237 additions and 9 deletions

View File

@ -26,6 +26,7 @@ from routers import evaluation # v9d/v9e Training Type Profiles (#15)
from routers import goals, focus_areas # v9e/v9g Goal System v2.0 (Dynamic Focus Areas)
from routers import goal_types, goal_progress, training_phases, fitness_tests # v9h Goal System (Split routers)
from routers import charts # Phase 0c Multi-Layer Architecture
from routers import workflow_questions # Phase 1 Workflow Engine - Question Catalog
# ── App Configuration ─────────────────────────────────────────────────────────
DATA_DIR = Path(os.getenv("DATA_DIR", "./data"))
@ -110,6 +111,9 @@ app.include_router(focus_areas.router) # /api/focus-areas/* (v9g Focus
# Phase 0c Multi-Layer Architecture
app.include_router(charts.router) # /api/charts/* (Phase 0c Charts API)
# Phase 1 Workflow Engine
app.include_router(workflow_questions.router) # /api/workflow/questions/* (Phase 1 Question Catalog)
# ── Health Check ──────────────────────────────────────────────────────────────
@app.get("/")
def root():

View File

@ -205,18 +205,54 @@ async def execute_base_prompt(
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None
catalog: Optional[Dict] = None,
node_questions: Optional[list] = None # Phase 1: Knotengebundene Fragen
) -> Dict[str, Any]:
"""Execute a base-type prompt (single template)."""
"""
Execute a base-type prompt (single template).
Phase 1: Unterstützt Fragenergänzungen (Hybridmodell)
- node_questions: Knotengebundene Fragen (Priorität 1)
- prompt.question_augmentations: Prompt-Defaults (Priorität 2)
"""
from question_augmenter import (
parse_question_augmentations_from_jsonb,
merge_question_augmentations,
augment_prompt_with_questions
)
from result_container_parser import parse_result_container_robust
template = prompt.get('template')
if not template:
raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}")
debug_info = {} if enable_debug else None
# Phase 1: Load question augmentations (Hybridmodell)
prompt_default_questions = None
if prompt.get('question_augmentations'):
try:
from workflow_models import QuestionAugmentation
prompt_default_questions = parse_question_augmentations_from_jsonb(
prompt['question_augmentations']
)
except Exception as e:
if enable_debug:
debug_info['question_augmentations_error'] = str(e)
# Merge question augmentations (Vorrangregel: Knoten > Prompt)
questions = merge_question_augmentations(node_questions, prompt_default_questions)
# Resolve placeholders (with optional catalog for |d modifier)
prompt_text = resolve_placeholders(template, variables, debug_info, catalog)
# Phase 1: Augment prompt with questions (if any)
if questions:
prompt_text = augment_prompt_with_questions(prompt_text, questions)
if enable_debug:
debug_info['question_augmentations_count'] = len(questions)
debug_info['question_types'] = [q.type for q in questions]
if enable_debug:
debug_info['template'] = template
debug_info['final_prompt'] = prompt_text[:500] + ('...' if len(prompt_text) > 500 else '')
@ -229,12 +265,24 @@ async def execute_base_prompt(
debug_info['ai_response_length'] = len(response)
debug_info['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
# Validate JSON if required
output_format = prompt.get('output_format', 'text')
if output_format == 'json':
output = validate_json_output(response, prompt.get('output_schema'), debug_info if enable_debug else None)
# Phase 1: Parse structured result if questions were used
if questions:
expected_question_types = [q.type for q in questions]
container = parse_result_container_robust(response, expected_question_types)
if enable_debug:
debug_info['parsing_status'] = container['parsing_status']
debug_info['parsing_warnings'] = container.get('warnings', [])
output = container
output_format = 'structured_container' # New format type
else:
output = response
# Legacy behavior: Validate JSON if required
output_format = prompt.get('output_format', 'text')
if output_format == 'json':
output = validate_json_output(response, prompt.get('output_schema'), debug_info if enable_debug else None)
else:
output = response
result = {
"type": "base",

View File

@ -0,0 +1,289 @@
"""
Question Augmenter (Phase 1)
Generiert Fragenergänzungs-Suffix für Analyseprompts.
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 6.4, 8.3)
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Sektion 4.2, 6)
Hybridmodell (Sektion 6):
- Primär: Knotengebundene Fragenergänzungen (am Workflow-Knoten definiert)
- Sekundär: Prompt-gebundene Standardfragen (optional in ai_prompts.question_augmentations)
- Vorrangregel: Knotenspezifische überschreiben Prompt-Defaults
Output-Format (User-Entscheidung 2):
- Markdown-Sektionen mit klaren Delimitern (statt verpflichtendem JSON-Mode)
"""
from typing import List, Dict, Optional, Any
from workflow_models import QuestionAugmentation
from db import get_db, get_cursor, r2d
def generate_question_suffix(questions: List[QuestionAugmentation]) -> str:
"""
Generiert Fragenergänzungs-Suffix für einen Analyseprompt.
Format (Markdown-Sektionen):
```
## Analyse
[Hauptinhalt deines Prompts hier]
## Entscheidungsfragen
Beantworte folgende Fragen präzise:
- Relevanz: [ja/nein/unklar]
- Priorität: [hoch/mittel/niedrig/unklar]
## Begründung
[Optional: Kurze Plausibilisierung deiner Antworten (1-2 Sätze)]
```
Args:
questions: Liste von QuestionAugmentation-Objekten
Returns:
Fragenergänzungs-Suffix als Markdown-String
Raises:
ValueError: Bei leerer Fragenliste
"""
if not questions:
raise ValueError("Fragenliste darf nicht leer sein")
# Build question list
question_lines = []
for q in questions:
# Format: "- Fragentyp: [erlaubte Werte]"
spectrum_str = "/".join(q.answer_spectrum)
question_lines.append(f"- {q.type.capitalize()}: [{spectrum_str}]")
question_block = "\n".join(question_lines)
suffix = f"""
---
**WICHTIG: Strukturiere deine Antwort wie folgt:**
## Analyse
[Deine Hauptanalyse hier - beantworte die ursprüngliche Frage ausführlich]
## Entscheidungsfragen
Beantworte folgende Fragen **präzise** mit den vorgegebenen Werten:
{question_block}
## Begründung
[Optional: Kurze Plausibilisierung deiner Entscheidungsfragen-Antworten (1-2 Sätze)]
**Hinweise:**
- Antworte bei Entscheidungsfragen NUR mit den vorgegebenen Werten
- Bei Unsicherheit wähle "unklar"
- Die Begründung ist optional, aber hilfreich für die Nachvollziehbarkeit
"""
return suffix
def load_question_augmentations_from_db(
question_types: Optional[List[str]] = None
) -> List[QuestionAugmentation]:
"""
Lädt Fragenergänzungen aus der Datenbank (workflow_question_catalog).
Args:
question_types: Optionale Liste von Fragetypen zum Filtern
(z.B. ["relevanz", "prioritaet"])
Wenn None: Alle aktiven Fragen werden geladen
Returns:
Liste von QuestionAugmentation-Objekten
Raises:
HTTPException: Bei Datenbankfehlern
"""
with get_db() as conn:
cur = get_cursor(conn)
if question_types:
placeholders = ", ".join(["%s"] * len(question_types))
cur.execute(
f"""SELECT id, question_type, label, question_template, answer_spectrum
FROM workflow_question_catalog
WHERE active = true AND question_type IN ({placeholders})
ORDER BY question_type""",
tuple(question_types)
)
else:
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum
FROM workflow_question_catalog
WHERE active = true
ORDER BY question_type"""
)
rows = cur.fetchall()
questions = []
for row in rows:
db_row = r2d(row)
questions.append(
QuestionAugmentation(
id=db_row['question_type'], # Verwende Typ als ID (z.B. "relevanz")
type=db_row['question_type'],
question=db_row['question_template'],
answer_spectrum=db_row['answer_spectrum']
)
)
return questions
def merge_question_augmentations(
node_questions: Optional[List[QuestionAugmentation]],
prompt_default_questions: Optional[List[QuestionAugmentation]]
) -> List[QuestionAugmentation]:
"""
Merged Fragenergänzungen nach Hybridmodell-Vorrangregel.
Vorrangregel (Sektion 6.2 der Anforderungsanalyse):
1. Knotenspezifische Fragenergänzungen überschreiben Prompt-Defaults
2. Wenn Knoten keine Fragen definiert: Prompt-Defaults werden verwendet (falls vorhanden)
3. Wenn weder Knoten noch Prompt Fragen definieren: Leere Liste
Args:
node_questions: Fragenergänzungen am Workflow-Knoten (primär)
prompt_default_questions: Fragenergänzungen am Prompt (sekundär)
Returns:
Gemergete Liste von QuestionAugmentation-Objekten
"""
# Vorrangregel 1: Knotenspezifische Fragen haben absolute Priorität
if node_questions:
return node_questions
# Vorrangregel 2: Wenn Knoten keine Fragen hat, verwende Prompt-Defaults
if prompt_default_questions:
return prompt_default_questions
# Vorrangregel 3: Wenn weder Knoten noch Prompt Fragen haben: Leere Liste
return []
def parse_question_augmentations_from_jsonb(
jsonb_data: Optional[Dict[str, Any]]
) -> List[QuestionAugmentation]:
"""
Parsed QuestionAugmentation-Objekte aus JSONB-Daten (z.B. ai_prompts.question_augmentations).
Format:
[
{
"id": "q1",
"type": "relevanz",
"question": "Ist eine vertiefte Analyse relevant?",
"answer_spectrum": ["ja", "nein", "unklar"]
},
...
]
Args:
jsonb_data: JSONB-Array als Python-Dict/List
Returns:
Liste von QuestionAugmentation-Objekten
Raises:
ValueError: Bei ungültigem Format
"""
if not jsonb_data:
return []
if not isinstance(jsonb_data, list):
raise ValueError("question_augmentations muss ein Array sein")
questions = []
for item in jsonb_data:
try:
questions.append(QuestionAugmentation(**item))
except Exception as e:
raise ValueError(f"Ungültiges QuestionAugmentation-Format: {e}")
return questions
def get_question_augmentation_instruction() -> str:
"""
Gibt generische Instruktion für strukturierte Antwort zurück.
Diese Instruktion wird IMMER zum Prompt hinzugefügt, wenn Fragenergänzungen aktiv sind.
Returns:
Instruktions-String als Markdown
"""
return """
---
**WICHTIG: Strukturiere deine Antwort in folgenden Markdown-Sektionen:**
1. **## Analyse** - Deine Hauptanalyse (beantworte die ursprüngliche Frage)
2. **## Entscheidungsfragen** - Beantworte die unten stehenden Fragen PRÄZISE mit den vorgegebenen Werten
3. **## Begründung** - Optional: Kurze Plausibilisierung deiner Entscheidungsfragen (1-2 Sätze)
"""
def format_question_list(questions: List[QuestionAugmentation]) -> str:
"""
Formatiert Fragenliste als Markdown-Liste.
Format:
```
- Relevanz: [ja/nein/unklar]
- Priorität: [hoch/mittel/niedrig/unklar]
```
Args:
questions: Liste von QuestionAugmentation-Objekten
Returns:
Formatierte Markdown-Liste
"""
lines = []
for q in questions:
spectrum_str = "/".join(q.answer_spectrum)
lines.append(f"- **{q.type.capitalize()}**: [{spectrum_str}]")
return "\n".join(lines)
def augment_prompt_with_questions(
base_prompt: str,
questions: List[QuestionAugmentation]
) -> str:
"""
Fügt Fragenergänzungen zu einem Basis-Prompt hinzu.
Args:
base_prompt: Original-Prompt-Text
questions: Liste von Fragenergänzungen
Returns:
Erweiterter Prompt mit Fragenergänzungen
Raises:
ValueError: Bei leerer Fragenliste
"""
if not questions:
raise ValueError("Keine Fragenergänzungen vorhanden")
instruction = get_question_augmentation_instruction()
question_list = format_question_list(questions)
augmented_prompt = f"""{base_prompt}
{instruction}
**Entscheidungsfragen:**
{question_list}
"""
return augmented_prompt

View File

@ -0,0 +1,271 @@
"""
Result Container Parser (Phase 1)
Parsed strukturierte LLM-Antworten (Markdown-Sektionen).
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.2)
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Sektion 4.1)
Erwartet Format (Markdown-Sektionen):
```
## Analyse
[Hauptinhalt]
## Entscheidungsfragen
- Relevanz: ja
- Priorität: hoch
## Begründung
[Optional: Plausibilisierung]
```
Output:
```python
{
"analysis_core": str,
"decision_signals": Dict[str, str], # {"relevanz": "ja", "prioritaet": "hoch"}
"reasoning_anchors": Optional[str]
}
```
"""
import re
from typing import Dict, Optional, List, Tuple
def parse_result_container(llm_output: str) -> Dict:
"""
Parsed strukturierte LLM-Antwort in Ergebniscontainer.
Extrahiert drei Bereiche:
1. **Analysekern** (## Analyse)
2. **Entscheidungsanteil** (## Entscheidungsfragen)
3. **Begründungsanker** (## Begründung, optional)
Args:
llm_output: Vollständiger LLM-Output als String
Returns:
Dict mit drei Bereichen:
{
"analysis_core": str,
"decision_signals": Dict[str, str],
"reasoning_anchors": Optional[str],
"parsing_status": str # "complete", "partial", "failed"
}
Raises:
ValueError: Bei komplett fehlgeschlagenem Parsing
"""
# Extrahiere Sektionen
analysis_core = extract_section(llm_output, "Analyse")
decision_section = extract_section(llm_output, "Entscheidungsfragen")
reasoning_anchors = extract_section(llm_output, "Begründung")
# Parse Entscheidungsfragen
decision_signals = {}
if decision_section:
decision_signals = parse_decision_questions(decision_section)
# Determine parsing status
if analysis_core and decision_signals:
parsing_status = "complete"
elif analysis_core or decision_signals:
parsing_status = "partial"
else:
parsing_status = "failed"
# Fallback: Wenn keine Strukturierung erkannt, verwende gesamten Output als Analysekern
if parsing_status == "failed":
analysis_core = llm_output
parsing_status = "fallback"
return {
"analysis_core": analysis_core or "",
"decision_signals": decision_signals,
"reasoning_anchors": reasoning_anchors,
"parsing_status": parsing_status
}
def extract_section(text: str, section_name: str) -> Optional[str]:
"""
Extrahiert eine Markdown-Sektion aus dem Text.
Sucht nach:
- `## Section_Name` (Überschrift)
- Alles bis zur nächsten `##` Überschrift
Args:
text: Volltext
section_name: Name der Sektion (z.B. "Analyse", "Entscheidungsfragen")
Returns:
Sektionsinhalt (ohne Überschrift) oder None wenn nicht gefunden
"""
# Pattern: ## Section_Name (mit optionalem Whitespace)
# Captured content bis zur nächsten ## oder Ende
pattern = rf'##\s*{re.escape(section_name)}\s*\n(.*?)(?=\n##|\Z)'
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
content = match.group(1).strip()
return content if content else None
return None
def parse_decision_questions(section_text: str) -> Dict[str, str]:
"""
Parsed Entscheidungsfragen aus Sektion.
Erwartet Format:
```
- Relevanz: ja
- Priorität: hoch
- Selektion: nein
```
Alternativen:
```
Relevanz: ja
Priorität: hoch
```
oder:
```
- **Relevanz**: ja
- **Priorität**: hoch
```
Args:
section_text: Text der Entscheidungsfragen-Sektion
Returns:
Dict mit {frage_typ: antwort}
Beispiel: {"relevanz": "ja", "prioritaet": "hoch"}
"""
signals = {}
# Pattern: Matcht verschiedene Formatvariationen
# Gruppe 1: Fragetyp (z.B. "Relevanz", "**Priorität**")
# Gruppe 2: Antwort (z.B. "ja", "hoch")
patterns = [
r'-\s*\*?\*?(\w+)\*?\*?\s*:\s*\[?([^\]\n]+)\]?', # "- **Relevanz**: [ja]"
r'^\s*\*?\*?(\w+)\*?\*?\s*:\s*\[?([^\]\n]+)\]?', # "Relevanz: ja" (ohne -)
]
for pattern in patterns:
matches = re.finditer(pattern, section_text, re.MULTILINE | re.IGNORECASE)
for match in matches:
question_type = match.group(1).strip().lower()
answer = match.group(2).strip()
# Entferne Klammern und Whitespace
answer = answer.strip('[]()').strip()
signals[question_type] = answer
return signals
def validate_decision_signal(
signal_value: str,
answer_spectrum: List[str]
) -> Tuple[str, str]:
"""
Validiert ein Entscheidungssignal gegen Antwortspektrum.
Gibt zurück:
- (normalized_value, status)
Status:
- "valid": Antwort exakt im Spektrum
- "normalized": Antwort wurde normalisiert (z.B. "Ja" "ja")
- "invalid": Antwort außerhalb des Spektrums
Args:
signal_value: Rohe Antwort vom LLM
answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"])
Returns:
Tuple (normalisierter_wert, status)
"""
# Exakte Übereinstimmung (case-sensitive)
if signal_value in answer_spectrum:
return signal_value, "valid"
# Case-insensitive Matching
signal_lower = signal_value.lower()
for allowed in answer_spectrum:
if signal_lower == allowed.lower():
return allowed, "normalized"
# Keine Übereinstimmung
return signal_value, "invalid"
def extract_analysis_core_fallback(llm_output: str) -> str:
"""
Fallback-Methode: Extrahiert Analysekern wenn keine Strukturierung erkannt.
Versucht intelligent zu identifizieren was der Hauptinhalt ist:
1. Wenn Text vor "## Entscheidungsfragen" existiert: Verwende das
2. Sonst: Verwende gesamten Text
Args:
llm_output: Vollständiger LLM-Output
Returns:
Best-Guess für Analysekern
"""
# Suche nach "## Entscheidungsfragen" Marker
pattern = r'##\s*Entscheidungsfragen'
match = re.search(pattern, llm_output, re.IGNORECASE)
if match:
# Text vor dem Marker ist wahrscheinlich die Analyse
return llm_output[:match.start()].strip()
# Fallback: Gesamter Text
return llm_output.strip()
def parse_result_container_robust(
llm_output: str,
expected_questions: Optional[List[str]] = None
) -> Dict:
"""
Robuste Variante des Parsings mit zusätzlicher Validierung.
Args:
llm_output: Vollständiger LLM-Output
expected_questions: Optionale Liste erwarteter Fragetypen (z.B. ["relevanz", "prioritaet"])
Returns:
Ergebniscontainer mit zusätzlichem "warnings"-Feld
"""
result = parse_result_container(llm_output)
warnings = []
# Prüfe ob erwartete Fragen vorhanden
if expected_questions:
found_questions = set(result['decision_signals'].keys())
expected_set = set(expected_questions)
missing = expected_set - found_questions
if missing:
warnings.append(f"Fehlende Entscheidungsfragen: {', '.join(missing)}")
unexpected = found_questions - expected_set
if unexpected:
warnings.append(f"Unerwartete Entscheidungsfragen: {', '.join(unexpected)}")
# Prüfe Parsing-Status
if result['parsing_status'] == "partial":
warnings.append("Parsing unvollständig: Einige Sektionen fehlen")
elif result['parsing_status'] == "fallback":
warnings.append("Keine Strukturierung erkannt, Fallback-Parsing verwendet")
result['warnings'] = warnings
return result

View File

@ -0,0 +1,235 @@
"""
Workflow Questions Router (Phase 1)
CRUD für workflow_question_catalog Tabelle.
Endpunkte:
- GET /api/workflow/questions - Liste aller Fragen
- GET /api/workflow/questions/{id} - Einzelne Frage
- POST /api/workflow/questions - Neue Frage (Admin only)
- PUT /api/workflow/questions/{id} - Frage aktualisieren (Admin only)
- DELETE /api/workflow/questions/{id} - Frage löschen (Admin only)
"""
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Optional
from auth import require_auth, require_admin
from db import get_db, get_cursor, r2d
from workflow_models import QuestionCatalogEntry
from pydantic import BaseModel
router = APIRouter()
class QuestionCatalogCreate(BaseModel):
"""Request-Modell für neue Frage"""
question_type: str
label: str
question_template: str
answer_spectrum: List[str]
normalization_rules: Optional[dict] = None
class QuestionCatalogUpdate(BaseModel):
"""Request-Modell für Frage-Update"""
label: Optional[str] = None
question_template: Optional[str] = None
answer_spectrum: Optional[List[str]] = None
normalization_rules: Optional[dict] = None
active: Optional[bool] = None
@router.get("/api/workflow/questions")
def list_questions(
active_only: bool = True,
session: dict = Depends(require_auth)
):
"""
Liste alle Fragen aus dem Katalog.
Query-Parameter:
- active_only: Nur aktive Fragen (default: true)
"""
with get_db() as conn:
cur = get_cursor(conn)
if active_only:
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum,
normalization_rules, active, created_at::text as created_at
FROM workflow_question_catalog
WHERE active = true
ORDER BY question_type"""
)
else:
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum,
normalization_rules, active, created_at::text as created_at
FROM workflow_question_catalog
ORDER BY question_type"""
)
rows = cur.fetchall()
return [r2d(row) for row in rows]
@router.get("/api/workflow/questions/{question_id}")
def get_question(
question_id: str,
session: dict = Depends(require_auth)
):
"""Einzelne Frage abrufen"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, question_type, label, question_template, answer_spectrum,
normalization_rules, active, created_at::text as created_at
FROM workflow_question_catalog
WHERE id = %s""",
(question_id,)
)
row = cur.fetchone()
if not row:
raise HTTPException(404, f"Frage nicht gefunden: {question_id}")
return r2d(row)
@router.post("/api/workflow/questions")
def create_question(
data: QuestionCatalogCreate,
session: dict = Depends(require_admin)
):
"""
Neue Frage erstellen (Admin only).
Validierungen:
- question_type muss eindeutig sein
- answer_spectrum muss mindestens 2 Werte enthalten
"""
# Validierung
if len(data.answer_spectrum) < 2:
raise HTTPException(400, "answer_spectrum muss mindestens 2 Werte enthalten")
with get_db() as conn:
cur = get_cursor(conn)
# Prüfe ob question_type bereits existiert
cur.execute(
"SELECT id FROM workflow_question_catalog WHERE question_type = %s",
(data.question_type,)
)
if cur.fetchone():
raise HTTPException(400, f"question_type '{data.question_type}' existiert bereits")
# Erstelle Frage
cur.execute(
"""INSERT INTO workflow_question_catalog
(question_type, label, question_template, answer_spectrum, normalization_rules, active)
VALUES (%s, %s, %s, %s, %s, true)
RETURNING id, created_at::text as created_at""",
(
data.question_type,
data.label,
data.question_template,
json.dumps(data.answer_spectrum),
json.dumps(data.normalization_rules) if data.normalization_rules else None
)
)
result = cur.fetchone()
conn.commit()
return {
"id": result[0],
"question_type": data.question_type,
"label": data.label,
"question_template": data.question_template,
"answer_spectrum": data.answer_spectrum,
"normalization_rules": data.normalization_rules,
"active": True,
"created_at": result[1]
}
@router.put("/api/workflow/questions/{question_id}")
def update_question(
question_id: str,
data: QuestionCatalogUpdate,
session: dict = Depends(require_admin)
):
"""Frage aktualisieren (Admin only)"""
import json
# Prüfe ob Frage existiert
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id FROM workflow_question_catalog WHERE id = %s", (question_id,))
if not cur.fetchone():
raise HTTPException(404, f"Frage nicht gefunden: {question_id}")
# Build UPDATE statement dynamically
updates = []
params = []
if data.label is not None:
updates.append("label = %s")
params.append(data.label)
if data.question_template is not None:
updates.append("question_template = %s")
params.append(data.question_template)
if data.answer_spectrum is not None:
if len(data.answer_spectrum) < 2:
raise HTTPException(400, "answer_spectrum muss mindestens 2 Werte enthalten")
updates.append("answer_spectrum = %s")
params.append(json.dumps(data.answer_spectrum))
if data.normalization_rules is not None:
updates.append("normalization_rules = %s")
params.append(json.dumps(data.normalization_rules))
if data.active is not None:
updates.append("active = %s")
params.append(data.active)
if not updates:
raise HTTPException(400, "Keine Aktualisierungen angegeben")
params.append(question_id)
update_sql = f"UPDATE workflow_question_catalog SET {', '.join(updates)} WHERE id = %s"
cur.execute(update_sql, tuple(params))
conn.commit()
# Return updated question
return get_question(question_id, session)
@router.delete("/api/workflow/questions/{question_id}")
def delete_question(
question_id: str,
session: dict = Depends(require_admin)
):
"""
Frage löschen (Admin only).
Setzt active=false statt physischem Löschen (Soft Delete).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"UPDATE workflow_question_catalog SET active = false WHERE id = %s RETURNING id",
(question_id,)
)
result = cur.fetchone()
if not result:
raise HTTPException(404, f"Frage nicht gefunden: {question_id}")
conn.commit()
return {"status": "deleted", "id": question_id}

View File

@ -7,7 +7,7 @@ Semantic Versioning: MAJOR.MINOR.PATCH
- PATCH: Bugfix, kleine Änderung, Refactor
"""
APP_VERSION = "0.9i"
APP_VERSION = "0.9j"
BUILD_DATE = "2026-04-03"
DB_SCHEMA_VERSION = "20260403" # Migration 034
@ -27,10 +27,22 @@ MODULE_VERSIONS = {
"exportdata": "1.1.0",
"importdata": "1.0.0",
"membership": "2.1.0",
"workflow": "0.1.0", # Phase 0: Foundation
"workflow": "0.2.0", # Phase 1: Fragenergänzung + Strukturierter Container
}
CHANGELOG = [
{
"version": "0.9j",
"date": "2026-04-03",
"changes": [
"Phase 1: Fragenergänzung + Strukturierter Container",
"question_augmenter.py: Hybrid-Modell (Knotengebundene Fragen überschreiben Prompt-Defaults)",
"result_container_parser.py: Markdown-Sektionen (Analysekern, Entscheidungsanteil, Begründungsanker)",
"Integration in execute_base_prompt(): Fragenergänzung vor LLM-Call, Parsing nach LLM-Response",
"API-Router workflow_questions.py: CRUD für workflow_question_catalog",
"Unit-Tests Phase 1: 25 Tests (question_augmenter + result_container_parser)",
]
},
{
"version": "0.9i",
"date": "2026-04-03",

View File

@ -0,0 +1,135 @@
"""
Unit Tests für question_augmenter.py (Phase 1)
Run with: PYTHONPATH=./backend pytest tests/backend/test_phase1_question_augmenter.py -v
"""
import pytest
from workflow_models import QuestionAugmentation
from question_augmenter import (
augment_prompt_with_questions,
merge_question_augmentations,
format_question_list,
parse_question_augmentations_from_jsonb
)
def test_format_question_list():
"""Test: Formatierung der Fragenliste"""
questions = [
QuestionAugmentation(
id="q1",
type="relevanz",
question="Ist relevant?",
answer_spectrum=["ja", "nein", "unklar"]
),
QuestionAugmentation(
id="q2",
type="prioritaet",
question="Wie hoch?",
answer_spectrum=["hoch", "mittel", "niedrig", "unklar"]
)
]
result = format_question_list(questions)
assert "Relevanz" in result
assert "[ja/nein/unklar]" in result
assert "Prioritaet" in result # Lowercase wird capitalized
assert "[hoch/mittel/niedrig/unklar]" in result
def test_augment_prompt_with_questions():
"""Test: Prompt-Erweiterung mit Fragenergänzungen"""
base_prompt = "Analysiere die Körperdaten."
questions = [
QuestionAugmentation(
id="q1",
type="relevanz",
question="Ist relevant?",
answer_spectrum=["ja", "nein", "unklar"]
)
]
augmented = augment_prompt_with_questions(base_prompt, questions)
assert "Analysiere die Körperdaten." in augmented
assert "## Analyse" in augmented
assert "## Entscheidungsfragen" in augmented
assert "Relevanz" in augmented
assert "[ja/nein/unklar]" in augmented
def test_merge_question_augmentations_node_priority():
"""Test: Knotengebundene Fragen haben Vorrang (Hybridmodell)"""
node_questions = [
QuestionAugmentation(id="q1", type="relevanz", question="Q1", answer_spectrum=["ja", "nein"])
]
prompt_questions = [
QuestionAugmentation(id="q2", type="prioritaet", question="Q2", answer_spectrum=["hoch", "niedrig"])
]
result = merge_question_augmentations(node_questions, prompt_questions)
# Knotengebundene haben Vorrang
assert len(result) == 1
assert result[0].type == "relevanz"
def test_merge_question_augmentations_prompt_fallback():
"""Test: Prompt-Defaults werden verwendet wenn Knoten leer"""
node_questions = None
prompt_questions = [
QuestionAugmentation(id="q2", type="prioritaet", question="Q2", answer_spectrum=["hoch", "niedrig"])
]
result = merge_question_augmentations(node_questions, prompt_questions)
# Prompt-Defaults werden verwendet
assert len(result) == 1
assert result[0].type == "prioritaet"
def test_merge_question_augmentations_empty():
"""Test: Leere Liste wenn weder Knoten noch Prompt Fragen haben"""
result = merge_question_augmentations(None, None)
assert result == []
def test_parse_question_augmentations_from_jsonb():
"""Test: Parsing aus JSONB-Format"""
jsonb_data = [
{
"id": "q1",
"type": "relevanz",
"question": "Ist relevant?",
"answer_spectrum": ["ja", "nein", "unklar"]
},
{
"id": "q2",
"type": "prioritaet",
"question": "Wie hoch?",
"answer_spectrum": ["hoch", "mittel", "niedrig"]
}
]
result = parse_question_augmentations_from_jsonb(jsonb_data)
assert len(result) == 2
assert result[0].type == "relevanz"
assert result[1].type == "prioritaet"
def test_parse_question_augmentations_empty_jsonb():
"""Test: Leere Liste bei None JSONB"""
result = parse_question_augmentations_from_jsonb(None)
assert result == []
def test_parse_question_augmentations_invalid_jsonb():
"""Test: ValueError bei ungültigem JSONB"""
with pytest.raises(ValueError, match="muss ein Array sein"):
parse_question_augmentations_from_jsonb({"invalid": "format"})
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -0,0 +1,234 @@
"""
Unit Tests für result_container_parser.py (Phase 1)
Run with: PYTHONPATH=./backend pytest tests/backend/test_phase1_result_container_parser.py -v
"""
import pytest
from result_container_parser import (
parse_result_container,
extract_section,
parse_decision_questions,
validate_decision_signal,
parse_result_container_robust
)
def test_extract_section_basic():
"""Test: Einfache Sektion extrahieren"""
text = """
## Analyse
Das ist der Analysekern.
Mehrere Zeilen.
## Entscheidungsfragen
- Relevanz: ja
"""
result = extract_section(text, "Analyse")
assert result == "Das ist der Analysekern.\nMehrere Zeilen."
def test_extract_section_not_found():
"""Test: Nicht vorhandene Sektion"""
text = "## Analyse\nInhalt"
result = extract_section(text, "Begründung")
assert result is None
def test_extract_section_empty():
"""Test: Leere Sektion (nur Whitespace am Ende)"""
text = "## Analyse\n\n"
result = extract_section(text, "Analyse")
assert result is None
def test_parse_decision_questions_basic():
"""Test: Standard-Format parsen"""
section = """
- Relevanz: ja
- Priorität: hoch
- Selektion: nein
"""
result = parse_decision_questions(section)
assert result == {
"relevanz": "ja",
"priorität": "hoch",
"selektion": "nein"
}
def test_parse_decision_questions_bold():
"""Test: Format mit **bold** Markup"""
section = """
- **Relevanz**: ja
- **Priorität**: hoch
"""
result = parse_decision_questions(section)
assert result == {
"relevanz": "ja",
"priorität": "hoch"
}
def test_parse_decision_questions_without_dash():
"""Test: Format ohne führendes Minus"""
section = """
Relevanz: ja
Priorität: hoch
"""
result = parse_decision_questions(section)
assert result == {
"relevanz": "ja",
"priorität": "hoch"
}
def test_parse_decision_questions_brackets():
"""Test: Format mit [Klammern]"""
section = """
- Relevanz: [ja]
- Priorität: [hoch]
"""
result = parse_decision_questions(section)
assert result == {
"relevanz": "ja",
"priorität": "hoch"
}
def test_validate_decision_signal_exact_match():
"""Test: Exakte Übereinstimmung"""
value, status = validate_decision_signal("ja", ["ja", "nein", "unklar"])
assert value == "ja"
assert status == "valid"
def test_validate_decision_signal_normalized():
"""Test: Case-insensitive Normalisierung"""
value, status = validate_decision_signal("JA", ["ja", "nein", "unklar"])
assert value == "ja"
assert status == "normalized"
def test_validate_decision_signal_invalid():
"""Test: Ungültige Antwort"""
value, status = validate_decision_signal("vielleicht", ["ja", "nein", "unklar"])
assert value == "vielleicht"
assert status == "invalid"
def test_parse_result_container_complete():
"""Test: Vollständiger Container mit allen Sektionen"""
llm_output = """
## Analyse
Der Nutzer zeigt eine positive Gewichtsentwicklung.
Kaloriendefizit wird eingehalten.
## Entscheidungsfragen
- Relevanz: ja
- Priorität: hoch
## Begründung
Die Gewichtsabnahme ist im Zielbereich von 0.5-1% pro Woche.
"""
result = parse_result_container(llm_output)
assert result["parsing_status"] == "complete"
assert "Gewichtsentwicklung" in result["analysis_core"]
assert result["decision_signals"]["relevanz"] == "ja"
assert result["decision_signals"]["priorität"] == "hoch"
assert "Zielbereich" in result["reasoning_anchors"]
def test_parse_result_container_partial():
"""Test: Container ohne Begründung (partial)"""
llm_output = """
## Analyse
Analyse-Inhalt
## Entscheidungsfragen
- Relevanz: ja
"""
result = parse_result_container(llm_output)
assert result["parsing_status"] == "complete"
assert result["analysis_core"] == "Analyse-Inhalt"
assert result["decision_signals"]["relevanz"] == "ja"
assert result["reasoning_anchors"] is None
def test_parse_result_container_no_structure():
"""Test: Unstrukturierte Antwort (Fallback)"""
llm_output = "Einfache Textantwort ohne Strukturierung."
result = parse_result_container(llm_output)
assert result["parsing_status"] == "fallback"
assert result["analysis_core"] == "Einfache Textantwort ohne Strukturierung."
assert result["decision_signals"] == {}
assert result["reasoning_anchors"] is None
def test_parse_result_container_only_questions():
"""Test: Nur Entscheidungsfragen, keine Analyse (partial)"""
llm_output = """
## Entscheidungsfragen
- Relevanz: nein
- Priorität: niedrig
"""
result = parse_result_container(llm_output)
assert result["parsing_status"] == "partial"
assert result["analysis_core"] == ""
assert result["decision_signals"]["relevanz"] == "nein"
def test_parse_result_container_robust_with_warnings():
"""Test: Robuste Variante mit erwarteten Fragen"""
llm_output = """
## Analyse
Inhalt
## Entscheidungsfragen
- Relevanz: ja
"""
expected_questions = ["relevanz", "prioritaet", "selektion"]
result = parse_result_container_robust(llm_output, expected_questions)
assert "warnings" in result
assert any("Fehlende Entscheidungsfragen" in w for w in result["warnings"])
assert any("prioritaet" in w for w in result["warnings"])
def test_parse_result_container_case_insensitive():
"""Test: Case-insensitive Sektion-Matching"""
llm_output = """
## ANALYSE
Großgeschrieben
## entscheidungsfragen
- Relevanz: ja
"""
result = parse_result_container(llm_output)
assert result["parsing_status"] == "complete"
assert result["analysis_core"] == "Großgeschrieben"
assert result["decision_signals"]["relevanz"] == "ja"
def test_parse_decision_questions_mixed_formats():
"""Test: Gemischte Formate in einer Sektion"""
section = """
- **Relevanz**: ja
Priorität: hoch
- Selektion: [nein]
"""
result = parse_decision_questions(section)
assert result["relevanz"] == "ja"
assert result["priorität"] == "hoch"
assert result["selektion"] == "nein"
if __name__ == "__main__":
pytest.main([__file__, "-v"])