fix: move /execute-stream route BEFORE /{prompt_id} catch-all
- /execute-stream now at line 260 (was 1448)
- /{prompt_id} now at line 410 (was 257)
- FastAPI will now match /execute-stream correctly
- Fixes 'Connection to server lost' error in analysis page
This commit is contained in:
parent
35ba2d7fdb
commit
09d1b6f967
|
|
@ -254,6 +254,156 @@ def import_prompts(
|
|||
}
|
||||
|
||||
|
||||
from models import UnifiedPromptCreate, UnifiedPromptUpdate
|
||||
|
||||
|
||||
@router.get("/execute-stream")
|
||||
async def execute_unified_prompt_stream(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
|
||||
save: bool = Query(False, description="Save result to ai_insights"),
|
||||
session: dict = Depends(require_auth_flexible)
|
||||
):
|
||||
"""
|
||||
Execute a unified prompt with Server-Sent Events (SSE) streaming.
|
||||
|
||||
Returns live progress updates during workflow execution:
|
||||
- execution_started: Workflow has begun
|
||||
- node_complete: Each node completes
|
||||
- execution_complete: Final result ready
|
||||
- execution_failed: Error occurred
|
||||
|
||||
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
||||
"""
|
||||
profile_id = session['profile_id']
|
||||
|
||||
# Use default modules/timeframes (SSE doesn't support complex params)
|
||||
modules = {
|
||||
'körper': True,
|
||||
'ernährung': True,
|
||||
'training': True,
|
||||
'schlaf': True,
|
||||
'vitalwerte': True
|
||||
}
|
||||
|
||||
timeframes = {
|
||||
'körper': 30,
|
||||
'ernährung': 30,
|
||||
'training': 14,
|
||||
'schlaf': 14,
|
||||
'vitalwerte': 7
|
||||
}
|
||||
|
||||
# Wrapper function for OpenRouter calls
|
||||
async def workflow_llm_call(prompt: str, model: str = None) -> str:
|
||||
return await call_openrouter(prompt)
|
||||
|
||||
# SSE Event Generator
|
||||
async def event_stream():
|
||||
"""Generate Server-Sent Events during workflow execution."""
|
||||
import asyncio
|
||||
from asyncio import Queue
|
||||
|
||||
# Event queue for progress updates
|
||||
event_queue = Queue()
|
||||
|
||||
# Flag to track execution completion
|
||||
execution_complete = False
|
||||
|
||||
# Define progress callback for streaming updates
|
||||
async def progress_callback(event_type: str, data: dict):
|
||||
"""Queue SSE event for streaming to client."""
|
||||
event_data = {
|
||||
"type": event_type,
|
||||
**data
|
||||
}
|
||||
await event_queue.put(event_data)
|
||||
|
||||
# Start workflow execution in background task
|
||||
async def execute_workflow_async():
|
||||
nonlocal execution_complete
|
||||
try:
|
||||
# Execute workflow with progress callbacks
|
||||
result = await execute_prompt_with_data(
|
||||
prompt_slug=prompt_slug,
|
||||
profile_id=profile_id,
|
||||
modules=modules,
|
||||
timeframes=timeframes,
|
||||
openrouter_call_func=workflow_llm_call,
|
||||
enable_debug=debug or save,
|
||||
progress_callback=progress_callback
|
||||
)
|
||||
|
||||
# Save to ai_insights if requested (same logic as /execute)
|
||||
if save:
|
||||
if result['type'] == 'pipeline':
|
||||
final_output = result.get('output', {})
|
||||
if isinstance(final_output, dict) and len(final_output) == 1:
|
||||
content = list(final_output.values())[0]
|
||||
else:
|
||||
content = json.dumps(final_output, ensure_ascii=False)
|
||||
elif result['type'] == 'workflow':
|
||||
content = _workflow_user_facing_content(result.get('aggregated_result'))
|
||||
else:
|
||||
content = result.get('output', '')
|
||||
if isinstance(content, dict):
|
||||
content = json.dumps(content, ensure_ascii=False)
|
||||
|
||||
# Save to database (minimal metadata for now)
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute(
|
||||
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
|
||||
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
|
||||
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
# Queue error event
|
||||
await event_queue.put({
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
})
|
||||
finally:
|
||||
execution_complete = True
|
||||
|
||||
# Start workflow execution in background
|
||||
import asyncio
|
||||
execution_task = asyncio.create_task(execute_workflow_async())
|
||||
|
||||
# Stream events from queue
|
||||
try:
|
||||
while not execution_complete or not event_queue.empty():
|
||||
try:
|
||||
# Wait for event with timeout
|
||||
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
|
||||
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send keepalive ping
|
||||
yield f": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Wait for execution task to complete
|
||||
await execution_task
|
||||
|
||||
except Exception as e:
|
||||
# Send final error event
|
||||
error_event = {
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Disable nginx buffering
|
||||
}
|
||||
|
||||
# NOTE: /execute-stream MUST be defined BEFORE /{prompt_id} to avoid route conflicts
|
||||
# FastAPI matches routes in order, so specific routes must come before catch-all patterns
|
||||
|
||||
|
|
@ -1445,155 +1595,6 @@ def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)
|
|||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
from prompt_executor import execute_prompt_with_data
|
||||
from models import UnifiedPromptCreate, UnifiedPromptUpdate
|
||||
|
||||
|
||||
@router.get("/execute-stream")
|
||||
async def execute_unified_prompt_stream(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
|
||||
save: bool = Query(False, description="Save result to ai_insights"),
|
||||
session: dict = Depends(require_auth_flexible)
|
||||
):
|
||||
"""
|
||||
Execute a unified prompt with Server-Sent Events (SSE) streaming.
|
||||
|
||||
Returns live progress updates during workflow execution:
|
||||
- execution_started: Workflow has begun
|
||||
- node_complete: Each node completes
|
||||
- execution_complete: Final result ready
|
||||
- execution_failed: Error occurred
|
||||
|
||||
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
||||
"""
|
||||
profile_id = session['profile_id']
|
||||
|
||||
# Use default modules/timeframes (SSE doesn't support complex params)
|
||||
modules = {
|
||||
'körper': True,
|
||||
'ernährung': True,
|
||||
'training': True,
|
||||
'schlaf': True,
|
||||
'vitalwerte': True
|
||||
}
|
||||
|
||||
timeframes = {
|
||||
'körper': 30,
|
||||
'ernährung': 30,
|
||||
'training': 14,
|
||||
'schlaf': 14,
|
||||
'vitalwerte': 7
|
||||
}
|
||||
|
||||
# Wrapper function for OpenRouter calls
|
||||
async def workflow_llm_call(prompt: str, model: str = None) -> str:
|
||||
return await call_openrouter(prompt)
|
||||
|
||||
# SSE Event Generator
|
||||
async def event_stream():
|
||||
"""Generate Server-Sent Events during workflow execution."""
|
||||
import asyncio
|
||||
from asyncio import Queue
|
||||
|
||||
# Event queue for progress updates
|
||||
event_queue = Queue()
|
||||
|
||||
# Flag to track execution completion
|
||||
execution_complete = False
|
||||
|
||||
# Define progress callback for streaming updates
|
||||
async def progress_callback(event_type: str, data: dict):
|
||||
"""Queue SSE event for streaming to client."""
|
||||
event_data = {
|
||||
"type": event_type,
|
||||
**data
|
||||
}
|
||||
await event_queue.put(event_data)
|
||||
|
||||
# Start workflow execution in background task
|
||||
async def execute_workflow_async():
|
||||
nonlocal execution_complete
|
||||
try:
|
||||
# Execute workflow with progress callbacks
|
||||
result = await execute_prompt_with_data(
|
||||
prompt_slug=prompt_slug,
|
||||
profile_id=profile_id,
|
||||
modules=modules,
|
||||
timeframes=timeframes,
|
||||
openrouter_call_func=workflow_llm_call,
|
||||
enable_debug=debug or save,
|
||||
progress_callback=progress_callback
|
||||
)
|
||||
|
||||
# Save to ai_insights if requested (same logic as /execute)
|
||||
if save:
|
||||
if result['type'] == 'pipeline':
|
||||
final_output = result.get('output', {})
|
||||
if isinstance(final_output, dict) and len(final_output) == 1:
|
||||
content = list(final_output.values())[0]
|
||||
else:
|
||||
content = json.dumps(final_output, ensure_ascii=False)
|
||||
elif result['type'] == 'workflow':
|
||||
content = _workflow_user_facing_content(result.get('aggregated_result'))
|
||||
else:
|
||||
content = result.get('output', '')
|
||||
if isinstance(content, dict):
|
||||
content = json.dumps(content, ensure_ascii=False)
|
||||
|
||||
# Save to database (minimal metadata for now)
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute(
|
||||
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
|
||||
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
|
||||
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
# Queue error event
|
||||
await event_queue.put({
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
})
|
||||
finally:
|
||||
execution_complete = True
|
||||
|
||||
# Start workflow execution in background
|
||||
import asyncio
|
||||
execution_task = asyncio.create_task(execute_workflow_async())
|
||||
|
||||
# Stream events from queue
|
||||
try:
|
||||
while not execution_complete or not event_queue.empty():
|
||||
try:
|
||||
# Wait for event with timeout
|
||||
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
|
||||
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send keepalive ping
|
||||
yield f": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Wait for execution task to complete
|
||||
await execution_task
|
||||
|
||||
except Exception as e:
|
||||
# Send final error event
|
||||
error_event = {
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Disable nginx buffering
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user