diff --git a/backend/auth.py b/backend/auth.py index 918b5eb..f255879 100644 --- a/backend/auth.py +++ b/backend/auth.py @@ -13,6 +13,8 @@ import bcrypt from db import get_db, get_cursor +print("[AUTH.PY] Module loaded - require_auth_flexible will be defined") + def hash_pin(pin: str) -> str: """Hash password with bcrypt. Falls back gracefully from legacy SHA256.""" @@ -76,21 +78,24 @@ def require_auth(x_auth_token: Optional[str] = Header(default=None)): return session -def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), token: Optional[str] = Query(default=None)): +def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), ssetoken: Optional[str] = Query(default=None)): """ FastAPI dependency - auth via header OR query parameter. - Used for endpoints accessed by tags that can't send headers. + Used for endpoints accessed by tags and SSE connections that can't send headers. + Query parameter is 'ssetoken' to avoid conflicts with endpoint 'token' parameters. Usage: @app.get("/api/photos/{id}") def get_photo(id: str, session: dict = Depends(require_auth_flexible)): ... + Call with: ?ssetoken=XXX or Header: X-Auth-Token: XXX + Raises: HTTPException 401 if not authenticated """ - session = get_session(x_auth_token or token) + session = get_session(x_auth_token or ssetoken) if not session: raise HTTPException(401, "Nicht eingeloggt") return session diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index 9c8b09c..92e6987 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -12,7 +12,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Header from fastapi.responses import StreamingResponse from db import get_db, get_cursor, r2d -from auth import require_auth, require_admin +from auth import require_auth, require_admin, require_auth_flexible from models import ( PromptCreate, PromptUpdate, PromptGenerateRequest, PipelineConfigCreate, PipelineConfigUpdate @@ -254,6 +254,178 @@ def import_prompts( } +# ══════════════════════════════════════════════════════════════════════════════ +# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2) +# ══════════════════════════════════════════════════════════════════════════════ + +from prompt_executor import execute_prompt_with_data +from models import UnifiedPromptCreate, UnifiedPromptUpdate + + +@router.get("/execute-stream") +async def execute_unified_prompt_stream( + prompt_slug: str = Query(..., description="Slug of prompt to execute"), + debug: bool = Query(False, description="Include debug information (node_states, etc.)"), + save: bool = Query(False, description="Save result to ai_insights"), + session: dict = Depends(require_auth_flexible) +): + """ + Execute a unified prompt with Server-Sent Events (SSE) streaming. + + Returns live progress updates during workflow execution: + - execution_started: Workflow has begun + - node_complete: Each node completes + - workflow_graph_finished: Workflow-Graph fertig (Zwischen-Info, kein Endergebnis) + - execution_complete: Endergebnis (wie POST /execute, Feld result) + - execution_failed: Error occurred + + Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. + """ + profile_id = session['profile_id'] + + # Use default modules/timeframes (SSE doesn't support complex params) + modules = { + 'körper': True, + 'ernährung': True, + 'training': True, + 'schlaf': True, + 'vitalwerte': True + } + + timeframes = { + 'körper': 30, + 'ernährung': 30, + 'training': 14, + 'schlaf': 14, + 'vitalwerte': 7 + } + + # Wrapper function for OpenRouter calls + async def workflow_llm_call(prompt: str, model: str = None) -> str: + return await call_openrouter(prompt) + + # SSE Event Generator + async def event_stream(): + """Generate Server-Sent Events during workflow execution.""" + import asyncio + from asyncio import Queue + + # Event queue for progress updates + event_queue = Queue() + + # Flag to track execution completion + execution_complete = False + + # Define progress callback for streaming updates + async def progress_callback(event_type: str, data: dict): + """Queue SSE event for streaming to client.""" + event_data = { + "type": event_type, + **data + } + await event_queue.put(event_data) + + # Start workflow execution in background task + async def execute_workflow_async(): + nonlocal execution_complete + try: + # Execute workflow with progress callbacks + result = await execute_prompt_with_data( + prompt_slug=prompt_slug, + profile_id=profile_id, + modules=modules, + timeframes=timeframes, + openrouter_call_func=workflow_llm_call, + enable_debug=debug or save, + progress_callback=progress_callback + ) + + # Save to ai_insights if requested (same logic as /execute) + if save: + if result['type'] == 'pipeline': + final_output = result.get('output', {}) + if isinstance(final_output, dict) and len(final_output) == 1: + content = list(final_output.values())[0] + else: + content = json.dumps(final_output, ensure_ascii=False) + elif result['type'] == 'workflow': + content = _workflow_user_facing_content(result.get('aggregated_result')) + else: + content = result.get('output', '') + if isinstance(content, dict): + content = json.dumps(content, ensure_ascii=False) + + # Save to database (minimal metadata for now) + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + """INSERT INTO ai_insights (profile_id, scope, content, metadata, created) + VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""", + (profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']})) + ) + conn.commit() + + # Pflicht für alle Prompt-Typen: Pipeline/Base rufen keinen progress_callback + # mit Abschluss auf — ohne dieses Event endet SSE ohne resolve → „Connection to server lost“. + try: + sse_payload = json.loads(json.dumps(result, default=str)) + except (TypeError, ValueError): + sse_payload = { + "type": result.get("type", "unknown"), + "error": "result_not_serializable", + } + await event_queue.put({ + "type": "execution_complete", + "result": sse_payload, + }) + + except Exception as e: + # Queue error event + await event_queue.put({ + "type": "execution_failed", + "error": str(e) + }) + finally: + execution_complete = True + + # Start workflow execution in background + import asyncio + execution_task = asyncio.create_task(execute_workflow_async()) + + # Stream events from queue + try: + while not execution_complete or not event_queue.empty(): + try: + # Wait for event with timeout + event = await asyncio.wait_for(event_queue.get(), timeout=0.5) + yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" + except asyncio.TimeoutError: + # Send keepalive ping + yield f": keepalive\n\n" + continue + + # Wait for execution task to complete + await execution_task + + except Exception as e: + # Send final error event + error_event = { + "type": "execution_failed", + "error": str(e) + } + yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" # Disable nginx buffering + } + ) + + @router.get("/{prompt_id}") def get_prompt(prompt_id: str, session: dict=Depends(require_auth)): """Get single AI prompt by ID (UUID).""" @@ -1437,177 +1609,6 @@ def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin) return {"ok": True} -# ══════════════════════════════════════════════════════════════════════════════ -# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2) -# ══════════════════════════════════════════════════════════════════════════════ - -from prompt_executor import execute_prompt_with_data -from models import UnifiedPromptCreate, UnifiedPromptUpdate - - -@router.post("/execute-stream") -async def execute_unified_prompt_stream( - prompt_slug: str = Query(..., description="Slug of prompt to execute"), - token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"), - modules: Optional[dict] = None, - timeframes: Optional[dict] = None, - debug: bool = Query(False, description="Include debug information (node_states, etc.)"), - save: bool = Query(False, description="Save result to ai_insights") -): - """ - Execute a unified prompt with Server-Sent Events (SSE) streaming. - - Returns live progress updates during workflow execution: - - execution_started: Workflow has begun - - node_complete: Each node completes - - execution_complete: Final result ready - - execution_failed: Error occurred - - Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts. - """ - # Manual auth: verify token and get profile_id - if not token: - raise HTTPException(401, "Missing auth token") - - with get_db() as conn: - cur = get_cursor(conn) - cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,)) - row = cur.fetchone() - if not row: - raise HTTPException(401, "Invalid or expired token") - profile_id = row['profile_id'] - - # Use default modules/timeframes if not provided - if not modules: - modules = { - 'körper': True, - 'ernährung': True, - 'training': True, - 'schlaf': True, - 'vitalwerte': True - } - - if not timeframes: - timeframes = { - 'körper': 30, - 'ernährung': 30, - 'training': 14, - 'schlaf': 14, - 'vitalwerte': 7 - } - - # Wrapper function for OpenRouter calls - async def workflow_llm_call(prompt: str, model: str = None) -> str: - return await call_openrouter(prompt) - - # SSE Event Generator - async def event_stream(): - """Generate Server-Sent Events during workflow execution.""" - import asyncio - from asyncio import Queue - - # Event queue for progress updates - event_queue = Queue() - - # Flag to track execution completion - execution_complete = False - - # Define progress callback for streaming updates - async def progress_callback(event_type: str, data: dict): - """Queue SSE event for streaming to client.""" - event_data = { - "type": event_type, - **data - } - await event_queue.put(event_data) - - # Start workflow execution in background task - async def execute_workflow_async(): - nonlocal execution_complete - try: - # Execute workflow with progress callbacks - result = await execute_prompt_with_data( - prompt_slug=prompt_slug, - profile_id=profile_id, - modules=modules, - timeframes=timeframes, - openrouter_call_func=workflow_llm_call, - enable_debug=debug or save, - progress_callback=progress_callback - ) - - # Save to ai_insights if requested (same logic as /execute) - if save: - if result['type'] == 'pipeline': - final_output = result.get('output', {}) - if isinstance(final_output, dict) and len(final_output) == 1: - content = list(final_output.values())[0] - else: - content = json.dumps(final_output, ensure_ascii=False) - elif result['type'] == 'workflow': - content = _workflow_user_facing_content(result.get('aggregated_result')) - else: - content = result.get('output', '') - if isinstance(content, dict): - content = json.dumps(content, ensure_ascii=False) - - # Save to database (minimal metadata for now) - with get_db() as conn: - cur = get_cursor(conn) - cur.execute( - """INSERT INTO ai_insights (profile_id, scope, content, metadata, created) - VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""", - (profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']})) - ) - conn.commit() - - except Exception as e: - # Queue error event - await event_queue.put({ - "type": "execution_failed", - "error": str(e) - }) - finally: - execution_complete = True - - # Start workflow execution in background - import asyncio - execution_task = asyncio.create_task(execute_workflow_async()) - - # Stream events from queue - try: - while not execution_complete or not event_queue.empty(): - try: - # Wait for event with timeout - event = await asyncio.wait_for(event_queue.get(), timeout=0.5) - yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n" - except asyncio.TimeoutError: - # Send keepalive ping - yield f": keepalive\n\n" - continue - - # Wait for execution task to complete - await execution_task - - except Exception as e: - # Send final error event - error_event = { - "type": "execution_failed", - "error": str(e) - } - yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" - - return StreamingResponse( - event_stream(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no" # Disable nginx buffering - } - ) - - @router.post("/execute") async def execute_unified_prompt( prompt_slug: str = Query(..., description="Slug of prompt to execute"), diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index 1f12fba..be94f1e 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -213,9 +213,11 @@ async def execute_workflow( logger.info(f"Workflow execution completed: {execution_id}") - # NEW: Progress-Callback für erfolgreiche Fertigstellung + # Fortschritt: kein type=execution_complete — das sendet /execute-stream einmalig + # mit vollem execute_prompt_with_data-Result (Pipeline/Base/Workflow), sonst schließt + # der Client nach Workflow vorzeitig ohne debug/node_states oder Pipeline bricht ab. if progress_callback: - await progress_callback("execution_complete", { + await progress_callback("workflow_graph_finished", { "execution_id": execution_id, "status": "completed", "aggregated_result": aggregated, diff --git a/frontend/nginx.conf b/frontend/nginx.conf index 7196b3c..d3eedef 100644 --- a/frontend/nginx.conf +++ b/frontend/nginx.conf @@ -8,6 +8,9 @@ server { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; client_max_body_size 20M; + proxy_read_timeout 300s; + proxy_connect_timeout 60s; + proxy_send_timeout 60s; } location / { diff --git a/frontend/src/utils/api.js b/frontend/src/utils/api.js index c52e032..d716e9a 100644 --- a/frontend/src/utils/api.js +++ b/frontend/src/utils/api.js @@ -484,8 +484,9 @@ export const api = { // TODO: Security improvement - use session cookie instead of token in URL // For now, send token as query param since EventSource doesn't support custom headers - const token = localStorage.getItem('token') - if (token) params.append('token', token) + // Using 'ssetoken' to avoid conflicts with endpoint 'token' parameters + const token = getToken() + if (token) params.append('ssetoken', token) if (modules) { Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v)) @@ -496,7 +497,7 @@ export const api = { // Return a Promise that resolves with final result return new Promise((resolve, reject) => { - const url = `${BASE_URL}/prompts/execute-stream?${params}` + const url = `${BASE}/prompts/execute-stream?${params}` const eventSource = new EventSource(url) @@ -511,18 +512,17 @@ export const api = { onProgress(data) } - // Check for final result + // Check for final result (/execute-stream liefert volles POST-/execute-Payload unter result) if (data.type === 'execution_complete') { - // Transform SSE result to match regular execute format - finalResult = { - type: 'workflow', - execution_id: data.execution_id, - status: data.status, - aggregated_result: data.aggregated_result, - debug: { - node_states: [] // TODO: collect from progress events if needed - } - } + finalResult = data.result + ? data.result + : { + type: 'workflow', + execution_id: data.execution_id, + status: data.status, + aggregated_result: data.aggregated_result, + debug: { node_states: [] }, + } eventSource.close() resolve(finalResult) } else if (data.type === 'execution_failed') {