Bug-Fixing Analyse Fehler #87
|
|
@ -13,6 +13,8 @@ import bcrypt
|
|||
|
||||
from db import get_db, get_cursor
|
||||
|
||||
print("[AUTH.PY] Module loaded - require_auth_flexible will be defined")
|
||||
|
||||
|
||||
def hash_pin(pin: str) -> str:
|
||||
"""Hash password with bcrypt. Falls back gracefully from legacy SHA256."""
|
||||
|
|
@ -76,21 +78,24 @@ def require_auth(x_auth_token: Optional[str] = Header(default=None)):
|
|||
return session
|
||||
|
||||
|
||||
def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), token: Optional[str] = Query(default=None)):
|
||||
def require_auth_flexible(x_auth_token: Optional[str] = Header(default=None), ssetoken: Optional[str] = Query(default=None)):
|
||||
"""
|
||||
FastAPI dependency - auth via header OR query parameter.
|
||||
|
||||
Used for endpoints accessed by <img> tags that can't send headers.
|
||||
Used for endpoints accessed by <img> tags and SSE connections that can't send headers.
|
||||
Query parameter is 'ssetoken' to avoid conflicts with endpoint 'token' parameters.
|
||||
|
||||
Usage:
|
||||
@app.get("/api/photos/{id}")
|
||||
def get_photo(id: str, session: dict = Depends(require_auth_flexible)):
|
||||
...
|
||||
|
||||
Call with: ?ssetoken=XXX or Header: X-Auth-Token: XXX
|
||||
|
||||
Raises:
|
||||
HTTPException 401 if not authenticated
|
||||
"""
|
||||
session = get_session(x_auth_token or token)
|
||||
session = get_session(x_auth_token or ssetoken)
|
||||
if not session:
|
||||
raise HTTPException(401, "Nicht eingeloggt")
|
||||
return session
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Header
|
|||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from db import get_db, get_cursor, r2d
|
||||
from auth import require_auth, require_admin
|
||||
from auth import require_auth, require_admin, require_auth_flexible
|
||||
from models import (
|
||||
PromptCreate, PromptUpdate, PromptGenerateRequest,
|
||||
PipelineConfigCreate, PipelineConfigUpdate
|
||||
|
|
@ -254,6 +254,178 @@ def import_prompts(
|
|||
}
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2)
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
from prompt_executor import execute_prompt_with_data
|
||||
from models import UnifiedPromptCreate, UnifiedPromptUpdate
|
||||
|
||||
|
||||
@router.get("/execute-stream")
|
||||
async def execute_unified_prompt_stream(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
|
||||
save: bool = Query(False, description="Save result to ai_insights"),
|
||||
session: dict = Depends(require_auth_flexible)
|
||||
):
|
||||
"""
|
||||
Execute a unified prompt with Server-Sent Events (SSE) streaming.
|
||||
|
||||
Returns live progress updates during workflow execution:
|
||||
- execution_started: Workflow has begun
|
||||
- node_complete: Each node completes
|
||||
- workflow_graph_finished: Workflow-Graph fertig (Zwischen-Info, kein Endergebnis)
|
||||
- execution_complete: Endergebnis (wie POST /execute, Feld result)
|
||||
- execution_failed: Error occurred
|
||||
|
||||
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
||||
"""
|
||||
profile_id = session['profile_id']
|
||||
|
||||
# Use default modules/timeframes (SSE doesn't support complex params)
|
||||
modules = {
|
||||
'körper': True,
|
||||
'ernährung': True,
|
||||
'training': True,
|
||||
'schlaf': True,
|
||||
'vitalwerte': True
|
||||
}
|
||||
|
||||
timeframes = {
|
||||
'körper': 30,
|
||||
'ernährung': 30,
|
||||
'training': 14,
|
||||
'schlaf': 14,
|
||||
'vitalwerte': 7
|
||||
}
|
||||
|
||||
# Wrapper function for OpenRouter calls
|
||||
async def workflow_llm_call(prompt: str, model: str = None) -> str:
|
||||
return await call_openrouter(prompt)
|
||||
|
||||
# SSE Event Generator
|
||||
async def event_stream():
|
||||
"""Generate Server-Sent Events during workflow execution."""
|
||||
import asyncio
|
||||
from asyncio import Queue
|
||||
|
||||
# Event queue for progress updates
|
||||
event_queue = Queue()
|
||||
|
||||
# Flag to track execution completion
|
||||
execution_complete = False
|
||||
|
||||
# Define progress callback for streaming updates
|
||||
async def progress_callback(event_type: str, data: dict):
|
||||
"""Queue SSE event for streaming to client."""
|
||||
event_data = {
|
||||
"type": event_type,
|
||||
**data
|
||||
}
|
||||
await event_queue.put(event_data)
|
||||
|
||||
# Start workflow execution in background task
|
||||
async def execute_workflow_async():
|
||||
nonlocal execution_complete
|
||||
try:
|
||||
# Execute workflow with progress callbacks
|
||||
result = await execute_prompt_with_data(
|
||||
prompt_slug=prompt_slug,
|
||||
profile_id=profile_id,
|
||||
modules=modules,
|
||||
timeframes=timeframes,
|
||||
openrouter_call_func=workflow_llm_call,
|
||||
enable_debug=debug or save,
|
||||
progress_callback=progress_callback
|
||||
)
|
||||
|
||||
# Save to ai_insights if requested (same logic as /execute)
|
||||
if save:
|
||||
if result['type'] == 'pipeline':
|
||||
final_output = result.get('output', {})
|
||||
if isinstance(final_output, dict) and len(final_output) == 1:
|
||||
content = list(final_output.values())[0]
|
||||
else:
|
||||
content = json.dumps(final_output, ensure_ascii=False)
|
||||
elif result['type'] == 'workflow':
|
||||
content = _workflow_user_facing_content(result.get('aggregated_result'))
|
||||
else:
|
||||
content = result.get('output', '')
|
||||
if isinstance(content, dict):
|
||||
content = json.dumps(content, ensure_ascii=False)
|
||||
|
||||
# Save to database (minimal metadata for now)
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute(
|
||||
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
|
||||
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
|
||||
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Pflicht für alle Prompt-Typen: Pipeline/Base rufen keinen progress_callback
|
||||
# mit Abschluss auf — ohne dieses Event endet SSE ohne resolve → „Connection to server lost“.
|
||||
try:
|
||||
sse_payload = json.loads(json.dumps(result, default=str))
|
||||
except (TypeError, ValueError):
|
||||
sse_payload = {
|
||||
"type": result.get("type", "unknown"),
|
||||
"error": "result_not_serializable",
|
||||
}
|
||||
await event_queue.put({
|
||||
"type": "execution_complete",
|
||||
"result": sse_payload,
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
# Queue error event
|
||||
await event_queue.put({
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
})
|
||||
finally:
|
||||
execution_complete = True
|
||||
|
||||
# Start workflow execution in background
|
||||
import asyncio
|
||||
execution_task = asyncio.create_task(execute_workflow_async())
|
||||
|
||||
# Stream events from queue
|
||||
try:
|
||||
while not execution_complete or not event_queue.empty():
|
||||
try:
|
||||
# Wait for event with timeout
|
||||
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
|
||||
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send keepalive ping
|
||||
yield f": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Wait for execution task to complete
|
||||
await execution_task
|
||||
|
||||
except Exception as e:
|
||||
# Send final error event
|
||||
error_event = {
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Disable nginx buffering
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{prompt_id}")
|
||||
def get_prompt(prompt_id: str, session: dict=Depends(require_auth)):
|
||||
"""Get single AI prompt by ID (UUID)."""
|
||||
|
|
@ -1437,177 +1609,6 @@ def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)
|
|||
return {"ok": True}
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2)
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
from prompt_executor import execute_prompt_with_data
|
||||
from models import UnifiedPromptCreate, UnifiedPromptUpdate
|
||||
|
||||
|
||||
@router.post("/execute-stream")
|
||||
async def execute_unified_prompt_stream(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"),
|
||||
modules: Optional[dict] = None,
|
||||
timeframes: Optional[dict] = None,
|
||||
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
|
||||
save: bool = Query(False, description="Save result to ai_insights")
|
||||
):
|
||||
"""
|
||||
Execute a unified prompt with Server-Sent Events (SSE) streaming.
|
||||
|
||||
Returns live progress updates during workflow execution:
|
||||
- execution_started: Workflow has begun
|
||||
- node_complete: Each node completes
|
||||
- execution_complete: Final result ready
|
||||
- execution_failed: Error occurred
|
||||
|
||||
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
||||
"""
|
||||
# Manual auth: verify token and get profile_id
|
||||
if not token:
|
||||
raise HTTPException(401, "Missing auth token")
|
||||
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
raise HTTPException(401, "Invalid or expired token")
|
||||
profile_id = row['profile_id']
|
||||
|
||||
# Use default modules/timeframes if not provided
|
||||
if not modules:
|
||||
modules = {
|
||||
'körper': True,
|
||||
'ernährung': True,
|
||||
'training': True,
|
||||
'schlaf': True,
|
||||
'vitalwerte': True
|
||||
}
|
||||
|
||||
if not timeframes:
|
||||
timeframes = {
|
||||
'körper': 30,
|
||||
'ernährung': 30,
|
||||
'training': 14,
|
||||
'schlaf': 14,
|
||||
'vitalwerte': 7
|
||||
}
|
||||
|
||||
# Wrapper function for OpenRouter calls
|
||||
async def workflow_llm_call(prompt: str, model: str = None) -> str:
|
||||
return await call_openrouter(prompt)
|
||||
|
||||
# SSE Event Generator
|
||||
async def event_stream():
|
||||
"""Generate Server-Sent Events during workflow execution."""
|
||||
import asyncio
|
||||
from asyncio import Queue
|
||||
|
||||
# Event queue for progress updates
|
||||
event_queue = Queue()
|
||||
|
||||
# Flag to track execution completion
|
||||
execution_complete = False
|
||||
|
||||
# Define progress callback for streaming updates
|
||||
async def progress_callback(event_type: str, data: dict):
|
||||
"""Queue SSE event for streaming to client."""
|
||||
event_data = {
|
||||
"type": event_type,
|
||||
**data
|
||||
}
|
||||
await event_queue.put(event_data)
|
||||
|
||||
# Start workflow execution in background task
|
||||
async def execute_workflow_async():
|
||||
nonlocal execution_complete
|
||||
try:
|
||||
# Execute workflow with progress callbacks
|
||||
result = await execute_prompt_with_data(
|
||||
prompt_slug=prompt_slug,
|
||||
profile_id=profile_id,
|
||||
modules=modules,
|
||||
timeframes=timeframes,
|
||||
openrouter_call_func=workflow_llm_call,
|
||||
enable_debug=debug or save,
|
||||
progress_callback=progress_callback
|
||||
)
|
||||
|
||||
# Save to ai_insights if requested (same logic as /execute)
|
||||
if save:
|
||||
if result['type'] == 'pipeline':
|
||||
final_output = result.get('output', {})
|
||||
if isinstance(final_output, dict) and len(final_output) == 1:
|
||||
content = list(final_output.values())[0]
|
||||
else:
|
||||
content = json.dumps(final_output, ensure_ascii=False)
|
||||
elif result['type'] == 'workflow':
|
||||
content = _workflow_user_facing_content(result.get('aggregated_result'))
|
||||
else:
|
||||
content = result.get('output', '')
|
||||
if isinstance(content, dict):
|
||||
content = json.dumps(content, ensure_ascii=False)
|
||||
|
||||
# Save to database (minimal metadata for now)
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute(
|
||||
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
|
||||
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
|
||||
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
# Queue error event
|
||||
await event_queue.put({
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
})
|
||||
finally:
|
||||
execution_complete = True
|
||||
|
||||
# Start workflow execution in background
|
||||
import asyncio
|
||||
execution_task = asyncio.create_task(execute_workflow_async())
|
||||
|
||||
# Stream events from queue
|
||||
try:
|
||||
while not execution_complete or not event_queue.empty():
|
||||
try:
|
||||
# Wait for event with timeout
|
||||
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
|
||||
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send keepalive ping
|
||||
yield f": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Wait for execution task to complete
|
||||
await execution_task
|
||||
|
||||
except Exception as e:
|
||||
# Send final error event
|
||||
error_event = {
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Disable nginx buffering
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.post("/execute")
|
||||
async def execute_unified_prompt(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
|
|
|
|||
|
|
@ -213,9 +213,11 @@ async def execute_workflow(
|
|||
|
||||
logger.info(f"Workflow execution completed: {execution_id}")
|
||||
|
||||
# NEW: Progress-Callback für erfolgreiche Fertigstellung
|
||||
# Fortschritt: kein type=execution_complete — das sendet /execute-stream einmalig
|
||||
# mit vollem execute_prompt_with_data-Result (Pipeline/Base/Workflow), sonst schließt
|
||||
# der Client nach Workflow vorzeitig ohne debug/node_states oder Pipeline bricht ab.
|
||||
if progress_callback:
|
||||
await progress_callback("execution_complete", {
|
||||
await progress_callback("workflow_graph_finished", {
|
||||
"execution_id": execution_id,
|
||||
"status": "completed",
|
||||
"aggregated_result": aggregated,
|
||||
|
|
|
|||
|
|
@ -8,6 +8,9 @@ server {
|
|||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
client_max_body_size 20M;
|
||||
proxy_read_timeout 300s;
|
||||
proxy_connect_timeout 60s;
|
||||
proxy_send_timeout 60s;
|
||||
}
|
||||
|
||||
location / {
|
||||
|
|
|
|||
|
|
@ -484,8 +484,9 @@ export const api = {
|
|||
|
||||
// TODO: Security improvement - use session cookie instead of token in URL
|
||||
// For now, send token as query param since EventSource doesn't support custom headers
|
||||
const token = localStorage.getItem('token')
|
||||
if (token) params.append('token', token)
|
||||
// Using 'ssetoken' to avoid conflicts with endpoint 'token' parameters
|
||||
const token = getToken()
|
||||
if (token) params.append('ssetoken', token)
|
||||
|
||||
if (modules) {
|
||||
Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v))
|
||||
|
|
@ -496,7 +497,7 @@ export const api = {
|
|||
|
||||
// Return a Promise that resolves with final result
|
||||
return new Promise((resolve, reject) => {
|
||||
const url = `${BASE_URL}/prompts/execute-stream?${params}`
|
||||
const url = `${BASE}/prompts/execute-stream?${params}`
|
||||
|
||||
const eventSource = new EventSource(url)
|
||||
|
||||
|
|
@ -511,17 +512,16 @@ export const api = {
|
|||
onProgress(data)
|
||||
}
|
||||
|
||||
// Check for final result
|
||||
// Check for final result (/execute-stream liefert volles POST-/execute-Payload unter result)
|
||||
if (data.type === 'execution_complete') {
|
||||
// Transform SSE result to match regular execute format
|
||||
finalResult = {
|
||||
finalResult = data.result
|
||||
? data.result
|
||||
: {
|
||||
type: 'workflow',
|
||||
execution_id: data.execution_id,
|
||||
status: data.status,
|
||||
aggregated_result: data.aggregated_result,
|
||||
debug: {
|
||||
node_states: [] // TODO: collect from progress events if needed
|
||||
}
|
||||
debug: { node_states: [] },
|
||||
}
|
||||
eventSource.close()
|
||||
resolve(finalResult)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user