From 0ad3ddd627bfe5cb8286b6cf4b84eaf1a2787104 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 18 Apr 2026 09:11:07 +0200 Subject: [PATCH] fix: update progress callback and event types for workflow execution - Changed progress callback from "execution_complete" to "workflow_graph_finished" to provide intermediate updates. - Updated documentation to clarify the distinction between "workflow_graph_finished" and "execution_complete". - Adjusted frontend API handling to accommodate new event structure and ensure proper result serialization. --- backend/routers/prompts.py | 17 ++++++++++++++++- backend/workflow_executor.py | 6 ++++-- frontend/src/utils/api.js | 21 ++++++++++----------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index e50e47a..92e6987 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -275,7 +275,8 @@ async def execute_unified_prompt_stream( Returns live progress updates during workflow execution: - execution_started: Workflow has begun - node_complete: Each node completes - - execution_complete: Final result ready + - workflow_graph_finished: Workflow-Graph fertig (Zwischen-Info, kein Endergebnis) + - execution_complete: Endergebnis (wie POST /execute, Feld result) - execution_failed: Error occurred Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. @@ -364,6 +365,20 @@ async def execute_unified_prompt_stream( ) conn.commit() + # Pflicht für alle Prompt-Typen: Pipeline/Base rufen keinen progress_callback + # mit Abschluss auf — ohne dieses Event endet SSE ohne resolve → „Connection to server lost“. + try: + sse_payload = json.loads(json.dumps(result, default=str)) + except (TypeError, ValueError): + sse_payload = { + "type": result.get("type", "unknown"), + "error": "result_not_serializable", + } + await event_queue.put({ + "type": "execution_complete", + "result": sse_payload, + }) + except Exception as e: # Queue error event await event_queue.put({ diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index 1f12fba..be94f1e 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -213,9 +213,11 @@ async def execute_workflow( logger.info(f"Workflow execution completed: {execution_id}") - # NEW: Progress-Callback für erfolgreiche Fertigstellung + # Fortschritt: kein type=execution_complete — das sendet /execute-stream einmalig + # mit vollem execute_prompt_with_data-Result (Pipeline/Base/Workflow), sonst schließt + # der Client nach Workflow vorzeitig ohne debug/node_states oder Pipeline bricht ab. if progress_callback: - await progress_callback("execution_complete", { + await progress_callback("workflow_graph_finished", { "execution_id": execution_id, "status": "completed", "aggregated_result": aggregated, diff --git a/frontend/src/utils/api.js b/frontend/src/utils/api.js index b680124..d716e9a 100644 --- a/frontend/src/utils/api.js +++ b/frontend/src/utils/api.js @@ -512,18 +512,17 @@ export const api = { onProgress(data) } - // Check for final result + // Check for final result (/execute-stream liefert volles POST-/execute-Payload unter result) if (data.type === 'execution_complete') { - // Transform SSE result to match regular execute format - finalResult = { - type: 'workflow', - execution_id: data.execution_id, - status: data.status, - aggregated_result: data.aggregated_result, - debug: { - node_states: [] // TODO: collect from progress events if needed - } - } + finalResult = data.result + ? data.result + : { + type: 'workflow', + execution_id: data.execution_id, + status: data.status, + aggregated_result: data.aggregated_result, + debug: { node_states: [] }, + } eventSource.close() resolve(finalResult) } else if (data.type === 'execution_failed') {