Compare commits

..

15 Commits

Author SHA1 Message Date
f34e46b04f Merge pull request 'Async Workflow' (#82) from develop into main
All checks were successful
Deploy Production / deploy (push) Successful in 1m3s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
Reviewed-on: #82
2026-04-13 11:58:01 +02:00
3664f53c51 fix: Use NodeStatus.EXECUTED instead of COMPLETED
All checks were successful
Deploy Development / deploy (push) Successful in 53s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
NodeStatus enum has EXECUTED, not COMPLETED. Fixed in workflow_executor.py progress callback.
2026-04-13 11:49:31 +02:00
fb2e0803c0 fix: SSE streaming - WorkflowNode label attribute and ai_insights column name
All checks were successful
Deploy Development / deploy (push) Successful in 52s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
- workflow_executor.py: Generate node_label from prompt_slug or node.type (WorkflowNode has no label attribute)
- prompts.py: Fix INSERT statement - use 'created' column instead of 'created_at'

SSE endpoint now works correctly for workflow execution streaming.
2026-04-13 11:47:31 +02:00
bb01283727 fix: Correct except/finally indentation in SSE endpoint
All checks were successful
Deploy Development / deploy (push) Successful in 46s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
2026-04-13 11:41:56 +02:00
bc60b9f5c9 fix: Correct indentation in SSE execute_workflow_async function
Some checks failed
Deploy Development / deploy (push) Successful in 49s
Build Test / pytest-backend (push) Failing after 1s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
2026-04-13 11:27:44 +02:00
fbeabcde97 fix: IndentationError in prompts.py SSE endpoint
Some checks failed
Deploy Development / deploy (push) Successful in 52s
Build Test / pytest-backend (push) Failing after 1s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 16s
2026-04-13 11:25:34 +02:00
ba474b0a57 feat: Implement Server-Sent Events (SSE) for long-running workflows
Some checks failed
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Failing after 1s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
Backend:
- workflow_executor.py: Add progress_callback parameter, emit events for execution_started, node_complete, execution_complete, execution_failed
- prompt_executor.py: Thread progress_callback through execute chain
- routers/prompts.py: New /execute-stream endpoint with asyncio Queue for SSE

Frontend:
- utils/api.js: New executeUnifiedPromptStream() function with EventSource
- pages/Analysis.jsx: Use SSE with live progress display (X/Y Nodes)

Fixes:
- No more gateway timeouts for complex workflows (10+ nodes)
- Live progress feedback for users
- Unlimited workflow complexity

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-04-13 11:23:16 +02:00
790e6df8ef fix: Make debug parameter work as Query parameter in /api/prompts/execute
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 17s
Bug: debug=true in URL was ignored because FastAPI expected it in
request body (POST without Query() expects body params by default).

Result: node_states were never returned, even with ?debug=true

Fix: Changed debug and save to Query parameters:
- debug: bool = Query(False, ...)
- save: bool = Query(False, ...)

Now ?debug=true in URL correctly enables debug output with node_states.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 09:14:30 +02:00
057df0afc8 fix: Support UI-format edge routing with sourceHandle
All checks were successful
Deploy Development / deploy (push) Successful in 52s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 24s
Logic-Nodes evaluated correctly but activated_edges was empty because
_get_edges_by_label() only checked e.label, which is null in UI format.

UI format uses:
- sourceHandle: "true" / "false" (instead of label: "then" / "else")
- targetHandle: "in" / "path_1" / etc.

Changes:
1. Added source_handle/target_handle fields to WorkflowEdge model
   - With aliases sourceHandle/targetHandle for camelCase JSON
2. Updated _get_edges_by_label() to check both formats:
   - Legacy: e.label == "then" / "else"
   - UI: e.source_handle == "true" / "false"

Now Logic-Nodes correctly activate outgoing edges → Join-Node receives
completed paths → End-Node executes → Workflow completes!

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 09:07:50 +02:00
ba04e0c0b6 fix: Add extra='forbid' to Condition for proper Union resolution
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
Critical fix: Without extra='forbid', Pydantic accepted UI format
{operator: "and", operands: [...]} as valid Condition by ignoring
unknown fields, resulting in Condition(expression=None).

With extra='forbid':
- Condition rejects unknown fields → fails
- Union tries next type → LogicExpression → success

Test Results (9/9 passed):
- Simple comparisons (eq, neq, gt, lt, in) 
- AND/OR combinations 
- Deep nesting (3+ levels) 
- NOT operator 
- All operators (eq, neq, in, not_in, gt, lt, gte, lte, and, or, not) 
- Legacy format (Condition wrapper) 
- Complex real-world scenarios 

Added comprehensive test suite in:
- test_condition_parsing.py (9 test cases)
- test_condition_union.py (Union resolution verification)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 09:01:53 +02:00
f5ce1ec941 refactor: Proper type-safe condition handling with Union types
All checks were successful
Deploy Development / deploy (push) Successful in 52s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
Previous fix used Any type, breaking type safety and only handling
simple cases. This is the correct implementation:

Changes:
1. LogicExpression.operands: List[Any] → List['LogicExpression']
   - Enables recursive/nested expressions
   - Proper type checking for all operator combinations

2. WorkflowNode.condition: Any → Union[LogicExpression, Condition]
   - Type-safe deserialization
   - Supports both UI format (direct LogicExpression) and legacy (Condition wrapper)
   - Pydantic automatically tries LogicExpression first, then Condition

3. Executor: Simplified with isinstance() checks
   - Clean type detection without dict manipulation
   - Fallback for edge cases

This now correctly handles:
- Simple conditions: {operator: "eq", ref: "...", value: "..."}
- Combined: {operator: "and", operands: [...]}
- Nested: {operator: "or", operands: [{operator: "and", ...}, ...]}
- All operators: eq, neq, in, not_in, gt, lt, gte, lte, contains, and, or, not
- Legacy format: {expression: {...}, then_path: "...", else_path: "..."}

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 08:45:55 +02:00
2deb6510f8 fix: Support UI-format LogicExpression in Logic-Node condition field
All checks were successful
Deploy Development / deploy (push) Successful in 48s
Build Test / pytest-backend (push) Successful in 13s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
Root cause: UI saves LogicExpression directly as condition:
  {operands: [...], operator: "and"}

But Pydantic model expected Condition with wrapped expression:
  {expression: {operands: [...], operator: "and"}}

Result: Pydantic deserialized it as Condition with expression=None
→ Logic-Nodes failed with "'NoneType' object has no attribute 'operator'"

Fix:
1. Changed WorkflowNode.condition type from Condition to Any
2. Executor now handles both dict and Pydantic model formats
3. Detects UI format (operator+operands) vs legacy format (expression wrapper)
4. Converts dict to LogicExpression before evaluation

Fixes: Logic-Node execution failures in Training-Tiefenanalyse workflow

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 08:40:43 +02:00
0eac40abf6 fix: Add None-check for Logic-Node condition/expression
All checks were successful
Deploy Development / deploy (push) Successful in 51s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
Previous fix handled hasattr() but didn't check for None values.
Now explicitly checks that operator/expression is not None before using it.

Error was: "'NoneType' object has no attribute 'operator'"

Clearer error message: "condition is None or missing"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 08:32:54 +02:00
e915d3fb13 fix: Support both Logic-Node condition serialization formats
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
Logic-Nodes were timing out because UI saves condition as:
  {operands: [...], operator: "and"}

But Backend expected:
  {expression: {operands: [...], operator: "and"}}

This caused node.condition.expression to be None, triggering:
- Logic-Node failures
- Join-Node wait_all timeout
- 504 Gateway Timeout

Fix: Accept both formats by checking for operator/operands attributes
directly on condition, falling back to condition.expression.

Fixes: 504 Gateway Timeout in Training-Tiefenanalyse workflow

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 08:26:43 +02:00
60f6cf3c6d fix: Add null check for logic node expression to prevent AttributeError
All checks were successful
Deploy Development / deploy (push) Successful in 58s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
Problem: Logic nodes without logic_expression defined caused AttributeError
"'NoneType' object has no attribute 'operator'" when evaluating condition.

Solution: Check both node.condition AND node.condition.expression before
calling evaluate_logic_expression(). Return clear FAILED state with error
message instead of crashing.

Impact: Workflows with incomplete logic node definitions now fail gracefully
with clear error message instead of cryptic AttributeError.
2026-04-13 08:16:06 +02:00
8 changed files with 745 additions and 24 deletions

View File

@ -167,7 +167,8 @@ async def execute_prompt(
prompt_slug: str, prompt_slug: str,
variables: Dict[str, Any], variables: Dict[str, Any],
openrouter_call_func, openrouter_call_func,
enable_debug: bool = False enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute a single prompt (base or pipeline type). Execute a single prompt (base or pipeline type).
@ -217,7 +218,7 @@ async def execute_prompt(
elif prompt_type == 'workflow': elif prompt_type == 'workflow':
# Workflow prompt: graph-based execution (Phase 0: Foundation) # Workflow prompt: graph-based execution (Phase 0: Foundation)
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback)
else: else:
raise HTTPException(400, f"Unknown prompt type: {prompt_type}") raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
@ -469,7 +470,8 @@ async def execute_prompt_with_data(
modules: Optional[Dict[str, bool]] = None, modules: Optional[Dict[str, bool]] = None,
timeframes: Optional[Dict[str, int]] = None, timeframes: Optional[Dict[str, int]] = None,
openrouter_call_func = None, openrouter_call_func = None,
enable_debug: bool = False enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute prompt with data loaded from database. Execute prompt with data loaded from database.
@ -605,7 +607,7 @@ async def execute_prompt_with_data(
variables['goals_data'] = [] variables['goals_data'] = []
# Execute prompt # Execute prompt
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug) return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback)
async def execute_workflow_prompt( async def execute_workflow_prompt(
@ -613,7 +615,8 @@ async def execute_workflow_prompt(
variables: Dict[str, Any], variables: Dict[str, Any],
openrouter_call_func, openrouter_call_func,
enable_debug: bool = False, enable_debug: bool = False,
catalog: Optional[Dict] = None catalog: Optional[Dict] = None,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Execute a workflow-type prompt (graph-based execution). Execute a workflow-type prompt (graph-based execution).
@ -652,7 +655,8 @@ async def execute_workflow_prompt(
profile_id=variables.get('profile_id', 'unknown'), # From context profile_id=variables.get('profile_id', 'unknown'), # From context
variables=variables, variables=variables,
openrouter_call_func=openrouter_call_func, openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug enable_debug=enable_debug,
progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen
) )
# Convert ExecutionResult to dict for API response # Convert ExecutionResult to dict for API response

View File

@ -1445,13 +1445,176 @@ from prompt_executor import execute_prompt_with_data
from models import UnifiedPromptCreate, UnifiedPromptUpdate from models import UnifiedPromptCreate, UnifiedPromptUpdate
@router.post("/execute") @router.post("/execute-stream")
async def execute_unified_prompt( async def execute_unified_prompt_stream(
prompt_slug: str, prompt_slug: str = Query(..., description="Slug of prompt to execute"),
token: Optional[str] = Query(None, description="Auth token (temporary solution for SSE)"),
modules: Optional[dict] = None, modules: Optional[dict] = None,
timeframes: Optional[dict] = None, timeframes: Optional[dict] = None,
debug: bool = False, debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
save: bool = False, save: bool = Query(False, description="Save result to ai_insights")
):
"""
Execute a unified prompt with Server-Sent Events (SSE) streaming.
Returns live progress updates during workflow execution:
- execution_started: Workflow has begun
- node_complete: Each node completes
- execution_complete: Final result ready
- execution_failed: Error occurred
Use this endpoint for long-running workflows (>30s) to avoid gateway timeouts.
"""
# Manual auth: verify token and get profile_id
if not token:
raise HTTPException(401, "Missing auth token")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT profile_id FROM sessions WHERE token = %s", (token,))
row = cur.fetchone()
if not row:
raise HTTPException(401, "Invalid or expired token")
profile_id = row['profile_id']
# Use default modules/timeframes if not provided
if not modules:
modules = {
'körper': True,
'ernährung': True,
'training': True,
'schlaf': True,
'vitalwerte': True
}
if not timeframes:
timeframes = {
'körper': 30,
'ernährung': 30,
'training': 14,
'schlaf': 14,
'vitalwerte': 7
}
# Wrapper function for OpenRouter calls
async def workflow_llm_call(prompt: str, model: str = None) -> str:
return await call_openrouter(prompt)
# SSE Event Generator
async def event_stream():
"""Generate Server-Sent Events during workflow execution."""
import asyncio
from asyncio import Queue
# Event queue for progress updates
event_queue = Queue()
# Flag to track execution completion
execution_complete = False
# Define progress callback for streaming updates
async def progress_callback(event_type: str, data: dict):
"""Queue SSE event for streaming to client."""
event_data = {
"type": event_type,
**data
}
await event_queue.put(event_data)
# Start workflow execution in background task
async def execute_workflow_async():
nonlocal execution_complete
try:
# Execute workflow with progress callbacks
result = await execute_prompt_with_data(
prompt_slug=prompt_slug,
profile_id=profile_id,
modules=modules,
timeframes=timeframes,
openrouter_call_func=workflow_llm_call,
enable_debug=debug or save,
progress_callback=progress_callback
)
# Save to ai_insights if requested (same logic as /execute)
if save:
if result['type'] == 'pipeline':
final_output = result.get('output', {})
if isinstance(final_output, dict) and len(final_output) == 1:
content = list(final_output.values())[0]
else:
content = json.dumps(final_output, ensure_ascii=False)
elif result['type'] == 'workflow':
content = _workflow_user_facing_content(result.get('aggregated_result'))
else:
content = result.get('output', '')
if isinstance(content, dict):
content = json.dumps(content, ensure_ascii=False)
# Save to database (minimal metadata for now)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO ai_insights (profile_id, scope, content, metadata, created)
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
(profile_id, prompt_slug, content, json.dumps({"prompt_type": result['type']}))
)
conn.commit()
except Exception as e:
# Queue error event
await event_queue.put({
"type": "execution_failed",
"error": str(e)
})
finally:
execution_complete = True
# Start workflow execution in background
import asyncio
execution_task = asyncio.create_task(execute_workflow_async())
# Stream events from queue
try:
while not execution_complete or not event_queue.empty():
try:
# Wait for event with timeout
event = await asyncio.wait_for(event_queue.get(), timeout=0.5)
yield f"data: {json.dumps(event, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
# Send keepalive ping
yield f": keepalive\n\n"
continue
# Wait for execution task to complete
await execution_task
except Exception as e:
# Send final error event
error_event = {
"type": "execution_failed",
"error": str(e)
}
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
@router.post("/execute")
async def execute_unified_prompt(
prompt_slug: str = Query(..., description="Slug of prompt to execute"),
modules: Optional[dict] = None,
timeframes: Optional[dict] = None,
debug: bool = Query(False, description="Include debug information (node_states, etc.)"),
save: bool = Query(False, description="Save result to ai_insights"),
session: dict = Depends(require_auth) session: dict = Depends(require_auth)
): ):
""" """

View File

@ -42,7 +42,8 @@ async def execute_workflow(
profile_id: str = None, profile_id: str = None,
variables: Dict[str, Any] = None, variables: Dict[str, Any] = None,
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
enable_debug: bool = False enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für Progress-Updates: async (event_type, data) -> None
) -> ExecutionResult: ) -> ExecutionResult:
""" """
Führt einen Workflow aus (mit conditional branching und path consolidation). Führt einen Workflow aus (mit conditional branching und path consolidation).
@ -76,6 +77,13 @@ async def execute_workflow(
logger.info(f"Starting workflow execution: {execution_id}") logger.info(f"Starting workflow execution: {execution_id}")
# NEW: Progress-Callback für Start
if progress_callback:
await progress_callback("execution_started", {
"execution_id": execution_id,
"started_at": started_at
})
try: try:
# 1. Lade Workflow-Definition # 1. Lade Workflow-Definition
if graph_data: if graph_data:
@ -161,6 +169,20 @@ async def execute_workflow(
node_states.append(node_state) node_states.append(node_state)
context["node_results"][node_id] = node_state context["node_results"][node_id] = node_state
# NEW: Progress-Callback aufrufen (für SSE Streaming)
if progress_callback:
# Create a meaningful label for the node
node_label = node.prompt_slug if hasattr(node, 'prompt_slug') and node.prompt_slug else f"{node.type.value}-{node_id[:8]}"
await progress_callback("node_complete", {
"node_id": node_id,
"node_type": node.type,
"node_label": node_label,
"status": node_state.status.value,
"total_nodes": len(graph.nodes),
"completed_nodes": len([ns for ns in node_states if ns.status in [NodeStatus.EXECUTED, NodeStatus.SKIPPED]]),
"error": node_state.error if node_state.status == NodeStatus.FAILED else None
})
# Füge Nachfolger zur Queue hinzu # Füge Nachfolger zur Queue hinzu
outgoing_edges = [e for e in graph.edges if e.from_node == node_id] outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
for edge in outgoing_edges: for edge in outgoing_edges:
@ -185,6 +207,19 @@ async def execute_workflow(
logger.info(f"Workflow execution completed: {execution_id}") logger.info(f"Workflow execution completed: {execution_id}")
# NEW: Progress-Callback für erfolgreiche Fertigstellung
if progress_callback:
await progress_callback("execution_complete", {
"execution_id": execution_id,
"status": "completed",
"aggregated_result": aggregated,
"total_nodes": len(node_states),
"completed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.EXECUTED]),
"skipped_nodes": len([ns for ns in node_states if ns.status == NodeStatus.SKIPPED]),
"failed_nodes": len([ns for ns in node_states if ns.status == NodeStatus.FAILED]),
"completed_at": completed_at
})
return ExecutionResult( return ExecutionResult(
execution_id=execution_id, execution_id=execution_id,
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
@ -198,6 +233,15 @@ async def execute_workflow(
except Exception as e: except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True) logger.error(f"Workflow execution failed: {e}", exc_info=True)
# NEW: Progress-Callback für Fehler
if progress_callback:
await progress_callback("execution_failed", {
"execution_id": execution_id,
"status": "failed",
"error": str(e),
"completed_at": datetime.utcnow().isoformat()
})
# Speichere Failed State # Speichere Failed State
completed_at = datetime.utcnow().isoformat() completed_at = datetime.utcnow().isoformat()
save_execution_state( save_execution_state(
@ -399,10 +443,49 @@ def execute_logic_node(
try: try:
if not node.condition: if not node.condition:
raise ValueError(f"Logic node {node.id} has no condition") error_msg = f"Logic node {node.id} has no condition defined"
logger.error(error_msg)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=error_msg,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# Handle both formats (thanks to Union[LogicExpression, Condition] type):
# 1. Direct LogicExpression (UI format): node.condition is LogicExpression
# 2. Wrapped in Condition (legacy): node.condition is Condition with .expression
from workflow_models import LogicExpression, Condition
expression = None
if isinstance(node.condition, LogicExpression):
# UI format: direct LogicExpression
expression = node.condition
elif isinstance(node.condition, Condition):
# Legacy format: wrapped in Condition
expression = node.condition.expression
else:
# Fallback: try to detect format manually
if hasattr(node.condition, 'operator') and hasattr(node.condition, 'operands'):
expression = node.condition # Looks like LogicExpression
elif hasattr(node.condition, 'expression'):
expression = node.condition.expression # Looks like Condition
if expression is None:
error_msg = f"Logic node {node.id} has no valid condition/expression defined"
logger.error(error_msg)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=error_msg,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# 1. Evaluiere Bedingung # 1. Evaluiere Bedingung
result, error = evaluate_logic_expression(node.condition.expression, context) result, error = evaluate_logic_expression(expression, context)
if error: if error:
# Fehler bei Evaluation → Fallback anwenden # Fehler bei Evaluation → Fallback anwenden
@ -777,17 +860,31 @@ def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[
""" """
Findet alle ausgehenden Edges mit bestimmtem Label. Findet alle ausgehenden Edges mit bestimmtem Label.
Unterstützt beide Formate:
- Legacy: e.label == label (z.B. "then", "else")
- UI: e.source_handle == label (z.B. "true", "false")
Args: Args:
node_id: Node-ID node_id: Node-ID
label: Edge-Label (z.B. "then", "else", "uncertainty") label: Edge-Label oder sourceHandle (z.B. "then"/"true", "else"/"false")
graph: WorkflowGraph graph: WorkflowGraph
Returns: Returns:
Liste von Edge-IDs Liste von Edge-IDs
""" """
# Map label to sourceHandle equivalents
label_to_handle = {
"then": "true",
"else": "false"
}
handle_equivalent = label_to_handle.get(label, label)
matching_edges = [ matching_edges = [
e.id for e in graph.edges e.id for e in graph.edges
if e.from_node == node_id and e.label == label if e.from_node == node_id and (
e.label == label or # Legacy format
(hasattr(e, 'source_handle') and e.source_handle == handle_equivalent) # UI format
)
] ]
return matching_edges return matching_edges

View File

@ -6,7 +6,7 @@ Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen.
Konzept-Basis: konzept_workflow_engine_konsolidated.md Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md
""" """
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from enum import Enum from enum import Enum
@ -148,18 +148,27 @@ class LogicExpression(BaseModel):
} }
""" """
operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator") operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator")
operands: Optional[List[Any]] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)") operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)")
# Bei einfachem Vergleich: # Bei einfachem Vergleich:
ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)") ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)")
value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)") value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)")
# Enable forward reference resolution for recursive model
LogicExpression.model_rebuild()
class Condition(BaseModel): class Condition(BaseModel):
""" """
Bedingung für einen Logik-Knoten. Bedingung für einen Logik-Knoten.
Unterstützt if/else-if/else-Logik. Unterstützt if/else-if/else-Logik.
Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression.
If unknown fields are present (like 'operator', 'operands'), deserialization fails
and Pydantic tries LogicExpression instead.
""" """
model_config = {'extra': 'forbid'}
type: str = Field(default="if", description="Bedingungstyp: if, else-if, else") type: str = Field(default="if", description="Bedingungstyp: if, else-if, else")
expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')") expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')")
then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad") then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad")
@ -195,7 +204,8 @@ class WorkflowNode(BaseModel):
question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)") question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)")
# LOGIC-Knoten # LOGIC-Knoten
condition: Optional[Condition] = Field(None, description="Bedingung für Pfad-Routing") # Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy)
condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing")
fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration") fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration")
# JOIN-Knoten (Phase 4) # JOIN-Knoten (Phase 4)
@ -220,6 +230,10 @@ class WorkflowEdge(BaseModel):
to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID") to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID")
label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')") label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')")
# UI-Format fields (React Flow)
source_handle: Optional[str] = Field(None, alias="sourceHandle", description="Source handle ID (UI format: 'true', 'false', 'out')")
target_handle: Optional[str] = Field(None, alias="targetHandle", description="Target handle ID (UI format: 'in', 'path_1', etc.)")
class WorkflowGraph(BaseModel): class WorkflowGraph(BaseModel):
""" """

