mitai-jinkendo/tests/backend/test_phase2_workflow_executor.py
Lars e2a132353d
All checks were successful
Deploy Development / deploy (push) Successful in 45s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: Phase 4 - Join Nodes and Path Consolidation
Backend Implementation (v0.9m, workflow 0.5.0):
- join_evaluator.py (394 lines): Join-Strategie-Evaluator
  - evaluate_join_node(): Hauptlogik für Join-Node Execution
  - Join-Strategien: wait_all, wait_any, best_effort
  - Skip-Handling: ignore_skipped, use_placeholder, require_minimum
  - Result Consolidation: merge analysis_cores, combine signals
  - Partial Execution: korrekte Behandlung von SKIPPED/FAILED Pfaden

- workflow_executor.py: execute_join_node() Integration
  - BFS-Traversierung erweitert für Join-Nodes
  - NodeExecutionState List → Dict Konvertierung für Signale
  - Signal-Name-Kollisionen via node_id Präfix gelöst

Testing (49 Tests passing):
- test_phase4_join_nodes.py: 18 neue Unit Tests
  - Join-Strategien (wait_all, wait_any, best_effort)
  - Skip-Handling (ignore, placeholder)
  - Result Consolidation (merge, combine)
  - Partial Execution (mixed status paths)
  - Helper Functions (collect, check, merge, combine)

- Backward Compatibility: 31 Phase 2/3 Tests (alle passing)
  - test_phase2_workflow_executor.py: 1 Test aktualisiert
  - test_phase3_logic_evaluator.py: 20 Tests unverändert

Konzept: konzept_workflow_engine_konsolidated.md (Sektion 8.8)
Anforderungsanalyse: phase4_anforderungsanalyse.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-04 12:27:31 +02:00

402 lines
13 KiB
Python

