Async Workflow #82

Merged
Lars merged 14 commits from develop into main 2026-04-13 11:58:01 +02:00
8 changed files with 745 additions and 24 deletions

View File

@ -167,7 +167,8 @@ async def execute_prompt(
prompt_slug: str, prompt_slug: str,
variables: Dict[str, Any], variables: Dict[str, Any],
openrouter_call_func, openrouter_call_func,
enable_debug: bool = False enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute a single prompt (base or pipeline type). Execute a single prompt (base or pipeline type).
@ -217,7 +218,7 @@ async def execute_prompt(
elif prompt_type == 'workflow': elif prompt_type == 'workflow':
# Workflow prompt: graph-based execution (Phase 0: Foundation) # Workflow prompt: graph-based execution (Phase 0: Foundation)
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback)
else: else:
raise HTTPException(400, f"Unknown prompt type: {prompt_type}") raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
@ -469,7 +470,8 @@ async def execute_prompt_with_data(
modules: Optional[Dict[str, bool]] = None, modules: Optional[Dict[str, bool]] = None,
timeframes: Optional[Dict[str, int]] = None, timeframes: Optional[Dict[str, int]] = None,
openrouter_call_func = None, openrouter_call_func = None,
enable_debug: bool = False enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute prompt with data loaded from database. Execute prompt with data loaded from database.
@ -605,7 +607,7 @@ async def execute_prompt_with_data(
variables['goals_data'] = [] variables['goals_data'] = []
# Execute prompt # Execute prompt
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug) return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback)
async def execute_workflow_prompt( async def execute_workflow_prompt(
@ -613,7 +615,8 @@ async def execute_workflow_prompt(
variables: Dict[str, Any], variables: Dict[str, Any],
openrouter_call_func, openrouter_call_func,
enable_debug: bool = False, enable_debug: bool = False,
catalog: Optional[Dict] = None catalog: Optional[Dict] = None,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute a workflow-type prompt (graph-based execution). Execute a workflow-type prompt (graph-based execution).
@ -652,7 +655,8 @@ async def execute_workflow_prompt(
profile_id=variables.get('profile_id', 'unknown'), # From context profile_id=variables.get('profile_id', 'unknown'), # From context
variables=variables, variables=variables,
openrouter_call_func=openrouter_call_func, openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug enable_debug=enable_debug,
progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen
) )
# Convert ExecutionResult to dict for API response # Convert ExecutionResult to dict for API response

View File

@ -1445,13 +1445,176 @@ from prompt_executor import execute_prompt_with_data
from models import UnifiedPromptCreate, UnifiedPromptUpdate from models import UnifiedPromptCreate, UnifiedPromptUpdate
@router.post("/execute") @router.post("/execute-stream")
async def execute_unified_prompt( async def execute_unified_prompt_stream(
prompt_slug: str, prompt_slug: str = Query(..., description="Slug of prompt to execute"),
token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"),
modules: Optional[dict] = None, modules: Optional[dict] = None,
timeframes: Optional[dict] = None, timeframes: Optional[dict] = None,
debug: bool = False, debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
save: bool = False, save: bool = Query(False, description="Save result to ai_insights")
):
"""
Execute a unified prompt with Server-Sent Events (SSE) streaming.
Returns live progress updates during workflow execution:
- execution_started: Workflow has begun
- node_complete: Each node completes
- execution_complete: Final result ready
- execution_failed: Error occurred
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
"""
# Manual auth: verify token and get profile_id
if not token:
raise HTTPException(401, "Missing auth token")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,))
row = cur.fetchone()
if not row:
raise HTTPException(401, "Invalid or expired token")
profile_id = row['profile_id']
# Use default modules/timeframes if not provided
if not modules:
modules = {
'körper': True,
'ernährung': True,
'training': True,
'schlaf': True,
'vitalwerte': True
}
if not timeframes:
timeframes = {
'körper': 30,
'ernährung': 30,
'training': 14,
'schlaf': 14,
'vitalwerte': 7
}
# Wrapper function for OpenRouter calls
async def workflow_llm_call(prompt: str, model: str = None) -> str:
return await call_openrouter(prompt)
# SSE Event Generator
async def event_stream():
"""Generate Server-Sent Events during workflow execution."""
import asyncio
from asyncio import Queue
# Event queue for progress updates
event_queue = Queue()
# Flag to track execution completion
execution_complete = False
# Define progress callback for streaming updates
async def progress_callback(event_type: str, data: dict):
"""Queue SSE event for streaming to client."""
event_data = {
"type": event_type,
**data
}
await event_queue.put(event_data)
# Start workflow execution in background task
async def execute_workflow_async():
nonlocal execution_complete
try:
# Execute workflow with progress callbacks
result = await execute_prompt_with_data(
prompt_slug=prompt_slug,
profile_id=profile_id,
modules=modules,
timeframes=timeframes,
openrouter_call_func=workflow_llm_call,
enable_debug=debug or save,
progress_callback=progress_callback
)
# Save to ai_insights if requested (same logic as /execute)
if save:
if result['type'] == 'pipeline':
final_output = result.get('output', {})
if isinstance(final_output, dict) and len(final_output) == 1:
content = list(final_output.values())[0]
else:
content = json.dumps(final_output, ensure_ascii=False)
elif result['type'] == 'workflow':
content = _workflow_user_facing_content(result.get('aggregated_result'))
else:
content = result.get('output', '')
if isinstance(content, dict):
content = json.dumps(content, ensure_ascii=False)
# Save to database (minimal metadata for now)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
)
conn.commit()
except Exception as e:
# Queue error event
await event_queue.put({
"type": "execution_failed",
"error": str(e)
})
finally:
execution_complete = True
# Start workflow execution in background
import asyncio
execution_task = asyncio.create_task(execute_workflow_async())
# Stream events from queue
try:
while not execution_complete or not event_queue.empty():
try:
# Wait for event with timeout
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
# Send keepalive ping
yield f": keepalive\n\n"
continue
# Wait for execution task to complete
await execution_task
except Exception as e:
# Send final error event
error_event = {
"type": "execution_failed",
"error": str(e)
}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
@router.post("/execute")
async def execute_unified_prompt(
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
modules: Optional[dict] = None,
timeframes: Optional[dict] = None,
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
save: bool = Query(False, description="Save result to ai_insights"),
session: dict = Depends(require_auth) session: dict = Depends(require_auth)
): ):
""" """