View File

@ -338,6 +338,8 @@ export default function Analysis() {
/** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */ /** Kategorie-Schlüssel aus `buildPipelineGroups` (Navigation); Detail = alle Pipelines dieser Kategorie */
const [activeCategoryKey, setActiveCategoryKey] = useState(null) const [activeCategoryKey, setActiveCategoryKey] = useState(null)
const [historyScopePick, setHistoryScopePick] = useState(null) const [historyScopePick, setHistoryScopePick] = useState(null)
// NEW: Progress tracking for SSE workflows
const [progress, setProgress] = useState(null) // { total_nodes, completed_nodes, current_node_label }
const loadAll = async () => { const loadAll = async () => {
const [p, i] = await Promise.all([ const [p, i] = await Promise.all([
@ -377,10 +379,21 @@ export default function Analysis() {
}, [newResult?.scope, prompts]) }, [newResult?.scope, prompts])
const runPrompt = async (slug) => { const runPrompt = async (slug) => {
setLoading(slug); setError(null); setNewResult(null) setLoading(slug); setError(null); setNewResult(null); setProgress(null)
try { try {
// Use new unified executor with save=true // Use SSE-based executor for long-running workflows
const result = await api.executeUnifiedPrompt(slug, null, null, false, true) const result = await api.executeUnifiedPromptStream(slug, null, null, false, true, (event) => {
// Progress callback: update UI in real-time
if (event.type === 'execution_started') {
setProgress({ total_nodes: 0, completed_nodes: 0, current_node_label: 'Starte...' })
} else if (event.type === 'node_complete') {
setProgress({
total_nodes: event.total_nodes || 0,
completed_nodes: event.completed_nodes || 0,
current_node_label: event.node_label || `Node ${event.node_id}`
})
}
})
// Transform result to match old format for InsightCard // Transform result to match old format for InsightCard
let content = '' let content = ''
@ -434,7 +447,10 @@ export default function Analysis() {
setTab('run') setTab('run')
} catch(e) { } catch(e) {
setError('Fehler: ' + e.message) setError('Fehler: ' + e.message)
} finally { setLoading(null) } } finally {
setLoading(null)
setProgress(null) // Clear progress
}
} }
const deleteInsight = async (id) => { const deleteInsight = async (id) => {
@ -618,7 +634,9 @@ export default function Analysis() {
disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)} disabled={!!loading||!canUseAI||(aiUsage && !aiUsage.allowed)}
> >
{loading===p.slug {loading===p.slug
? <><div className="spinner" style={{width:13,height:13}}/> Läuft</> ? (progress
? <><div className="spinner" style={{width:13,height:13}}/> {progress.completed_nodes}/{progress.total_nodes} Nodes</>
: <><div className="spinner" style={{width:13,height:13}}/> Läuft</>)
: (aiUsage && !aiUsage.allowed) ? '🔒 Limit' : (aiUsage && !aiUsage.allowed) ? '🔒 Limit'
: <><Brain size={13}/> Starten</>} : <><Brain size={13}/> Starten</>}
</button> </button>

View File

@ -402,6 +402,72 @@ export const api = {
return req('/prompts/execute?' + params, json(body)) return req('/prompts/execute?' + params, json(body))
}, },
// NEW: SSE-based execution for long-running workflows
executeUnifiedPromptStream: (slug, modules=null, timeframes=null, debug=false, save=false, onProgress=null) => {
const params = new URLSearchParams({ prompt_slug: slug })
if (debug) params.append('debug', 'true')
if (save) params.append('save', 'true')
// TODO: Security improvement - use session cookie instead of token in URL
// For now, send token as query param since EventSource doesn't support custom headers
const token = localStorage.getItem('token')
if (token) params.append('token', token)
if (modules) {
Object.entries(modules).forEach(([k, v]) => params.append(`modules[${k}]`, v))
}
if (timeframes) {
Object.entries(timeframes).forEach(([k, v]) => params.append(`timeframes[${k}]`, v))
}
// Return a Promise that resolves with final result
return new Promise((resolve, reject) => {
const url = `${BASE_URL}/prompts/execute-stream?${params}`
const eventSource = new EventSource(url)
let finalResult = null
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
// Call progress callback if provided
if (onProgress) {
onProgress(data)
}
// Check for final result
if (data.type === 'execution_complete') {
// Transform SSE result to match regular execute format
finalResult = {
type: 'workflow',
execution_id: data.execution_id,
status: data.status,
aggregated_result: data.aggregated_result,
debug: {
node_states: [] // TODO: collect from progress events if needed
}
}
eventSource.close()
resolve(finalResult)
} else if (data.type === 'execution_failed') {
eventSource.close()
reject(new Error(data.error || 'Workflow execution failed'))
}
} catch (e) {
console.error('Error parsing SSE event:', e)
}
}
eventSource.onerror = (error) => {
console.error('SSE connection error:', error)
eventSource.close()
reject(new Error('Connection to server lost'))
}
})
},
// Workflow Execution (Part 2: Frontend Execute Integration) // Workflow Execution (Part 2: Frontend Execute Integration)
executeWorkflow: (slug, variables=null, debug=true, save=false) => { executeWorkflow: (slug, variables=null, debug=true, save=false) => {
const params = new URLSearchParams({ prompt_slug: slug }) const params = new URLSearchParams({ prompt_slug: slug })

246
test_condition_parsing.py Normal file
View File

@ -0,0 +1,246 @@
"""
Test Condition Parsing - Alle Formate und Verschachtelungen
Testet ob Pydantic die verschiedenen Condition-Formate korrekt deserialisiert.
"""
import sys
import os
# Force UTF-8 encoding on Windows
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, 'backend')
from workflow_models import LogicExpression, Condition, WorkflowNode, LogicOperator
from pydantic import ValidationError
import json
def test_case(name, data, expected_type, should_fail=False):
"""Test a single condition format"""
print(f"\n{'='*60}")
print(f"TEST: {name}")
print(f"{'='*60}")
print(f"Input: {json.dumps(data, indent=2)}")
try:
# Test 1: Direct deserialization
if expected_type == LogicExpression:
result = LogicExpression(**data)
elif expected_type == Condition:
result = Condition(**data)
if should_fail:
print("❌ FAILED: Should have raised ValidationError but didn't")
return False
print(f"✅ PASSED: Deserialized as {type(result).__name__}")
print(f"Result: {result.model_dump()}")
# Test 2: As part of WorkflowNode
node_data = {
"id": "test_node",
"type": "logic",
"condition": data
}
node = WorkflowNode(**node_data)
print(f"✅ PASSED: WorkflowNode.condition type: {type(node.condition).__name__}")
return True
except ValidationError as e:
if should_fail:
print(f"✅ PASSED: Correctly raised ValidationError")
return True
else:
print(f"❌ FAILED: {e}")
return False
except Exception as e:
print(f"❌ FAILED: Unexpected error: {e}")
import traceback
traceback.print_exc()
return False
# ============================================================================
# Test Cases
# ============================================================================
test_results = []
# Test 1: Simple comparison (UI format - einfachster Fall)
test_results.append(test_case(
"Simple comparison (eq)",
{
"operator": "eq",
"ref": "node_1.q1",
"value": "ja"
},
LogicExpression
))
# Test 2: Simple AND (UI format - wie im Workflow)
test_results.append(test_case(
"Simple AND with 2 operands",
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_5.qTiefananalyseRecovery", "value": "ja"},
{"operator": "neq", "ref": "node_6.qKonsistenz", "value": "nein"}
]
},
LogicExpression
))
# Test 3: Simple OR
test_results.append(test_case(
"Simple OR with 3 operands",
{
"operator": "or",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
{"operator": "eq", "ref": "node_1.q2", "value": "nein"},
{"operator": "eq", "ref": "node_1.q3", "value": "unklar"}
]
},
LogicExpression
))
# Test 4: Nested AND/OR
test_results.append(test_case(
"Nested: OR with nested AND",
{
"operator": "or",
"operands": [
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
{"operator": "neq", "ref": "node_1.q2", "value": "nein"}
]
},
{"operator": "eq", "ref": "node_2.q1", "value": "hoch"}
]
},
LogicExpression
))
# Test 5: Deep nesting (3 levels)
test_results.append(test_case(
"Deep nesting (3 levels)",
{
"operator": "and",
"operands": [
{
"operator": "or",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "ja"},
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_2.q1", "value": "hoch"},
{"operator": "neq", "ref": "node_2.q2", "value": "niedrig"}
]
}
]
},
{"operator": "eq", "ref": "node_3.q1", "value": "aktiv"}
]
},
LogicExpression
))
# Test 6: Different operators
test_results.append(test_case(
"Different comparison operators (gt, lt, in)",
{
"operator": "and",
"operands": [
{"operator": "gt", "ref": "node_1.score", "value": 50},
{"operator": "lt", "ref": "node_1.score", "value": 100},
{"operator": "in", "ref": "node_1.category", "value": ["high", "medium"]}
]
},
LogicExpression
))
# Test 7: Legacy format (wrapped in Condition)
test_results.append(test_case(
"Legacy format: Condition with expression",
{
"type": "if",
"expression": {
"operator": "eq",
"ref": "node_1.q1",
"value": "ja"
},
"then_path": "edge_1",
"else_path": "edge_2"
},
Condition
))
# Test 8: NOT operator
test_results.append(test_case(
"NOT operator",
{
"operator": "not",
"operands": [
{"operator": "eq", "ref": "node_1.q1", "value": "nein"}
]
},
LogicExpression
))
# Test 9: Complex real-world scenario
test_results.append(test_case(
"Complex real-world: Multiple nested conditions",
{
"operator": "and",
"operands": [
{
"operator": "or",
"operands": [
{"operator": "eq", "ref": "node_1.relevance", "value": "high"},
{
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_1.relevance", "value": "medium"},
{"operator": "gt", "ref": "node_1.priority", "value": 5}
]
}
]
},
{"operator": "neq", "ref": "node_2.status", "value": "blocked"},
{
"operator": "in",
"ref": "node_3.category",
"value": ["training", "nutrition", "recovery"]
}
]
},
LogicExpression
))
# ============================================================================
# Results Summary
# ============================================================================
print("\n" + "="*60)
print("TEST RESULTS SUMMARY")
print("="*60)
passed = sum(test_results)
total = len(test_results)
failed = total - passed
print(f"\n✅ Passed: {passed}/{total}")
if failed > 0:
print(f"❌ Failed: {failed}/{total}")
print(f"\n⚠️ CRITICAL: Some tests failed! Do NOT deploy until fixed.")
sys.exit(1)
else:
print(f"\n🎉 All tests passed! Safe to deploy.")
sys.exit(0)

