mitai-jinkendo/tests/backend/test_phase4_join_nodes.py
Lars e2a132353d
All checks were successful
Deploy Development / deploy (push) Successful in 45s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: Phase 4 - Join Nodes and Path Consolidation
Backend Implementation (v0.9m, workflow 0.5.0):
- join_evaluator.py (394 lines): Join-Strategie-Evaluator
  - evaluate_join_node(): Hauptlogik für Join-Node Execution
  - Join-Strategien: wait_all, wait_any, best_effort
  - Skip-Handling: ignore_skipped, use_placeholder, require_minimum
  - Result Consolidation: merge analysis_cores, combine signals
  - Partial Execution: korrekte Behandlung von SKIPPED/FAILED Pfaden

- workflow_executor.py: execute_join_node() Integration
  - BFS-Traversierung erweitert für Join-Nodes
  - NodeExecutionState List → Dict Konvertierung für Signale
  - Signal-Name-Kollisionen via node_id Präfix gelöst

Testing (49 Tests passing):
- test_phase4_join_nodes.py: 18 neue Unit Tests
  - Join-Strategien (wait_all, wait_any, best_effort)
  - Skip-Handling (ignore, placeholder)
  - Result Consolidation (merge, combine)
  - Partial Execution (mixed status paths)
  - Helper Functions (collect, check, merge, combine)

- Backward Compatibility: 31 Phase 2/3 Tests (alle passing)
  - test_phase2_workflow_executor.py: 1 Test aktualisiert
  - test_phase3_logic_evaluator.py: 20 Tests unverändert

Konzept: konzept_workflow_engine_konsolidated.md (Sektion 8.8)
Anforderungsanalyse: phase4_anforderungsanalyse.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-04 12:27:31 +02:00

512 lines
18 KiB
Python

