diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index 1d6a436..6c7f8a5 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -254,6 +254,156 @@ def import_prompts( } +from models import UnifiedPromptCreate, UnifiedPromptUpdate + + +@router.get("/execute-stream") +async def execute_unified_prompt_stream( + prompt_slug: str = Query(..., description="Slug of prompt to execute"), + debug: bool = Query(False, description="Include debug information (node_states, etc.)"), + save: bool = Query(False, description="Save result to ai_insights"), + session: dict = Depends(require_auth_flexible) +): + """ + Execute a unified prompt with Server-Sent Events (SSE) streaming. + + Returns live progress updates during workflow execution: + - execution_started: Workflow has begun + - node_complete: Each node completes + - execution_complete: Final result ready + - execution_failed: Error occurred + + Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. + """ + profile_id = session['profile_id'] + + # Use default modules/timeframes (SSE doesn't support complex params) + modules = { + 'körper': True, + 'ernährung': True, + 'training': True, + 'schlaf': True, + 'vitalwerte': True + } + + timeframes = { + 'körper': 30, + 'ernährung': 30, + 'training': 14, + 'schlaf': 14, + 'vitalwerte': 7 + } + + # Wrapper function for OpenRouter calls + async def workflow_llm_call(prompt: str, model: str = None) -> str: + return await call_openrouter(prompt) + + # SSE Event Generator + async def event_stream(): + """Generate Server-Sent Events during workflow execution.""" + import asyncio + from asyncio import Queue + + # Event queue for progress updates + event_queue = Queue() + + # Flag to track execution completion + execution_complete = False + + # Define progress callback for streaming updates + async def progress_callback(event_type: str, data: dict): + """Queue SSE event for streaming to client.""" + event_data = { + "type": event_type, + **data + } + await event_queue.put(event_data) + + # Start workflow execution in background task + async def execute_workflow_async(): + nonlocal execution_complete + try: + # Execute workflow with progress callbacks + result = await execute_prompt_with_data( + prompt_slug=prompt_slug, + profile_id=profile_id, + modules=modules, + timeframes=timeframes, + openrouter_call_func=workflow_llm_call, + enable_debug=debug or save, + progress_callback=progress_callback + ) + + # Save to ai_insights if requested (same logic as /execute) + if save: + if result['type'] == 'pipeline': + final_output = result.get('output', {}) + if isinstance(final_output, dict) and len(final_output) == 1: + content = list(final_output.values())[0] + else: + content = json.dumps(final_output, ensure_ascii=False) + elif result['type'] == 'workflow': + content = _workflow_user_facing_content(result.get('aggregated_result')) + else: + content = result.get('output', '') + if isinstance(content, dict): + content = json.dumps(content, ensure_ascii=False) + + # Save to database (minimal metadata for now) + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + """INSERT INTO ai_insights (profile_id, scope, content, metadata, created) + VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""", + (profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']})) + ) + conn.commit() + + except Exception as e: + # Queue error event + await event_queue.put({ + "type": "execution_failed", + "error": str(e) + }) + finally: + execution_complete = True + + # Start workflow execution in background + import asyncio + execution_task = asyncio.create_task(execute_workflow_async()) + + # Stream events from queue + try: + while not execution_complete or not event_queue.empty(): + try: + # Wait for event with timeout + event = await asyncio.wait_for(event_queue.get(), timeout=0.5) + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + except asyncio.TimeoutError: + # Send keepalive ping + yield f": keepalive\n\n" + continue + + # Wait for execution task to complete + await execution_task + + except Exception as e: + # Send final error event + error_event = { + "type": "execution_failed", + "error": str(e) + } + yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" # Disable nginx buffering + } + # NOTE: /execute-stream MUST be defined BEFORE /{prompt_id} to avoid route conflicts # FastAPI matches routes in order, so specific routes must come before catch-all patterns @@ -1445,155 +1595,6 @@ def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin) # ══════════════════════════════════════════════════════════════════════════════ from prompt_executor import execute_prompt_with_data -from models import UnifiedPromptCreate, UnifiedPromptUpdate - - -@router.get("/execute-stream") -async def execute_unified_prompt_stream( - prompt_slug: str = Query(..., description="Slug of prompt to execute"), - debug: bool = Query(False, description="Include debug information (node_states, etc.)"), - save: bool = Query(False, description="Save result to ai_insights"), - session: dict = Depends(require_auth_flexible) -): - """ - Execute a unified prompt with Server-Sent Events (SSE) streaming. - - Returns live progress updates during workflow execution: - - execution_started: Workflow has begun - - node_complete: Each node completes - - execution_complete: Final result ready - - execution_failed: Error occurred - - Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. - """ - profile_id = session['profile_id'] - - # Use default modules/timeframes (SSE doesn't support complex params) - modules = { - 'körper': True, - 'ernährung': True, - 'training': True, - 'schlaf': True, - 'vitalwerte': True - } - - timeframes = { - 'körper': 30, - 'ernährung': 30, - 'training': 14, - 'schlaf': 14, - 'vitalwerte': 7 - } - - # Wrapper function for OpenRouter calls - async def workflow_llm_call(prompt: str, model: str = None) -> str: - return await call_openrouter(prompt) - - # SSE Event Generator - async def event_stream(): - """Generate Server-Sent Events during workflow execution.""" - import asyncio - from asyncio import Queue - - # Event queue for progress updates - event_queue = Queue() - - # Flag to track execution completion - execution_complete = False - - # Define progress callback for streaming updates - async def progress_callback(event_type: str, data: dict): - """Queue SSE event for streaming to client.""" - event_data = { - "type": event_type, - **data - } - await event_queue.put(event_data) - - # Start workflow execution in background task - async def execute_workflow_async(): - nonlocal execution_complete - try: - # Execute workflow with progress callbacks - result = await execute_prompt_with_data( - prompt_slug=prompt_slug, - profile_id=profile_id, - modules=modules, - timeframes=timeframes, - openrouter_call_func=workflow_llm_call, - enable_debug=debug or save, - progress_callback=progress_callback - ) - - # Save to ai_insights if requested (same logic as /execute) - if save: - if result['type'] == 'pipeline': - final_output = result.get('output', {}) - if isinstance(final_output, dict) and len(final_output) == 1: - content = list(final_output.values())[0] - else: - content = json.dumps(final_output, ensure_ascii=False) - elif result['type'] == 'workflow': - content = _workflow_user_facing_content(result.get('aggregated_result')) - else: - content = result.get('output', '') - if isinstance(content, dict): - content = json.dumps(content, ensure_ascii=False) - - # Save to database (minimal metadata for now) - with get_db() as conn: - cur = get_cursor(conn) - cur.execute( - """INSERT INTO ai_insights (profile_id, scope, content, metadata, created) - VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""", - (profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']})) - ) - conn.commit() - - except Exception as e: - # Queue error event - await event_queue.put({ - "type": "execution_failed", - "error": str(e) - }) - finally: - execution_complete = True - - # Start workflow execution in background - import asyncio - execution_task = asyncio.create_task(execute_workflow_async()) - - # Stream events from queue - try: - while not execution_complete or not event_queue.empty(): - try: - # Wait for event with timeout - event = await asyncio.wait_for(event_queue.get(), timeout=0.5) - yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" - except asyncio.TimeoutError: - # Send keepalive ping - yield f": keepalive\n\n" - continue - - # Wait for execution task to complete - await execution_task - - except Exception as e: - # Send final error event - error_event = { - "type": "execution_failed", - "error": str(e) - } - yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" - - return StreamingResponse( - event_stream(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no" # Disable nginx buffering - } )