113
test_condition_union.py Normal file
View File

@ -0,0 +1,113 @@
"""
Test Union[LogicExpression, Condition] type resolution
"""
import sys
import os
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.path.insert(0, 'backend')
from workflow_models import LogicExpression, Condition, WorkflowNode
import json
# Test 1: UI format (should be LogicExpression)
print("\n" + "="*60)
print("TEST 1: UI Format (direct LogicExpression)")
print("="*60)
ui_format = {
"operator": "and",
"operands": [
{"operator": "eq", "ref": "node_5.qTiefananalyseRecovery", "value": "ja"},
{"operator": "neq", "ref": "node_6.qKonsistenz", "value": "nein"}
]
}
node = WorkflowNode(
id="test_node",
type="logic",
condition=ui_format
)
print(f"Input: {json.dumps(ui_format, indent=2)}")
print(f"node.condition type: {type(node.condition).__name__}")
print(f"Expected: LogicExpression")
if isinstance(node.condition, LogicExpression):
print("✅ CORRECT: Deserialized as LogicExpression")
print(f"Has operator: {hasattr(node.condition, 'operator')} = {node.condition.operator if hasattr(node.condition, 'operator') else 'N/A'}")
print(f"Has operands: {hasattr(node.condition, 'operands')} = {len(node.condition.operands) if hasattr(node.condition, 'operands') and node.condition.operands else 'N/A'}")
elif isinstance(node.condition, Condition):
print("❌ WRONG: Deserialized as Condition (should be LogicExpression)")
print(f"Has expression: {hasattr(node.condition, 'expression')} = {type(node.condition.expression).__name__ if hasattr(node.condition, 'expression') and node.condition.expression else 'N/A'}")
if hasattr(node.condition, 'expression') and node.condition.expression:
print(f"expression.operator: {node.condition.expression.operator}")
print(f"expression.operands: {len(node.condition.expression.operands) if node.condition.expression.operands else 0}")
else:
print(f"❌ UNEXPECTED TYPE: {type(node.condition)}")
# Test 2: Legacy format (should be Condition)
print("\n" + "="*60)
print("TEST 2: Legacy Format (Condition with expression)")
print("="*60)
legacy_format = {
"type": "if",
"expression": {
"operator": "eq",
"ref": "node_1.q1",
"value": "ja"
},
"then_path": "edge_1",
"else_path": "edge_2"
}
node2 = WorkflowNode(
id="test_node2",
type="logic",
condition=legacy_format
)
print(f"Input: {json.dumps(legacy_format, indent=2)}")
print(f"node.condition type: {type(node2.condition).__name__}")
print(f"Expected: Condition")
if isinstance(node2.condition, Condition):
print("✅ CORRECT: Deserialized as Condition")
print(f"Has expression: {hasattr(node2.condition, 'expression')}")
print(f"Has then_path: {hasattr(node2.condition, 'then_path')} = {node2.condition.then_path}")
elif isinstance(node2.condition, LogicExpression):
print("❌ WRONG: Deserialized as LogicExpression (should be Condition)")
else:
print(f"❌ UNEXPECTED TYPE: {type(node2.condition)}")
# Test 3: Check what executor would do
print("\n" + "="*60)
print("TEST 3: Executor Logic Simulation")
print("="*60)
from workflow_models import LogicExpression, Condition
for test_name, node in [("UI Format", node), ("Legacy Format", node2)]:
print(f"\n{test_name}:")
print(f" condition type: {type(node.condition).__name__}")
if isinstance(node.condition, LogicExpression):
print(" ✅ Executor would use: node.condition directly")
expression = node.condition
elif isinstance(node.condition, Condition):
print(" ✅ Executor would use: node.condition.expression")
expression = node.condition.expression if node.condition.expression else None
else:
print(f" ❌ Executor would fail: Unknown type {type(node.condition)}")
expression = None
if expression:
print(f" Expression type: {type(expression).__name__}")
print(f" Expression operator: {expression.operator}")
print(f" Expression has operands: {expression.operands is not None}")
else:
print(f" ❌ No expression found!")