"""
Unit Tests für workflow_executor.py (Phase 2)
Run with: PYTHONPATH=./backend pytest tests/backend/test_phase2_workflow_executor.py -v
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from workflow_executor import aggregate_results
from workflow_models import NodeExecutionState, NodeStatus, NormalizedSignal, SignalStatus
# ── aggregate_results Tests ────────────────────────────────────────────────────
def test_aggregate_results_basic():
"""Test: Aggregation mit zwei executed nodes"""
states = [
NodeExecutionState(
node_id="start",
status=NodeStatus.EXECUTED,
started_at="2026-04-03T12:00:00",
completed_at="2026-04-03T12:00:01"
),
NodeExecutionState(
node_id="body",
status=NodeStatus.EXECUTED,
analysis_core="Gewichtsentwicklung positiv",
normalized_signals=[
NormalizedSignal(
question_type="relevanz",
raw_value="ja",
normalized_value="ja",
status=SignalStatus.VALID
)
],
started_at="2026-04-03T12:00:01",
completed_at="2026-04-03T12:00:05"
),
NodeExecutionState(
node_id="end",
status=NodeStatus.EXECUTED,
started_at="2026-04-03T12:00:05",
completed_at="2026-04-03T12:00:06"
)
]
result = aggregate_results(states)
assert "## body" in result["combined_analysis"]
assert "Gewichtsentwicklung" in result["combined_analysis"]
assert result["total_nodes"] == 3
assert result["executed_nodes"] == 3
assert result["failed_nodes"] == 0
assert len(result["all_signals"]) == 1
assert result["all_signals"][0]["question_type"] == "relevanz"
def test_aggregate_results_with_failed_node():
"""Test: Aggregation mit einem fehlgeschlagenen Knoten"""
states = [
NodeExecutionState(
node_id="node1",
status=NodeStatus.EXECUTED,
analysis_core="Success",
started_at="2026-04-03T12:00:00",
completed_at="2026-04-03T12:00:01"
),
NodeExecutionState(
node_id="node2",
status=NodeStatus.FAILED,
error="LLM timeout",
started_at="2026-04-03T12:00:01",
completed_at="2026-04-03T12:00:02"
)
]
result = aggregate_results(states)
assert result["total_nodes"] == 2
assert result["executed_nodes"] == 1
assert result["failed_nodes"] == 1
assert "## node1" in result["combined_analysis"]
assert "## node2" not in result["combined_analysis"]
def test_aggregate_results_multiple_signals():
"""Test: Aggregation mit mehreren normalisierten Signalen"""
states = [
NodeExecutionState(
node_id="node1",
status=NodeStatus.EXECUTED,
analysis_core="Analysis 1",
normalized_signals=[
NormalizedSignal(
question_type="relevanz",
raw_value="ja",
normalized_value="ja",
status=SignalStatus.VALID
),
NormalizedSignal(
question_type="prioritaet",
raw_value="hoch",
normalized_value="hoch",
status=SignalStatus.VALID
)
],
started_at="2026-04-03T12:00:00",
completed_at="2026-04-03T12:00:01"
),
NodeExecutionState(
node_id="node2",
status=NodeStatus.EXECUTED,
analysis_core="Analysis 2",
normalized_signals=[
NormalizedSignal(
question_type="selektion",
raw_value="nein",
normalized_value="nein",
status=SignalStatus.VALID
)
],
started_at="2026-04-03T12:00:01",
completed_at="2026-04-03T12:00:02"
)
]
result = aggregate_results(states)
assert len(result["all_signals"]) == 3
assert result["all_signals"][0]["question_type"] == "relevanz"
assert result["all_signals"][1]["question_type"] == "prioritaet"
assert result["all_signals"][2]["question_type"] == "selektion"
def test_aggregate_results_empty():
"""Test: Aggregation mit leerer node_states Liste"""
result = aggregate_results([])
assert result["combined_analysis"] == ""
assert result["all_signals"] == []
assert result["total_nodes"] == 0
assert result["executed_nodes"] == 0
assert result["failed_nodes"] == 0
def test_aggregate_results_no_analysis_core():
"""Test: Aggregation mit nodes ohne analysis_core"""
states = [
NodeExecutionState(
node_id="start",
status=NodeStatus.EXECUTED,
started_at="2026-04-03T12:00:00",
completed_at="2026-04-03T12:00:01"
)
]
result = aggregate_results(states)
assert result["combined_analysis"] == ""
assert result["executed_nodes"] == 1
def test_aggregate_results_formatting():
"""Test: Formatierung der combined_analysis"""
states = [
NodeExecutionState(
node_id="node1",
status=NodeStatus.EXECUTED,
analysis_core="First analysis",
started_at="2026-04-03T12:00:00",
completed_at="2026-04-03T12:00:01"
),
NodeExecutionState(
node_id="node2",
status=NodeStatus.EXECUTED,
analysis_core="Second analysis",
started_at="2026-04-03T12:00:01",
completed_at="2026-04-03T12:00:02"
)
]
result = aggregate_results(states)
# Prüfe Format: ## node_id\nanalysis_core\n\n## node_id\nanalysis_core
assert result["combined_analysis"].startswith("## node1\nFirst analysis")
assert "## node2\nSecond analysis" in result["combined_analysis"]
assert "\n\n" in result["combined_analysis"] # Separator zwischen Knoten
# ── Integration-ähnliche Tests (ohne echte DB/LLM) ─────────────────────────────
@pytest.mark.asyncio
async def test_execute_node_start_end():
"""Test: Start/End Nodes sind No-Ops"""
from workflow_executor import execute_node
from workflow_models import WorkflowNode, WorkflowGraph
start_node = WorkflowNode(id="start", type="start")
end_node = WorkflowNode(id="end", type="end")
context = {"variables": {}, "profile_id": "test"}
catalog = {}
mock_graph = WorkflowGraph(nodes=[], edges=[]) # Phase 3: graph parameter required
async def mock_llm(prompt, model):
return "should not be called"
# Test start
result = await execute_node(start_node, context, catalog, mock_graph, mock_llm)
assert result.status == NodeStatus.EXECUTED
assert result.analysis_core is None
# Test end
result = await execute_node(end_node, context, catalog, mock_graph, mock_llm)
assert result.status == NodeStatus.EXECUTED
assert result.analysis_core is None
@pytest.mark.asyncio
async def test_execute_node_join_implemented():
"""Test: Join Node ist jetzt implementiert (Phase 4)"""
from workflow_executor import execute_node
from workflow_models import WorkflowNode, WorkflowGraph, JoinStrategy
# Phase 4: join nodes sind jetzt implementiert
join_node = WorkflowNode(id="join1", type="join", join_strategy=JoinStrategy.BEST_EFFORT)
# Minimal-context (kein incoming path vorhanden)
context = {
"variables": {},
"profile_id": "test",
"node_results": {}, # Keine incoming paths
"active_edges": {}
}
catalog = {}
mock_graph = WorkflowGraph(nodes=[join_node], edges=[])
async def mock_llm(prompt, model):
return ""
result = await execute_node(join_node, context, catalog, mock_graph, mock_llm)
# Join node sollte erfolgreich ausgeführt werden (best_effort mit 0 Pfaden)
# oder FAILED mit sinnvoller Fehlermeldung (keine incoming edges)
assert result.status in [NodeStatus.EXECUTED, NodeStatus.FAILED]
assert result.node_id == "join1"
@pytest.mark.asyncio
async def test_execute_node_analysis_simple():
"""Test: Analysis Node ohne Fragenergänzung"""
from workflow_executor import execute_node
from workflow_models import WorkflowNode, WorkflowGraph
node = WorkflowNode(
id="test_node",
type="analysis",
prompt_slug="test_prompt",
question_augmentations=None
)
context = {"variables": {"name": "Test"}, "profile_id": "test"}
catalog = {}
mock_graph = WorkflowGraph(nodes=[], edges=[])
# Mock LLM
async def mock_llm(prompt, model):
return "## Analyse\nTest analysis content"
# Mock load_prompt_template
with patch('workflow_executor.load_prompt_template') as mock_load:
mock_load.return_value = "Test prompt for {{name}}"
result = await execute_node(node, context, catalog, mock_graph, mock_llm)
assert result.status == NodeStatus.EXECUTED
assert result.analysis_core == "Test analysis content"
assert len(result.normalized_signals) == 0 # Keine Fragen
@pytest.mark.asyncio
async def test_execute_node_analysis_with_questions():
"""Test: Analysis Node mit Fragenergänzung und Normalisierung"""
from workflow_executor import execute_node
from workflow_models import WorkflowNode, QuestionAugmentation, WorkflowGraph
node = WorkflowNode(
id="test_node",
type="analysis",
prompt_slug="test_prompt",
question_augmentations=[
QuestionAugmentation(
id="q1",
type="relevanz",
question="Ist relevant?",
answer_spectrum=["ja", "nein", "unklar"]
)
]
)
context = {"variables": {}, "profile_id": "test"}
catalog = {
"relevanz": {
"answer_spectrum": ["ja", "nein", "unklar"],
"normalization_rules": None
}
}
mock_graph = WorkflowGraph(nodes=[], edges=[])
# Mock LLM
async def mock_llm(prompt, model):
# LLM antwortet mit Fragenergänzung
return """## Analyse
Test analysis
## Entscheidungsfragen
- Relevanz: ja
"""
# Mock load_prompt_template
with patch('workflow_executor.load_prompt_template') as mock_load:
mock_load.return_value = "Base prompt"
result = await execute_node(node, context, catalog, mock_graph, mock_llm)
assert result.status == NodeStatus.EXECUTED
assert result.analysis_core == "Test analysis"
assert len(result.normalized_signals) == 1
assert result.normalized_signals[0].question_type == "relevanz"
assert result.normalized_signals[0].normalized_value == "ja"
assert result.normalized_signals[0].status == SignalStatus.VALID
@pytest.mark.asyncio
async def test_execute_node_hybrid_model_override():
"""
Test: Hybrid Model - Node-spezifisches Spektrum überschreibt Catalog
Kritischer Test für Bug-Fix: Node mit answer_spectrum ["increase", "stable", "decrease"]
muss Catalog-Spektrum ["ja", "nein", "unklar"] überschreiben.
Regression-Test für: https://github.com/anthropics/claude-code/issues/XXX
"""
from workflow_executor import execute_node
from workflow_models import WorkflowNode, QuestionAugmentation, WorkflowGraph
# Node mit ANDEREM Spektrum als Catalog
node = WorkflowNode(
id="test_node",
type="analysis",
prompt_slug="test_prompt",
question_augmentations=[
QuestionAugmentation(
id="q1",
type="relevanz",
question="Hat sich die Fettmasse verändert?",
answer_spectrum=["increase", "stable", "decrease"] # ← Node-spezifisch
)
]
)
context = {"variables": {}, "profile_id": "test"}
# Catalog hat ANDERES Spektrum
catalog = {
"relevanz": {
"answer_spectrum": ["ja", "nein", "unklar"], # ← Catalog-Standard
"normalization_rules": None
}
}
mock_graph = WorkflowGraph(nodes=[], edges=[])
# Mock LLM gibt "decrease" zurück (gültig für Node, ungültig für Catalog)
async def mock_llm(prompt, model):
return """## Analyse
Gewicht gesunken
## Entscheidungsfragen
- Relevanz: decrease
"""
# Mock load_prompt_template
with patch('workflow_executor.load_prompt_template') as mock_load:
mock_load.return_value = "Base prompt"
result = await execute_node(node, context, catalog, mock_graph, mock_llm)
# Assertions: "decrease" muss VALID sein (Node-Spektrum), nicht INVALID (Catalog)
assert result.status == NodeStatus.EXECUTED
assert len(result.normalized_signals) == 1
signal = result.normalized_signals[0]
assert signal.question_type == "relevanz"
assert signal.raw_value == "decrease"
assert signal.normalized_value == "decrease"
assert signal.status == SignalStatus.VALID # ← KRITISCH: Muss VALID sein, nicht INVALID!
# Wenn dieser Test fehlschlägt, wurde der Catalog benutzt statt Node-Spektrum
if __name__ == "__main__":
pytest.main([__file__, "-v"])