Merge pull request 'Async Workflow' (#82) from develop into main
Reviewed-on: #82
This commit is contained in:
commit
f34e46b04f
|
|
@ -167,7 +167,8 @@ async def execute_prompt(
|
|||
prompt_slug: str,
|
||||
variables: Dict[str, Any],
|
||||
openrouter_call_func,
|
||||
enable_debug: bool = False
|
||||
enable_debug: bool = False,
|
||||
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a single prompt (base or pipeline type).
|
||||
|
|
@ -217,7 +218,7 @@ async def execute_prompt(
|
|||
|
||||
elif prompt_type == 'workflow':
|
||||
# Workflow prompt: graph-based execution (Phase 0: Foundation)
|
||||
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
|
||||
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback)
|
||||
|
||||
else:
|
||||
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
|
||||
|
|
@ -469,7 +470,8 @@ async def execute_prompt_with_data(
|
|||
modules: Optional[Dict[str, bool]] = None,
|
||||
timeframes: Optional[Dict[str, int]] = None,
|
||||
openrouter_call_func = None,
|
||||
enable_debug: bool = False
|
||||
enable_debug: bool = False,
|
||||
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute prompt with data loaded from database.
|
||||
|
|
@ -605,7 +607,7 @@ async def execute_prompt_with_data(
|
|||
variables['goals_data'] = []
|
||||
|
||||
# Execute prompt
|
||||
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug)
|
||||
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback)
|
||||
|
||||
|
||||
async def execute_workflow_prompt(
|
||||
|
|
@ -613,7 +615,8 @@ async def execute_workflow_prompt(
|
|||
variables: Dict[str, Any],
|
||||
openrouter_call_func,
|
||||
enable_debug: bool = False,
|
||||
catalog: Optional[Dict] = None
|
||||
catalog: Optional[Dict] = None,
|
||||
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a workflow-type prompt (graph-based execution).
|
||||
|
|
@ -652,7 +655,8 @@ async def execute_workflow_prompt(
|
|||
profile_id=variables.get('profile_id', 'unknown'), # From context
|
||||
variables=variables,
|
||||
openrouter_call_func=openrouter_call_func,
|
||||
enable_debug=enable_debug
|
||||
enable_debug=enable_debug,
|
||||
progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen
|
||||
)
|
||||
|
||||
# Convert ExecutionResult to dict for API response
|
||||
|
|
|
|||
|
|
@ -1445,13 +1445,176 @@ from prompt_executor import execute_prompt_with_data
|
|||
from models import UnifiedPromptCreate, UnifiedPromptUpdate
|
||||
|
||||
|
||||
@router.post("/execute")
|
||||
async def execute_unified_prompt(
|
||||
prompt_slug: str,
|
||||
@router.post("/execute-stream")
|
||||
async def execute_unified_prompt_stream(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"),
|
||||
modules: Optional[dict] = None,
|
||||
timeframes: Optional[dict] = None,
|
||||
debug: bool = False,
|
||||
save: bool = False,
|
||||
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
|
||||
save: bool = Query(False, description="Save result to ai_insights")
|
||||
):
|
||||
"""
|
||||
Execute a unified prompt with Server-Sent Events (SSE) streaming.
|
||||
|
||||
Returns live progress updates during workflow execution:
|
||||
- execution_started: Workflow has begun
|
||||
- node_complete: Each node completes
|
||||
- execution_complete: Final result ready
|
||||
- execution_failed: Error occurred
|
||||
|
||||
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
|
||||
"""
|
||||
# Manual auth: verify token and get profile_id
|
||||
if not token:
|
||||
raise HTTPException(401, "Missing auth token")
|
||||
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
raise HTTPException(401, "Invalid or expired token")
|
||||
profile_id = row['profile_id']
|
||||
|
||||
# Use default modules/timeframes if not provided
|
||||
if not modules:
|
||||
modules = {
|
||||
'körper': True,
|
||||
'ernährung': True,
|
||||
'training': True,
|
||||
'schlaf': True,
|
||||
'vitalwerte': True
|
||||
}
|
||||
|
||||
if not timeframes:
|
||||
timeframes = {
|
||||
'körper': 30,
|
||||
'ernährung': 30,
|
||||
'training': 14,
|
||||
'schlaf': 14,
|
||||
'vitalwerte': 7
|
||||
}
|
||||
|
||||
# Wrapper function for OpenRouter calls
|
||||
async def workflow_llm_call(prompt: str, model: str = None) -> str:
|
||||
return await call_openrouter(prompt)
|
||||
|
||||
# SSE Event Generator
|
||||
async def event_stream():
|
||||
"""Generate Server-Sent Events during workflow execution."""
|
||||
import asyncio
|
||||
from asyncio import Queue
|
||||
|
||||
# Event queue for progress updates
|
||||
event_queue = Queue()
|
||||
|
||||
# Flag to track execution completion
|
||||
execution_complete = False
|
||||
|
||||
# Define progress callback for streaming updates
|
||||
async def progress_callback(event_type: str, data: dict):
|
||||
"""Queue SSE event for streaming to client."""
|
||||
event_data = {
|
||||
"type": event_type,
|
||||
**data
|
||||
}
|
||||
await event_queue.put(event_data)
|
||||
|
||||
# Start workflow execution in background task
|
||||
async def execute_workflow_async():
|
||||
nonlocal execution_complete
|
||||
try:
|
||||
# Execute workflow with progress callbacks
|
||||
result = await execute_prompt_with_data(
|
||||
prompt_slug=prompt_slug,
|
||||
profile_id=profile_id,
|
||||
modules=modules,
|
||||
timeframes=timeframes,
|
||||
openrouter_call_func=workflow_llm_call,
|
||||
enable_debug=debug or save,
|
||||
progress_callback=progress_callback
|
||||
)
|
||||
|
||||
# Save to ai_insights if requested (same logic as /execute)
|
||||
if save:
|
||||
if result['type'] == 'pipeline':
|
||||
final_output = result.get('output', {})
|
||||
if isinstance(final_output, dict) and len(final_output) == 1:
|
||||
content = list(final_output.values())[0]
|
||||
else:
|
||||
content = json.dumps(final_output, ensure_ascii=False)
|
||||
elif result['type'] == 'workflow':
|
||||
content = _workflow_user_facing_content(result.get('aggregated_result'))
|
||||
else:
|
||||
content = result.get('output', '')
|
||||
if isinstance(content, dict):
|
||||
content = json.dumps(content, ensure_ascii=False)
|
||||
|
||||
# Save to database (minimal metadata for now)
|
||||
with get_db() as conn:
|
||||
cur = get_cursor(conn)
|
||||
cur.execute(
|
||||
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
|
||||
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
|
||||
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
except Exception as e:
|
||||
# Queue error event
|
||||
await event_queue.put({
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
})
|
||||
finally:
|
||||
execution_complete = True
|
||||
|
||||
# Start workflow execution in background
|
||||
import asyncio
|
||||
execution_task = asyncio.create_task(execute_workflow_async())
|
||||
|
||||
# Stream events from queue
|
||||
try:
|
||||
while not execution_complete or not event_queue.empty():
|
||||
try:
|
||||
# Wait for event with timeout
|
||||
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
|
||||
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
# Send keepalive ping
|
||||
yield f": keepalive\n\n"
|
||||
continue
|
||||
|
||||
# Wait for execution task to complete
|
||||
await execution_task
|
||||
|
||||
except Exception as e:
|
||||
# Send final error event
|
||||
error_event = {
|
||||
"type": "execution_failed",
|
||||
"error": str(e)
|
||||
}
|
||||
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no" # Disable nginx buffering
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@router.post("/execute")
|
||||
async def execute_unified_prompt(
|
||||
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
|
||||
modules: Optional[dict] = None,
|
||||
timeframes: Optional[dict] = None,
|
||||
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
|
||||
save: bool = Query(False, description="Save result to ai_insights"),
|
||||
session: dict = Depends(require_auth)
|
||||
):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -42,7 +42,8 @@ async def execute_workflow(
|
|||
profile_id: str = None,
|
||||
variables: Dict[str, Any] = None,
|
||||
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
|
||||
enable_debug: bool = False
|
||||
enable_debug: bool = False,
|
||||
progress_callback = None # NEW: Optional callback für Progress-Updates: async (event_type, data) -> None
|
||||
) -> ExecutionResult:
|
||||
"""
|
||||
Führt einen Workflow aus (mit conditional branching und path consolidation).
|
||||
|
|
@ -76,6 +77,13 @@ async def execute_workflow(
|
|||
|
||||
logger.info(f"Starting workflow execution: {execution_id}")
|
||||
|
||||
# NEW: Progress-Callback für Start
|
||||
if progress_callback:
|
||||
await progress_callback("execution_started", {
|
||||
"execution_id": execution_id,
|
||||
"started_at": started_at
|
||||
})
|
||||
|
||||
try:
|
||||
# 1. Lade Workflow-Definition
|
||||
if graph_data:
|
||||
|
|
@ -161,6 +169,20 @@ async def execute_workflow(
|
|||
node_states.append(node_state)
|
||||
context["node_results"][node_id] = node_state
|
||||
|
||||
# NEW: Progress-Callback aufrufen (für SSE Streaming)
|
||||
if progress_callback:
|
||||
# Create a meaningful label for the node
|
||||
node_label = node.prompt_slug if hasattr(node, 'prompt_slug') and node.prompt_slug else f"{node.type.value}-{node_id[:8]}"
|
||||
await progress_callback("node_complete", {
|
||||
"node_id": node_id,
|
||||
"node_type": node.type,
|
||||
"node_label": node_label,
|
||||
"status": node_state.status.value,
|
||||
"total_nodes": len(graph.nodes),
|
||||
"completed_nodes": len([ns for ns in node_states if ns.status in [NodeStatus.EXECUTED, NodeStatus.SKIPPED]]),
|
||||
"error": node_state.error if node_state.status == NodeStatus.FAILED else None
|
||||
})
|
||||
|
||||
# Füge Nachfolger zur Queue hinzu
|
||||
outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
|
||||
for edge in outgoing_edges:
|
||||
|
|
@ -185,6 +207,19 @@ async def execute_workflow(
|
|||
|
||||
logger.info(f"Workflow execution completed: {execution_id}")
|
||||
|
||||
# NEW: Progress-Callback für erfolgreiche Fertigstellung
|
||||
if progress_callback:
|
||||
await progress_callback("execution_complete", {
|
||||
"execution_id": execution_id,
|
||||
"status": "completed",
|
||||
"aggregated_result": aggregated,
|
||||
"total_nodes": len(node_states),
|
||||
"completed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.EXECUTED]),
|
||||
"skipped_nodes": len([ns for ns in node_states if ns.status == NodeStatus.SKIPPED]),
|
||||
"failed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.FAILED]),
|
||||
"completed_at": completed_at
|
||||
})
|
||||
|
||||
return ExecutionResult(
|
||||
execution_id=execution_id,
|
||||
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
|
||||
|
|
@ -198,6 +233,15 @@ async def execute_workflow(
|
|||
except Exception as e:
|
||||
logger.error(f"Workflow execution failed: {e}", exc_info=True)
|
||||
|
||||
# NEW: Progress-Callback für Fehler
|
||||
if progress_callback:
|
||||
await progress_callback("execution_failed", {
|
||||
"execution_id": execution_id,
|
||||
"status": "failed",
|
||||
"error": str(e),
|
||||
"completed_at": datetime.utcnow().isoformat()
|
||||
})
|
||||
|
||||
# Speichere Failed State
|
||||
completed_at = datetime.utcnow().isoformat()
|
||||
save_execution_state(
|
||||
|
|
@ -399,10 +443,49 @@ def execute_logic_node(
|
|||
|
||||
try:
|
||||
if not node.condition:
|
||||
raise ValueError(f"Logic node {node.id} has no condition")
|
||||
error_msg = f"Logic node {node.id} has no condition defined"
|
||||
logger.error(error_msg)
|
||||
return NodeExecutionState(
|
||||
node_id=node.id,
|
||||
status=NodeStatus.FAILED,
|
||||
error=error_msg,
|
||||
started_at=started_at,
|
||||
completed_at=datetime.utcnow().isoformat()
|
||||
)
|
||||
|
||||
# Handle both formats (thanks to Union[LogicExpression, Condition] type):
|
||||
# 1. Direct LogicExpression (UI format): node.condition is LogicExpression
|
||||
# 2. Wrapped in Condition (legacy): node.condition is Condition with .expression
|
||||
from workflow_models import LogicExpression, Condition
|
||||
|
||||
expression = None
|
||||
|
||||
if isinstance(node.condition, LogicExpression):
|
||||
# UI format: direct LogicExpression
|
||||
expression = node.condition
|
||||
elif isinstance(node.condition, Condition):
|
||||
# Legacy format: wrapped in Condition
|
||||
expression = node.condition.expression
|
||||
else:
|
||||
# Fallback: try to detect format manually
|
||||
if hasattr(node.condition, 'operator') and hasattr(node.condition, 'operands'):
|
||||
expression = node.condition # Looks like LogicExpression
|
||||
elif hasattr(node.condition, 'expression'):
|
||||
expression = node.condition.expression # Looks like Condition
|
||||
|
||||
if expression is None:
|
||||
error_msg = f"Logic node {node.id} has no valid condition/expression defined"
|
||||
logger.error(error_msg)
|
||||
return NodeExecutionState(
|
||||
node_id=node.id,
|
||||
status=NodeStatus.FAILED,
|
||||
error=error_msg,
|
||||
started_at=started_at,
|
||||
completed_at=datetime.utcnow().isoformat()
|
||||
)
|
||||
|
||||
# 1. Evaluiere Bedingung
|
||||
result, error = evaluate_logic_expression(node.condition.expression, context)
|
||||
result, error = evaluate_logic_expression(expression, context)
|
||||
|
||||
if error:
|
||||
# Fehler bei Evaluation → Fallback anwenden
|
||||
|
|
@ -777,17 +860,31 @@ def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[
|
|||
"""
|
||||
Findet alle ausgehenden Edges mit bestimmtem Label.
|
||||
|
||||
Unterstützt beide Formate:
|
||||
- Legacy: e.label == label (z.B. "then", "else")
|
||||
- UI: e.source_handle == label (z.B. "true", "false")
|
||||
|
||||
Args:
|
||||
node_id: Node-ID
|
||||
label: Edge-Label (z.B. "then", "else", "uncertainty")
|
||||
label: Edge-Label oder sourceHandle (z.B. "then"/"true", "else"/"false")
|
||||
graph: WorkflowGraph
|
||||
|
||||
Returns:
|
||||
Liste von Edge-IDs
|
||||
"""
|
||||
# Map label to sourceHandle equivalents
|
||||
label_to_handle = {
|
||||
"then": "true",
|
||||
"else": "false"
|
||||
}
|
||||
handle_equivalent = label_to_handle.get(label, label)
|
||||
|
||||
matching_edges = [
|
||||
e.id for e in graph.edges
|
||||
if e.from_node == node_id and e.label == label
|
||||
if e.from_node == node_id and (
|
||||
e.label == label or # Legacy format
|
||||
(hasattr(e, 'source_handle') and e.source_handle == handle_equivalent) # UI format
|
||||
)
|
||||
]
|
||||
return matching_edges
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen.
|
|||
Konzept-Basis: konzept_workflow_engine_konsolidated.md
|
||||
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md
|
||||
"""
|
||||
from typing import Optional, List, Dict, Any
|
||||
from typing import Optional, List, Dict, Any, Union
|
||||
from pydantic import BaseModel, Field
|
||||
from enum import Enum
|
||||
|
||||
|
|
@ -148,18 +148,27 @@ class LogicExpression(BaseModel):
|
|||
}
|
||||
"""
|
||||
operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator")
|
||||
operands: Optional[List[Any]] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)")
|
||||
operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)")
|
||||
# Bei einfachem Vergleich:
|
||||
ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)")
|
||||
value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)")
|
||||
|
||||
# Enable forward reference resolution for recursive model
|
||||
LogicExpression.model_rebuild()
|
||||
|
||||
|
||||
class Condition(BaseModel):
|
||||
"""
|
||||
Bedingung für einen Logik-Knoten.
|
||||
|
||||
Unterstützt if/else-if/else-Logik.
|
||||
|
||||
Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression.
|
||||
If unknown fields are present (like 'operator', 'operands'), deserialization fails
|
||||
and Pydantic tries LogicExpression instead.
|
||||
"""
|
||||
model_config = {'extra': 'forbid'}
|
||||
|
||||
type: str = Field(default="if", description="Bedingungstyp: if, else-if, else")
|
||||
expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')")
|
||||
then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad")
|
||||
|
|
@ -195,7 +204,8 @@ class WorkflowNode(BaseModel):
|
|||
question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)")
|
||||
|
||||
# LOGIC-Knoten
|
||||
condition: Optional[Condition] = Field(None, description="Bedingung für Pfad-Routing")
|
||||
# Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy)
|
||||
condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing")
|
||||
fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration")
|
||||
|
||||
# JOIN-Knoten (Phase 4)
|
||||
|
|
@ -220,6 +230,10 @@ class WorkflowEdge(BaseModel):
|
|||
to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID")
|
||||
label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')")
|
||||
|
||||
# UI-Format fields (React Flow)
|
||||
source_handle: Optional[str] = Field(None, alias="sourceHandle", description="Source handle ID (UI format: 'true', 'false', 'out')")
|
||||
target_handle: Optional[str] = Field(None, alias="targetHandle", description="Target handle ID (UI format: 'in', 'path_1', etc.)")
|
||||
|
||||
|
||||
class WorkflowGraph(BaseModel):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -338,6 +338,8 @@ export default function Analysis() {
|
|||
/** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */
|
||||
const [activeCategoryKey, setActiveCategoryKey] = useState(null)
|
||||
const [historyScopePick, setHistoryScopePick] = useState(null)
|
||||
// NEW: Progress tracking for SSE workflows
|
||||
const [progress, setProgress] = useState(null) // { total_nodes, completed_nodes, current_node_label }
|
||||
|
||||
const loadAll = async () => {
|
||||
const [p, i] = await Promise.all([
|
||||
|
|
@ -377,10 +379,21 @@ export default function Analysis() {
|
|||
}, [newResult?.scope, prompts])
|
||||
|
||||
const runPrompt = async (slug) => {
|
||||
setLoading(slug); setError(null); setNewResult(null)
|
||||
setLoading(slug); setError(null); setNewResult(null); setProgress(null)
|
||||
try {
|
||||
// Use new unified executor with save=true
|
||||
const result = await api.executeUnifiedPrompt(slug, null, null, false, true)
|
||||
// Use SSE-based executor for long-running workflows
|
||||
const result = await api.executeUnifiedPromptStream(slug, null, null, false, true, (event) => {
|
||||
// Progress callback: update UI in real-time
|
||||
if (event.type === 'execution_started') {
|
||||
setProgress({ total_nodes: 0, completed_nodes: 0, current_node_label: 'Starte...' })
|
||||
} else if (event.type === 'node_complete') {
|
||||
setProgress({
|
||||
total_nodes: event.total_nodes || 0,
|
||||
completed_nodes: event.completed_nodes || 0,
|
||||
current_node_label: event.node_label || `Node ${event.node_id}`
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
// Transform result to match old format for InsightCard
|
||||
let content = ''
|
||||
|
|
@ -434,7 +447,10 @@ export default function Analysis() {
|
|||
setTab('run')
|
||||
} catch(e) {
|
||||
setError('Fehler: ' + e.message)
|
||||
} finally { setLoading(null) }
|
||||
} finally {
|
||||
setLoading(null)
|
||||
setProgress(null) // Clear progress
|
||||
}
|
||||
}
|
||||
|
||||
const deleteInsight = async (id) => {
|
||||
|
|
@ -618,7 +634,9 @@ export default function Analysis() {
|
|||
disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)}
|
||||
>
|
||||
{loading===p.slug
|
||||
? <><div className="spinner" style={{width:13,height:13}}/> Läuft…</>
|
||||
? (progress
|
||||
? <><div className="spinner" style={{width:13,height:13}}/> {progress.completed_nodes}/{progress.total_nodes} Nodes</>
|
||||
: <><div className="spinner" style={{width:13,height:13}}/> Läuft…</>)
|
||||
: (aiUsage && !aiUsage.allowed) ? '🔒 Limit'
|
||||
: <><Brain size={13}/> Starten</>}
|
||||
</button>
|
||||
|
|
|
|||
|
|
@ -402,6 +402,72 @@ export const api = {
|
|||
return req('/prompts/execute?' + params, json(body))
|
||||
},
|
||||
|
||||
// NEW: SSE-based execution for long-running workflows
|
||||
executeUnifiedPromptStream: (slug, modules=null, timeframes=null, debug=false, save=false, onProgress=null) => {
|
||||
const params = new URLSearchParams({ prompt_slug: slug })
|
||||
if (debug) params.append('debug', 'true')
|
||||
if (save) params.append('save', 'true')
|
||||
|
||||
// TODO: Security improvement - use session cookie instead of token in URL
|
||||
// For now, send token as query param since EventSource doesn't support custom headers
|
||||
const token = localStorage.getItem('token')
|
||||
if (token) params.append('token', token)
|
||||
|
||||
if (modules) {
|
||||
Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v))
|
||||
}
|
||||
if (timeframes) {
|
||||
Object.entries(timeframes).forEach(([k, v]) => params.append(`timeframes[${k}]`, v))
|
||||
}
|
||||
|
||||
// Return a Promise that resolves with final result
|
||||
return new Promise((resolve, reject) => {
|
||||
const url = `${BASE_URL}/prompts/execute-stream?${params}`
|
||||
|
||||
const eventSource = new EventSource(url)
|
||||
|
||||
let finalResult = null
|
||||
|
||||
eventSource.onmessage = (event) => {
|
||||
try {
|
||||
const data = JSON.parse(event.data)
|
||||
|
||||
// Call progress callback if provided
|
||||
if (onProgress) {
|
||||
onProgress(data)
|
||||
}
|
||||
|
||||
// Check for final result
|
||||
if (data.type === 'execution_complete') {
|
||||
// Transform SSE result to match regular execute format
|
||||
finalResult = {
|
||||
type: 'workflow',
|
||||
execution_id: data.execution_id,
|
||||
status: data.status,
|
||||
aggregated_result: data.aggregated_result,
|
||||
debug: {
|
||||
node_states: [] // TODO: collect from progress events if needed
|
||||
}
|
||||
}
|
||||
eventSource.close()
|
||||
resolve(finalResult)
|
||||
} else if (data.type === 'execution_failed') {
|
||||
eventSource.close()
|
||||
reject(new Error(data.error || 'Workflow execution failed'))
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('Error parsing SSE event:', e)
|
||||
}
|
||||
}
|
||||
|
||||
eventSource.onerror = (error) => {
|
||||
console.error('SSE connection error:', error)
|
||||
eventSource.close()
|
||||
reject(new Error('Connection to server lost'))
|
||||
}
|
||||
})
|
||||
},
|
||||
|
||||
// Workflow Execution (Part 2: Frontend Execute Integration)
|
||||
executeWorkflow: (slug, variables=null, debug=true, save=false) => {
|
||||
const params = new URLSearchParams({ prompt_slug: slug })
|
||||
|
|
|
|||
246
test_condition_parsing.py
Normal file
246
test_condition_parsing.py
Normal file
|
|
@ -0,0 +1,246 @@
|
|||
"""
|
||||
Test Condition Parsing - Alle Formate und Verschachtelungen
|
||||
|
||||
Testet ob Pydantic die verschiedenen Condition-Formate korrekt deserialisiert.
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Force UTF-8 encoding on Windows
|
||||
if sys.platform == 'win32':
|
||||
import io
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||||
|
||||
sys.path.insert(0, 'backend')
|
||||
|
||||
from workflow_models import LogicExpression, Condition, WorkflowNode, LogicOperator
|
||||
from pydantic import ValidationError
|
||||
import json
|
||||
|
||||
def test_case(name, data, expected_type, should_fail=False):
|
||||
"""Test a single condition format"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"TEST: {name}")
|
||||
print(f"{'='*60}")
|
||||
print(f"Input: {json.dumps(data, indent=2)}")
|
||||
|
||||
try:
|
||||
# Test 1: Direct deserialization
|
||||
if expected_type == LogicExpression:
|
||||
result = LogicExpression(**data)
|
||||
elif expected_type == Condition:
|
||||
result = Condition(**data)
|
||||
|
||||
if should_fail:
|
||||
print("❌ FAILED: Should have raised ValidationError but didn't")
|
||||
return False
|
||||
|
||||
print(f"✅ PASSED: Deserialized as {type(result).__name__}")
|
||||
print(f"Result: {result.model_dump()}")
|
||||
|
||||
# Test 2: As part of WorkflowNode
|
||||
node_data = {
|
||||
"id": "test_node",
|
||||
"type": "logic",
|
||||
"condition": data
|
||||
}
|
||||
node = WorkflowNode(**node_data)
|
||||
print(f"✅ PASSED: WorkflowNode.condition type: {type(node.condition).__name__}")
|
||||
|
||||
return True
|
||||
|
||||
except ValidationError as e:
|
||||
if should_fail:
|
||||
print(f"✅ PASSED: Correctly raised ValidationError")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ FAILED: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"❌ FAILED: Unexpected error: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Test Cases
|
||||
# ============================================================================
|
||||
|
||||
test_results = []
|
||||
|
||||
# Test 1: Simple comparison (UI format - einfachster Fall)
|
||||
test_results.append(test_case(
|
||||
"Simple comparison (eq)",
|
||||
{
|
||||
"operator": "eq",
|
||||
"ref": "node_1.q1",
|
||||
"value": "ja"
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 2: Simple AND (UI format - wie im Workflow)
|
||||
test_results.append(test_case(
|
||||
"Simple AND with 2 operands",
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_5.qTiefananalyseRecovery", "value": "ja"},
|
||||
{"operator": "neq", "ref": "node_6.qKonsistenz", "value": "nein"}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 3: Simple OR
|
||||
test_results.append(test_case(
|
||||
"Simple OR with 3 operands",
|
||||
{
|
||||
"operator": "or",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
|
||||
{"operator": "eq", "ref": "node_1.q2", "value": "nein"},
|
||||
{"operator": "eq", "ref": "node_1.q3", "value": "unklar"}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 4: Nested AND/OR
|
||||
test_results.append(test_case(
|
||||
"Nested: OR with nested AND",
|
||||
{
|
||||
"operator": "or",
|
||||
"operands": [
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
|
||||
{"operator": "neq", "ref": "node_1.q2", "value": "nein"}
|
||||
]
|
||||
},
|
||||
{"operator": "eq", "ref": "node_2.q1", "value": "hoch"}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 5: Deep nesting (3 levels)
|
||||
test_results.append(test_case(
|
||||
"Deep nesting (3 levels)",
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{
|
||||
"operator": "or",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_2.q1", "value": "hoch"},
|
||||
{"operator": "neq", "ref": "node_2.q2", "value": "niedrig"}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{"operator": "eq", "ref": "node_3.q1", "value": "aktiv"}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 6: Different operators
|
||||
test_results.append(test_case(
|
||||
"Different comparison operators (gt, lt, in)",
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{"operator": "gt", "ref": "node_1.score", "value": 50},
|
||||
{"operator": "lt", "ref": "node_1.score", "value": 100},
|
||||
{"operator": "in", "ref": "node_1.category", "value": ["high", "medium"]}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 7: Legacy format (wrapped in Condition)
|
||||
test_results.append(test_case(
|
||||
"Legacy format: Condition with expression",
|
||||
{
|
||||
"type": "if",
|
||||
"expression": {
|
||||
"operator": "eq",
|
||||
"ref": "node_1.q1",
|
||||
"value": "ja"
|
||||
},
|
||||
"then_path": "edge_1",
|
||||
"else_path": "edge_2"
|
||||
},
|
||||
Condition
|
||||
))
|
||||
|
||||
# Test 8: NOT operator
|
||||
test_results.append(test_case(
|
||||
"NOT operator",
|
||||
{
|
||||
"operator": "not",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_1.q1", "value": "nein"}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# Test 9: Complex real-world scenario
|
||||
test_results.append(test_case(
|
||||
"Complex real-world: Multiple nested conditions",
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{
|
||||
"operator": "or",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_1.relevance", "value": "high"},
|
||||
{
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_1.relevance", "value": "medium"},
|
||||
{"operator": "gt", "ref": "node_1.priority", "value": 5}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{"operator": "neq", "ref": "node_2.status", "value": "blocked"},
|
||||
{
|
||||
"operator": "in",
|
||||
"ref": "node_3.category",
|
||||
"value": ["training", "nutrition", "recovery"]
|
||||
}
|
||||
]
|
||||
},
|
||||
LogicExpression
|
||||
))
|
||||
|
||||
# ============================================================================
|
||||
# Results Summary
|
||||
# ============================================================================
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("TEST RESULTS SUMMARY")
|
||||
print("="*60)
|
||||
|
||||
passed = sum(test_results)
|
||||
total = len(test_results)
|
||||
failed = total - passed
|
||||
|
||||
print(f"\n✅ Passed: {passed}/{total}")
|
||||
if failed > 0:
|
||||
print(f"❌ Failed: {failed}/{total}")
|
||||
print(f"\n⚠️ CRITICAL: Some tests failed! Do NOT deploy until fixed.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print(f"\n🎉 All tests passed! Safe to deploy.")
|
||||
sys.exit(0)
|
||||
113
test_condition_union.py
Normal file
113
test_condition_union.py
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
"""
|
||||
Test Union[LogicExpression, Condition] type resolution
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
|
||||
if sys.platform == 'win32':
|
||||
import io
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||||
|
||||
sys.path.insert(0, 'backend')
|
||||
|
||||
from workflow_models import LogicExpression, Condition, WorkflowNode
|
||||
import json
|
||||
|
||||
# Test 1: UI format (should be LogicExpression)
|
||||
print("\n" + "="*60)
|
||||
print("TEST 1: UI Format (direct LogicExpression)")
|
||||
print("="*60)
|
||||
|
||||
ui_format = {
|
||||
"operator": "and",
|
||||
"operands": [
|
||||
{"operator": "eq", "ref": "node_5.qTiefananalyseRecovery", "value": "ja"},
|
||||
{"operator": "neq", "ref": "node_6.qKonsistenz", "value": "nein"}
|
||||
]
|
||||
}
|
||||
|
||||
node = WorkflowNode(
|
||||
id="test_node",
|
||||
type="logic",
|
||||
condition=ui_format
|
||||
)
|
||||
|
||||
print(f"Input: {json.dumps(ui_format, indent=2)}")
|
||||
print(f"node.condition type: {type(node.condition).__name__}")
|
||||
print(f"Expected: LogicExpression")
|
||||
|
||||
if isinstance(node.condition, LogicExpression):
|
||||
print("✅ CORRECT: Deserialized as LogicExpression")
|
||||
print(f"Has operator: {hasattr(node.condition, 'operator')} = {node.condition.operator if hasattr(node.condition, 'operator') else 'N/A'}")
|
||||
print(f"Has operands: {hasattr(node.condition, 'operands')} = {len(node.condition.operands) if hasattr(node.condition, 'operands') and node.condition.operands else 'N/A'}")
|
||||
elif isinstance(node.condition, Condition):
|
||||
print("❌ WRONG: Deserialized as Condition (should be LogicExpression)")
|
||||
print(f"Has expression: {hasattr(node.condition, 'expression')} = {type(node.condition.expression).__name__ if hasattr(node.condition, 'expression') and node.condition.expression else 'N/A'}")
|
||||
if hasattr(node.condition, 'expression') and node.condition.expression:
|
||||
print(f"expression.operator: {node.condition.expression.operator}")
|
||||
print(f"expression.operands: {len(node.condition.expression.operands) if node.condition.expression.operands else 0}")
|
||||
else:
|
||||
print(f"❌ UNEXPECTED TYPE: {type(node.condition)}")
|
||||
|
||||
# Test 2: Legacy format (should be Condition)
|
||||
print("\n" + "="*60)
|
||||
print("TEST 2: Legacy Format (Condition with expression)")
|
||||
print("="*60)
|
||||
|
||||
legacy_format = {
|
||||
"type": "if",
|
||||
"expression": {
|
||||
"operator": "eq",
|
||||
"ref": "node_1.q1",
|
||||
"value": "ja"
|
||||
},
|
||||
"then_path": "edge_1",
|
||||
"else_path": "edge_2"
|
||||
}
|
||||
|
||||
node2 = WorkflowNode(
|
||||
id="test_node2",
|
||||
type="logic",
|
||||
condition=legacy_format
|
||||
)
|
||||
|
||||
print(f"Input: {json.dumps(legacy_format, indent=2)}")
|
||||
print(f"node.condition type: {type(node2.condition).__name__}")
|
||||
print(f"Expected: Condition")
|
||||
|
||||
if isinstance(node2.condition, Condition):
|
||||
print("✅ CORRECT: Deserialized as Condition")
|
||||
print(f"Has expression: {hasattr(node2.condition, 'expression')}")
|
||||
print(f"Has then_path: {hasattr(node2.condition, 'then_path')} = {node2.condition.then_path}")
|
||||
elif isinstance(node2.condition, LogicExpression):
|
||||
print("❌ WRONG: Deserialized as LogicExpression (should be Condition)")
|
||||
else:
|
||||
print(f"❌ UNEXPECTED TYPE: {type(node2.condition)}")
|
||||
|
||||
# Test 3: Check what executor would do
|
||||
print("\n" + "="*60)
|
||||
print("TEST 3: Executor Logic Simulation")
|
||||
print("="*60)
|
||||
|
||||
from workflow_models import LogicExpression, Condition
|
||||
|
||||
for test_name, node in [("UI Format", node), ("Legacy Format", node2)]:
|
||||
print(f"\n{test_name}:")
|
||||
print(f" condition type: {type(node.condition).__name__}")
|
||||
|
||||
if isinstance(node.condition, LogicExpression):
|
||||
print(" ✅ Executor would use: node.condition directly")
|
||||
expression = node.condition
|
||||
elif isinstance(node.condition, Condition):
|
||||
print(" ✅ Executor would use: node.condition.expression")
|
||||
expression = node.condition.expression if node.condition.expression else None
|
||||
else:
|
||||
print(f" ❌ Executor would fail: Unknown type {type(node.condition)}")
|
||||
expression = None
|
||||
|
||||
if expression:
|
||||
print(f" Expression type: {type(expression).__name__}")
|
||||
print(f" Expression operator: {expression.operator}")
|
||||
print(f" Expression has operands: {expression.operands is not None}")
|
||||
else:
|
||||
print(f" ❌ No expression found!")
|
||||
Loading…
Reference in New Issue
Block a user