Backend Implementation (v0.9m, workflow 0.5.0): - join_evaluator.py (394 lines): Join-Strategie-Evaluator - evaluate_join_node(): Hauptlogik für Join-Node Execution - Join-Strategien: wait_all, wait_any, best_effort - Skip-Handling: ignore_skipped, use_placeholder, require_minimum - Result Consolidation: merge analysis_cores, combine signals - Partial Execution: korrekte Behandlung von SKIPPED/FAILED Pfaden - workflow_executor.py: execute_join_node() Integration - BFS-Traversierung erweitert für Join-Nodes - NodeExecutionState List → Dict Konvertierung für Signale - Signal-Name-Kollisionen via node_id Präfix gelöst Testing (49 Tests passing): - test_phase4_join_nodes.py: 18 neue Unit Tests - Join-Strategien (wait_all, wait_any, best_effort) - Skip-Handling (ignore, placeholder) - Result Consolidation (merge, combine) - Partial Execution (mixed status paths) - Helper Functions (collect, check, merge, combine) - Backward Compatibility: 31 Phase 2/3 Tests (alle passing) - test_phase2_workflow_executor.py: 1 Test aktualisiert - test_phase3_logic_evaluator.py: 20 Tests unverändert Konzept: konzept_workflow_engine_konsolidated.md (Sektion 8.8) Anforderungsanalyse: phase4_anforderungsanalyse.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
512 lines
18 KiB
Python
512 lines
18 KiB
Python
"""
|
|
Phase 4 Tests: Join Nodes and Path Consolidation
|
|
|
|
Tests für join_evaluator.py und execute_join_node() Funktionalität.
|
|
|
|
Test-Kategorien:
|
|
- Join Strategy Tests (wait_all, wait_any, best_effort)
|
|
- Skip Handling Tests (ignore_skipped, use_placeholder)
|
|
- Result Consolidation Tests (merge analysis_cores, combine signals)
|
|
- Partial Execution Tests (failed paths, skipped paths, mixed status)
|
|
"""
|
|
|
|
import pytest
|
|
from typing import Dict, Any
|
|
from workflow_models import (
|
|
WorkflowNode,
|
|
WorkflowGraph,
|
|
WorkflowEdge,
|
|
JoinStrategy,
|
|
SkipHandling,
|
|
NodeStatus,
|
|
NormalizedSignal,
|
|
SignalStatus,
|
|
NodeExecutionState
|
|
)
|
|
from join_evaluator import (
|
|
evaluate_join_node,
|
|
_collect_incoming_paths,
|
|
_check_join_strategy,
|
|
_merge_analysis_cores,
|
|
_combine_signals,
|
|
PathStatus
|
|
)
|
|
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
def simple_graph():
|
|
"""
|
|
Einfacher Graph mit 2 Pfaden + Join:
|
|
start → path_a → join → end
|
|
→ path_b →
|
|
"""
|
|
nodes = [
|
|
WorkflowNode(id="start", type="start"),
|
|
WorkflowNode(id="path_a", type="analysis", prompt_slug="prompt_a"),
|
|
WorkflowNode(id="path_b", type="analysis", prompt_slug="prompt_b"),
|
|
WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.WAIT_ALL),
|
|
WorkflowNode(id="end", type="end")
|
|
]
|
|
edges = [
|
|
WorkflowEdge(id="e1", from_node="start", to_node="path_a"),
|
|
WorkflowEdge(id="e2", from_node="start", to_node="path_b"),
|
|
WorkflowEdge(id="e3", from_node="path_a", to_node="join"),
|
|
WorkflowEdge(id="e4", from_node="path_b", to_node="join"),
|
|
WorkflowEdge(id="e5", from_node="join", to_node="end")
|
|
]
|
|
return WorkflowGraph(nodes=nodes, edges=edges)
|
|
|
|
|
|
@pytest.fixture
|
|
def context_all_executed():
|
|
"""Context mit beiden Pfaden EXECUTED"""
|
|
return {
|
|
"node_results": {
|
|
"path_a": NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Analysis from path A",
|
|
normalized_signals=[
|
|
NormalizedSignal(
|
|
question_type="relevanz",
|
|
raw_value="hoch",
|
|
normalized_value="hoch",
|
|
status=SignalStatus.VALID
|
|
)
|
|
]
|
|
),
|
|
"path_b": NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Analysis from path B",
|
|
normalized_signals=[
|
|
NormalizedSignal(
|
|
question_type="prioritaet",
|
|
raw_value="mittel",
|
|
normalized_value="mittel",
|
|
status=SignalStatus.VALID
|
|
)
|
|
]
|
|
)
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def context_one_skipped():
|
|
"""Context mit einem Pfad SKIPPED"""
|
|
return {
|
|
"node_results": {
|
|
"path_a": NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Analysis from path A"
|
|
),
|
|
"path_b": NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.SKIPPED,
|
|
analysis_core=None
|
|
)
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def context_one_failed():
|
|
"""Context mit einem Pfad FAILED"""
|
|
return {
|
|
"node_results": {
|
|
"path_a": NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Analysis from path A"
|
|
),
|
|
"path_b": NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.FAILED,
|
|
analysis_core=None,
|
|
error="LLM call failed"
|
|
)
|
|
}
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def context_no_paths():
|
|
"""Context ohne ausgeführte Pfade"""
|
|
return {
|
|
"node_results": {
|
|
"path_a": NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.SKIPPED,
|
|
analysis_core=None
|
|
),
|
|
"path_b": NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.SKIPPED,
|
|
analysis_core=None
|
|
)
|
|
}
|
|
}
|
|
|
|
|
|
# ── Join Strategy Tests ───────────────────────────────────────────────────────
|
|
|
|
|
|
def test_wait_all_success(simple_graph, context_all_executed):
|
|
"""wait_all: Alle Pfade verfügbar → ready=True, EXECUTED"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.WAIT_ALL
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_all_executed)
|
|
|
|
assert result.ready is True
|
|
assert result.error is None
|
|
assert len(result.consolidated_analysis_core) == 2
|
|
assert "path_a" in result.consolidated_analysis_core
|
|
assert "path_b" in result.consolidated_analysis_core
|
|
assert result.metadata["executed_paths"] == 2
|
|
|
|
|
|
def test_wait_all_missing_path(simple_graph, context_one_skipped):
|
|
"""wait_all: Ein Pfad fehlt → ready=False, FAILED"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.WAIT_ALL
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
|
|
|
|
assert result.ready is False
|
|
assert result.error is not None
|
|
assert "wait_all strategy failed" in result.error
|
|
assert "path_b" in result.error
|
|
assert result.metadata["executed_paths"] == 1
|
|
assert result.metadata["skipped_paths"] == 1
|
|
|
|
|
|
def test_wait_any_one_path(simple_graph, context_one_skipped):
|
|
"""wait_any: Mindestens ein Pfad → ready=True, EXECUTED"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.WAIT_ANY
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
|
|
|
|
assert result.ready is True
|
|
assert result.error is None
|
|
assert len(result.consolidated_analysis_core) == 1
|
|
assert "path_a" in result.consolidated_analysis_core
|
|
assert result.metadata["executed_paths"] == 1
|
|
|
|
|
|
def test_wait_any_no_paths(simple_graph, context_no_paths):
|
|
"""wait_any: Keine Pfade → ready=False, FAILED"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.WAIT_ANY
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_no_paths)
|
|
|
|
assert result.ready is False
|
|
assert result.error is not None
|
|
assert "wait_any strategy failed" in result.error
|
|
assert result.metadata["executed_paths"] == 0
|
|
|
|
|
|
def test_best_effort_partial(simple_graph, context_one_skipped):
|
|
"""best_effort: Einige Pfade fehlen → ready=True, EXECUTED"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.BEST_EFFORT
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
|
|
|
|
assert result.ready is True
|
|
assert result.error is None
|
|
assert len(result.consolidated_analysis_core) == 1
|
|
assert result.metadata["executed_paths"] == 1
|
|
assert result.metadata["skipped_paths"] == 1
|
|
|
|
|
|
def test_best_effort_no_paths(simple_graph, context_no_paths):
|
|
"""best_effort: Keine Pfade → ready=True, EXECUTED (leere Konsolidierung)"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.BEST_EFFORT
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_no_paths)
|
|
|
|
assert result.ready is True
|
|
assert result.error is None
|
|
assert len(result.consolidated_analysis_core) == 0 # Leer
|
|
assert result.metadata["executed_paths"] == 0
|
|
assert "note" in result.metadata # Hinweis auf leere Konsolidierung
|
|
|
|
|
|
# ── Skip Handling Tests ───────────────────────────────────────────────────────
|
|
|
|
|
|
def test_ignore_skipped(simple_graph, context_one_skipped):
|
|
"""IGNORE_SKIPPED: Übersprungene Pfade nicht in Ergebnis"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.BEST_EFFORT
|
|
join_node.skip_handling = SkipHandling.IGNORE_SKIPPED
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
|
|
|
|
assert result.ready is True
|
|
assert len(result.consolidated_analysis_core) == 1
|
|
assert "path_a" in result.consolidated_analysis_core
|
|
assert "path_b" not in result.consolidated_analysis_core
|
|
|
|
|
|
def test_use_placeholder(simple_graph, context_one_skipped):
|
|
"""USE_PLACEHOLDER: Platzhalter für übersprungene Pfade"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.BEST_EFFORT
|
|
join_node.skip_handling = SkipHandling.USE_PLACEHOLDER
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
|
|
|
|
assert result.ready is True
|
|
assert len(result.consolidated_analysis_core) == 2
|
|
assert "path_a" in result.consolidated_analysis_core
|
|
assert "path_b" in result.consolidated_analysis_core
|
|
assert "[Path skipped:" in result.consolidated_analysis_core["path_b"]
|
|
|
|
|
|
def test_failed_path_placeholder(simple_graph, context_one_failed):
|
|
"""FAILED Pfade bekommen Platzhalter (unabhängig von skip_handling)"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
join_node.join_strategy = JoinStrategy.BEST_EFFORT
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_one_failed)
|
|
|
|
assert result.ready is True
|
|
assert len(result.consolidated_analysis_core) == 2
|
|
assert "path_a" in result.consolidated_analysis_core
|
|
assert "[Path failed:" in result.consolidated_analysis_core["path_b"]
|
|
|
|
|
|
# ── Result Consolidation Tests ────────────────────────────────────────────────
|
|
|
|
|
|
def test_merge_analysis_cores(simple_graph, context_all_executed):
|
|
"""Analyse-Kerne korrekt merged"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_all_executed)
|
|
|
|
assert result.ready is True
|
|
assert len(result.consolidated_analysis_core) == 2
|
|
assert result.consolidated_analysis_core["path_a"] == "Analysis from path A"
|
|
assert result.consolidated_analysis_core["path_b"] == "Analysis from path B"
|
|
|
|
|
|
def test_combine_signals(simple_graph, context_all_executed):
|
|
"""Signale aller Pfade kombiniert (mit node_id Präfix)"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
|
|
result = evaluate_join_node(join_node, simple_graph, context_all_executed)
|
|
|
|
assert result.ready is True
|
|
assert len(result.consolidated_signals) == 2
|
|
|
|
# Signale sind mit node_id geprefixed
|
|
assert "path_a.relevanz" in result.consolidated_signals
|
|
assert "path_b.prioritaet" in result.consolidated_signals
|
|
|
|
# Signal-Werte korrekt übernommen
|
|
assert result.consolidated_signals["path_a.relevanz"].normalized_value == "hoch"
|
|
assert result.consolidated_signals["path_b.prioritaet"].normalized_value == "mittel"
|
|
|
|
|
|
def test_signal_name_collision():
|
|
"""Gleiche Signal-Namen in verschiedenen Pfaden (Präfix verhindert Kollision)"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="path_a", type="analysis"),
|
|
WorkflowNode(id="path_b", type="analysis"),
|
|
WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.WAIT_ALL)
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="path_a", to_node="join"),
|
|
WorkflowEdge(id="e2", from_node="path_b", to_node="join")
|
|
]
|
|
)
|
|
|
|
context = {
|
|
"node_results": {
|
|
"path_a": NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="A",
|
|
normalized_signals=[
|
|
NormalizedSignal(
|
|
question_type="relevanz",
|
|
raw_value="hoch",
|
|
normalized_value="hoch",
|
|
status=SignalStatus.VALID
|
|
)
|
|
]
|
|
),
|
|
"path_b": NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="B",
|
|
normalized_signals=[
|
|
NormalizedSignal( # Gleicher Name!
|
|
question_type="relevanz",
|
|
raw_value="mittel",
|
|
normalized_value="mittel",
|
|
status=SignalStatus.VALID
|
|
)
|
|
]
|
|
)
|
|
}
|
|
}
|
|
|
|
join_node = graph.nodes[2]
|
|
result = evaluate_join_node(join_node, graph, context)
|
|
|
|
# Beide Signale vorhanden (durch Präfix unterscheidbar)
|
|
assert "path_a.relevanz" in result.consolidated_signals
|
|
assert "path_b.relevanz" in result.consolidated_signals
|
|
assert result.consolidated_signals["path_a.relevanz"].normalized_value == "hoch"
|
|
assert result.consolidated_signals["path_b.relevanz"].normalized_value == "mittel"
|
|
|
|
|
|
# ── Partial Execution Tests ───────────────────────────────────────────────────
|
|
|
|
|
|
def test_mixed_status_paths():
|
|
"""Kombination EXECUTED/SKIPPED/FAILED"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="path_a", type="analysis"),
|
|
WorkflowNode(id="path_b", type="analysis"),
|
|
WorkflowNode(id="path_c", type="analysis"),
|
|
WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.BEST_EFFORT)
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="path_a", to_node="join"),
|
|
WorkflowEdge(id="e2", from_node="path_b", to_node="join"),
|
|
WorkflowEdge(id="e3", from_node="path_c", to_node="join")
|
|
]
|
|
)
|
|
|
|
context = {
|
|
"node_results": {
|
|
"path_a": NodeExecutionState(
|
|
node_id="path_a",
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core="Analysis A"
|
|
),
|
|
"path_b": NodeExecutionState(
|
|
node_id="path_b",
|
|
status=NodeStatus.SKIPPED,
|
|
analysis_core=None
|
|
),
|
|
"path_c": NodeExecutionState(
|
|
node_id="path_c",
|
|
status=NodeStatus.FAILED,
|
|
error="Error",
|
|
analysis_core=None
|
|
)
|
|
}
|
|
}
|
|
|
|
join_node = graph.nodes[3]
|
|
result = evaluate_join_node(join_node, graph, context)
|
|
|
|
assert result.ready is True
|
|
assert result.metadata["executed_paths"] == 1
|
|
assert result.metadata["skipped_paths"] == 1
|
|
assert result.metadata["failed_paths"] == 1
|
|
|
|
# Analysis core nur von path_a (executed)
|
|
assert "path_a" in result.consolidated_analysis_core
|
|
assert "path_c" in result.consolidated_analysis_core # Failed → Placeholder
|
|
assert "[Path failed:" in result.consolidated_analysis_core["path_c"]
|
|
|
|
|
|
# ── Helper Function Tests ─────────────────────────────────────────────────────
|
|
|
|
|
|
def test_collect_incoming_paths(simple_graph, context_all_executed):
|
|
"""_collect_incoming_paths sammelt alle Pfade"""
|
|
join_node = next(n for n in simple_graph.nodes if n.id == "join")
|
|
|
|
paths = _collect_incoming_paths(join_node, simple_graph, context_all_executed)
|
|
|
|
assert len(paths) == 2
|
|
assert paths[0].node_id in ["path_a", "path_b"]
|
|
assert paths[1].node_id in ["path_a", "path_b"]
|
|
assert all(p.status == NodeStatus.EXECUTED for p in paths)
|
|
|
|
|
|
def test_check_join_strategy_wait_all():
|
|
"""_check_join_strategy für wait_all"""
|
|
paths = [
|
|
PathStatus("path_a", NodeStatus.EXECUTED, "A", {}),
|
|
PathStatus("path_b", NodeStatus.EXECUTED, "B", {})
|
|
]
|
|
|
|
ready, error = _check_join_strategy(paths, JoinStrategy.WAIT_ALL)
|
|
assert ready is True
|
|
assert error is None
|
|
|
|
|
|
def test_check_join_strategy_wait_all_failed():
|
|
"""_check_join_strategy für wait_all mit fehlenden Pfaden"""
|
|
paths = [
|
|
PathStatus("path_a", NodeStatus.EXECUTED, "A", {}),
|
|
PathStatus("path_b", NodeStatus.SKIPPED, None, {})
|
|
]
|
|
|
|
ready, error = _check_join_strategy(paths, JoinStrategy.WAIT_ALL)
|
|
assert ready is False
|
|
assert error is not None
|
|
assert "wait_all strategy failed" in error
|
|
|
|
|
|
def test_merge_analysis_cores_helper():
|
|
"""_merge_analysis_cores merged Kerne korrekt"""
|
|
paths = [
|
|
PathStatus("path_a", NodeStatus.EXECUTED, "Analysis A", {}),
|
|
PathStatus("path_b", NodeStatus.EXECUTED, "Analysis B", {}),
|
|
PathStatus("path_c", NodeStatus.SKIPPED, None, {})
|
|
]
|
|
|
|
merged = _merge_analysis_cores(paths, SkipHandling.IGNORE_SKIPPED)
|
|
|
|
assert len(merged) == 2
|
|
assert merged["path_a"] == "Analysis A"
|
|
assert merged["path_b"] == "Analysis B"
|
|
assert "path_c" not in merged
|
|
|
|
|
|
def test_combine_signals_helper():
|
|
"""_combine_signals kombiniert Signale mit Präfix"""
|
|
signal_a = NormalizedSignal(
|
|
question_type="relevanz",
|
|
raw_value="hoch",
|
|
normalized_value="hoch",
|
|
status=SignalStatus.VALID
|
|
)
|
|
signal_b = NormalizedSignal(
|
|
question_type="prioritaet",
|
|
raw_value="mittel",
|
|
normalized_value="mittel",
|
|
status=SignalStatus.VALID
|
|
)
|
|
|
|
paths = [
|
|
PathStatus("path_a", NodeStatus.EXECUTED, "A", {"relevanz": signal_a}),
|
|
PathStatus("path_b", NodeStatus.EXECUTED, "B", {"prioritaet": signal_b})
|
|
]
|
|
|
|
combined = _combine_signals(paths)
|
|
|
|
assert len(combined) == 2
|
|
assert "path_a.relevanz" in combined
|
|
assert "path_b.prioritaet" in combined
|