From ba474b0a57b31c40418e9c05661a0f2d79eeab9c Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 13 Apr 2026 11:23:16 +0200 Subject: [PATCH] feat: Implement Server-Sent Events (SSE) for long-running workflows Backend: - workflow_executor.py: Add progress_callback parameter, emit events for execution_started, node_complete, execution_complete, execution_failed - prompt_executor.py: Thread progress_callback through execute chain - routers/prompts.py: New /execute-stream endpoint with asyncio Queue for SSE Frontend: - utils/api.js: New executeUnifiedPromptStream() function with EventSource - pages/Analysis.jsx: Use SSE with live progress display (X/Y Nodes) Fixes: - No more gateway timeouts for complex workflows (10+ nodes) - Live progress feedback for users - Unlimited workflow complexity Co-Authored-By: Claude Sonnet 4.5 --- backend/prompt_executor.py | 16 ++-- backend/routers/prompts.py | 162 ++++++++++++++++++++++++++++++++ backend/workflow_executor.py | 44 ++++++++- frontend/src/pages/Analysis.jsx | 28 +++++- frontend/src/utils/api.js | 66 +++++++++++++ 5 files changed, 304 insertions(+), 12 deletions(-) diff --git a/backend/prompt_executor.py b/backend/prompt_executor.py index eddf4c6..d434069 100644 --- a/backend/prompt_executor.py +++ b/backend/prompt_executor.py @@ -167,7 +167,8 @@ async def execute_prompt( prompt_slug: str, variables: Dict[str, Any], openrouter_call_func, - enable_debug: bool = False + enable_debug: bool = False, + progress_callback = None # NEW: Optional callback für SSE Progress-Updates ) -> Dict[str, Any]: """ Execute a single prompt (base or pipeline type). @@ -217,7 +218,7 @@ async def execute_prompt( elif prompt_type == 'workflow': # Workflow prompt: graph-based execution (Phase 0: Foundation) - return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) + return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback) else: raise HTTPException(400, f"Unknown prompt type: {prompt_type}") @@ -469,7 +470,8 @@ async def execute_prompt_with_data( modules: Optional[Dict[str, bool]] = None, timeframes: Optional[Dict[str, int]] = None, openrouter_call_func = None, - enable_debug: bool = False + enable_debug: bool = False, + progress_callback = None # NEW: Optional callback für SSE Progress-Updates ) -> Dict[str, Any]: """ Execute prompt with data loaded from database. @@ -605,7 +607,7 @@ async def execute_prompt_with_data( variables['goals_data'] = [] # Execute prompt - return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug) + return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback) async def execute_workflow_prompt( @@ -613,7 +615,8 @@ async def execute_workflow_prompt( variables: Dict[str, Any], openrouter_call_func, enable_debug: bool = False, - catalog: Optional[Dict] = None + catalog: Optional[Dict] = None, + progress_callback = None # NEW: Optional callback für SSE Progress-Updates ) -> Dict[str, Any]: """ Execute a workflow-type prompt (graph-based execution). @@ -652,7 +655,8 @@ async def execute_workflow_prompt( profile_id=variables.get('profile_id', 'unknown'), # From context variables=variables, openrouter_call_func=openrouter_call_func, - enable_debug=enable_debug + enable_debug=enable_debug, + progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen ) # Convert ExecutionResult to dict for API response diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index acd1ed1..ff518db 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -1445,6 +1445,168 @@ from prompt_executor import execute_prompt_with_data from models import UnifiedPromptCreate, UnifiedPromptUpdate +@router.post("/execute-stream") +async def execute_unified_prompt_stream( + prompt_slug: str = Query(..., description="Slug of prompt to execute"), + token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"), + modules: Optional[dict] = None, + timeframes: Optional[dict] = None, + debug: bool = Query(False, description="Include debug information (node_states, etc.)"), + save: bool = Query(False, description="Save result to ai_insights") +): + """ + Execute a unified prompt with Server-Sent Events (SSE) streaming. + + Returns live progress updates during workflow execution: + - execution_started: Workflow has begun + - node_complete: Each node completes + - execution_complete: Final result ready + - execution_failed: Error occurred + + Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. + """ + # Manual auth: verify token and get profile_id + if not token: + raise HTTPException(401, "Missing auth token") + + with get_db() as conn: + cur = get_cursor(conn) + cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,)) + row = cur.fetchone() + if not row: + raise HTTPException(401, "Invalid or expired token") + profile_id = row['profile_id'] + + # Use default modules/timeframes if not provided + if not modules: + modules = { + 'körper': True, + 'ernährung': True, + 'training': True, + 'schlaf': True, + 'vitalwerte': True + } + + if not timeframes: + timeframes = { + 'körper': 30, + 'ernährung': 30, + 'training': 14, + 'schlaf': 14, + 'vitalwerte': 7 + } + + # Wrapper function for OpenRouter calls + async def workflow_llm_call(prompt: str, model: str = None) -> str: + return await call_openrouter(prompt) + + # SSE Event Generator + async def event_stream(): + """Generate Server-Sent Events during workflow execution.""" + import asyncio + from asyncio import Queue + + # Event queue for progress updates + event_queue = Queue() + + # Flag to track execution completion + execution_complete = False + + # Define progress callback for streaming updates + async def progress_callback(event_type: str, data: dict): + """Queue SSE event for streaming to client.""" + event_data = { + "type": event_type, + **data + } + await event_queue.put(event_data) + + # Start workflow execution in background task + async def execute_workflow_async(): + nonlocal execution_complete + try: + # Execute workflow with progress callbacks + result = await execute_prompt_with_data( + prompt_slug=prompt_slug, + profile_id=profile_id, + modules=modules, + timeframes=timeframes, + openrouter_call_func=workflow_llm_call, + enable_debug=debug or save, + progress_callback=progress_callback + ) + + # Save to ai_insights if requested (same logic as /execute) + if result['type'] == 'pipeline': + final_output = result.get('output', {}) + if isinstance(final_output, dict) and len(final_output) == 1: + content = list(final_output.values())[0] + else: + content = json.dumps(final_output, ensure_ascii=False) + elif result['type'] == 'workflow': + content = _workflow_user_facing_content(result.get('aggregated_result')) + else: + content = result.get('output', '') + if isinstance(content, dict): + content = json.dumps(content, ensure_ascii=False) + + # Save to database (minimal metadata for now) + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + """INSERT INTO ai_insights (profile_id, scope, content, metadata, created_at) + VALUES (%s, %s, %s, %s, NOW())""", + (profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']})) + ) + conn.commit() + + except Exception as e: + # Queue error event + await event_queue.put({ + "type": "execution_failed", + "error": str(e) + }) + finally: + execution_complete = True + + # Start workflow execution in background + import asyncio + execution_task = asyncio.create_task(execute_workflow_async()) + + # Stream events from queue + try: + while not execution_complete or not event_queue.empty(): + try: + # Wait for event with timeout + event = await asyncio.wait_for(event_queue.get(), timeout=0.5) + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + except asyncio.TimeoutError: + # Send keepalive ping + yield f": keepalive\n\n" + continue + + # Wait for execution task to complete + await execution_task + + except Exception as e: + # Send final error event + error_event = { + "type": "execution_failed", + "error": str(e) + } + yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" # Disable nginx buffering + } + ) + + @router.post("/execute") async def execute_unified_prompt( prompt_slug: str = Query(..., description="Slug of prompt to execute"), diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index ad25033..3bb6f67 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -42,7 +42,8 @@ async def execute_workflow( profile_id: str = None, variables: Dict[str, Any] = None, openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str - enable_debug: bool = False + enable_debug: bool = False, + progress_callback = None # NEW: Optional callback für Progress-Updates: async (event_type, data) -> None ) -> ExecutionResult: """ Führt einen Workflow aus (mit conditional branching und path consolidation). @@ -76,6 +77,13 @@ async def execute_workflow( logger.info(f"Starting workflow execution: {execution_id}") + # NEW: Progress-Callback für Start + if progress_callback: + await progress_callback("execution_started", { + "execution_id": execution_id, + "started_at": started_at + }) + try: # 1. Lade Workflow-Definition if graph_data: @@ -161,6 +169,18 @@ async def execute_workflow( node_states.append(node_state) context["node_results"][node_id] = node_state + # NEW: Progress-Callback aufrufen (für SSE Streaming) + if progress_callback: + await progress_callback("node_complete", { + "node_id": node_id, + "node_type": node.type, + "node_label": node.label, + "status": node_state.status.value, + "total_nodes": len(graph.nodes), + "completed_nodes": len([ns for ns in node_states if ns.status in [NodeStatus.COMPLETED, NodeStatus.SKIPPED]]), + "error": node_state.error if node_state.status == NodeStatus.FAILED else None + }) + # Füge Nachfolger zur Queue hinzu outgoing_edges = [e for e in graph.edges if e.from_node == node_id] for edge in outgoing_edges: @@ -185,6 +205,19 @@ async def execute_workflow( logger.info(f"Workflow execution completed: {execution_id}") + # NEW: Progress-Callback für erfolgreiche Fertigstellung + if progress_callback: + await progress_callback("execution_complete", { + "execution_id": execution_id, + "status": "completed", + "aggregated_result": aggregated, + "total_nodes": len(node_states), + "completed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.COMPLETED]), + "skipped_nodes": len([ns for ns in node_states if ns.status == NodeStatus.SKIPPED]), + "failed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.FAILED]), + "completed_at": completed_at + }) + return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly @@ -198,6 +231,15 @@ async def execute_workflow( except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) + # NEW: Progress-Callback für Fehler + if progress_callback: + await progress_callback("execution_failed", { + "execution_id": execution_id, + "status": "failed", + "error": str(e), + "completed_at": datetime.utcnow().isoformat() + }) + # Speichere Failed State completed_at = datetime.utcnow().isoformat() save_execution_state( diff --git a/frontend/src/pages/Analysis.jsx b/frontend/src/pages/Analysis.jsx index 53dac0b..8f03c81 100644 --- a/frontend/src/pages/Analysis.jsx +++ b/frontend/src/pages/Analysis.jsx @@ -338,6 +338,8 @@ export default function Analysis() { /** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */ const [activeCategoryKey, setActiveCategoryKey] = useState(null) const [historyScopePick, setHistoryScopePick] = useState(null) + // NEW: Progress tracking for SSE workflows + const [progress, setProgress] = useState(null) // { total_nodes, completed_nodes, current_node_label } const loadAll = async () => { const [p, i] = await Promise.all([ @@ -377,10 +379,21 @@ export default function Analysis() { }, [newResult?.scope, prompts]) const runPrompt = async (slug) => { - setLoading(slug); setError(null); setNewResult(null) + setLoading(slug); setError(null); setNewResult(null); setProgress(null) try { - // Use new unified executor with save=true - const result = await api.executeUnifiedPrompt(slug, null, null, false, true) + // Use SSE-based executor for long-running workflows + const result = await api.executeUnifiedPromptStream(slug, null, null, false, true, (event) => { + // Progress callback: update UI in real-time + if (event.type === 'execution_started') { + setProgress({ total_nodes: 0, completed_nodes: 0, current_node_label: 'Starte...' }) + } else if (event.type === 'node_complete') { + setProgress({ + total_nodes: event.total_nodes || 0, + completed_nodes: event.completed_nodes || 0, + current_node_label: event.node_label || `Node ${event.node_id}` + }) + } + }) // Transform result to match old format for InsightCard let content = '' @@ -434,7 +447,10 @@ export default function Analysis() { setTab('run') } catch(e) { setError('Fehler: ' + e.message) - } finally { setLoading(null) } + } finally { + setLoading(null) + setProgress(null) // Clear progress + } } const deleteInsight = async (id) => { @@ -618,7 +634,9 @@ export default function Analysis() { disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)} > {loading===p.slug - ? <>
Läuft… + ? (progress + ? <>
{progress.completed_nodes}/{progress.total_nodes} Nodes + : <>
Läuft…) : (aiUsage && !aiUsage.allowed) ? '🔒 Limit' : <> Starten} diff --git a/frontend/src/utils/api.js b/frontend/src/utils/api.js index ea37736..205acc2 100644 --- a/frontend/src/utils/api.js +++ b/frontend/src/utils/api.js @@ -402,6 +402,72 @@ export const api = { return req('/prompts/execute?' + params, json(body)) }, + // NEW: SSE-based execution for long-running workflows + executeUnifiedPromptStream: (slug, modules=null, timeframes=null, debug=false, save=false, onProgress=null) => { + const params = new URLSearchParams({ prompt_slug: slug }) + if (debug) params.append('debug', 'true') + if (save) params.append('save', 'true') + + // TODO: Security improvement - use session cookie instead of token in URL + // For now, send token as query param since EventSource doesn't support custom headers + const token = localStorage.getItem('token') + if (token) params.append('token', token) + + if (modules) { + Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v)) + } + if (timeframes) { + Object.entries(timeframes).forEach(([k, v]) => params.append(`timeframes[${k}]`, v)) + } + + // Return a Promise that resolves with final result + return new Promise((resolve, reject) => { + const url = `${BASE_URL}/prompts/execute-stream?${params}` + + const eventSource = new EventSource(url) + + let finalResult = null + + eventSource.onmessage = (event) => { + try { + const data = JSON.parse(event.data) + + // Call progress callback if provided + if (onProgress) { + onProgress(data) + } + + // Check for final result + if (data.type === 'execution_complete') { + // Transform SSE result to match regular execute format + finalResult = { + type: 'workflow', + execution_id: data.execution_id, + status: data.status, + aggregated_result: data.aggregated_result, + debug: { + node_states: [] // TODO: collect from progress events if needed + } + } + eventSource.close() + resolve(finalResult) + } else if (data.type === 'execution_failed') { + eventSource.close() + reject(new Error(data.error || 'Workflow execution failed')) + } + } catch (e) { + console.error('Error parsing SSE event:', e) + } + } + + eventSource.onerror = (error) => { + console.error('SSE connection error:', error) + eventSource.close() + reject(new Error('Connection to server lost')) + } + }) + }, + // Workflow Execution (Part 2: Frontend Execute Integration) executeWorkflow: (slug, variables=null, debug=true, save=false) => { const params = new URLSearchParams({ prompt_slug: slug })