feat: Implement Server-Sent Events (SSE) for long-running workflows
Some checks failed
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Failing after 1s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s

Backend:
- workflow_executor.py: Add progress_callback parameter, emit events for execution_started, node_complete, execution_complete, execution_failed
- prompt_executor.py: Thread progress_callback through execute chain
- routers/prompts.py: New /execute-stream endpoint with asyncio Queue for SSE

Frontend:
- utils/api.js: New executeUnifiedPromptStream() function with EventSource
- pages/Analysis.jsx: Use SSE with live progress display (X/Y Nodes)

Fixes:
- No more gateway timeouts for complex workflows (10+ nodes)
- Live progress feedback for users
- Unlimited workflow complexity

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Lars 2026-04-13 11:23:16 +02:00
parent 790e6df8ef
commit ba474b0a57
5 changed files with 304 additions and 12 deletions

View File

@ -167,7 +167,8 @@ async def execute_prompt(
prompt_slug: str,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False
enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]:
"""
Execute a single prompt (base or pipeline type).
@ -217,7 +218,7 @@ async def execute_prompt(
elif prompt_type == 'workflow':
# Workflow prompt: graph-based execution (Phase 0: Foundation)
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback)
else:
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
@ -469,7 +470,8 @@ async def execute_prompt_with_data(
modules: Optional[Dict[str, bool]] = None,
timeframes: Optional[Dict[str, int]] = None,
openrouter_call_func = None,
enable_debug: bool = False
enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]:
"""
Execute prompt with data loaded from database.
@ -605,7 +607,7 @@ async def execute_prompt_with_data(
variables['goals_data'] = []
# Execute prompt
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug)
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback)
async def execute_workflow_prompt(
@ -613,7 +615,8 @@ async def execute_workflow_prompt(
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None
catalog: Optional[Dict] = None,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]:
"""
Execute a workflow-type prompt (graph-based execution).
@ -652,7 +655,8 @@ async def execute_workflow_prompt(
profile_id=variables.get('profile_id', 'unknown'), # From context
variables=variables,
openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug
enable_debug=enable_debug,
progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen
)
# Convert ExecutionResult to dict for API response

View File

@ -1445,6 +1445,168 @@ from prompt_executor import execute_prompt_with_data
from models import UnifiedPromptCreate, UnifiedPromptUpdate
@router.post("/execute-stream")
async def execute_unified_prompt_stream(
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"),
modules: Optional[dict] = None,
timeframes: Optional[dict] = None,
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
save: bool = Query(False, description="Save result to ai_insights")
):
"""
Execute a unified prompt with Server-Sent Events (SSE) streaming.
Returns live progress updates during workflow execution:
- execution_started: Workflow has begun
- node_complete: Each node completes
- execution_complete: Final result ready
- execution_failed: Error occurred
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
"""
# Manual auth: verify token and get profile_id
if not token:
raise HTTPException(401, "Missing auth token")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,))
row = cur.fetchone()
if not row:
raise HTTPException(401, "Invalid or expired token")
profile_id = row['profile_id']
# Use default modules/timeframes if not provided
if not modules:
modules = {
'körper': True,
'ernährung': True,
'training': True,
'schlaf': True,
'vitalwerte': True
}
if not timeframes:
timeframes = {
'körper': 30,
'ernährung': 30,
'training': 14,
'schlaf': 14,
'vitalwerte': 7
}
# Wrapper function for OpenRouter calls
async def workflow_llm_call(prompt: str, model: str = None) -> str:
return await call_openrouter(prompt)
# SSE Event Generator
async def event_stream():
"""Generate Server-Sent Events during workflow execution."""
import asyncio
from asyncio import Queue
# Event queue for progress updates
event_queue = Queue()
# Flag to track execution completion
execution_complete = False
# Define progress callback for streaming updates
async def progress_callback(event_type: str, data: dict):
"""Queue SSE event for streaming to client."""
event_data = {
"type": event_type,
**data
}
await event_queue.put(event_data)
# Start workflow execution in background task
async def execute_workflow_async():
nonlocal execution_complete
try:
# Execute workflow with progress callbacks
result = await execute_prompt_with_data(
prompt_slug=prompt_slug,
profile_id=profile_id,
modules=modules,
timeframes=timeframes,
openrouter_call_func=workflow_llm_call,
enable_debug=debug or save,
progress_callback=progress_callback
)
# Save to ai_insights if requested (same logic as /execute)
if result['type'] == 'pipeline':
final_output = result.get('output', {})
if isinstance(final_output, dict) and len(final_output) == 1:
content = list(final_output.values())[0]
else:
content = json.dumps(final_output, ensure_ascii=False)
elif result['type'] == 'workflow':
content = _workflow_user_facing_content(result.get('aggregated_result'))
else:
content = result.get('output', '')
if isinstance(content, dict):
content = json.dumps(content, ensure_ascii=False)
# Save to database (minimal metadata for now)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created_at)
VALUES (%s, %s, %s, %s, NOW())""",
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
)
conn.commit()
except Exception as e:
# Queue error event
await event_queue.put({
"type": "execution_failed",
"error": str(e)
})
finally:
execution_complete = True
# Start workflow execution in background
import asyncio
execution_task = asyncio.create_task(execute_workflow_async())
# Stream events from queue
try:
while not execution_complete or not event_queue.empty():
try:
# Wait for event with timeout
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
# Send keepalive ping
yield f": keepalive\n\n"
continue
# Wait for execution task to complete
await execution_task
except Exception as e:
# Send final error event
error_event = {
"type": "execution_failed",
"error": str(e)
}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
@router.post("/execute")
async def execute_unified_prompt(
prompt_slug: str = Query(..., description="Slug of prompt to execute"),

View File

@ -42,7 +42,8 @@ async def execute_workflow(
profile_id: str = None,
variables: Dict[str, Any] = None,
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
enable_debug: bool = False
enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für Progress-Updates: async (event_type, data) -> None
) -> ExecutionResult:
"""
Führt einen Workflow aus (mit conditional branching und path consolidation).
@ -76,6 +77,13 @@ async def execute_workflow(
logger.info(f"Starting workflow execution: {execution_id}")
# NEW: Progress-Callback für Start
if progress_callback:
await progress_callback("execution_started", {
"execution_id": execution_id,
"started_at": started_at
})
try:
# 1. Lade Workflow-Definition
if graph_data:
@ -161,6 +169,18 @@ async def execute_workflow(
node_states.append(node_state)
context["node_results"][node_id] = node_state
# NEW: Progress-Callback aufrufen (für SSE Streaming)
if progress_callback:
await progress_callback("node_complete", {
"node_id": node_id,
"node_type": node.type,
"node_label": node.label,
"status": node_state.status.value,
"total_nodes": len(graph.nodes),
"completed_nodes": len([ns for ns in node_states if ns.status in [NodeStatus.COMPLETED, NodeStatus.SKIPPED]]),
"error": node_state.error if node_state.status == NodeStatus.FAILED else None
})
# Füge Nachfolger zur Queue hinzu
outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
for edge in outgoing_edges:
@ -185,6 +205,19 @@ async def execute_workflow(
logger.info(f"Workflow execution completed: {execution_id}")
# NEW: Progress-Callback für erfolgreiche Fertigstellung
if progress_callback:
await progress_callback("execution_complete", {
"execution_id": execution_id,
"status": "completed",
"aggregated_result": aggregated,
"total_nodes": len(node_states),
"completed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.COMPLETED]),
"skipped_nodes": len([ns for ns in node_states if ns.status == NodeStatus.SKIPPED]),
"failed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.FAILED]),
"completed_at": completed_at
})
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
@ -198,6 +231,15 @@ async def execute_workflow(
except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True)
# NEW: Progress-Callback für Fehler
if progress_callback:
await progress_callback("execution_failed", {
"execution_id": execution_id,
"status": "failed",
"error": str(e),
"completed_at": datetime.utcnow().isoformat()
})
# Speichere Failed State
completed_at = datetime.utcnow().isoformat()
save_execution_state(

View File

@ -338,6 +338,8 @@ export default function Analysis() {
/** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */
const [activeCategoryKey, setActiveCategoryKey] = useState(null)
const [historyScopePick, setHistoryScopePick] = useState(null)
// NEW: Progress tracking for SSE workflows
const [progress, setProgress] = useState(null) // { total_nodes, completed_nodes, current_node_label }
const loadAll = async () => {
const [p, i] = await Promise.all([
@ -377,10 +379,21 @@ export default function Analysis() {
}, [newResult?.scope, prompts])
const runPrompt = async (slug) => {
setLoading(slug); setError(null); setNewResult(null)
setLoading(slug); setError(null); setNewResult(null); setProgress(null)
try {
// Use new unified executor with save=true
const result = await api.executeUnifiedPrompt(slug, null, null, false, true)
// Use SSE-based executor for long-running workflows
const result = await api.executeUnifiedPromptStream(slug, null, null, false, true, (event) => {
// Progress callback: update UI in real-time
if (event.type === 'execution_started') {
setProgress({ total_nodes: 0, completed_nodes: 0, current_node_label: 'Starte...' })
} else if (event.type === 'node_complete') {
setProgress({
total_nodes: event.total_nodes || 0,
completed_nodes: event.completed_nodes || 0,
current_node_label: event.node_label || `Node ${event.node_id}`
})
}
})
// Transform result to match old format for InsightCard
let content = ''
@ -434,7 +447,10 @@ export default function Analysis() {
setTab('run')
} catch(e) {
setError('Fehler: ' + e.message)
} finally { setLoading(null) }
} finally {
setLoading(null)
setProgress(null) // Clear progress
}
}
const deleteInsight = async (id) => {
@ -618,7 +634,9 @@ export default function Analysis() {
disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)}
>
{loading===p.slug
? <><div className="spinner" style={{width:13,height:13}}/> Läuft</>
? (progress
? <><div className="spinner" style={{width:13,height:13}}/> {progress.completed_nodes}/{progress.total_nodes} Nodes</>
: <><div className="spinner" style={{width:13,height:13}}/> Läuft</>)
: (aiUsage && !aiUsage.allowed) ? '🔒 Limit'
: <><Brain size={13}/> Starten</>}
</button>

View File

@ -402,6 +402,72 @@ export const api = {
return req('/prompts/execute?' + params, json(body))
},
// NEW: SSE-based execution for long-running workflows
executeUnifiedPromptStream: (slug, modules=null, timeframes=null, debug=false, save=false, onProgress=null) => {
const params = new URLSearchParams({ prompt_slug: slug })
if (debug) params.append('debug', 'true')
if (save) params.append('save', 'true')
// TODO: Security improvement - use session cookie instead of token in URL
// For now, send token as query param since EventSource doesn't support custom headers
const token = localStorage.getItem('token')
if (token) params.append('token', token)
if (modules) {
Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v))
}
if (timeframes) {
Object.entries(timeframes).forEach(([k, v]) => params.append(`timeframes[${k}]`, v))
}
// Return a Promise that resolves with final result
return new Promise((resolve, reject) => {
const url = `${BASE_URL}/prompts/execute-stream?${params}`
const eventSource = new EventSource(url)
let finalResult = null
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
// Call progress callback if provided
if (onProgress) {
onProgress(data)
}
// Check for final result
if (data.type === 'execution_complete') {
// Transform SSE result to match regular execute format
finalResult = {
type: 'workflow',
execution_id: data.execution_id,
status: data.status,
aggregated_result: data.aggregated_result,
debug: {
node_states: [] // TODO: collect from progress events if needed
}
}
eventSource.close()
resolve(finalResult)
} else if (data.type === 'execution_failed') {
eventSource.close()
reject(new Error(data.error || 'Workflow execution failed'))
}
} catch (e) {
console.error('Error parsing SSE event:', e)
}
}
eventSource.onerror = (error) => {
console.error('SSE connection error:', error)
eventSource.close()
reject(new Error('Connection to server lost'))
}
})
},
// Workflow Execution (Part 2: Frontend Execute Integration)
executeWorkflow: (slug, variables=null, debug=true, save=false) => {
const params = new URLSearchParams({ prompt_slug: slug })