View File

@ -42,7 +42,8 @@ async def execute_workflow(
profile_id: str = None, profile_id: str = None,
variables: Dict[str, Any] = None, variables: Dict[str, Any] = None,
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
enable_debug: bool = False enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für Progress-Updates: async (event_type, data) -> None
) -> ExecutionResult: ) -> ExecutionResult:
""" """
Führt einen Workflow aus (mit conditional branching und path consolidation). Führt einen Workflow aus (mit conditional branching und path consolidation).
@ -76,6 +77,13 @@ async def execute_workflow(
logger.info(f"Starting workflow execution: {execution_id}") logger.info(f"Starting workflow execution: {execution_id}")
# NEW: Progress-Callback für Start
if progress_callback:
await progress_callback("execution_started", {
"execution_id": execution_id,
"started_at": started_at
})
try: try:
# 1. Lade Workflow-Definition # 1. Lade Workflow-Definition
if graph_data: if graph_data:
@ -161,6 +169,20 @@ async def execute_workflow(
node_states.append(node_state) node_states.append(node_state)
context["node_results"][node_id] = node_state context["node_results"][node_id] = node_state
# NEW: Progress-Callback aufrufen (für SSE Streaming)
if progress_callback:
# Create a meaningful label for the node
node_label = node.prompt_slug if hasattr(node, 'prompt_slug') and node.prompt_slug else f"{node.type.value}-{node_id[:8]}"
await progress_callback("node_complete", {
"node_id": node_id,
"node_type": node.type,
"node_label": node_label,
"status": node_state.status.value,
"total_nodes": len(graph.nodes),
"completed_nodes": len([ns for ns in node_states if ns.status in [NodeStatus.EXECUTED, NodeStatus.SKIPPED]]),
"error": node_state.error if node_state.status == NodeStatus.FAILED else None
})
# Füge Nachfolger zur Queue hinzu # Füge Nachfolger zur Queue hinzu
outgoing_edges = [e for e in graph.edges if e.from_node == node_id] outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
for edge in outgoing_edges: for edge in outgoing_edges:
@ -185,6 +207,19 @@ async def execute_workflow(
logger.info(f"Workflow execution completed: {execution_id}") logger.info(f"Workflow execution completed: {execution_id}")
# NEW: Progress-Callback für erfolgreiche Fertigstellung
if progress_callback:
await progress_callback("execution_complete", {
"execution_id": execution_id,
"status": "completed",
"aggregated_result": aggregated,
"total_nodes": len(node_states),
"completed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.EXECUTED]),
"skipped_nodes": len([ns for ns in node_states if ns.status == NodeStatus.SKIPPED]),
"failed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.FAILED]),
"completed_at": completed_at
})
return ExecutionResult( return ExecutionResult(
execution_id=execution_id, execution_id=execution_id,
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
@ -198,6 +233,15 @@ async def execute_workflow(
except Exception as e: except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True) logger.error(f"Workflow execution failed: {e}", exc_info=True)
# NEW: Progress-Callback für Fehler
if progress_callback:
await progress_callback("execution_failed", {
"execution_id": execution_id,
"status": "failed",
"error": str(e),
"completed_at": datetime.utcnow().isoformat()
})
# Speichere Failed State # Speichere Failed State
completed_at = datetime.utcnow().isoformat() completed_at = datetime.utcnow().isoformat()
save_execution_state( save_execution_state(
@ -399,10 +443,49 @@ def execute_logic_node(
try: try:
if not node.condition: if not node.condition:
raise ValueError(f"Logic node {node.id} has no condition") error_msg = f"Logic node {node.id} has no condition defined"
logger.error(error_msg)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=error_msg,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# Handle both formats (thanks to Union[LogicExpression, Condition] type):
# 1. Direct LogicExpression (UI format): node.condition is LogicExpression
# 2. Wrapped in Condition (legacy): node.condition is Condition with .expression
from workflow_models import LogicExpression, Condition
expression = None
if isinstance(node.condition, LogicExpression):
# UI format: direct LogicExpression
expression = node.condition
elif isinstance(node.condition, Condition):
# Legacy format: wrapped in Condition
expression = node.condition.expression
else:
# Fallback: try to detect format manually
if hasattr(node.condition, 'operator') and hasattr(node.condition, 'operands'):
expression = node.condition # Looks like LogicExpression
elif hasattr(node.condition, 'expression'):
expression = node.condition.expression # Looks like Condition
if expression is None:
error_msg = f"Logic node {node.id} has no valid condition/expression defined"
logger.error(error_msg)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=error_msg,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# 1. Evaluiere Bedingung # 1. Evaluiere Bedingung
result, error = evaluate_logic_expression(node.condition.expression, context) result, error = evaluate_logic_expression(expression, context)
if error: if error:
# Fehler bei Evaluation → Fallback anwenden # Fehler bei Evaluation → Fallback anwenden
@ -777,17 +860,31 @@ def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[
""" """
Findet alle ausgehenden Edges mit bestimmtem Label. Findet alle ausgehenden Edges mit bestimmtem Label.
Unterstützt beide Formate:
- Legacy: e.label == label (z.B. "then", "else")
- UI: e.source_handle == label (z.B. "true", "false")
Args: Args:
node_id: Node-ID node_id: Node-ID
label: Edge-Label (z.B. "then", "else", "uncertainty") label: Edge-Label oder sourceHandle (z.B. "then"/"true", "else"/"false")
graph: WorkflowGraph graph: WorkflowGraph
Returns: Returns:
Liste von Edge-IDs Liste von Edge-IDs
""" """
# Map label to sourceHandle equivalents
label_to_handle = {
"then": "true",
"else": "false"
}
handle_equivalent = label_to_handle.get(label, label)
matching_edges = [ matching_edges = [
e.id for e in graph.edges e.id for e in graph.edges
if e.from_node == node_id and e.label == label if e.from_node == node_id and (
e.label == label or # Legacy format
(hasattr(e, 'source_handle') and e.source_handle == handle_equivalent) # UI format
)
] ]
return matching_edges return matching_edges

View File

@ -6,7 +6,7 @@ Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen.
Konzept-Basis: konzept_workflow_engine_konsolidated.md Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md
""" """
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from enum import Enum from enum import Enum
@ -148,18 +148,27 @@ class LogicExpression(BaseModel):
} }
""" """
operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator") operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator")
operands: Optional[List[Any]] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)") operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)")
# Bei einfachem Vergleich: # Bei einfachem Vergleich:
ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)") ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)")
value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)") value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)")
# Enable forward reference resolution for recursive model
LogicExpression.model_rebuild()
class Condition(BaseModel): class Condition(BaseModel):
""" """
Bedingung für einen Logik-Knoten. Bedingung für einen Logik-Knoten.
Unterstützt if/else-if/else-Logik. Unterstützt if/else-if/else-Logik.
Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression.
If unknown fields are present (like 'operator', 'operands'), deserialization fails
and Pydantic tries LogicExpression instead.
""" """
model_config = {'extra': 'forbid'}
type: str = Field(default="if", description="Bedingungstyp: if, else-if, else") type: str = Field(default="if", description="Bedingungstyp: if, else-if, else")
expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')") expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')")
then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad") then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad")
@ -195,7 +204,8 @@ class WorkflowNode(BaseModel):
question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)") question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)")
# LOGIC-Knoten # LOGIC-Knoten
condition: Optional[Condition] = Field(None, description="Bedingung für Pfad-Routing") # Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy)
condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing")
fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration") fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration")
# JOIN-Knoten (Phase 4) # JOIN-Knoten (Phase 4)
@ -220,6 +230,10 @@ class WorkflowEdge(BaseModel):
to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID") to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID")
label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')") label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')")
# UI-Format fields (React Flow)
source_handle: Optional[str] = Field(None, alias="sourceHandle", description="Source handle ID (UI format: 'true', 'false', 'out')")
target_handle: Optional[str] = Field(None, alias="targetHandle", description="Target handle ID (UI format: 'in', 'path_1', etc.)")
class WorkflowGraph(BaseModel): class WorkflowGraph(BaseModel):
""" """

