Problem: Parser converted question IDs to lowercase ('qAnalyst' → 'qanalyst'),
causing normalization to fail because id_catalog lookup is case-sensitive.
Impact: All workflow question signals were lost - normalized_signals stayed empty,
so template placeholders like {{node_2.signal_qAnalyst}} remained unresolved.
Solution: Removed .lower() call in parse_decision_questions() to preserve
original case from AI response.
Root cause: Line 162 in result_container_parser.py
Fixes: Question augmentation signals not appearing in workflow end nodes
273 lines
7.5 KiB
Python
273 lines
7.5 KiB
Python
"""
|
|
Result Container Parser (Phase 1)
|
|
|
|
Parsed strukturierte LLM-Antworten (Markdown-Sektionen).
|
|
|
|
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.2)
|
|
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Sektion 4.1)
|
|
|
|
Erwartet Format (Markdown-Sektionen):
|
|
```
|
|
## Analyse
|
|
[Hauptinhalt]
|
|
|
|
## Entscheidungsfragen
|
|
- Relevanz: ja
|
|
- Priorität: hoch
|
|
|
|
## Begründung
|
|
[Optional: Plausibilisierung]
|
|
```
|
|
|
|
Output:
|
|
```python
|
|
{
|
|
"analysis_core": str,
|
|
"decision_signals": Dict[str, str], # {"relevanz": "ja", "prioritaet": "hoch"}
|
|
"reasoning_anchors": Optional[str]
|
|
}
|
|
```
|
|
"""
|
|
import re
|
|
from typing import Dict, Optional, List, Tuple
|
|
|
|
|
|
def parse_result_container(llm_output: str) -> Dict:
|
|
"""
|
|
Parsed strukturierte LLM-Antwort in Ergebniscontainer.
|
|
|
|
Extrahiert drei Bereiche:
|
|
1. **Analysekern** (## Analyse)
|
|
2. **Entscheidungsanteil** (## Entscheidungsfragen)
|
|
3. **Begründungsanker** (## Begründung, optional)
|
|
|
|
Args:
|
|
llm_output: Vollständiger LLM-Output als String
|
|
|
|
Returns:
|
|
Dict mit drei Bereichen:
|
|
{
|
|
"analysis_core": str,
|
|
"decision_signals": Dict[str, str],
|
|
"reasoning_anchors": Optional[str],
|
|
"parsing_status": str # "complete", "partial", "failed"
|
|
}
|
|
|
|
Raises:
|
|
ValueError: Bei komplett fehlgeschlagenem Parsing
|
|
"""
|
|
# Extrahiere Sektionen
|
|
analysis_core = extract_section(llm_output, "Analyse")
|
|
decision_section = extract_section(llm_output, "Entscheidungsfragen")
|
|
reasoning_anchors = extract_section(llm_output, "Begründung")
|
|
|
|
# Parse Entscheidungsfragen
|
|
decision_signals = {}
|
|
if decision_section:
|
|
decision_signals = parse_decision_questions(decision_section)
|
|
|
|
# Determine parsing status
|
|
if analysis_core and decision_signals:
|
|
parsing_status = "complete"
|
|
elif analysis_core or decision_signals:
|
|
parsing_status = "partial"
|
|
else:
|
|
parsing_status = "failed"
|
|
|
|
# Fallback: Wenn keine Strukturierung erkannt, verwende gesamten Output als Analysekern
|
|
if parsing_status == "failed":
|
|
analysis_core = llm_output
|
|
parsing_status = "fallback"
|
|
|
|
return {
|
|
"analysis_core": analysis_core or "",
|
|
"decision_signals": decision_signals,
|
|
"reasoning_anchors": reasoning_anchors,
|
|
"parsing_status": parsing_status
|
|
}
|
|
|
|
|
|
def extract_section(text: str, section_name: str) -> Optional[str]:
|
|
"""
|
|
Extrahiert eine Markdown-Sektion aus dem Text.
|
|
|
|
Sucht nach:
|
|
- `## Section_Name` (Überschrift)
|
|
- Alles bis zur nächsten `##` Überschrift
|
|
|
|
Args:
|
|
text: Volltext
|
|
section_name: Name der Sektion (z.B. "Analyse", "Entscheidungsfragen")
|
|
|
|
Returns:
|
|
Sektionsinhalt (ohne Überschrift) oder None wenn nicht gefunden
|
|
"""
|
|
# Pattern: ## Section_Name (mit optionalem Whitespace)
|
|
# Captured content bis zur nächsten ## oder Ende
|
|
pattern = rf'##\s*{re.escape(section_name)}\s*\n(.*?)(?=\n##|\Z)'
|
|
|
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
|
|
|
|
if match:
|
|
content = match.group(1).strip()
|
|
return content if content else None
|
|
|
|
return None
|
|
|
|
|
|
def parse_decision_questions(section_text: str) -> Dict[str, str]:
|
|
"""
|
|
Parsed Entscheidungsfragen aus Sektion.
|
|
|
|
Erwartet Format:
|
|
```
|
|
- Relevanz: ja
|
|
- Priorität: hoch
|
|
- Selektion: nein
|
|
```
|
|
|
|
Alternativen:
|
|
```
|
|
Relevanz: ja
|
|
Priorität: hoch
|
|
```
|
|
|
|
oder:
|
|
```
|
|
- **Relevanz**: ja
|
|
- **Priorität**: hoch
|
|
```
|
|
|
|
Args:
|
|
section_text: Text der Entscheidungsfragen-Sektion
|
|
|
|
Returns:
|
|
Dict mit {frage_typ: antwort}
|
|
Beispiel: {"relevanz": "ja", "prioritaet": "hoch"}
|
|
"""
|
|
signals = {}
|
|
|
|
# Pattern: Matcht verschiedene Formatvariationen
|
|
# Gruppe 1: Fragetyp (z.B. "Relevanz", "**Priorität**")
|
|
# Gruppe 2: Antwort (z.B. "ja", "hoch")
|
|
patterns = [
|
|
r'-\s*\*?\*?(\w+)\*?\*?\s*:\s*\[?([^\]\n]+)\]?', # "- **Relevanz**: [ja]"
|
|
r'^\s*\*?\*?(\w+)\*?\*?\s*:\s*\[?([^\]\n]+)\]?', # "Relevanz: ja" (ohne -)
|
|
]
|
|
|
|
for pattern in patterns:
|
|
matches = re.finditer(pattern, section_text, re.MULTILINE | re.IGNORECASE)
|
|
for match in matches:
|
|
# Preserve original case for question IDs (e.g., "qAnalyst" not "qanalyst")
|
|
question_type = match.group(1).strip()
|
|
answer = match.group(2).strip()
|
|
|
|
# Entferne Klammern und Whitespace
|
|
answer = answer.strip('[]()').strip()
|
|
|
|
signals[question_type] = answer
|
|
|
|
return signals
|
|
|
|
|
|
def validate_decision_signal(
|
|
signal_value: str,
|
|
answer_spectrum: List[str]
|
|
) -> Tuple[str, str]:
|
|
"""
|
|
Validiert ein Entscheidungssignal gegen Antwortspektrum.
|
|
|
|
Gibt zurück:
|
|
- (normalized_value, status)
|
|
|
|
Status:
|
|
- "valid": Antwort exakt im Spektrum
|
|
- "normalized": Antwort wurde normalisiert (z.B. "Ja" → "ja")
|
|
- "invalid": Antwort außerhalb des Spektrums
|
|
|
|
Args:
|
|
signal_value: Rohe Antwort vom LLM
|
|
answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"])
|
|
|
|
Returns:
|
|
Tuple (normalisierter_wert, status)
|
|
"""
|
|
# Exakte Übereinstimmung (case-sensitive)
|
|
if signal_value in answer_spectrum:
|
|
return signal_value, "valid"
|
|
|
|
# Case-insensitive Matching
|
|
signal_lower = signal_value.lower()
|
|
for allowed in answer_spectrum:
|
|
if signal_lower == allowed.lower():
|
|
return allowed, "normalized"
|
|
|
|
# Keine Übereinstimmung
|
|
return signal_value, "invalid"
|
|
|
|
|
|
def extract_analysis_core_fallback(llm_output: str) -> str:
|
|
"""
|
|
Fallback-Methode: Extrahiert Analysekern wenn keine Strukturierung erkannt.
|
|
|
|
Versucht intelligent zu identifizieren was der Hauptinhalt ist:
|
|
1. Wenn Text vor "## Entscheidungsfragen" existiert: Verwende das
|
|
2. Sonst: Verwende gesamten Text
|
|
|
|
Args:
|
|
llm_output: Vollständiger LLM-Output
|
|
|
|
Returns:
|
|
Best-Guess für Analysekern
|
|
"""
|
|
# Suche nach "## Entscheidungsfragen" Marker
|
|
pattern = r'##\s*Entscheidungsfragen'
|
|
match = re.search(pattern, llm_output, re.IGNORECASE)
|
|
|
|
if match:
|
|
# Text vor dem Marker ist wahrscheinlich die Analyse
|
|
return llm_output[:match.start()].strip()
|
|
|
|
# Fallback: Gesamter Text
|
|
return llm_output.strip()
|
|
|
|
|
|
def parse_result_container_robust(
|
|
llm_output: str,
|
|
expected_questions: Optional[List[str]] = None
|
|
) -> Dict:
|
|
"""
|
|
Robuste Variante des Parsings mit zusätzlicher Validierung.
|
|
|
|
Args:
|
|
llm_output: Vollständiger LLM-Output
|
|
expected_questions: Optionale Liste erwarteter Fragetypen (z.B. ["relevanz", "prioritaet"])
|
|
|
|
Returns:
|
|
Ergebniscontainer mit zusätzlichem "warnings"-Feld
|
|
"""
|
|
result = parse_result_container(llm_output)
|
|
warnings = []
|
|
|
|
# Prüfe ob erwartete Fragen vorhanden
|
|
if expected_questions:
|
|
found_questions = set(result['decision_signals'].keys())
|
|
expected_set = set(expected_questions)
|
|
|
|
missing = expected_set - found_questions
|
|
if missing:
|
|
warnings.append(f"Fehlende Entscheidungsfragen: {', '.join(missing)}")
|
|
|
|
unexpected = found_questions - expected_set
|
|
if unexpected:
|
|
warnings.append(f"Unerwartete Entscheidungsfragen: {', '.join(unexpected)}")
|
|
|
|
# Prüfe Parsing-Status
|
|
if result['parsing_status'] == "partial":
|
|
warnings.append("Parsing unvollständig: Einige Sektionen fehlen")
|
|
elif result['parsing_status'] == "fallback":
|
|
warnings.append("Keine Strukturierung erkannt, Fallback-Parsing verwendet")
|
|
|
|
result['warnings'] = warnings
|
|
return result
|