""" Phase 4 Tests: Join Nodes and Path Consolidation Tests für join_evaluator.py und execute_join_node() Funktionalität. Test-Kategorien: - Join Strategy Tests (wait_all, wait_any, best_effort) - Skip Handling Tests (ignore_skipped, use_placeholder) - Result Consolidation Tests (merge analysis_cores, combine signals) - Partial Execution Tests (failed paths, skipped paths, mixed status) """ import pytest from typing import Dict, Any from workflow_models import ( WorkflowNode, WorkflowGraph, WorkflowEdge, JoinStrategy, SkipHandling, NodeStatus, NormalizedSignal, SignalStatus, NodeExecutionState ) from join_evaluator import ( evaluate_join_node, _collect_incoming_paths, _check_join_strategy, _merge_analysis_cores, _combine_signals, PathStatus ) # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def simple_graph(): """ Einfacher Graph mit 2 Pfaden + Join: start → path_a → join → end → path_b → """ nodes = [ WorkflowNode(id="start", type="start"), WorkflowNode(id="path_a", type="analysis", prompt_slug="prompt_a"), WorkflowNode(id="path_b", type="analysis", prompt_slug="prompt_b"), WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.WAIT_ALL), WorkflowNode(id="end", type="end") ] edges = [ WorkflowEdge(id="e1", from_node="start", to_node="path_a"), WorkflowEdge(id="e2", from_node="start", to_node="path_b"), WorkflowEdge(id="e3", from_node="path_a", to_node="join"), WorkflowEdge(id="e4", from_node="path_b", to_node="join"), WorkflowEdge(id="e5", from_node="join", to_node="end") ] return WorkflowGraph(nodes=nodes, edges=edges) @pytest.fixture def context_all_executed(): """Context mit beiden Pfaden EXECUTED""" return { "node_results": { "path_a": NodeExecutionState( node_id="path_a", status=NodeStatus.EXECUTED, analysis_core="Analysis from path A", normalized_signals=[ NormalizedSignal( question_type="relevanz", raw_value="hoch", normalized_value="hoch", status=SignalStatus.VALID ) ] ), "path_b": NodeExecutionState( node_id="path_b", status=NodeStatus.EXECUTED, analysis_core="Analysis from path B", normalized_signals=[ NormalizedSignal( question_type="prioritaet", raw_value="mittel", normalized_value="mittel", status=SignalStatus.VALID ) ] ) } } @pytest.fixture def context_one_skipped(): """Context mit einem Pfad SKIPPED""" return { "node_results": { "path_a": NodeExecutionState( node_id="path_a", status=NodeStatus.EXECUTED, analysis_core="Analysis from path A" ), "path_b": NodeExecutionState( node_id="path_b", status=NodeStatus.SKIPPED, analysis_core=None ) } } @pytest.fixture def context_one_failed(): """Context mit einem Pfad FAILED""" return { "node_results": { "path_a": NodeExecutionState( node_id="path_a", status=NodeStatus.EXECUTED, analysis_core="Analysis from path A" ), "path_b": NodeExecutionState( node_id="path_b", status=NodeStatus.FAILED, analysis_core=None, error="LLM call failed" ) } } @pytest.fixture def context_no_paths(): """Context ohne ausgeführte Pfade""" return { "node_results": { "path_a": NodeExecutionState( node_id="path_a", status=NodeStatus.SKIPPED, analysis_core=None ), "path_b": NodeExecutionState( node_id="path_b", status=NodeStatus.SKIPPED, analysis_core=None ) } } # ── Join Strategy Tests ─────────────────────────────────────────────────────── def test_wait_all_success(simple_graph, context_all_executed): """wait_all: Alle Pfade verfügbar → ready=True, EXECUTED""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.WAIT_ALL result = evaluate_join_node(join_node, simple_graph, context_all_executed) assert result.ready is True assert result.error is None assert len(result.consolidated_analysis_core) == 2 assert "path_a" in result.consolidated_analysis_core assert "path_b" in result.consolidated_analysis_core assert result.metadata["executed_paths"] == 2 def test_wait_all_missing_path(simple_graph, context_one_skipped): """wait_all: Ein Pfad fehlt → ready=False, FAILED""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.WAIT_ALL result = evaluate_join_node(join_node, simple_graph, context_one_skipped) assert result.ready is False assert result.error is not None assert "wait_all strategy failed" in result.error assert "path_b" in result.error assert result.metadata["executed_paths"] == 1 assert result.metadata["skipped_paths"] == 1 def test_wait_any_one_path(simple_graph, context_one_skipped): """wait_any: Mindestens ein Pfad → ready=True, EXECUTED""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.WAIT_ANY result = evaluate_join_node(join_node, simple_graph, context_one_skipped) assert result.ready is True assert result.error is None assert len(result.consolidated_analysis_core) == 1 assert "path_a" in result.consolidated_analysis_core assert result.metadata["executed_paths"] == 1 def test_wait_any_no_paths(simple_graph, context_no_paths): """wait_any: Keine Pfade → ready=False, FAILED""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.WAIT_ANY result = evaluate_join_node(join_node, simple_graph, context_no_paths) assert result.ready is False assert result.error is not None assert "wait_any strategy failed" in result.error assert result.metadata["executed_paths"] == 0 def test_best_effort_partial(simple_graph, context_one_skipped): """best_effort: Einige Pfade fehlen → ready=True, EXECUTED""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.BEST_EFFORT result = evaluate_join_node(join_node, simple_graph, context_one_skipped) assert result.ready is True assert result.error is None assert len(result.consolidated_analysis_core) == 1 assert result.metadata["executed_paths"] == 1 assert result.metadata["skipped_paths"] == 1 def test_best_effort_no_paths(simple_graph, context_no_paths): """best_effort: Keine Pfade → ready=True, EXECUTED (leere Konsolidierung)""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.BEST_EFFORT result = evaluate_join_node(join_node, simple_graph, context_no_paths) assert result.ready is True assert result.error is None assert len(result.consolidated_analysis_core) == 0 # Leer assert result.metadata["executed_paths"] == 0 assert "note" in result.metadata # Hinweis auf leere Konsolidierung # ── Skip Handling Tests ─────────────────────────────────────────────────────── def test_ignore_skipped(simple_graph, context_one_skipped): """IGNORE_SKIPPED: Übersprungene Pfade nicht in Ergebnis""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.BEST_EFFORT join_node.skip_handling = SkipHandling.IGNORE_SKIPPED result = evaluate_join_node(join_node, simple_graph, context_one_skipped) assert result.ready is True assert len(result.consolidated_analysis_core) == 1 assert "path_a" in result.consolidated_analysis_core assert "path_b" not in result.consolidated_analysis_core def test_use_placeholder(simple_graph, context_one_skipped): """USE_PLACEHOLDER: Platzhalter für übersprungene Pfade""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.BEST_EFFORT join_node.skip_handling = SkipHandling.USE_PLACEHOLDER result = evaluate_join_node(join_node, simple_graph, context_one_skipped) assert result.ready is True assert len(result.consolidated_analysis_core) == 2 assert "path_a" in result.consolidated_analysis_core assert "path_b" in result.consolidated_analysis_core assert "[Path skipped:" in result.consolidated_analysis_core["path_b"] def test_failed_path_placeholder(simple_graph, context_one_failed): """FAILED Pfade bekommen Platzhalter (unabhängig von skip_handling)""" join_node = next(n for n in simple_graph.nodes if n.id == "join") join_node.join_strategy = JoinStrategy.BEST_EFFORT result = evaluate_join_node(join_node, simple_graph, context_one_failed) assert result.ready is True assert len(result.consolidated_analysis_core) == 2 assert "path_a" in result.consolidated_analysis_core assert "[Path failed:" in result.consolidated_analysis_core["path_b"] # ── Result Consolidation Tests ──────────────────────────────────────────────── def test_merge_analysis_cores(simple_graph, context_all_executed): """Analyse-Kerne korrekt merged""" join_node = next(n for n in simple_graph.nodes if n.id == "join") result = evaluate_join_node(join_node, simple_graph, context_all_executed) assert result.ready is True assert len(result.consolidated_analysis_core) == 2 assert result.consolidated_analysis_core["path_a"] == "Analysis from path A" assert result.consolidated_analysis_core["path_b"] == "Analysis from path B" def test_combine_signals(simple_graph, context_all_executed): """Signale aller Pfade kombiniert (mit node_id Präfix)""" join_node = next(n for n in simple_graph.nodes if n.id == "join") result = evaluate_join_node(join_node, simple_graph, context_all_executed) assert result.ready is True assert len(result.consolidated_signals) == 2 # Signale sind mit node_id geprefixed assert "path_a.relevanz" in result.consolidated_signals assert "path_b.prioritaet" in result.consolidated_signals # Signal-Werte korrekt übernommen assert result.consolidated_signals["path_a.relevanz"].normalized_value == "hoch" assert result.consolidated_signals["path_b.prioritaet"].normalized_value == "mittel" def test_signal_name_collision(): """Gleiche Signal-Namen in verschiedenen Pfaden (Präfix verhindert Kollision)""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="path_a", type="analysis"), WorkflowNode(id="path_b", type="analysis"), WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.WAIT_ALL) ], edges=[ WorkflowEdge(id="e1", from_node="path_a", to_node="join"), WorkflowEdge(id="e2", from_node="path_b", to_node="join") ] ) context = { "node_results": { "path_a": NodeExecutionState( node_id="path_a", status=NodeStatus.EXECUTED, analysis_core="A", normalized_signals=[ NormalizedSignal( question_type="relevanz", raw_value="hoch", normalized_value="hoch", status=SignalStatus.VALID ) ] ), "path_b": NodeExecutionState( node_id="path_b", status=NodeStatus.EXECUTED, analysis_core="B", normalized_signals=[ NormalizedSignal( # Gleicher Name! question_type="relevanz", raw_value="mittel", normalized_value="mittel", status=SignalStatus.VALID ) ] ) } } join_node = graph.nodes[2] result = evaluate_join_node(join_node, graph, context) # Beide Signale vorhanden (durch Präfix unterscheidbar) assert "path_a.relevanz" in result.consolidated_signals assert "path_b.relevanz" in result.consolidated_signals assert result.consolidated_signals["path_a.relevanz"].normalized_value == "hoch" assert result.consolidated_signals["path_b.relevanz"].normalized_value == "mittel" # ── Partial Execution Tests ─────────────────────────────────────────────────── def test_mixed_status_paths(): """Kombination EXECUTED/SKIPPED/FAILED""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="path_a", type="analysis"), WorkflowNode(id="path_b", type="analysis"), WorkflowNode(id="path_c", type="analysis"), WorkflowNode(id="join", type="join", join_strategy=JoinStrategy.BEST_EFFORT) ], edges=[ WorkflowEdge(id="e1", from_node="path_a", to_node="join"), WorkflowEdge(id="e2", from_node="path_b", to_node="join"), WorkflowEdge(id="e3", from_node="path_c", to_node="join") ] ) context = { "node_results": { "path_a": NodeExecutionState( node_id="path_a", status=NodeStatus.EXECUTED, analysis_core="Analysis A" ), "path_b": NodeExecutionState( node_id="path_b", status=NodeStatus.SKIPPED, analysis_core=None ), "path_c": NodeExecutionState( node_id="path_c", status=NodeStatus.FAILED, error="Error", analysis_core=None ) } } join_node = graph.nodes[3] result = evaluate_join_node(join_node, graph, context) assert result.ready is True assert result.metadata["executed_paths"] == 1 assert result.metadata["skipped_paths"] == 1 assert result.metadata["failed_paths"] == 1 # Analysis core nur von path_a (executed) assert "path_a" in result.consolidated_analysis_core assert "path_c" in result.consolidated_analysis_core # Failed → Placeholder assert "[Path failed:" in result.consolidated_analysis_core["path_c"] # ── Helper Function Tests ───────────────────────────────────────────────────── def test_collect_incoming_paths(simple_graph, context_all_executed): """_collect_incoming_paths sammelt alle Pfade""" join_node = next(n for n in simple_graph.nodes if n.id == "join") paths = _collect_incoming_paths(join_node, simple_graph, context_all_executed) assert len(paths) == 2 assert paths[0].node_id in ["path_a", "path_b"] assert paths[1].node_id in ["path_a", "path_b"] assert all(p.status == NodeStatus.EXECUTED for p in paths) def test_check_join_strategy_wait_all(): """_check_join_strategy für wait_all""" paths = [ PathStatus("path_a", NodeStatus.EXECUTED, "A", {}), PathStatus("path_b", NodeStatus.EXECUTED, "B", {}) ] ready, error = _check_join_strategy(paths, JoinStrategy.WAIT_ALL) assert ready is True assert error is None def test_check_join_strategy_wait_all_failed(): """_check_join_strategy für wait_all mit fehlenden Pfaden""" paths = [ PathStatus("path_a", NodeStatus.EXECUTED, "A", {}), PathStatus("path_b", NodeStatus.SKIPPED, None, {}) ] ready, error = _check_join_strategy(paths, JoinStrategy.WAIT_ALL) assert ready is False assert error is not None assert "wait_all strategy failed" in error def test_merge_analysis_cores_helper(): """_merge_analysis_cores merged Kerne korrekt""" paths = [ PathStatus("path_a", NodeStatus.EXECUTED, "Analysis A", {}), PathStatus("path_b", NodeStatus.EXECUTED, "Analysis B", {}), PathStatus("path_c", NodeStatus.SKIPPED, None, {}) ] merged = _merge_analysis_cores(paths, SkipHandling.IGNORE_SKIPPED) assert len(merged) == 2 assert merged["path_a"] == "Analysis A" assert merged["path_b"] == "Analysis B" assert "path_c" not in merged def test_combine_signals_helper(): """_combine_signals kombiniert Signale mit Präfix""" signal_a = NormalizedSignal( question_type="relevanz", raw_value="hoch", normalized_value="hoch", status=SignalStatus.VALID ) signal_b = NormalizedSignal( question_type="prioritaet", raw_value="mittel", normalized_value="mittel", status=SignalStatus.VALID ) paths = [ PathStatus("path_a", NodeStatus.EXECUTED, "A", {"relevanz": signal_a}), PathStatus("path_b", NodeStatus.EXECUTED, "B", {"prioritaet": signal_b}) ] combined = _combine_signals(paths) assert len(combined) == 2 assert "path_a.relevanz" in combined assert "path_b.prioritaet" in combined