fix: update progress callback and event types for workflow execution
- Changed progress callback from "execution_complete" to "workflow_graph_finished" to provide intermediate updates. - Updated documentation to clarify the distinction between "workflow_graph_finished" and "execution_complete". - Adjusted frontend API handling to accommodate new event structure and ensure proper result serialization.
This commit is contained in:
parent
a002781ef9
commit
0ad3ddd627
|
|
@ -275,7 +275,8 @@ async def execute_unified_prompt_stream(
|
||||||
Returns live progress updates during workflow execution:
|
Returns live progress updates during workflow execution:
|
||||||
- execution_started: Workflow has begun
|
- execution_started: Workflow has begun
|
||||||
- node_complete: Each node completes
|
- node_complete: Each node completes
|
||||||
- execution_complete: Final result ready
|
- workflow_graph_finished: Workflow-Graph fertig (Zwischen-Info, kein Endergebnis)
|
||||||
|
- execution_complete: Endergebnis (wie POST /execute, Feld result)
|
||||||
- execution_failed: Error occurred
|
- execution_failed: Error occurred
|
||||||
|
|
||||||
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
||||||
|
|
@ -364,6 +365,20 @@ async def execute_unified_prompt_stream(
|
||||||
)
|
)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
# Pflicht für alle Prompt-Typen: Pipeline/Base rufen keinen progress_callback
|
||||||
|
# mit Abschluss auf — ohne dieses Event endet SSE ohne resolve → „Connection to server lost“.
|
||||||
|
try:
|
||||||
|
sse_payload = json.loads(json.dumps(result, default=str))
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
sse_payload = {
|
||||||
|
"type": result.get("type", "unknown"),
|
||||||
|
"error": "result_not_serializable",
|
||||||
|
}
|
||||||
|
await event_queue.put({
|
||||||
|
"type": "execution_complete",
|
||||||
|
"result": sse_payload,
|
||||||
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Queue error event
|
# Queue error event
|
||||||
await event_queue.put({
|
await event_queue.put({
|
||||||
|
|
|
||||||
|
|
@ -213,9 +213,11 @@ async def execute_workflow(
|
||||||
|
|
||||||
logger.info(f"Workflow execution completed: {execution_id}")
|
logger.info(f"Workflow execution completed: {execution_id}")
|
||||||
|
|
||||||
# NEW: Progress-Callback für erfolgreiche Fertigstellung
|
# Fortschritt: kein type=execution_complete — das sendet /execute-stream einmalig
|
||||||
|
# mit vollem execute_prompt_with_data-Result (Pipeline/Base/Workflow), sonst schließt
|
||||||
|
# der Client nach Workflow vorzeitig ohne debug/node_states oder Pipeline bricht ab.
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
await progress_callback("execution_complete", {
|
await progress_callback("workflow_graph_finished", {
|
||||||
"execution_id": execution_id,
|
"execution_id": execution_id,
|
||||||
"status": "completed",
|
"status": "completed",
|
||||||
"aggregated_result": aggregated,
|
"aggregated_result": aggregated,
|
||||||
|
|
|
||||||
|
|
@ -512,18 +512,17 @@ export const api = {
|
||||||
onProgress(data)
|
onProgress(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for final result
|
// Check for final result (/execute-stream liefert volles POST-/execute-Payload unter result)
|
||||||
if (data.type === 'execution_complete') {
|
if (data.type === 'execution_complete') {
|
||||||
// Transform SSE result to match regular execute format
|
finalResult = data.result
|
||||||
finalResult = {
|
? data.result
|
||||||
type: 'workflow',
|
: {
|
||||||
execution_id: data.execution_id,
|
type: 'workflow',
|
||||||
status: data.status,
|
execution_id: data.execution_id,
|
||||||
aggregated_result: data.aggregated_result,
|
status: data.status,
|
||||||
debug: {
|
aggregated_result: data.aggregated_result,
|
||||||
node_states: [] // TODO: collect from progress events if needed
|
debug: { node_states: [] },
|
||||||
}
|
}
|
||||||
}
|
|
||||||
eventSource.close()
|
eventSource.close()
|
||||||
resolve(finalResult)
|
resolve(finalResult)
|
||||||
} else if (data.type === 'execution_failed') {
|
} else if (data.type === 'execution_failed') {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user