mitai-jinkendo/backend/result_container_parser.py
Lars ca562b7130
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 14s
feat: Phase 1 - Fragenergänzung + Strukturierter Container
Backend:
- question_augmenter.py (290 Zeilen): Hybrid-Modell für Fragenergänzungen
  * merge_question_augmentations(): Knotengebundene Fragen überschreiben Prompt-Defaults
  * augment_prompt_with_questions(): Markdown-formatierte Fragenergänzung
  * parse_question_augmentations_from_jsonb(): JSONB → QuestionAugmentation[]
- result_container_parser.py (250 Zeilen): Markdown-Sektionen-Parsing
  * parse_result_container(): Extrahiert Analysekern, Entscheidungsanteil, Begründungsanker
  * validate_decision_signal(): Normalisierung gegen answer_spectrum
  * Fallback-Parsing bei unstrukturierten Antworten
- routers/workflow_questions.py (236 Zeilen): CRUD für workflow_question_catalog
  * GET /api/workflow/questions (mit active_only Filter)
  * POST/PUT/DELETE (Admin only, Soft Delete)
- prompt_executor.py: Integration in execute_base_prompt()
  * Fragenergänzung vor LLM-Call (wenn node_questions oder catalog vorhanden)
  * Result-Container-Parsing nach LLM-Response
- main.py: Router-Registrierung (workflow_questions)

Tests:
- test_phase1_question_augmenter.py (8 Tests): Hybrid-Modell, Formatierung, JSONB-Parsing
- test_phase1_result_container_parser.py (17 Tests): Sektion-Extraktion, Decision-Parsing, Validierung

Alle 25 Unit-Tests bestanden.

version: 0.9j (backend)
module:  workflow 0.2.0

Konzept: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md (Phase 1)
2026-04-03 18:02:25 +02:00

272 lines
7.4 KiB
Python

