mitai-jinkendo/tests/test_join_deployed.py
Lars dc87e7f3b8
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
cursor_Setup
2026-04-04 14:05:50 +02:00

59 lines
1.6 KiB
Python

#!/usr/bin/env python3
"""Quick test for deployed join_evaluator.py"""
from join_evaluator import evaluate_join_node
from workflow_models import (
WorkflowNode, WorkflowGraph, WorkflowEdge,
JoinStrategy, NodeStatus, NodeExecutionState
)
# Minimal test: 2 paths → join
join_node = WorkflowNode(
id="test_join",
type="join",
join_strategy=JoinStrategy.BEST_EFFORT
)
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="path_a", type="analysis"),
WorkflowNode(id="path_b", type="analysis"),
join_node
],
edges=[
WorkflowEdge(id="e1", from_node="path_a", to_node="test_join"),
WorkflowEdge(id="e2", from_node="path_b", to_node="test_join")
]
)
context = {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.EXECUTED,
analysis_core="Analysis from path A"
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.EXECUTED,
analysis_core="Analysis from path B"
)
}
}
# Execute
result = evaluate_join_node(join_node, graph, context)
# Verify
print(f"✅ Ready: {result.ready}")
print(f"✅ Consolidated cores: {len(result.consolidated_analysis_core)}")
print(f"✅ Executed paths: {result.metadata.get('executed_paths')}")
print(f"✅ Strategy: {result.metadata.get('join_strategy')}")
assert result.ready is True
assert len(result.consolidated_analysis_core) == 2
assert "path_a" in result.consolidated_analysis_core
assert "path_b" in result.consolidated_analysis_core
print("\n🎉 Phase 4 Join Evaluator: DEPLOYED AND WORKING!")