106 lines
3.0 KiB
Python
106 lines
3.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Integration test: Full workflow with join node"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from workflow_executor import execute_node
|
|
from workflow_models import (
|
|
WorkflowNode, WorkflowGraph, WorkflowEdge,
|
|
JoinStrategy, NodeStatus, NodeExecutionState
|
|
)
|
|
|
|
async def test_join_node_integration():
|
|
"""Test execute_join_node via execute_node dispatcher"""
|
|
|
|
# Setup: 2 executed paths
|
|
path_a_state = NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Path A completed successfully"
|
|
)
|
|
|
|
path_b_state = NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Path B completed successfully"
|
|
)
|
|
|
|
# Join node
|
|
join_node = WorkflowNode(
|
|
id="join",
|
|
type="join",
|
|
join_strategy=JoinStrategy.WAIT_ALL
|
|
)
|
|
|
|
# Graph
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="path_a", type="analysis"),
|
|
WorkflowNode(id="path_b", type="analysis"),
|
|
join_node
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="path_a", to_node="join"),
|
|
WorkflowEdge(id="e2", from_node="path_b", to_node="join")
|
|
]
|
|
)
|
|
|
|
# Context with previous node results
|
|
context = {
|
|
"variables": {},
|
|
"profile_id": "test-profile",
|
|
"node_results": {
|
|
"path_a": path_a_state,
|
|
"path_b": path_b_state
|
|
},
|
|
"active_edges": {}
|
|
}
|
|
|
|
# Execute join node via dispatcher
|
|
async def mock_llm(prompt, model):
|
|
return "Mock LLM response"
|
|
|
|
result = await execute_node(
|
|
node=join_node,
|
|
context=context,
|
|
catalog={},
|
|
graph=graph,
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Verify
|
|
print(f"✅ Node executed: {result.node_id}")
|
|
print(f"✅ Status: {result.status.value}")
|
|
print(f"✅ Analysis core exists: {result.analysis_core is not None}")
|
|
|
|
assert result.node_id == "join"
|
|
assert result.status == NodeStatus.EXECUTED
|
|
assert result.analysis_core is not None
|
|
|
|
# Check consolidated data
|
|
import json
|
|
consolidated = json.loads(result.analysis_core)
|
|
print(f"✅ Consolidated paths: {len(consolidated)}")
|
|
assert len(consolidated) == 2
|
|
assert "path_a" in consolidated
|
|
assert "path_b" in consolidated
|
|
|
|
print(f"✅ Path A analysis: {consolidated['path_a'][:50]}...")
|
|
print(f"✅ Path B analysis: {consolidated['path_b'][:50]}...")
|
|
|
|
print("\n🎉 Integration Test: JOIN NODE WORKING IN WORKFLOW EXECUTOR!")
|
|
print(" - Both paths consolidated successfully")
|
|
print(" - Analysis cores merged correctly")
|
|
print(" - Join strategy executed properly")
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
success = asyncio.run(test_join_node_integration())
|
|
sys.exit(0 if success else 1)
|
|
except Exception as e:
|
|
print(f"❌ Test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|