ROOT ARCHITECTURAL CHANGE:
Multiple questions with same type are now supported!
Problem:
- question_augmenter used q.type as LLM key
- If two questions had type="unsicherheit":
- LLM saw duplicate keys: "- unsicherheit: [ja/nein]"
- Could only answer one
- Signals were ambiguous
Solution:
- Use question.id as LLM key (unique by design)
- Keep type for normalization logic
- Map id → type internally
Backend question_augmenter.py:
- format_question_list() now uses q.id as key
- Format: "- **q21**: [ja/nein] # Question text"
- Question text as comment for LLM context
Backend workflow_executor.py:
- Removed type→id mapping (no longer needed)
- decision_signals now keyed by id (from LLM)
- Build id→type catalog for normalization
- NormalizedSignal.question_type stores id (not type!)
- End Node template: signal_{id} directly available
Flow:
1. Questions sent to LLM: "- q21: [ja/nein] # Ist Protein unsicher?"
2. LLM answers: "- q21: nein"
3. Normalization: id→type lookup for spectrum/rules
4. Template: {{ node_4.signal_q21 }} = "nein"
Example (TWO unsicherheit questions):
Questions:
- q21: type=unsicherheit, question="Ist Protein unsicher?"
- q22: type=unsicherheit, question="Ist Energie unsicher?"
LLM Prompt:
```
## Entscheidungsfragen
- **q21**: [ja/nein] # Ist Protein unsicher?
- **q22**: [ja/nein] # Ist Energie unsicher?
```
LLM Response:
```
- q21: nein
- q22: ja
```
Template:
```
{{ node_4.signal_q21 }} → "nein"
{{ node_4.signal_q22 }} → "ja"
```
BREAKING CHANGE:
- Old workflows with decision_signals keyed by type will break
- Need to re-execute workflows after update
Issue: Cannot have multiple questions with same type
Version: 0.9p (workflow module)
Part 3: End Node Template Engine - ARCHITECTURAL FIX
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1011 lines
37 KiB
Python
1011 lines
37 KiB
Python
"""
|
|
Workflow Executor (Phase 4)
|
|
|
|
Führt Workflows mit conditional branching und path consolidation aus.
|
|
|
|
Phase 2: Sequential execution
|
|
Phase 3: Conditional branching, Logic Nodes, Fallback strategies
|
|
Phase 4: Join Nodes, Path consolidation
|
|
|
|
Konzept-Basis: konzept_workflow_engine_konsolidated.md
|
|
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2-4)
|
|
"""
|
|
from typing import Dict, Any, List, Optional, Set
|
|
from datetime import datetime
|
|
import uuid
|
|
import logging
|
|
import json
|
|
from jinja2 import Template, TemplateError
|
|
|
|
from workflow_models import (
|
|
WorkflowGraph, NodeExecutionState, ExecutionResult,
|
|
NodeStatus, NormalizedSignal, FallbackStrategy, SignalStatus,
|
|
EndNodeOutputMode
|
|
)
|
|
from workflow_engine import parse_workflow_graph, get_execution_order
|
|
from question_augmenter import (
|
|
augment_prompt_with_questions,
|
|
parse_question_augmentations_from_jsonb
|
|
)
|
|
from result_container_parser import parse_result_container
|
|
from normalization_engine import normalize_all_signals, normalize_signal_value, load_question_catalog
|
|
from logic_evaluator import evaluate_logic_expression, resolve_signal_reference
|
|
from join_evaluator import evaluate_join_node as evaluate_join_node_core
|
|
from db import get_db, get_cursor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def execute_workflow(
|
|
workflow_id: Optional[str] = None,
|
|
graph_data: Optional[Dict] = None, # Phase 5: Direkt von ai_prompts.graph_data
|
|
profile_id: str = None,
|
|
variables: Dict[str, Any] = None,
|
|
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
|
|
enable_debug: bool = False
|
|
) -> ExecutionResult:
|
|
"""
|
|
Führt einen Workflow aus (mit conditional branching und path consolidation).
|
|
|
|
Phase 2: Linear execution in topological order.
|
|
Phase 3: Conditional branching basierend auf logic nodes.
|
|
Phase 4: Join nodes und path consolidation.
|
|
Phase 5: Unterstützt graph_data direkt (aus ai_prompts, nicht workflow_definitions)
|
|
|
|
Args:
|
|
workflow_id: UUID des Workflows (legacy, für workflow_definitions Tabelle)
|
|
graph_data: Workflow-Graph als Dict (NEU, für ai_prompts.graph_data)
|
|
profile_id: UUID des Profils
|
|
variables: Platzhalter-Werte (z.B. {"name": "Lars", ...})
|
|
openrouter_call_func: async (prompt, model) -> str
|
|
enable_debug: Debug-Modus
|
|
|
|
Returns:
|
|
ExecutionResult mit allen node_states
|
|
|
|
Beispiel (Phase 5):
|
|
>>> result = await execute_workflow(
|
|
... graph_data={"nodes": [...], "edges": [...]},
|
|
... profile_id="test-profile",
|
|
... variables={"name": "Lars"},
|
|
... openrouter_call_func=my_llm_func
|
|
... )
|
|
"""
|
|
execution_id = str(uuid.uuid4())
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
logger.info(f"Starting workflow execution: {execution_id}")
|
|
|
|
try:
|
|
# 1. Lade Workflow-Definition
|
|
if graph_data:
|
|
# Phase 5: Graph direkt aus ai_prompts.graph_data (already a dict)
|
|
graph_dict = graph_data
|
|
logger.debug(f"Using provided graph_data")
|
|
elif workflow_id:
|
|
# Phase 0-4: Graph aus workflow_definitions Tabelle (legacy)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT graph FROM workflow_definitions WHERE id = %s AND active = true",
|
|
(workflow_id,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise ValueError(f"Workflow not found: {workflow_id}")
|
|
|
|
graph_dict = row['graph'] # PostgreSQL JSONB returns dict
|
|
logger.debug(f"Loaded graph from workflow_definitions: {workflow_id}")
|
|
else:
|
|
raise ValueError("Entweder workflow_id oder graph_data muss übergeben werden")
|
|
|
|
# 2. Parse Graph
|
|
graph = parse_workflow_graph(graph_dict)
|
|
logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
|
|
|
|
# 4. Lade Question Catalog
|
|
with get_db() as conn:
|
|
catalog = load_question_catalog(conn)
|
|
logger.debug(f"Loaded catalog: {len(catalog)} question types")
|
|
|
|
# 5. Execute Nodes mit conditional branching (Phase 3)
|
|
node_states: List[NodeExecutionState] = []
|
|
context = {
|
|
"variables": variables,
|
|
"profile_id": profile_id,
|
|
"node_results": {}, # Phase 3: Store full NodeExecutionState
|
|
"active_edges": {}, # Phase 3: Track edge activation
|
|
"graph": graph # Phase 4: Needed for End Node to access questions
|
|
}
|
|
|
|
# Phase 3: BFS traversal mit edge activation
|
|
start_node = next((n for n in graph.nodes if n.type == "start"), None)
|
|
if not start_node:
|
|
raise ValueError("No start node found in workflow")
|
|
|
|
visited: Set[str] = set()
|
|
queue = [start_node.id]
|
|
|
|
while queue:
|
|
node_id = queue.pop(0)
|
|
|
|
if node_id in visited:
|
|
continue
|
|
|
|
visited.add(node_id)
|
|
node = next(n for n in graph.nodes if n.id == node_id)
|
|
|
|
# Prüfe ob Node aktiv ist (mindestens eine incoming edge aktiv)
|
|
if not _has_active_incoming_edge(node, graph, context):
|
|
logger.info(f"Skipping node {node_id}: no active incoming edges")
|
|
node_states.append(NodeExecutionState(
|
|
node_id=node_id,
|
|
status=NodeStatus.SKIPPED,
|
|
error="no_active_incoming_edges",
|
|
started_at=datetime.utcnow().isoformat(),
|
|
completed_at=datetime.utcnow().isoformat()
|
|
))
|
|
continue
|
|
|
|
logger.info(f"Executing node: {node_id} (type: {node.type})")
|
|
|
|
node_state = await execute_node(
|
|
node=node,
|
|
context=context,
|
|
catalog=catalog,
|
|
graph=graph, # Phase 3: Needed for logic nodes
|
|
openrouter_call_func=openrouter_call_func,
|
|
enable_debug=enable_debug
|
|
)
|
|
|
|
node_states.append(node_state)
|
|
context["node_results"][node_id] = node_state
|
|
|
|
# Füge Nachfolger zur Queue hinzu
|
|
outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
|
|
for edge in outgoing_edges:
|
|
# Prüfe ob Edge aktiv ist (default: True, Logic Nodes setzen auf False)
|
|
if context["active_edges"].get(edge.id, True):
|
|
queue.append(edge.to_node)
|
|
|
|
# 6. Aggregiere Ergebnisse
|
|
aggregated = aggregate_results(node_states, graph)
|
|
|
|
# 7. Speichere Execution State
|
|
completed_at = datetime.utcnow().isoformat()
|
|
save_execution_state(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
profile_id=profile_id,
|
|
node_states=node_states,
|
|
status="completed",
|
|
started_at=started_at,
|
|
completed_at=completed_at
|
|
)
|
|
|
|
logger.info(f"Workflow execution completed: {execution_id}")
|
|
|
|
return ExecutionResult(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
|
|
status="completed",
|
|
node_states=node_states,
|
|
aggregated_result=aggregated,
|
|
started_at=started_at,
|
|
completed_at=completed_at
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Workflow execution failed: {e}", exc_info=True)
|
|
|
|
# Speichere Failed State
|
|
completed_at = datetime.utcnow().isoformat()
|
|
save_execution_state(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id, # Can be None when graph_data is used
|
|
profile_id=profile_id,
|
|
node_states=node_states if 'node_states' in locals() else [],
|
|
status="failed",
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
error=str(e)
|
|
)
|
|
|
|
return ExecutionResult(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id or "N/A", # ExecutionResult requires string, use placeholder
|
|
status="failed",
|
|
node_states=node_states if 'node_states' in locals() else [],
|
|
aggregated_result={},
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
async def execute_node(
|
|
node,
|
|
context: Dict[str, Any],
|
|
catalog: Dict[str, Dict],
|
|
graph: WorkflowGraph, # Phase 3: Needed for logic nodes
|
|
openrouter_call_func,
|
|
enable_debug: bool = False
|
|
) -> NodeExecutionState:
|
|
"""
|
|
Führt einen einzelnen Knoten aus.
|
|
|
|
Args:
|
|
node: WorkflowNode (aus graph.nodes)
|
|
context: Execution context (variables, profile_id, node_results, active_edges)
|
|
catalog: Question catalog
|
|
graph: WorkflowGraph (für Logic Nodes)
|
|
openrouter_call_func: LLM callback: async (prompt, model) -> str
|
|
enable_debug: Debug mode
|
|
|
|
Returns:
|
|
NodeExecutionState
|
|
|
|
Node Types:
|
|
- start/end: No-op
|
|
- analysis: Load prompt → augment → LLM → parse → normalize
|
|
- logic: Evaluate condition → activate/deactivate edges (Phase 3)
|
|
- join: Consolidate paths → merge results (Phase 4)
|
|
"""
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
try:
|
|
# Start Node: No-Op
|
|
if node.type == "start":
|
|
logger.debug(f"Node {node.id}: No-op (start)")
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
# End Node: Output Generation (Phase 4)
|
|
if node.type == "end":
|
|
return execute_end_node(node, context)
|
|
|
|
# Logic Nodes (Phase 3)
|
|
if node.type == "logic":
|
|
return execute_logic_node(node, context, graph)
|
|
|
|
# Join Nodes (Phase 4)
|
|
if node.type == "join":
|
|
return execute_join_node(node, context, graph)
|
|
|
|
# Analysis Nodes
|
|
if node.type == "analysis":
|
|
# 1. Lade Prompt
|
|
prompt_template = await load_prompt_template(node.prompt_slug, context)
|
|
logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'")
|
|
|
|
# 2. Parse question_augmentations
|
|
questions = []
|
|
if node.question_augmentations:
|
|
# Convert list of dicts to JSONB-like format for parser
|
|
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
|
|
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
|
|
logger.debug(f"Node {node.id}: {len(questions)} question augmentations")
|
|
|
|
# 3. Augment Prompt
|
|
if questions:
|
|
augmented_prompt = augment_prompt_with_questions(
|
|
base_prompt=prompt_template,
|
|
questions=questions
|
|
)
|
|
else:
|
|
augmented_prompt = prompt_template
|
|
|
|
# 4. LLM Call
|
|
logger.debug(f"Node {node.id}: Calling LLM")
|
|
llm_response = await openrouter_call_func(
|
|
augmented_prompt,
|
|
"anthropic/claude-sonnet-4" # Default model
|
|
)
|
|
|
|
# 5. Parse Result Container
|
|
parsed = parse_result_container(llm_response)
|
|
logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})")
|
|
|
|
# 6. Normalize Signals
|
|
# NOTE: decision_signals now use question.id as key (not type)
|
|
# We need to build a catalog: id → {type, spectrum} for normalization
|
|
normalized_signals = []
|
|
if parsed["decision_signals"]:
|
|
# Build catalog: id → answer_spectrum (for normalization)
|
|
id_catalog = {}
|
|
if questions:
|
|
for q in questions:
|
|
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
|
|
id_catalog[q_dict['id']] = {
|
|
"type": q_dict['type'], # Keep type for normalization
|
|
"answer_spectrum": q_dict['answer_spectrum'],
|
|
"normalization_rules": None # Node-Questions haben keine Synonyme
|
|
}
|
|
|
|
# Normalize each signal (signals keyed by ID now)
|
|
for signal_id, signal_value in parsed["decision_signals"].items():
|
|
if signal_id in id_catalog:
|
|
q_config = id_catalog[signal_id]
|
|
# Use the type-based catalog for normalization rules (if any)
|
|
type_catalog_entry = catalog.get(q_config['type'], {})
|
|
|
|
# Normalize with question-specific spectrum
|
|
normalized = normalize_signal_value(
|
|
raw_value=signal_value,
|
|
answer_spectrum=q_config['answer_spectrum'],
|
|
normalization_rules=type_catalog_entry.get('normalization_rules')
|
|
)
|
|
|
|
normalized_signals.append(NormalizedSignal(
|
|
question_type=signal_id, # Store ID as question_type (for template access)
|
|
raw_value=signal_value,
|
|
normalized_value=normalized.get('normalized_value'),
|
|
status=normalized.get('status'),
|
|
confidence=normalized.get('confidence'),
|
|
metadata=normalized.get('metadata')
|
|
))
|
|
logger.debug(f"Node {node.id}: Normalized signal '{signal_id}' = '{signal_value}' → '{normalized.get('normalized_value')}'")
|
|
|
|
logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals")
|
|
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core=parsed["analysis_core"],
|
|
decision_signals=parsed["decision_signals"],
|
|
normalized_signals=normalized_signals,
|
|
reasoning_anchors=parsed.get("reasoning_anchors"),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
# Unbekannter Node-Typ (Phase 4: join)
|
|
raise ValueError(f"Node type '{node.type}' not implemented yet (Phase 4+)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True)
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=str(e),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
|
|
def execute_logic_node(
|
|
node,
|
|
context: Dict[str, Any],
|
|
graph: WorkflowGraph
|
|
) -> NodeExecutionState:
|
|
"""
|
|
Führt Logic Node aus (Phase 3).
|
|
|
|
Args:
|
|
node: WorkflowNode vom Typ "logic"
|
|
context: Execution context mit node_results
|
|
graph: WorkflowGraph
|
|
|
|
Returns:
|
|
NodeExecutionState mit evaluation_result in metadata
|
|
|
|
Logic:
|
|
1. Evaluiere node.condition.expression
|
|
2. Wähle then_path oder else_path basierend auf Ergebnis
|
|
3. Bei Fehler/Unsicherheit: Wende fallback an
|
|
4. Markiere gewählte Edge(s) als aktiv, andere als inaktiv
|
|
5. Return NodeExecutionState mit evaluation_result
|
|
"""
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
try:
|
|
if not node.condition:
|
|
raise ValueError(f"Logic node {node.id} has no condition")
|
|
|
|
# 1. Evaluiere Bedingung
|
|
result, error = evaluate_logic_expression(node.condition.expression, context)
|
|
|
|
if error:
|
|
# Fehler bei Evaluation → Fallback anwenden
|
|
logger.warning(f"Logic node {node.id}: evaluation error: {error}")
|
|
activated_edges = _apply_fallback(node, graph, context, error)
|
|
else:
|
|
# Erfolgreiche Evaluation
|
|
logger.info(f"Logic node {node.id}: evaluated to {result}")
|
|
|
|
# 2. Wähle Pfad basierend auf Ergebnis
|
|
if result:
|
|
# Condition TRUE → then_path aktivieren
|
|
activated_edges = _get_edges_by_label(node.id, "then", graph)
|
|
else:
|
|
# Condition FALSE → else_path aktivieren
|
|
activated_edges = _get_edges_by_label(node.id, "else", graph)
|
|
|
|
# 3. Markiere Edges als aktiv/inaktiv
|
|
outgoing_edges = [e for e in graph.edges if e.from_node == node.id]
|
|
for edge in outgoing_edges:
|
|
context["active_edges"][edge.id] = edge.id in activated_edges
|
|
|
|
logger.debug(f"Logic node {node.id}: activated edges: {activated_edges}")
|
|
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat(),
|
|
analysis_core=json.dumps({
|
|
"evaluation_result": result if not error else None,
|
|
"error": error,
|
|
"activated_edges": activated_edges
|
|
})
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Logic node execution failed ({node.id}): {e}", exc_info=True)
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=str(e),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
|
|
def execute_join_node(
|
|
node,
|
|
context: Dict[str, Any],
|
|
graph: WorkflowGraph
|
|
) -> NodeExecutionState:
|
|
"""
|
|
Führt Join Node aus (Phase 4).
|
|
|
|
Args:
|
|
node: WorkflowNode vom Typ "join"
|
|
context: Execution context mit node_results
|
|
graph: WorkflowGraph
|
|
|
|
Returns:
|
|
NodeExecutionState mit konsolidierten Daten
|
|
|
|
Logic:
|
|
1. Evaluiere Join-Strategie (via join_evaluator)
|
|
2. Prüfe ob ready (alle erforderlichen Pfade verfügbar?)
|
|
3. Konsolidiere Ergebnisse (merge analysis_core, combine signals)
|
|
4. Return NodeExecutionState mit konsolidierten Daten
|
|
|
|
Error Handling:
|
|
- wait_all + fehlende Pfade → FAILED
|
|
- wait_any + keine Pfade → FAILED
|
|
- best_effort + keine Pfade → EXECUTED (mit metadata)
|
|
"""
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
try:
|
|
logger.info(f"Executing join node: {node.id}")
|
|
|
|
# 1. Evaluiere Join-Node
|
|
join_result = evaluate_join_node_core(node, graph, context)
|
|
|
|
# 2. Prüfe ob ready
|
|
if not join_result.ready:
|
|
# Strategie nicht erfüllt → FAILED
|
|
logger.warning(f"Join node {node.id}: Not ready - {join_result.error}")
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=join_result.error,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat(),
|
|
metadata=join_result.metadata
|
|
)
|
|
|
|
# 3. Baue konsolidierten analysis_core
|
|
# Format: JSON mit node_id → analysis_core mapping
|
|
consolidated_core_json = json.dumps(
|
|
join_result.consolidated_analysis_core,
|
|
ensure_ascii=False,
|
|
indent=2
|
|
)
|
|
|
|
# 4. Log Konsolidierung
|
|
executed_count = join_result.metadata.get("executed_paths", 0)
|
|
total_count = join_result.metadata.get("total_paths", 0)
|
|
logger.info(
|
|
f"Join node {node.id}: Consolidated {executed_count}/{total_count} paths"
|
|
)
|
|
|
|
# 5. Convert consolidated_signals Dict → List[NormalizedSignal]
|
|
# (NodeExecutionState expects List, but join_evaluator returns Dict)
|
|
consolidated_signals_list = list(join_result.consolidated_signals.values())
|
|
|
|
# 6. Return NodeExecutionState
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core=consolidated_core_json,
|
|
normalized_signals=consolidated_signals_list,
|
|
metadata=join_result.metadata,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Join node execution failed ({node.id}): {e}", exc_info=True)
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=str(e),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
|
|
def execute_end_node(
|
|
node,
|
|
context: Dict[str, Any]
|
|
) -> NodeExecutionState:
|
|
"""
|
|
Führt End Node aus (Phase 4 - Template Engine).
|
|
|
|
Args:
|
|
node: WorkflowNode vom Typ "end"
|
|
context: Execution context mit node_results
|
|
|
|
Returns:
|
|
NodeExecutionState mit finaler Ausgabe in analysis_core
|
|
|
|
Output Modes:
|
|
- AUTO: Concatenates all analysis_core values (backward compatible)
|
|
- TEMPLATE: Renders Jinja2 template with {{node_id.property}} placeholders
|
|
|
|
Template Context:
|
|
- {{node_id.analysis_core}}: Analysis text from node
|
|
- {{node_id.decision_signals}}: Dict of raw decision signals
|
|
- {{node_id.decision_signals.key}}: Specific signal value
|
|
- {{node_id.status}}: Node execution status
|
|
- Conditional rendering: {% if node_id %}...{% endif %}
|
|
- Default values: {{node_id|default("fallback")}}
|
|
|
|
Example Template:
|
|
```
|
|
# Final Analysis
|
|
|
|
## Body Composition
|
|
{{body_analysis.analysis_core}}
|
|
|
|
{% if training_analysis %}
|
|
## Training Recommendation
|
|
{{training_analysis.analysis_core}}
|
|
{% endif %}
|
|
|
|
## Decision Factors
|
|
- Relevance: {{body_analysis.decision_signals.relevanz}}
|
|
- Priority: {{body_analysis.decision_signals.prioritaet}}
|
|
```
|
|
"""
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
try:
|
|
logger.info(f"Executing end node: {node.id}")
|
|
|
|
# Determine output mode (default: AUTO for backward compatibility)
|
|
output_mode = node.output_mode or EndNodeOutputMode.AUTO
|
|
|
|
if output_mode == EndNodeOutputMode.AUTO:
|
|
# AUTO mode: Concatenate all analysis_core values
|
|
logger.debug(f"End node {node.id}: Using AUTO output mode")
|
|
|
|
combined_analysis = []
|
|
for node_id, node_state in context.get("node_results", {}).items():
|
|
if node_state.status == NodeStatus.EXECUTED and node_state.analysis_core:
|
|
combined_analysis.append(f"## {node_id}\n{node_state.analysis_core}")
|
|
|
|
final_output = "\n\n".join(combined_analysis) if combined_analysis else "[No analysis generated]"
|
|
|
|
elif output_mode == EndNodeOutputMode.TEMPLATE:
|
|
# TEMPLATE mode: Render Jinja2 template
|
|
logger.debug(f"End node {node.id}: Using TEMPLATE output mode")
|
|
|
|
if not node.template:
|
|
raise ValueError(f"End node {node.id} has output_mode=TEMPLATE but no template defined")
|
|
|
|
# Build template context: {{node_id}} → {analysis_core, signal_X, question_X, status}
|
|
template_context = {}
|
|
graph = context.get("graph") # WorkflowGraph object
|
|
|
|
for node_id, node_state in context.get("node_results", {}).items():
|
|
node_context = {
|
|
"analysis_core": node_state.analysis_core or "",
|
|
"decision_signals": node_state.decision_signals or {},
|
|
"reasoning_anchors": node_state.reasoning_anchors or "",
|
|
"status": node_state.status.value if node_state.status else "unknown",
|
|
}
|
|
|
|
# Add normalized signals as {{node_id.signal_ID}}
|
|
# NOTE: question_type now IS the ID (not the type!)
|
|
if node_state.normalized_signals:
|
|
for signal in node_state.normalized_signals:
|
|
# Convert NormalizedSignal object to dict if needed
|
|
signal_dict = signal.model_dump() if hasattr(signal, 'model_dump') else signal
|
|
q_id = signal_dict['question_type'] # This is actually the ID now!
|
|
|
|
signal_key = f"signal_{q_id}"
|
|
signal_value = signal_dict['normalized_value'] or signal_dict['raw_value']
|
|
node_context[signal_key] = signal_value
|
|
logger.info(f"Mapped signal: {q_id} → {signal_key} = '{signal_value}'")
|
|
|
|
# Add question texts as {{node_id.question_ID}}
|
|
if graph:
|
|
workflow_node = next((n for n in graph.nodes if n.id == node_id), None)
|
|
if workflow_node and workflow_node.question_augmentations:
|
|
for q in workflow_node.question_augmentations:
|
|
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
|
|
q_id = q_dict.get('id')
|
|
if q_id:
|
|
question_key = f"question_{q_id}"
|
|
node_context[question_key] = q_dict.get('question', '')
|
|
|
|
template_context[node_id] = node_context
|
|
|
|
logger.debug(f"End node {node.id}: Built template context for {len(template_context)} nodes")
|
|
|
|
# DEBUG: Log template context keys for each node
|
|
for node_id, node_ctx in template_context.items():
|
|
logger.info(f"End node template context[{node_id}]: {list(node_ctx.keys())}")
|
|
# Log signal keys specifically
|
|
signal_keys = [k for k in node_ctx.keys() if k.startswith('signal_')]
|
|
question_keys = [k for k in node_ctx.keys() if k.startswith('question_')]
|
|
if signal_keys:
|
|
logger.info(f" Signals: {signal_keys} → values: {[(k, node_ctx[k]) for k in signal_keys]}")
|
|
if question_keys:
|
|
logger.info(f" Questions: {question_keys} → values: {[(k, node_ctx[k]) for k in question_keys]}")
|
|
|
|
# Render template
|
|
try:
|
|
jinja_template = Template(node.template)
|
|
final_output = jinja_template.render(template_context)
|
|
logger.info(f"End node {node.id}: Template rendered successfully ({len(final_output)} chars)")
|
|
except TemplateError as te:
|
|
error_msg = f"Template rendering failed: {str(te)}"
|
|
logger.error(f"End node {node.id}: {error_msg}")
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=error_msg,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
else:
|
|
raise ValueError(f"Unknown output_mode: {output_mode}")
|
|
|
|
# Return NodeExecutionState with final output
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core=final_output,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"End node execution failed ({node.id}): {e}", exc_info=True)
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=str(e),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
|
|
def _apply_fallback(
|
|
node,
|
|
graph: WorkflowGraph,
|
|
context: Dict[str, Any],
|
|
error: str
|
|
) -> List[str]:
|
|
"""
|
|
Wendet Fallback-Strategie an.
|
|
|
|
Args:
|
|
node: WorkflowNode
|
|
graph: WorkflowGraph
|
|
context: Execution context
|
|
error: Fehler bei Evaluation
|
|
|
|
Returns:
|
|
Liste von Edge-IDs die aktiviert werden sollen
|
|
"""
|
|
if not node.fallback:
|
|
# Default: Konservativ (else-Pfad nehmen)
|
|
logger.warning(f"Node {node.id}: No fallback configured, using DEFAULT_PATH")
|
|
return _get_edges_by_label(node.id, "else", graph)
|
|
|
|
strategy = node.fallback.strategy
|
|
|
|
if strategy == FallbackStrategy.CONSERVATIVE_SKIP:
|
|
# Skip alle outgoing edges
|
|
logger.info(f"Node {node.id}: CONSERVATIVE_SKIP - deactivating all paths")
|
|
return []
|
|
|
|
elif strategy == FallbackStrategy.DEFAULT_PATH:
|
|
# Nimm else-Pfad
|
|
logger.info(f"Node {node.id}: DEFAULT_PATH - taking else path")
|
|
return _get_edges_by_label(node.id, "else", graph)
|
|
|
|
elif strategy == FallbackStrategy.UNCERTAINTY_PATH:
|
|
# Expliziter Unsicherheits-Pfad (falls vorhanden)
|
|
logger.info(f"Node {node.id}: UNCERTAINTY_PATH")
|
|
uncertainty_edges = _get_edges_by_label(node.id, "uncertainty", graph)
|
|
if uncertainty_edges:
|
|
return uncertainty_edges
|
|
else:
|
|
# Fallback to else
|
|
logger.warning(f"Node {node.id}: No uncertainty path found, using else")
|
|
return _get_edges_by_label(node.id, "else", graph)
|
|
|
|
elif strategy == FallbackStrategy.DOCUMENT_ONLY:
|
|
# Alle Pfade aktiv lassen (wie ohne Bedingung)
|
|
logger.info(f"Node {node.id}: DOCUMENT_ONLY - all paths remain active")
|
|
outgoing_edges = [e for e in graph.edges if e.from_node == node.id]
|
|
return [e.id for e in outgoing_edges]
|
|
|
|
else:
|
|
logger.warning(f"Node {node.id}: Unknown fallback strategy {strategy}, using DEFAULT_PATH")
|
|
return _get_edges_by_label(node.id, "else", graph)
|
|
|
|
|
|
def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[str]:
|
|
"""
|
|
Findet alle ausgehenden Edges mit bestimmtem Label.
|
|
|
|
Args:
|
|
node_id: Node-ID
|
|
label: Edge-Label (z.B. "then", "else", "uncertainty")
|
|
graph: WorkflowGraph
|
|
|
|
Returns:
|
|
Liste von Edge-IDs
|
|
"""
|
|
matching_edges = [
|
|
e.id for e in graph.edges
|
|
if e.from_node == node_id and e.label == label
|
|
]
|
|
return matching_edges
|
|
|
|
|
|
def _has_active_incoming_edge(node, graph: WorkflowGraph, context: Dict[str, Any]) -> bool:
|
|
"""
|
|
Prüft ob Node mindestens eine aktive incoming edge hat.
|
|
|
|
Args:
|
|
node: WorkflowNode
|
|
graph: WorkflowGraph
|
|
context: Execution context mit active_edges
|
|
|
|
Returns:
|
|
True wenn mindestens eine incoming edge aktiv ist
|
|
"""
|
|
# Start-Node hat keine incoming edges → immer aktiv
|
|
if node.type == "start":
|
|
return True
|
|
|
|
incoming_edges = [e for e in graph.edges if e.to_node == node.id]
|
|
|
|
# Keine incoming edges → nicht erreichbar
|
|
if not incoming_edges:
|
|
return False
|
|
|
|
# Prüfe ob mindestens eine aktiv ist
|
|
active_edges = context.get("active_edges", {})
|
|
for edge in incoming_edges:
|
|
if active_edges.get(edge.id, True): # Default: True (Phase 2 Kompatibilität)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str:
|
|
"""
|
|
Lädt Prompt-Template aus DB und resolved Platzhalter.
|
|
|
|
Args:
|
|
prompt_slug: Slug des Prompts (z.B. "pipeline_body")
|
|
context: {"variables": {"name": "Lars", ...}, "profile_id": "..."}
|
|
|
|
Returns:
|
|
Resolved prompt template
|
|
|
|
Beispiel:
|
|
>>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"})
|
|
>>> "{{name}}" not in template
|
|
True
|
|
"""
|
|
from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog
|
|
from prompt_executor import resolve_placeholders
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT template FROM ai_prompts WHERE slug = %s AND active = true",
|
|
(prompt_slug,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise ValueError(f"Prompt not found: {prompt_slug}")
|
|
|
|
template = row['template']
|
|
|
|
# Resolve Placeholders using modern prompt_executor method
|
|
profile_id = context.get("profile_id")
|
|
|
|
# Build variables dict with ALL registered placeholders
|
|
variables = {}
|
|
|
|
try:
|
|
# Get all placeholder values from registry
|
|
processed_placeholders = get_placeholder_example_values(profile_id)
|
|
# Remove {{ }} from keys (placeholder_resolver returns them with wrappers)
|
|
cleaned_placeholders = {
|
|
key.replace('{{', '').replace('}}', ''): value
|
|
for key, value in processed_placeholders.items()
|
|
}
|
|
variables.update(cleaned_placeholders)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load placeholders for workflow: {e}")
|
|
|
|
# Load catalog for |d modifier support
|
|
try:
|
|
catalog = get_placeholder_catalog(profile_id)
|
|
except Exception as e:
|
|
catalog = None
|
|
logger.warning(f"Failed to load placeholder catalog for workflow: {e}")
|
|
|
|
# Resolve with modern executor
|
|
resolved = resolve_placeholders(
|
|
template=template,
|
|
variables=variables,
|
|
catalog=catalog
|
|
)
|
|
|
|
return resolved
|
|
|
|
|
|
|
|
|
|
def aggregate_results(node_states: List[NodeExecutionState], graph) -> Dict[str, Any]:
|
|
"""
|
|
Aggregiert Ergebnisse aller Knoten.
|
|
|
|
Phase 4: Wenn End Node existiert, verwende NUR dessen Output.
|
|
Sonst (backward compatible): Kombiniere alle analysis_core Werte.
|
|
|
|
Args:
|
|
node_states: Liste aller NodeExecutionState
|
|
graph: WorkflowGraph object (to identify End Nodes)
|
|
|
|
Returns:
|
|
{
|
|
"analysis_core": "Final output (from End Node or combined)",
|
|
"all_signals": [{question_type, normalized_value, status}, ...],
|
|
"total_nodes": 3,
|
|
"executed_nodes": 3,
|
|
"failed_nodes": 0
|
|
}
|
|
|
|
Beispiel:
|
|
>>> states = [
|
|
... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"),
|
|
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2"),
|
|
... NodeExecutionState(node_id="end", status=NodeStatus.EXECUTED, analysis_core="Final")
|
|
... ]
|
|
>>> result = aggregate_results(states, graph)
|
|
>>> result["analysis_core"]
|
|
"Final"
|
|
"""
|
|
# Find End Node in graph
|
|
end_node_ids = [n.id for n in graph.nodes if n.type == "end"]
|
|
|
|
combined_analysis = []
|
|
all_signals = []
|
|
final_output = None
|
|
|
|
for state in node_states:
|
|
# Check if this is an End Node
|
|
is_end_node = state.node_id in end_node_ids
|
|
|
|
if state.status == NodeStatus.EXECUTED and state.analysis_core:
|
|
if is_end_node:
|
|
# End Node output is the final output (don't add to combined_analysis)
|
|
final_output = state.analysis_core
|
|
else:
|
|
# Regular node - add to combined analysis
|
|
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
|
|
|
|
if state.normalized_signals:
|
|
all_signals.extend([s.model_dump() for s in state.normalized_signals])
|
|
|
|
# Phase 4: If End Node exists, use its output ONLY
|
|
# Backward compatible: If no End Node, use combined_analysis
|
|
if final_output:
|
|
primary_output = final_output
|
|
else:
|
|
primary_output = "\n\n".join(combined_analysis)
|
|
|
|
return {
|
|
"analysis_core": primary_output,
|
|
"combined_analysis": "\n\n".join(combined_analysis), # Keep for debugging (excludes End Node)
|
|
"all_signals": all_signals,
|
|
"total_nodes": len(node_states),
|
|
"executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED),
|
|
"failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED),
|
|
"skipped_nodes": sum(1 for s in node_states if s.status == NodeStatus.SKIPPED) # Phase 3
|
|
}
|
|
|
|
|
|
def save_execution_state(
|
|
execution_id: str,
|
|
workflow_id: Optional[str], # None when using graph_data directly (Phase 5)
|
|
profile_id: str,
|
|
node_states: List[NodeExecutionState],
|
|
status: str,
|
|
started_at: str,
|
|
completed_at: Optional[str] = None,
|
|
error: Optional[str] = None
|
|
):
|
|
"""
|
|
Speichert Execution State in workflow_executions.
|
|
|
|
Args:
|
|
execution_id: UUID der Execution
|
|
workflow_id: UUID des Workflows (None wenn graph_data direkt verwendet wird)
|
|
profile_id: UUID des Profils
|
|
node_states: Liste aller NodeExecutionState
|
|
status: 'completed' | 'failed' | 'partial'
|
|
started_at: ISO timestamp
|
|
completed_at: ISO timestamp (optional)
|
|
error: Fehlermeldung (optional)
|
|
|
|
Beispiel:
|
|
>>> save_execution_state(
|
|
... execution_id="exec-123",
|
|
... workflow_id="wf-456",
|
|
... profile_id="prof-789",
|
|
... node_states=[],
|
|
... status="completed",
|
|
... started_at="2026-04-03T12:00:00"
|
|
... )
|
|
"""
|
|
# Serialize node_states to JSON
|
|
node_states_json = [s.model_dump() for s in node_states]
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("""
|
|
INSERT INTO workflow_executions
|
|
(id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
node_states = EXCLUDED.node_states,
|
|
execution_log = EXCLUDED.execution_log,
|
|
completed_at = EXCLUDED.completed_at
|
|
""", (
|
|
execution_id,
|
|
workflow_id,
|
|
profile_id,
|
|
status,
|
|
json.dumps(node_states_json),
|
|
json.dumps({"error": error} if error else {}),
|
|
started_at,
|
|
completed_at
|
|
))
|
|
conn.commit()
|
|
|
|
logger.info(f"Saved execution state: {execution_id} (status: {status})")
|