diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index e30127a..56d76b6 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -114,7 +114,8 @@ async def execute_workflow( "variables": variables, "profile_id": profile_id, "node_results": {}, # Phase 3: Store full NodeExecutionState - "active_edges": {} # Phase 3: Track edge activation + "active_edges": {}, # Phase 3: Track edge activation + "graph": graph # Phase 4: Needed for End Node to access questions } # Phase 3: BFS traversal mit edge activation @@ -168,7 +169,7 @@ async def execute_workflow( queue.append(edge.to_node) # 6. Aggregiere Ergebnisse - aggregated = aggregate_results(node_states) + aggregated = aggregate_results(node_states, graph) # 7. Speichere Execution State completed_at = datetime.utcnow().isoformat() @@ -590,18 +591,37 @@ def execute_end_node( if not node.template: raise ValueError(f"End node {node.id} has output_mode=TEMPLATE but no template defined") - # Build template context: {{node_id}} → {analysis_core, decision_signals, status} + # Build template context: {{node_id}} → {analysis_core, signal_X, question_X, status} template_context = {} + graph = context.get("graph") # WorkflowGraph object + for node_id, node_state in context.get("node_results", {}).items(): - template_context[node_id] = { + node_context = { "analysis_core": node_state.analysis_core or "", "decision_signals": node_state.decision_signals or {}, "reasoning_anchors": node_state.reasoning_anchors or "", "status": node_state.status.value if node_state.status else "unknown", - # Add individual signal access: {{node_id.signal_name}} - **node_state.decision_signals # Flatten signals into node context } + # Add normalized signals as {{node_id.signal_X}} + if node_state.normalized_signals: + for signal in node_state.normalized_signals: + # Convert NormalizedSignal object to dict if needed + signal_dict = signal.model_dump() if hasattr(signal, 'model_dump') else signal + signal_key = f"signal_{signal_dict['question_type']}" + node_context[signal_key] = signal_dict['normalized_value'] or signal_dict['raw_value'] + + # Add question texts as {{node_id.question_X}} + if graph: + workflow_node = next((n for n in graph.nodes if n.id == node_id), None) + if workflow_node and workflow_node.question_augmentations: + for q in workflow_node.question_augmentations: + q_dict = q.model_dump() if hasattr(q, 'model_dump') else q + question_key = f"question_{q_dict['type']}" + node_context[question_key] = q_dict.get('question', '') + + template_context[node_id] = node_context + logger.debug(f"End node {node.id}: Built template context for {len(template_context)} nodes") # Render template @@ -818,16 +838,20 @@ async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str -def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: +def aggregate_results(node_states: List[NodeExecutionState], graph) -> Dict[str, Any]: """ Aggregiert Ergebnisse aller Knoten. + Phase 4: Wenn End Node existiert, verwende NUR dessen Output. + Sonst (backward compatible): Kombiniere alle analysis_core Werte. + Args: node_states: Liste aller NodeExecutionState + graph: WorkflowGraph object (to identify End Nodes) Returns: { - "combined_analysis": "## node_1\n...\n\n## node_2\n...", + "analysis_core": "Final output (from End Node or combined)", "all_signals": [{question_type, normalized_value, status}, ...], "total_nodes": 3, "executed_nodes": 3, @@ -837,26 +861,45 @@ def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: Beispiel: >>> states = [ ... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"), - ... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2") + ... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2"), + ... NodeExecutionState(node_id="end", status=NodeStatus.EXECUTED, analysis_core="Final") ... ] - >>> result = aggregate_results(states) - >>> "## n1" in result["combined_analysis"] - True - >>> result["executed_nodes"] - 2 + >>> result = aggregate_results(states, graph) + >>> result["analysis_core"] + "Final" """ + # Find End Node in graph + end_node_ids = [n.id for n in graph.nodes if n.type == "end"] + combined_analysis = [] all_signals = [] + final_output = None for state in node_states: + # Check if this is an End Node + is_end_node = state.node_id in end_node_ids + if state.status == NodeStatus.EXECUTED and state.analysis_core: - combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}") + if is_end_node: + # End Node output is the final output (don't add to combined_analysis) + final_output = state.analysis_core + else: + # Regular node - add to combined analysis + combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}") if state.normalized_signals: all_signals.extend([s.model_dump() for s in state.normalized_signals]) + # Phase 4: If End Node exists, use its output ONLY + # Backward compatible: If no End Node, use combined_analysis + if final_output: + primary_output = final_output + else: + primary_output = "\n\n".join(combined_analysis) + return { - "combined_analysis": "\n\n".join(combined_analysis), + "analysis_core": primary_output, + "combined_analysis": "\n\n".join(combined_analysis), # Keep for debugging (excludes End Node) "all_signals": all_signals, "total_nodes": len(node_states), "executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED), diff --git a/frontend/src/components/workflow/panels/WorkflowResultViewer.jsx b/frontend/src/components/workflow/panels/WorkflowResultViewer.jsx index 85d60af..6a596a5 100644 --- a/frontend/src/components/workflow/panels/WorkflowResultViewer.jsx +++ b/frontend/src/components/workflow/panels/WorkflowResultViewer.jsx @@ -149,7 +149,7 @@ export function WorkflowResultViewer({ result, onClose }) { overflowY: 'auto' }} > - {aggregated.analysis_core || aggregated.combined_analysis || '(Kein Output)'} + {aggregated.analysis_core || '(Kein Output)'}