View File

@ -338,6 +338,8 @@ export default function Analysis() {
/** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */ /** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */
const [activeCategoryKey, setActiveCategoryKey] = useState(null) const [activeCategoryKey, setActiveCategoryKey] = useState(null)
const [historyScopePick, setHistoryScopePick] = useState(null) const [historyScopePick, setHistoryScopePick] = useState(null)
// NEW: Progress tracking for SSE workflows
const [progress, setProgress] = useState(null) // { total_nodes, completed_nodes, current_node_label }
const loadAll = async () => { const loadAll = async () => {
const [p, i] = await Promise.all([ const [p, i] = await Promise.all([
@ -377,10 +379,21 @@ export default function Analysis() {
}, [newResult?.scope, prompts]) }, [newResult?.scope, prompts])
const runPrompt = async (slug) => { const runPrompt = async (slug) => {
setLoading(slug); setError(null); setNewResult(null) setLoading(slug); setError(null); setNewResult(null); setProgress(null)
try { try {
// Use new unified executor with save=true // Use SSE-based executor for long-running workflows
const result = await api.executeUnifiedPrompt(slug, null, null, false, true) const result = await api.executeUnifiedPromptStream(slug, null, null, false, true, (event) => {
// Progress callback: update UI in real-time
if (event.type === 'execution_started') {
setProgress({ total_nodes: 0, completed_nodes: 0, current_node_label: 'Starte...' })
} else if (event.type === 'node_complete') {
setProgress({
total_nodes: event.total_nodes || 0,
completed_nodes: event.completed_nodes || 0,
current_node_label: event.node_label || `Node ${event.node_id}`
})
}
})
// Transform result to match old format for InsightCard // Transform result to match old format for InsightCard
let content = '' let content = ''
@ -434,7 +447,10 @@ export default function Analysis() {
setTab('run') setTab('run')
} catch(e) { } catch(e) {
setError('Fehler: ' + e.message) setError('Fehler: ' + e.message)
} finally { setLoading(null) } } finally {
setLoading(null)
setProgress(null) // Clear progress
}
} }
const deleteInsight = async (id) => { const deleteInsight = async (id) => {
@ -618,7 +634,9 @@ export default function Analysis() {
disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)} disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)}
> >
{loading===p.slug {loading===p.slug
? <><div className="spinner" style={{width:13,height:13}}/> Läuft</> ? (progress
? <><div className="spinner" style={{width:13,height:13}}/> {progress.completed_nodes}/{progress.total_nodes} Nodes</>
: <><div className="spinner" style={{width:13,height:13}}/> Läuft</>)
: (aiUsage && !aiUsage.allowed) ? '🔒 Limit' : (aiUsage && !aiUsage.allowed) ? '🔒 Limit'
: <><Brain size={13}/> Starten</>} : <><Brain size={13}/> Starten</>}
</button> </button>

View File

@ -402,6 +402,72 @@ export const api = {
return req('/prompts/execute?' + params, json(body)) return req('/prompts/execute?' + params, json(body))
}, },
// NEW: SSE-based execution for long-running workflows
executeUnifiedPromptStream: (slug, modules=null, timeframes=null, debug=false, save=false, onProgress=null) => {
const params = new URLSearchParams({ prompt_slug: slug })
if (debug) params.append('debug', 'true')
if (save) params.append('save', 'true')
// TODO: Security improvement - use session cookie instead of token in URL
// For now, send token as query param since EventSource doesn't support custom headers
const token = localStorage.getItem('token')
if (token) params.append('token', token)
if (modules) {
Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v))
}
if (timeframes) {
Object.entries(timeframes).forEach(([k, v]) => params.append(`timeframes[${k}]`, v))
}
// Return a Promise that resolves with final result
return new Promise((resolve, reject) => {
const url = `${BASE_URL}/prompts/execute-stream?${params}`
const eventSource = new EventSource(url)
let finalResult = null
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
// Call progress callback if provided
if (onProgress) {
onProgress(data)
}
// Check for final result
if (data.type === 'execution_complete') {
// Transform SSE result to match regular execute format
finalResult = {
type: 'workflow',
execution_id: data.execution_id,
status: data.status,
aggregated_result: data.aggregated_result,
debug: {
node_states: [] // TODO: collect from progress events if needed
}
}
eventSource.close()
resolve(finalResult)
} else if (data.type === 'execution_failed') {
eventSource.close()
reject(new Error(data.error || 'Workflow execution failed'))
}
} catch (e) {
console.error('Error parsing SSE event:', e)
}
}
eventSource.onerror = (error) => {
console.error('SSE connection error:', error)
eventSource.close()
reject(new Error('Connection to server lost'))
}
})
},
// Workflow Execution (Part 2: Frontend Execute Integration) // Workflow Execution (Part 2: Frontend Execute Integration)
executeWorkflow: (slug, variables=null, debug=true, save=false) => { executeWorkflow: (slug, variables=null, debug=true, save=false) => {
const params = new URLSearchParams({ prompt_slug: slug }) const params = new URLSearchParams({ prompt_slug: slug })

246
test_condition_parsing.py Normal file
View File

@ -0,0 +1,246 @@
"""
Test Condition Parsing - Alle Formate und Verschachtelungen
Testet ob Pydantic die verschiedenen Condition-Formate korrekt deserialisiert.
"""
import sys
import os
# Force UTF-8 encoding on Windows
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, 'backend')
from workflow_models import LogicExpression, Condition, WorkflowNode, LogicOperator
from pydantic import ValidationError
import json
def test_case(name, data, expected_type, should_fail=False):
"""Test a single condition format"""
print(f"\n{'='*60}")
print(f"TEST: {name}")
print(f"{'='*60}")
print(f"Input: {json.dumps(data, indent=2)}")
try:
# Test 1: Direct deserialization
if expected_type == LogicExpression:
result = LogicExpression(**data)
elif expected_type == Condition:
result = Condition(**data)
if should_fail:
print("❌ FAILED: Should have raised ValidationError but didn't")
return False
print(f"✅ PASSED: Deserialized as {type(result).__name__}")
print(f"Result: {result.model_dump()}")
# Test 2: As part of WorkflowNode
node_data = {
"id": "test_node",
"type": "logic",
"condition": data
}
node = WorkflowNode(**node_data)
print(f"✅ PASSED: WorkflowNode.condition type: {type(node.condition).__name__}")
return True
except ValidationError as e:
if should_fail:
print(f"✅ PASSED: Correctly raised ValidationError")
return True
else:
print(f"❌ FAILED: {e}")
return False
except Exception as e:
print(f"❌ FAILED: Unexpected error: {e}")
import traceback
traceback.print_exc()
return False
# ============================================================================
# Test Cases
# ============================================================================
test_results = []
# Test 1: Simple comparison (UI format - einfachster Fall)
test_results.append(test_case(
"Simple comparison (eq)",
{
"operator": "eq",
"ref": "node_1.q1",
"value": "ja"
},
LogicExpression
))
# Test 2: Simple AND (UI format - wie im Workflow)
test_results.append(test_case(
"Simple AND with 2 operands",
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_5.qTiefananalyseRecovery", "value": "ja"},
{"operator": "neq", "ref": "node_6.qKonsistenz", "value": "nein"}
]
},
LogicExpression
))
# Test 3: Simple OR
test_results.append(test_case(
"Simple OR with 3 operands",
{
"operator": "or",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
{"operator": "eq", "ref": "node_1.q2", "value": "nein"},
{"operator": "eq", "ref": "node_1.q3", "value": "unklar"}
]
},
LogicExpression
))
# Test 4: Nested AND/OR
test_results.append(test_case(
"Nested: OR with nested AND",
{
"operator": "or",
"operands": [
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
{"operator": "neq", "ref": "node_1.q2", "value": "nein"}
]
},
{"operator": "eq", "ref": "node_2.q1", "value": "hoch"}
]
},
LogicExpression
))
# Test 5: Deep nesting (3 levels)
test_results.append(test_case(
"Deep nesting (3 levels)",
{
"operator": "and",
"operands": [
{
"operator": "or",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_2.q1", "value": "hoch"},
{"operator": "neq", "ref": "node_2.q2", "value": "niedrig"}
]
}
]
},
{"operator": "eq", "ref": "node_3.q1", "value": "aktiv"}
]
},
LogicExpression
))
# Test 6: Different operators
test_results.append(test_case(
"Different comparison operators (gt, lt, in)",
{
"operator": "and",
"operands": [
{"operator": "gt", "ref": "node_1.score", "value": 50},
{"operator": "lt", "ref": "node_1.score", "value": 100},
{"operator": "in", "ref": "node_1.category", "value": ["high", "medium"]}
]
},
LogicExpression
))
# Test 7: Legacy format (wrapped in Condition)
test_results.append(test_case(
"Legacy format: Condition with expression",
{
"type": "if",
"expression": {
"operator": "eq",
"ref": "node_1.q1",
"value": "ja"
},
"then_path": "edge_1",
"else_path": "edge_2"
},
Condition
))
# Test 8: NOT operator
test_results.append(test_case(
"NOT operator",
{
"operator": "not",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "nein"}
]
},
LogicExpression
))
# Test 9: Complex real-world scenario
test_results.append(test_case(
"Complex real-world: Multiple nested conditions",
{
"operator": "and",
"operands": [
{
"operator": "or",
"operands": [
{"operator": "eq", "ref": "node_1.relevance", "value": "high"},
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_1.relevance", "value": "medium"},
{"operator": "gt", "ref": "node_1.priority", "value": 5}
]
}
]
},
{"operator": "neq", "ref": "node_2.status", "value": "blocked"},
{
"operator": "in",
"ref": "node_3.category",
"value": ["training", "nutrition", "recovery"]
}
]
},
LogicExpression
))
# ============================================================================
# Results Summary
# ============================================================================
print("\n" + "="*60)
print("TEST RESULTS SUMMARY")
print("="*60)
passed = sum(test_results)
total = len(test_results)
failed = total - passed
print(f"\n✅ Passed: {passed}/{total}")
if failed > 0:
print(f"❌ Failed: {failed}/{total}")
print(f"\n⚠️ CRITICAL: Some tests failed! Do NOT deploy until fixed.")
sys.exit(1)
else:
print(f"\n🎉 All tests passed! Safe to deploy.")
sys.exit(0)

