diff --git a/backend/prompt_executor.py b/backend/prompt_executor.py index eddf4c6..d434069 100644 --- a/backend/prompt_executor.py +++ b/backend/prompt_executor.py @@ -167,7 +167,8 @@ async def execute_prompt( prompt_slug: str, variables: Dict[str, Any], openrouter_call_func, - enable_debug: bool = False + enable_debug: bool = False, + progress_callback = None # NEW: Optional callback für SSE Progress-Updates ) -> Dict[str, Any]: """ Execute a single prompt (base or pipeline type). @@ -217,7 +218,7 @@ async def execute_prompt( elif prompt_type == 'workflow': # Workflow prompt: graph-based execution (Phase 0: Foundation) - return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) + return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback) else: raise HTTPException(400, f"Unknown prompt type: {prompt_type}") @@ -469,7 +470,8 @@ async def execute_prompt_with_data( modules: Optional[Dict[str, bool]] = None, timeframes: Optional[Dict[str, int]] = None, openrouter_call_func = None, - enable_debug: bool = False + enable_debug: bool = False, + progress_callback = None # NEW: Optional callback für SSE Progress-Updates ) -> Dict[str, Any]: """ Execute prompt with data loaded from database. @@ -605,7 +607,7 @@ async def execute_prompt_with_data( variables['goals_data'] = [] # Execute prompt - return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug) + return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback) async def execute_workflow_prompt( @@ -613,7 +615,8 @@ async def execute_workflow_prompt( variables: Dict[str, Any], openrouter_call_func, enable_debug: bool = False, - catalog: Optional[Dict] = None + catalog: Optional[Dict] = None, + progress_callback = None # NEW: Optional callback für SSE Progress-Updates ) -> Dict[str, Any]: """ Execute a workflow-type prompt (graph-based execution). @@ -652,7 +655,8 @@ async def execute_workflow_prompt( profile_id=variables.get('profile_id', 'unknown'), # From context variables=variables, openrouter_call_func=openrouter_call_func, - enable_debug=enable_debug + enable_debug=enable_debug, + progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen ) # Convert ExecutionResult to dict for API response diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index cc438db..9c8b09c 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -1445,13 +1445,176 @@ from prompt_executor import execute_prompt_with_data from models import UnifiedPromptCreate, UnifiedPromptUpdate -@router.post("/execute") -async def execute_unified_prompt( - prompt_slug: str, +@router.post("/execute-stream") +async def execute_unified_prompt_stream( + prompt_slug: str = Query(..., description="Slug of prompt to execute"), + token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"), modules: Optional[dict] = None, timeframes: Optional[dict] = None, - debug: bool = False, - save: bool = False, + debug: bool = Query(False, description="Include debug information (node_states, etc.)"), + save: bool = Query(False, description="Save result to ai_insights") +): + """ + Execute a unified prompt with Server-Sent Events (SSE) streaming. + + Returns live progress updates during workflow execution: + - execution_started: Workflow has begun + - node_complete: Each node completes + - execution_complete: Final result ready + - execution_failed: Error occurred + + Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. + """ + # Manual auth: verify token and get profile_id + if not token: + raise HTTPException(401, "Missing auth token") + + with get_db() as conn: + cur = get_cursor(conn) + cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,)) + row = cur.fetchone() + if not row: + raise HTTPException(401, "Invalid or expired token") + profile_id = row['profile_id'] + + # Use default modules/timeframes if not provided + if not modules: + modules = { + 'körper': True, + 'ernährung': True, + 'training': True, + 'schlaf': True, + 'vitalwerte': True + } + + if not timeframes: + timeframes = { + 'körper': 30, + 'ernährung': 30, + 'training': 14, + 'schlaf': 14, + 'vitalwerte': 7 + } + + # Wrapper function for OpenRouter calls + async def workflow_llm_call(prompt: str, model: str = None) -> str: + return await call_openrouter(prompt) + + # SSE Event Generator + async def event_stream(): + """Generate Server-Sent Events during workflow execution.""" + import asyncio + from asyncio import Queue + + # Event queue for progress updates + event_queue = Queue() + + # Flag to track execution completion + execution_complete = False + + # Define progress callback for streaming updates + async def progress_callback(event_type: str, data: dict): + """Queue SSE event for streaming to client.""" + event_data = { + "type": event_type, + **data + } + await event_queue.put(event_data) + + # Start workflow execution in background task + async def execute_workflow_async(): + nonlocal execution_complete + try: + # Execute workflow with progress callbacks + result = await execute_prompt_with_data( + prompt_slug=prompt_slug, + profile_id=profile_id, + modules=modules, + timeframes=timeframes, + openrouter_call_func=workflow_llm_call, + enable_debug=debug or save, + progress_callback=progress_callback + ) + + # Save to ai_insights if requested (same logic as /execute) + if save: + if result['type'] == 'pipeline': + final_output = result.get('output', {}) + if isinstance(final_output, dict) and len(final_output) == 1: + content = list(final_output.values())[0] + else: + content = json.dumps(final_output, ensure_ascii=False) + elif result['type'] == 'workflow': + content = _workflow_user_facing_content(result.get('aggregated_result')) + else: + content = result.get('output', '') + if isinstance(content, dict): + content = json.dumps(content, ensure_ascii=False) + + # Save to database (minimal metadata for now) + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + """INSERT INTO ai_insights (profile_id, scope, content, metadata, created) + VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""", + (profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']})) + ) + conn.commit() + + except Exception as e: + # Queue error event + await event_queue.put({ + "type": "execution_failed", + "error": str(e) + }) + finally: + execution_complete = True + + # Start workflow execution in background + import asyncio + execution_task = asyncio.create_task(execute_workflow_async()) + + # Stream events from queue + try: + while not execution_complete or not event_queue.empty(): + try: + # Wait for event with timeout + event = await asyncio.wait_for(event_queue.get(), timeout=0.5) + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + except asyncio.TimeoutError: + # Send keepalive ping + yield f": keepalive\n\n" + continue + + # Wait for execution task to complete + await execution_task + + except Exception as e: + # Send final error event + error_event = { + "type": "execution_failed", + "error": str(e) + } + yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" # Disable nginx buffering + } + ) + + +@router.post("/execute") +async def execute_unified_prompt( + prompt_slug: str = Query(..., description="Slug of prompt to execute"), + modules: Optional[dict] = None, + timeframes: Optional[dict] = None, + debug: bool = Query(False, description="Include debug information (node_states, etc.)"), + save: bool = Query(False, description="Save result to ai_insights"), session: dict = Depends(require_auth) ): """ diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index e5a7152..6e36ad4 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -42,7 +42,8 @@ async def execute_workflow( profile_id: str = None, variables: Dict[str, Any] = None, openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str - enable_debug: bool = False + enable_debug: bool = False, + progress_callback = None # NEW: Optional callback für Progress-Updates: async (event_type, data) -> None ) -> ExecutionResult: """ Führt einen Workflow aus (mit conditional branching und path consolidation). @@ -76,6 +77,13 @@ async def execute_workflow( logger.info(f"Starting workflow execution: {execution_id}") + # NEW: Progress-Callback für Start + if progress_callback: + await progress_callback("execution_started", { + "execution_id": execution_id, + "started_at": started_at + }) + try: # 1. Lade Workflow-Definition if graph_data: @@ -161,6 +169,20 @@ async def execute_workflow( node_states.append(node_state) context["node_results"][node_id] = node_state + # NEW: Progress-Callback aufrufen (für SSE Streaming) + if progress_callback: + # Create a meaningful label for the node + node_label = node.prompt_slug if hasattr(node, 'prompt_slug') and node.prompt_slug else f"{node.type.value}-{node_id[:8]}" + await progress_callback("node_complete", { + "node_id": node_id, + "node_type": node.type, + "node_label": node_label, + "status": node_state.status.value, + "total_nodes": len(graph.nodes), + "completed_nodes": len([ns for ns in node_states if ns.status in [NodeStatus.EXECUTED, NodeStatus.SKIPPED]]), + "error": node_state.error if node_state.status == NodeStatus.FAILED else None + }) + # Füge Nachfolger zur Queue hinzu outgoing_edges = [e for e in graph.edges if e.from_node == node_id] for edge in outgoing_edges: @@ -185,6 +207,19 @@ async def execute_workflow( logger.info(f"Workflow execution completed: {execution_id}") + # NEW: Progress-Callback für erfolgreiche Fertigstellung + if progress_callback: + await progress_callback("execution_complete", { + "execution_id": execution_id, + "status": "completed", + "aggregated_result": aggregated, + "total_nodes": len(node_states), + "completed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.EXECUTED]), + "skipped_nodes": len([ns for ns in node_states if ns.status == NodeStatus.SKIPPED]), + "failed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.FAILED]), + "completed_at": completed_at + }) + return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly @@ -198,6 +233,15 @@ async def execute_workflow( except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) + # NEW: Progress-Callback für Fehler + if progress_callback: + await progress_callback("execution_failed", { + "execution_id": execution_id, + "status": "failed", + "error": str(e), + "completed_at": datetime.utcnow().isoformat() + }) + # Speichere Failed State completed_at = datetime.utcnow().isoformat() save_execution_state( @@ -399,10 +443,49 @@ def execute_logic_node( try: if not node.condition: - raise ValueError(f"Logic node {node.id} has no condition") + error_msg = f"Logic node {node.id} has no condition defined" + logger.error(error_msg) + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.FAILED, + error=error_msg, + started_at=started_at, + completed_at=datetime.utcnow().isoformat() + ) + + # Handle both formats (thanks to Union[LogicExpression, Condition] type): + # 1. Direct LogicExpression (UI format): node.condition is LogicExpression + # 2. Wrapped in Condition (legacy): node.condition is Condition with .expression + from workflow_models import LogicExpression, Condition + + expression = None + + if isinstance(node.condition, LogicExpression): + # UI format: direct LogicExpression + expression = node.condition + elif isinstance(node.condition, Condition): + # Legacy format: wrapped in Condition + expression = node.condition.expression + else: + # Fallback: try to detect format manually + if hasattr(node.condition, 'operator') and hasattr(node.condition, 'operands'): + expression = node.condition # Looks like LogicExpression + elif hasattr(node.condition, 'expression'): + expression = node.condition.expression # Looks like Condition + + if expression is None: + error_msg = f"Logic node {node.id} has no valid condition/expression defined" + logger.error(error_msg) + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.FAILED, + error=error_msg, + started_at=started_at, + completed_at=datetime.utcnow().isoformat() + ) # 1. Evaluiere Bedingung - result, error = evaluate_logic_expression(node.condition.expression, context) + result, error = evaluate_logic_expression(expression, context) if error: # Fehler bei Evaluation → Fallback anwenden @@ -777,17 +860,31 @@ def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[ """ Findet alle ausgehenden Edges mit bestimmtem Label. + Unterstützt beide Formate: + - Legacy: e.label == label (z.B. "then", "else") + - UI: e.source_handle == label (z.B. "true", "false") + Args: node_id: Node-ID - label: Edge-Label (z.B. "then", "else", "uncertainty") + label: Edge-Label oder sourceHandle (z.B. "then"/"true", "else"/"false") graph: WorkflowGraph Returns: Liste von Edge-IDs """ + # Map label to sourceHandle equivalents + label_to_handle = { + "then": "true", + "else": "false" + } + handle_equivalent = label_to_handle.get(label, label) + matching_edges = [ e.id for e in graph.edges - if e.from_node == node_id and e.label == label + if e.from_node == node_id and ( + e.label == label or # Legacy format + (hasattr(e, 'source_handle') and e.source_handle == handle_equivalent) # UI format + ) ] return matching_edges diff --git a/backend/workflow_models.py b/backend/workflow_models.py index 5b92126..0467285 100644 --- a/backend/workflow_models.py +++ b/backend/workflow_models.py @@ -6,7 +6,7 @@ Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen. Konzept-Basis: konzept_workflow_engine_konsolidated.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md """ -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Union from pydantic import BaseModel, Field from enum import Enum @@ -148,18 +148,27 @@ class LogicExpression(BaseModel): } """ operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator") - operands: Optional[List[Any]] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)") + operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)") # Bei einfachem Vergleich: ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)") value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)") +# Enable forward reference resolution for recursive model +LogicExpression.model_rebuild() + class Condition(BaseModel): """ Bedingung für einen Logik-Knoten. Unterstützt if/else-if/else-Logik. + + Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression. + If unknown fields are present (like 'operator', 'operands'), deserialization fails + and Pydantic tries LogicExpression instead. """ + model_config = {'extra': 'forbid'} + type: str = Field(default="if", description="Bedingungstyp: if, else-if, else") expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')") then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad") @@ -195,7 +204,8 @@ class WorkflowNode(BaseModel): question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)") # LOGIC-Knoten - condition: Optional[Condition] = Field(None, description="Bedingung für Pfad-Routing") + # Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy) + condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing") fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration") # JOIN-Knoten (Phase 4) @@ -220,6 +230,10 @@ class WorkflowEdge(BaseModel): to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID") label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')") + # UI-Format fields (React Flow) + source_handle: Optional[str] = Field(None, alias="sourceHandle", description="Source handle ID (UI format: 'true', 'false', 'out')") + target_handle: Optional[str] = Field(None, alias="targetHandle", description="Target handle ID (UI format: 'in', 'path_1', etc.)") + class WorkflowGraph(BaseModel): """ diff --git a/frontend/src/pages/Analysis.jsx b/frontend/src/pages/Analysis.jsx index 53dac0b..8f03c81 100644 --- a/frontend/src/pages/Analysis.jsx +++ b/frontend/src/pages/Analysis.jsx @@ -338,6 +338,8 @@ export default function Analysis() { /** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */ const [activeCategoryKey, setActiveCategoryKey] = useState(null) const [historyScopePick, setHistoryScopePick] = useState(null) + // NEW: Progress tracking for SSE workflows + const [progress, setProgress] = useState(null) // { total_nodes, completed_nodes, current_node_label } const loadAll = async () => { const [p, i] = await Promise.all([ @@ -377,10 +379,21 @@ export default function Analysis() { }, [newResult?.scope, prompts]) const runPrompt = async (slug) => { - setLoading(slug); setError(null); setNewResult(null) + setLoading(slug); setError(null); setNewResult(null); setProgress(null) try { - // Use new unified executor with save=true - const result = await api.executeUnifiedPrompt(slug, null, null, false, true) + // Use SSE-based executor for long-running workflows + const result = await api.executeUnifiedPromptStream(slug, null, null, false, true, (event) => { + // Progress callback: update UI in real-time + if (event.type === 'execution_started') { + setProgress({ total_nodes: 0, completed_nodes: 0, current_node_label: 'Starte...' }) + } else if (event.type === 'node_complete') { + setProgress({ + total_nodes: event.total_nodes || 0, + completed_nodes: event.completed_nodes || 0, + current_node_label: event.node_label || `Node ${event.node_id}` + }) + } + }) // Transform result to match old format for InsightCard let content = '' @@ -434,7 +447,10 @@ export default function Analysis() { setTab('run') } catch(e) { setError('Fehler: ' + e.message) - } finally { setLoading(null) } + } finally { + setLoading(null) + setProgress(null) // Clear progress + } } const deleteInsight = async (id) => { @@ -618,7 +634,9 @@ export default function Analysis() { disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)} > {loading===p.slug - ? <>