""" Integration Tests für Workflow Branching (Phase 3) Testet conditional execution mit Logic Nodes. Run with: PYTHONPATH=./backend pytest tests/backend/test_phase3_workflow_branching.py -v """ import pytest from unittest.mock import AsyncMock, MagicMock, patch from workflow_executor import execute_workflow from workflow_models import ( WorkflowGraph, WorkflowNode, WorkflowEdge, LogicExpression, LogicOperator, Condition, FallbackConfig, FallbackStrategy, QuestionAugmentation, NodeStatus ) # ── Helper Functions ──────────────────────────────────────────────────────── def create_mock_db(): """Creates mock DB connection with cursor""" conn = MagicMock() cur = MagicMock() cur.fetchone = MagicMock() cur.fetchall = MagicMock(return_value=[]) # Mock get_cursor() def mock_get_cursor(c): return cur # Context manager support conn.__enter__ = MagicMock(return_value=conn) conn.__exit__ = MagicMock(return_value=None) return conn, cur, mock_get_cursor @pytest.mark.asyncio async def test_simple_if_else_branching(): """Test: Simple if/else branching - then path taken""" # Workflow: start → analysis → logic → then_path / else_path → end workflow_graph = { "nodes": [ {"id": "start", "type": "start"}, {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", "question_augmentations": [ {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} ]}, {"id": "logic", "type": "logic", "condition": { "expression": { "operator": "eq", "ref": "analysis.relevanz", "value": "ja" } }}, {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, {"id": "end", "type": "end"} ], "edges": [ {"id": "e1", "from": "start", "to": "analysis"}, {"id": "e2", "from": "analysis", "to": "logic"}, {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, {"id": "e5", "from": "then_path", "to": "end"}, {"id": "e6", "from": "else_path", "to": "end"} ] } # Mock DB conn, cur, mock_get_cursor = create_mock_db() cur.fetchone.side_effect = [ {"graph": workflow_graph}, # Workflow definition {"template": "Test prompt"} # Prompt template ] cur.fetchall.return_value = [ {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} ] # Mock LLM - returns "ja" signal async def mock_llm(prompt, model): return """## Analyse Test analysis ## Entscheidungsfragen - Relevanz: ja """ with patch('workflow_executor.get_db', return_value=conn): with patch('workflow_executor.get_cursor', side_effect=mock_get_cursor): with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): result = await execute_workflow( workflow_id="test-workflow", profile_id="test-profile", variables={}, openrouter_call_func=mock_llm ) # Assertions assert result.status == "completed" assert len(result.node_states) == 5 # start, analysis, logic, then_path, end (else_path skipped) # Check which nodes were executed executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] assert "then_path" in executed_nodes assert "else_path" in skipped_nodes # Check aggregation assert result.aggregated_result["executed_nodes"] == 4 # start, analysis, logic, then_path (end is no-op) assert result.aggregated_result["skipped_nodes"] == 1 # else_path @pytest.mark.asyncio async def test_else_path_taken(): """Test: Simple if/else branching - else path taken""" workflow_graph = { "nodes": [ {"id": "start", "type": "start"}, {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", "question_augmentations": [ {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} ]}, {"id": "logic", "type": "logic", "condition": { "expression": { "operator": "eq", "ref": "analysis.relevanz", "value": "ja" } }}, {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, {"id": "end", "type": "end"} ], "edges": [ {"id": "e1", "from": "start", "to": "analysis"}, {"id": "e2", "from": "analysis", "to": "logic"}, {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, {"id": "e5", "from": "then_path", "to": "end"}, {"id": "e6", "from": "else_path", "to": "end"} ] } conn, cur = create_mock_db() cur.fetchone.side_effect = [ {"graph": workflow_graph}, {"template": "Test prompt"} ] cur.fetchall.return_value = [ {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} ] # Mock LLM - returns "nein" signal (condition false) async def mock_llm(prompt, model): return """## Analyse Test analysis ## Entscheidungsfragen - Relevanz: nein """ with patch('workflow_executor.get_db', return_value=conn): with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): result = await execute_workflow( workflow_id="test-workflow", profile_id="test-profile", variables={}, openrouter_call_func=mock_llm ) # Assertions executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] assert "else_path" in executed_nodes assert "then_path" in skipped_nodes @pytest.mark.asyncio async def test_and_condition(): """Test: AND condition - both must be true""" workflow_graph = { "nodes": [ {"id": "start", "type": "start"}, {"id": "analysis1", "type": "analysis", "prompt_slug": "test_prompt", "question_augmentations": [ {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} ]}, {"id": "analysis2", "type": "analysis", "prompt_slug": "test_prompt", "question_augmentations": [ {"id": "q2", "type": "prioritaet", "question": "Priority?", "answer_spectrum": ["hoch", "niedrig"]} ]}, {"id": "logic", "type": "logic", "condition": { "expression": { "operator": "and", "operands": [ {"operator": "eq", "ref": "analysis1.relevanz", "value": "ja"}, {"operator": "eq", "ref": "analysis2.prioritaet", "value": "hoch"} ] } }}, {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, {"id": "end", "type": "end"} ], "edges": [ {"id": "e1", "from": "start", "to": "analysis1"}, {"id": "e2", "from": "analysis1", "to": "analysis2"}, {"id": "e3", "from": "analysis2", "to": "logic"}, {"id": "e4", "from": "logic", "to": "then_path", "label": "then"}, {"id": "e5", "from": "logic", "to": "else_path", "label": "else"}, {"id": "e6", "from": "then_path", "to": "end"}, {"id": "e7", "from": "else_path", "to": "end"} ] } conn, cur = create_mock_db() cur.fetchone.side_effect = [ {"graph": workflow_graph}, {"template": "Test prompt"}, {"template": "Test prompt"} ] cur.fetchall.return_value = [ {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}, {"question_type": "prioritaet", "answer_spectrum": ["hoch", "niedrig"], "normalization_rules": None} ] # Mock LLM - returns ja AND hoch (both true) call_count = 0 async def mock_llm(prompt, model): nonlocal call_count call_count += 1 if call_count == 1: return """## Analyse Analysis 1 ## Entscheidungsfragen - Relevanz: ja """ else: return """## Analyse Analysis 2 ## Entscheidungsfragen - Prioritaet: hoch """ with patch('workflow_executor.get_db', return_value=conn): with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): result = await execute_workflow( workflow_id="test-workflow", profile_id="test-profile", variables={}, openrouter_call_func=mock_llm ) # Assertions: Both true → then path taken executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] assert "then_path" in executed_nodes assert "else_path" in skipped_nodes @pytest.mark.asyncio async def test_fallback_conservative_skip(): """Test: Fallback strategy CONSERVATIVE_SKIP""" workflow_graph = { "nodes": [ {"id": "start", "type": "start"}, {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", "question_augmentations": [ {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} ]}, {"id": "logic", "type": "logic", "condition": { "expression": { "operator": "eq", "ref": "analysis.relevanz", "value": "ja" } }, "fallback": { "strategy": "conservative_skip" }}, {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, {"id": "end", "type": "end"} ], "edges": [ {"id": "e1", "from": "start", "to": "analysis"}, {"id": "e2", "from": "analysis", "to": "logic"}, {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, {"id": "e5", "from": "then_path", "to": "end"}, {"id": "e6", "from": "else_path", "to": "end"} ] } conn, cur = create_mock_db() cur.fetchone.side_effect = [ {"graph": workflow_graph}, {"template": "Test prompt"} ] cur.fetchall.return_value = [ {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} ] # Mock LLM - returns UNCLEAR signal (triggers fallback) async def mock_llm(prompt, model): return """## Analyse Test analysis ## Entscheidungsfragen - Relevanz: unklar """ with patch('workflow_executor.get_db', return_value=conn): with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): result = await execute_workflow( workflow_id="test-workflow", profile_id="test-profile", variables={}, openrouter_call_func=mock_llm ) # Assertions: CONSERVATIVE_SKIP → both paths skipped skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] assert "then_path" in skipped_nodes assert "else_path" in skipped_nodes assert result.aggregated_result["skipped_nodes"] == 2 @pytest.mark.asyncio async def test_fallback_default_path(): """Test: Fallback strategy DEFAULT_PATH""" workflow_graph = { "nodes": [ {"id": "start", "type": "start"}, {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", "question_augmentations": [ {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} ]}, {"id": "logic", "type": "logic", "condition": { "expression": { "operator": "eq", "ref": "analysis.relevanz", "value": "ja" } }, "fallback": { "strategy": "default_path" }}, {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, {"id": "end", "type": "end"} ], "edges": [ {"id": "e1", "from": "start", "to": "analysis"}, {"id": "e2", "from": "analysis", "to": "logic"}, {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, {"id": "e5", "from": "then_path", "to": "end"}, {"id": "e6", "from": "else_path", "to": "end"} ] } conn, cur = create_mock_db() cur.fetchone.side_effect = [ {"graph": workflow_graph}, {"template": "Test prompt"} ] cur.fetchall.return_value = [ {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} ] # Mock LLM - returns INVALID signal (triggers fallback) async def mock_llm(prompt, model): return """## Analyse Test analysis ## Entscheidungsfragen - Relevanz: totally_invalid_value """ with patch('workflow_executor.get_db', return_value=conn): with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): result = await execute_workflow( workflow_id="test-workflow", profile_id="test-profile", variables={}, openrouter_call_func=mock_llm ) # Assertions: DEFAULT_PATH → else path taken executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] assert "else_path" in executed_nodes assert "then_path" in skipped_nodes @pytest.mark.asyncio async def test_linear_workflow_still_works(): """Test: Linear workflow (no logic nodes) still works (Phase 2 compatibility)""" workflow_graph = { "nodes": [ {"id": "start", "type": "start"}, {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt"}, {"id": "end", "type": "end"} ], "edges": [ {"id": "e1", "from": "start", "to": "analysis"}, {"id": "e2", "from": "analysis", "to": "end"} ] } conn, cur = create_mock_db() cur.fetchone.side_effect = [ {"graph": workflow_graph}, {"template": "Test prompt"} ] cur.fetchall.return_value = [] async def mock_llm(prompt, model): return "## Analyse\nTest analysis" with patch('workflow_executor.get_db', return_value=conn): with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): result = await execute_workflow( workflow_id="test-workflow", profile_id="test-profile", variables={}, openrouter_call_func=mock_llm ) # Assertions: All nodes executed assert result.status == "completed" assert len(result.node_states) == 3 assert all(s.status == NodeStatus.EXECUTED for s in result.node_states) assert result.aggregated_result["skipped_nodes"] == 0 if __name__ == "__main__": pytest.main([__file__, "-v"])