113
test_condition_union.py Normal file
View File

@ -0,0 +1,113 @@
"""
Test Union[LogicExpression, Condition] type resolution
"""
import sys
import os
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, 'backend')
from workflow_models import LogicExpression, Condition, WorkflowNode
import json
# Test 1: UI format (should be LogicExpression)
print("\n" + "="*60)
print("TEST 1: UI Format (direct LogicExpression)")
print("="*60)
ui_format = {
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_5.qTiefananalyseRecovery", "value": "ja"},
{"operator": "neq", "ref": "node_6.qKonsistenz", "value": "nein"}
]
}
node = WorkflowNode(
id="test_node",
type="logic",
condition=ui_format
)
print(f"Input: {json.dumps(ui_format, indent=2)}")
print(f"node.condition type: {type(node.condition).__name__}")
print(f"Expected: LogicExpression")
if isinstance(node.condition, LogicExpression):
print("✅ CORRECT: Deserialized as LogicExpression")
print(f"Has operator: {hasattr(node.condition, 'operator')} = {node.condition.operator if hasattr(node.condition, 'operator') else 'N/A'}")
print(f"Has operands: {hasattr(node.condition, 'operands')} = {len(node.condition.operands) if hasattr(node.condition, 'operands') and node.condition.operands else 'N/A'}")
elif isinstance(node.condition, Condition):
print("❌ WRONG: Deserialized as Condition (should be LogicExpression)")
print(f"Has expression: {hasattr(node.condition, 'expression')} = {type(node.condition.expression).__name__ if hasattr(node.condition, 'expression') and node.condition.expression else 'N/A'}")
if hasattr(node.condition, 'expression') and node.condition.expression:
print(f"expression.operator: {node.condition.expression.operator}")
print(f"expression.operands: {len(node.condition.expression.operands) if node.condition.expression.operands else 0}")
else:
print(f"❌ UNEXPECTED TYPE: {type(node.condition)}")
# Test 2: Legacy format (should be Condition)
print("\n" + "="*60)
print("TEST 2: Legacy Format (Condition with expression)")
print("="*60)
legacy_format = {
"type": "if",
"expression": {
"operator": "eq",
"ref": "node_1.q1",
"value": "ja"
},
"then_path": "edge_1",
"else_path": "edge_2"
}
node2 = WorkflowNode(
id="test_node2",
type="logic",
condition=legacy_format
)
print(f"Input: {json.dumps(legacy_format, indent=2)}")
print(f"node.condition type: {type(node2.condition).__name__}")
print(f"Expected: Condition")
if isinstance(node2.condition, Condition):
print("✅ CORRECT: Deserialized as Condition")
print(f"Has expression: {hasattr(node2.condition, 'expression')}")
print(f"Has then_path: {hasattr(node2.condition, 'then_path')} = {node2.condition.then_path}")
elif isinstance(node2.condition, LogicExpression):
print("❌ WRONG: Deserialized as LogicExpression (should be Condition)")
else:
print(f"❌ UNEXPECTED TYPE: {type(node2.condition)}")
# Test 3: Check what executor would do
print("\n" + "="*60)
print("TEST 3: Executor Logic Simulation")
print("="*60)
from workflow_models import LogicExpression, Condition
for test_name, node in [("UI Format", node), ("Legacy Format", node2)]:
print(f"\n{test_name}:")
print(f" condition type: {type(node.condition).__name__}")
if isinstance(node.condition, LogicExpression):
print(" ✅ Executor would use: node.condition directly")
expression = node.condition
elif isinstance(node.condition, Condition):
print(" ✅ Executor would use: node.condition.expression")
expression = node.condition.expression if node.condition.expression else None
else:
print(f" ❌ Executor would fail: Unknown type {type(node.condition)}")
expression = None
if expression:
print(f" Expression type: {type(expression).__name__}")
print(f" Expression operator: {expression.operator}")
print(f" Expression has operands: {expression.operands is not None}")
else:
print(f" ❌ No expression found!")