"""
Result Container Parser (Phase 1)
Parsed strukturierte LLM-Antworten (Markdown-Sektionen).
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.2)
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Sektion 4.1)
Erwartet Format (Markdown-Sektionen):
```
## Analyse
[Hauptinhalt]
## Entscheidungsfragen
- Relevanz: ja
- Priorität: hoch
## Begründung
[Optional: Plausibilisierung]
```
Output:
```python
{
"analysis_core": str,
"decision_signals": Dict[str, str], # {"relevanz": "ja", "prioritaet": "hoch"}
"reasoning_anchors": Optional[str]
}
```
"""
import re
from typing import Dict, Optional, List, Tuple
def parse_result_container(llm_output: str) -> Dict:
"""
Parsed strukturierte LLM-Antwort in Ergebniscontainer.
Extrahiert drei Bereiche:
1. **Analysekern** (## Analyse)
2. **Entscheidungsanteil** (## Entscheidungsfragen)
3. **Begründungsanker** (## Begründung, optional)
Args:
llm_output: Vollständiger LLM-Output als String
Returns:
Dict mit drei Bereichen:
{
"analysis_core": str,
"decision_signals": Dict[str, str],
"reasoning_anchors": Optional[str],
"parsing_status": str # "complete", "partial", "failed"
}
Raises:
ValueError: Bei komplett fehlgeschlagenem Parsing
"""
# Extrahiere Sektionen
analysis_core = extract_section(llm_output, "Analyse")
decision_section = extract_section(llm_output, "Entscheidungsfragen")
reasoning_anchors = extract_section(llm_output, "Begründung")
# Parse Entscheidungsfragen
decision_signals = {}
if decision_section:
decision_signals = parse_decision_questions(decision_section)
# Determine parsing status
if analysis_core and decision_signals:
parsing_status = "complete"
elif analysis_core or decision_signals:
parsing_status = "partial"
else:
parsing_status = "failed"
# Fallback: Wenn keine Strukturierung erkannt, verwende gesamten Output als Analysekern
if parsing_status == "failed":
analysis_core = llm_output
parsing_status = "fallback"
return {
"analysis_core": analysis_core or "",
"decision_signals": decision_signals,
"reasoning_anchors": reasoning_anchors,
"parsing_status": parsing_status
}
def extract_section(text: str, section_name: str) -> Optional[str]:
"""
Extrahiert eine Markdown-Sektion aus dem Text.
Sucht nach:
- `## Section_Name` (Überschrift)
- Alles bis zur nächsten `##` Überschrift
Args:
text: Volltext
section_name: Name der Sektion (z.B. "Analyse", "Entscheidungsfragen")
Returns:
Sektionsinhalt (ohne Überschrift) oder None wenn nicht gefunden
"""
# Pattern: ## Section_Name (mit optionalem Whitespace)
# Captured content bis zur nächsten ## oder Ende
pattern = rf'##\s*{re.escape(section_name)}\s*\n(.*?)(?=\n##|\Z)'
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
content = match.group(1).strip()
return content if content else None
return None
def parse_decision_questions(section_text: str) -> Dict[str, str]:
"""
Parsed Entscheidungsfragen aus Sektion.
Erwartet Format:
```
- Relevanz: ja
- Priorität: hoch
- Selektion: nein
```
Alternativen:
```
Relevanz: ja
Priorität: hoch
```
oder:
```
- **Relevanz**: ja
- **Priorität**: hoch
```
Args:
section_text: Text der Entscheidungsfragen-Sektion
Returns:
Dict mit {frage_typ: antwort}
Beispiel: {"relevanz": "ja", "prioritaet": "hoch"}
"""
signals = {}
# Pattern: Matcht verschiedene Formatvariationen
# Gruppe 1: Fragetyp (z.B. "Relevanz", "**Priorität**")
# Gruppe 2: Antwort (z.B. "ja", "hoch")
patterns = [
r'-\s*\*?\*?(\w+)\*?\*?\s*:\s*\[?([^\]\n]+)\]?', # "- **Relevanz**: [ja]"
r'^\s*\*?\*?(\w+)\*?\*?\s*:\s*\[?([^\]\n]+)\]?', # "Relevanz: ja" (ohne -)
]
for pattern in patterns:
matches = re.finditer(pattern, section_text, re.MULTILINE | re.IGNORECASE)
for match in matches:
question_type = match.group(1).strip().lower()
answer = match.group(2).strip()
# Entferne Klammern und Whitespace
answer = answer.strip('[]()').strip()
signals[question_type] = answer
return signals
def validate_decision_signal(
signal_value: str,
answer_spectrum: List[str]
) -> Tuple[str, str]:
"""
Validiert ein Entscheidungssignal gegen Antwortspektrum.
Gibt zurück:
- (normalized_value, status)
Status:
- "valid": Antwort exakt im Spektrum
- "normalized": Antwort wurde normalisiert (z.B. "Ja""ja")
- "invalid": Antwort außerhalb des Spektrums
Args:
signal_value: Rohe Antwort vom LLM
answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"])
Returns:
Tuple (normalisierter_wert, status)
"""
# Exakte Übereinstimmung (case-sensitive)
if signal_value in answer_spectrum:
return signal_value, "valid"
# Case-insensitive Matching
signal_lower = signal_value.lower()
for allowed in answer_spectrum:
if signal_lower == allowed.lower():
return allowed, "normalized"
# Keine Übereinstimmung
return signal_value, "invalid"
def extract_analysis_core_fallback(llm_output: str) -> str:
"""
Fallback-Methode: Extrahiert Analysekern wenn keine Strukturierung erkannt.
Versucht intelligent zu identifizieren was der Hauptinhalt ist:
1. Wenn Text vor "## Entscheidungsfragen" existiert: Verwende das
2. Sonst: Verwende gesamten Text
Args:
llm_output: Vollständiger LLM-Output
Returns:
Best-Guess für Analysekern
"""
# Suche nach "## Entscheidungsfragen" Marker
pattern = r'##\s*Entscheidungsfragen'
match = re.search(pattern, llm_output, re.IGNORECASE)
if match:
# Text vor dem Marker ist wahrscheinlich die Analyse
return llm_output[:match.start()].strip()
# Fallback: Gesamter Text
return llm_output.strip()
def parse_result_container_robust(
llm_output: str,
expected_questions: Optional[List[str]] = None
) -> Dict:
"""
Robuste Variante des Parsings mit zusätzlicher Validierung.
Args:
llm_output: Vollständiger LLM-Output
expected_questions: Optionale Liste erwarteter Fragetypen (z.B. ["relevanz", "prioritaet"])
Returns:
Ergebniscontainer mit zusätzlichem "warnings"-Feld
"""
result = parse_result_container(llm_output)
warnings = []
# Prüfe ob erwartete Fragen vorhanden
if expected_questions:
found_questions = set(result['decision_signals'].keys())
expected_set = set(expected_questions)
missing = expected_set - found_questions
if missing:
warnings.append(f"Fehlende Entscheidungsfragen: {', '.join(missing)}")
unexpected = found_questions - expected_set
if unexpected:
warnings.append(f"Unerwartete Entscheidungsfragen: {', '.join(unexpected)}")
# Prüfe Parsing-Status
if result['parsing_status'] == "partial":
warnings.append("Parsing unvollständig: Einige Sektionen fehlen")
elif result['parsing_status'] == "fallback":
warnings.append("Keine Strukturierung erkannt, Fallback-Parsing verwendet")
result['warnings'] = warnings
return result