"""
Phase 4 Tests: Join Nodes and Path Consolidation
Tests für join_evaluator.py und execute_join_node() Funktionalität.
Test-Kategorien:
- Join Strategy Tests (wait_all, wait_any, best_effort)
- Skip Handling Tests (ignore_skipped, use_placeholder)
- Result Consolidation Tests (merge analysis_cores, combine signals)
- Partial Execution Tests (failed paths, skipped paths, mixed status)
"""
import pytest
from typing import Dict, Any
from workflow_models import (
WorkflowNode,
WorkflowGraph,
WorkflowEdge,
JoinStrategy,
SkipHandling,
NodeStatus,
NormalizedSignal,
SignalStatus,
NodeExecutionState
)
from join_evaluator import (
evaluate_join_node,
_collect_incoming_paths,
_check_join_strategy,
_merge_analysis_cores,
_combine_signals,
PathStatus
)
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def simple_graph():
"""
Einfacher Graph mit 2 Pfaden + Join:
start → path_a → join → end
→ path_b →
"""
nodes = [
WorkflowNode(id="start", type="start"),
WorkflowNode(id="path_a", type="analysis", prompt_slug="prompt_a"),
WorkflowNode(id="path_b", type="analysis", prompt_slug="prompt_b"),
WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.WAIT_ALL),
WorkflowNode(id="end", type="end")
]
edges = [
WorkflowEdge(id="e1", from_node="start", to_node="path_a"),
WorkflowEdge(id="e2", from_node="start", to_node="path_b"),
WorkflowEdge(id="e3", from_node="path_a", to_node="join"),
WorkflowEdge(id="e4", from_node="path_b", to_node="join"),
WorkflowEdge(id="e5", from_node="join", to_node="end")
]
return WorkflowGraph(nodes=nodes, edges=edges)
@pytest.fixture
def context_all_executed():
"""Context mit beiden Pfaden EXECUTED"""
return {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.EXECUTED,
analysis_core="Analysis from path A",
normalized_signals=[
NormalizedSignal(
question_type="relevanz",
raw_value="hoch",
normalized_value="hoch",
status=SignalStatus.VALID
)
]
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.EXECUTED,
analysis_core="Analysis from path B",
normalized_signals=[
NormalizedSignal(
question_type="prioritaet",
raw_value="mittel",
normalized_value="mittel",
status=SignalStatus.VALID
)
]
)
}
}
@pytest.fixture
def context_one_skipped():
"""Context mit einem Pfad SKIPPED"""
return {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.EXECUTED,
analysis_core="Analysis from path A"
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.SKIPPED,
analysis_core=None
)
}
}
@pytest.fixture
def context_one_failed():
"""Context mit einem Pfad FAILED"""
return {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.EXECUTED,
analysis_core="Analysis from path A"
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.FAILED,
analysis_core=None,
error="LLM call failed"
)
}
}
@pytest.fixture
def context_no_paths():
"""Context ohne ausgeführte Pfade"""
return {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.SKIPPED,
analysis_core=None
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.SKIPPED,
analysis_core=None
)
}
}
# ── Join Strategy Tests ───────────────────────────────────────────────────────
def test_wait_all_success(simple_graph, context_all_executed):
"""wait_all: Alle Pfade verfügbar → ready=True, EXECUTED"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.WAIT_ALL
result = evaluate_join_node(join_node, simple_graph, context_all_executed)
assert result.ready is True
assert result.error is None
assert len(result.consolidated_analysis_core) == 2
assert "path_a" in result.consolidated_analysis_core
assert "path_b" in result.consolidated_analysis_core
assert result.metadata["executed_paths"] == 2
def test_wait_all_missing_path(simple_graph, context_one_skipped):
"""wait_all: Ein Pfad fehlt → ready=False, FAILED"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.WAIT_ALL
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
assert result.ready is False
assert result.error is not None
assert "wait_all strategy failed" in result.error
assert "path_b" in result.error
assert result.metadata["executed_paths"] == 1
assert result.metadata["skipped_paths"] == 1
def test_wait_any_one_path(simple_graph, context_one_skipped):
"""wait_any: Mindestens ein Pfad → ready=True, EXECUTED"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.WAIT_ANY
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
assert result.ready is True
assert result.error is None
assert len(result.consolidated_analysis_core) == 1
assert "path_a" in result.consolidated_analysis_core
assert result.metadata["executed_paths"] == 1
def test_wait_any_no_paths(simple_graph, context_no_paths):
"""wait_any: Keine Pfade → ready=False, FAILED"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.WAIT_ANY
result = evaluate_join_node(join_node, simple_graph, context_no_paths)
assert result.ready is False
assert result.error is not None
assert "wait_any strategy failed" in result.error
assert result.metadata["executed_paths"] == 0
def test_best_effort_partial(simple_graph, context_one_skipped):
"""best_effort: Einige Pfade fehlen → ready=True, EXECUTED"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.BEST_EFFORT
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
assert result.ready is True
assert result.error is None
assert len(result.consolidated_analysis_core) == 1
assert result.metadata["executed_paths"] == 1
assert result.metadata["skipped_paths"] == 1
def test_best_effort_no_paths(simple_graph, context_no_paths):
"""best_effort: Keine Pfade → ready=True, EXECUTED (leere Konsolidierung)"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.BEST_EFFORT
result = evaluate_join_node(join_node, simple_graph, context_no_paths)
assert result.ready is True
assert result.error is None
assert len(result.consolidated_analysis_core) == 0 # Leer
assert result.metadata["executed_paths"] == 0
assert "note" in result.metadata # Hinweis auf leere Konsolidierung
# ── Skip Handling Tests ───────────────────────────────────────────────────────
def test_ignore_skipped(simple_graph, context_one_skipped):
"""IGNORE_SKIPPED: Übersprungene Pfade nicht in Ergebnis"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.BEST_EFFORT
join_node.skip_handling = SkipHandling.IGNORE_SKIPPED
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
assert result.ready is True
assert len(result.consolidated_analysis_core) == 1
assert "path_a" in result.consolidated_analysis_core
assert "path_b" not in result.consolidated_analysis_core
def test_use_placeholder(simple_graph, context_one_skipped):
"""USE_PLACEHOLDER: Platzhalter für übersprungene Pfade"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.BEST_EFFORT
join_node.skip_handling = SkipHandling.USE_PLACEHOLDER
result = evaluate_join_node(join_node, simple_graph, context_one_skipped)
assert result.ready is True
assert len(result.consolidated_analysis_core) == 2
assert "path_a" in result.consolidated_analysis_core
assert "path_b" in result.consolidated_analysis_core
assert "[Path skipped:" in result.consolidated_analysis_core["path_b"]
def test_failed_path_placeholder(simple_graph, context_one_failed):
"""FAILED Pfade bekommen Platzhalter (unabhängig von skip_handling)"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
join_node.join_strategy = JoinStrategy.BEST_EFFORT
result = evaluate_join_node(join_node, simple_graph, context_one_failed)
assert result.ready is True
assert len(result.consolidated_analysis_core) == 2
assert "path_a" in result.consolidated_analysis_core
assert "[Path failed:" in result.consolidated_analysis_core["path_b"]
# ── Result Consolidation Tests ────────────────────────────────────────────────
def test_merge_analysis_cores(simple_graph, context_all_executed):
"""Analyse-Kerne korrekt merged"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
result = evaluate_join_node(join_node, simple_graph, context_all_executed)
assert result.ready is True
assert len(result.consolidated_analysis_core) == 2
assert result.consolidated_analysis_core["path_a"] == "Analysis from path A"
assert result.consolidated_analysis_core["path_b"] == "Analysis from path B"
def test_combine_signals(simple_graph, context_all_executed):
"""Signale aller Pfade kombiniert (mit node_id Präfix)"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
result = evaluate_join_node(join_node, simple_graph, context_all_executed)
assert result.ready is True
assert len(result.consolidated_signals) == 2
# Signale sind mit node_id geprefixed
assert "path_a.relevanz" in result.consolidated_signals
assert "path_b.prioritaet" in result.consolidated_signals
# Signal-Werte korrekt übernommen
assert result.consolidated_signals["path_a.relevanz"].normalized_value == "hoch"
assert result.consolidated_signals["path_b.prioritaet"].normalized_value == "mittel"
def test_signal_name_collision():
"""Gleiche Signal-Namen in verschiedenen Pfaden (Präfix verhindert Kollision)"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="path_a", type="analysis"),
WorkflowNode(id="path_b", type="analysis"),
WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.WAIT_ALL)
],
edges=[
WorkflowEdge(id="e1", from_node="path_a", to_node="join"),
WorkflowEdge(id="e2", from_node="path_b", to_node="join")
]
)
context = {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.EXECUTED,
analysis_core="A",
normalized_signals=[
NormalizedSignal(
question_type="relevanz",
raw_value="hoch",
normalized_value="hoch",
status=SignalStatus.VALID
)
]
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.EXECUTED,
analysis_core="B",
normalized_signals=[
NormalizedSignal( # Gleicher Name!
question_type="relevanz",
raw_value="mittel",
normalized_value="mittel",
status=SignalStatus.VALID
)
]
)
}
}
join_node = graph.nodes[2]
result = evaluate_join_node(join_node, graph, context)
# Beide Signale vorhanden (durch Präfix unterscheidbar)
assert "path_a.relevanz" in result.consolidated_signals
assert "path_b.relevanz" in result.consolidated_signals
assert result.consolidated_signals["path_a.relevanz"].normalized_value == "hoch"
assert result.consolidated_signals["path_b.relevanz"].normalized_value == "mittel"
# ── Partial Execution Tests ───────────────────────────────────────────────────
def test_mixed_status_paths():
"""Kombination EXECUTED/SKIPPED/FAILED"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="path_a", type="analysis"),
WorkflowNode(id="path_b", type="analysis"),
WorkflowNode(id="path_c", type="analysis"),
WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.BEST_EFFORT)
],
edges=[
WorkflowEdge(id="e1", from_node="path_a", to_node="join"),
WorkflowEdge(id="e2", from_node="path_b", to_node="join"),
WorkflowEdge(id="e3", from_node="path_c", to_node="join")
]
)
context = {
"node_results": {
"path_a": NodeExecutionState(
node_id="path_a",
status=NodeStatus.EXECUTED,
analysis_core="Analysis A"
),
"path_b": NodeExecutionState(
node_id="path_b",
status=NodeStatus.SKIPPED,
analysis_core=None
),
"path_c": NodeExecutionState(
node_id="path_c",
status=NodeStatus.FAILED,
error="Error",
analysis_core=None
)
}
}
join_node = graph.nodes[3]
result = evaluate_join_node(join_node, graph, context)
assert result.ready is True
assert result.metadata["executed_paths"] == 1
assert result.metadata["skipped_paths"] == 1
assert result.metadata["failed_paths"] == 1
# Analysis core nur von path_a (executed)
assert "path_a" in result.consolidated_analysis_core
assert "path_c" in result.consolidated_analysis_core # Failed → Placeholder
assert "[Path failed:" in result.consolidated_analysis_core["path_c"]
# ── Helper Function Tests ─────────────────────────────────────────────────────
def test_collect_incoming_paths(simple_graph, context_all_executed):
"""_collect_incoming_paths sammelt alle Pfade"""
join_node = next(n for n in simple_graph.nodes if n.id == "join")
paths = _collect_incoming_paths(join_node, simple_graph, context_all_executed)
assert len(paths) == 2
assert paths[0].node_id in ["path_a", "path_b"]
assert paths[1].node_id in ["path_a", "path_b"]
assert all(p.status == NodeStatus.EXECUTED for p in paths)
def test_check_join_strategy_wait_all():
"""_check_join_strategy für wait_all"""
paths = [
PathStatus("path_a", NodeStatus.EXECUTED, "A", {}),
PathStatus("path_b", NodeStatus.EXECUTED, "B", {})
]
ready, error = _check_join_strategy(paths, JoinStrategy.WAIT_ALL)
assert ready is True
assert error is None
def test_check_join_strategy_wait_all_failed():
"""_check_join_strategy für wait_all mit fehlenden Pfaden"""
paths = [
PathStatus("path_a", NodeStatus.EXECUTED, "A", {}),
PathStatus("path_b", NodeStatus.SKIPPED, None, {})
]
ready, error = _check_join_strategy(paths, JoinStrategy.WAIT_ALL)
assert ready is False
assert error is not None
assert "wait_all strategy failed" in error
def test_merge_analysis_cores_helper():
"""_merge_analysis_cores merged Kerne korrekt"""
paths = [
PathStatus("path_a", NodeStatus.EXECUTED, "Analysis A", {}),
PathStatus("path_b", NodeStatus.EXECUTED, "Analysis B", {}),
PathStatus("path_c", NodeStatus.SKIPPED, None, {})
]
merged = _merge_analysis_cores(paths, SkipHandling.IGNORE_SKIPPED)
assert len(merged) == 2
assert merged["path_a"] == "Analysis A"
assert merged["path_b"] == "Analysis B"
assert "path_c" not in merged
def test_combine_signals_helper():
"""_combine_signals kombiniert Signale mit Präfix"""
signal_a = NormalizedSignal(
question_type="relevanz",
raw_value="hoch",
normalized_value="hoch",
status=SignalStatus.VALID
)
signal_b = NormalizedSignal(
question_type="prioritaet",
raw_value="mittel",
normalized_value="mittel",
status=SignalStatus.VALID
)
paths = [
PathStatus("path_a", NodeStatus.EXECUTED, "A", {"relevanz": signal_a}),
PathStatus("path_b", NodeStatus.EXECUTED, "B", {"prioritaet": signal_b})
]
combined = _combine_signals(paths)
assert len(combined) == 2
assert "path_a.relevanz" in combined
assert "path_b.prioritaet" in combined