Backend:
- logic_evaluator.py (NEU, 307 Zeilen): Deterministischer Logic Evaluator
- Vergleichsoperatoren: EQ, NEQ, IN, NOT_IN, GT, LT, GTE, LTE, CONTAINS
- Logische Operatoren: AND, OR, NOT mit Verschachtelung
- Resolve signal references (node_id.question_type)
- Error handling für UNCLEAR/INVALID/NOT_DECIDABLE Signale
- workflow_executor.py (ERWEITERT):
- execute_logic_node(): Bedingungen evaluieren, Pfade aktivieren/deaktivieren
- execute_workflow(): BFS-Traversierung mit Edge-Activation statt Sequential
- _apply_fallback(): 4 Fallback-Strategien (CONSERVATIVE_SKIP, DEFAULT_PATH, UNCERTAINTY_PATH, DOCUMENT_ONLY)
- _has_active_incoming_edge(): Prüft ob Node erreichbar ist
- _get_edges_by_label(): Findet then/else/uncertainty Pfade
- workflow_models.py (ERWEITERT):
- LogicOperator.CONTAINS hinzugefügt
- version.py: 0.9k → 0.9l, workflow 0.3.0 → 0.4.0
Tests:
- test_phase3_logic_evaluator.py (NEU): 20 Unit Tests (alle passing)
- Comparison operators (EQ, NEQ, IN, GT, LT, CONTAINS)
- Logical operators (AND, OR, NOT)
- Nested expressions
- Error handling (missing refs, UNCLEAR/INVALID signals)
- test_phase2_workflow_executor.py (AKTUALISIERT): 11 Tests (alle passing)
- execute_node() graph parameter hinzugefügt (Phase 3 requirement)
- test_execute_node_unknown_type: logic → join (logic jetzt implementiert)
- test_phase3_workflow_branching.py (NEU): Integration Tests vorbereitet
- Erfordert vollständige DB-Mock-Strategie (wird in E2E-Test nachgeholt)
Phase 2 Backward Compatibility: ✅ Alle Phase 2 Tests bestehen weiterhin
Konzept: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: .claude/task/Workflow_engine_prompting_engine/phase3_anforderungsanalyse.md
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
460 lines
16 KiB
Python
460 lines
16 KiB
Python
"""
|
|
Integration Tests für Workflow Branching (Phase 3)
|
|
|
|
Testet conditional execution mit Logic Nodes.
|
|
|
|
Run with: PYTHONPATH=./backend pytest tests/backend/test_phase3_workflow_branching.py -v
|
|
"""
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from workflow_executor import execute_workflow
|
|
from workflow_models import (
|
|
WorkflowGraph, WorkflowNode, WorkflowEdge,
|
|
LogicExpression, LogicOperator, Condition, FallbackConfig, FallbackStrategy,
|
|
QuestionAugmentation, NodeStatus
|
|
)
|
|
|
|
|
|
# ── Helper Functions ────────────────────────────────────────────────────────
|
|
|
|
|
|
def create_mock_db():
|
|
"""Creates mock DB connection with cursor"""
|
|
conn = MagicMock()
|
|
cur = MagicMock()
|
|
cur.fetchone = MagicMock()
|
|
cur.fetchall = MagicMock(return_value=[])
|
|
|
|
# Mock get_cursor()
|
|
def mock_get_cursor(c):
|
|
return cur
|
|
|
|
# Context manager support
|
|
conn.__enter__ = MagicMock(return_value=conn)
|
|
conn.__exit__ = MagicMock(return_value=None)
|
|
|
|
return conn, cur, mock_get_cursor
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_simple_if_else_branching():
|
|
"""Test: Simple if/else branching - then path taken"""
|
|
|
|
# Workflow: start → analysis → logic → then_path / else_path → end
|
|
workflow_graph = {
|
|
"nodes": [
|
|
{"id": "start", "type": "start"},
|
|
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
|
|
"question_augmentations": [
|
|
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
|
|
]},
|
|
{"id": "logic", "type": "logic",
|
|
"condition": {
|
|
"expression": {
|
|
"operator": "eq",
|
|
"ref": "analysis.relevanz",
|
|
"value": "ja"
|
|
}
|
|
}},
|
|
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
|
|
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
|
|
{"id": "end", "type": "end"}
|
|
],
|
|
"edges": [
|
|
{"id": "e1", "from": "start", "to": "analysis"},
|
|
{"id": "e2", "from": "analysis", "to": "logic"},
|
|
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
|
|
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
|
|
{"id": "e5", "from": "then_path", "to": "end"},
|
|
{"id": "e6", "from": "else_path", "to": "end"}
|
|
]
|
|
}
|
|
|
|
# Mock DB
|
|
conn, cur, mock_get_cursor = create_mock_db()
|
|
cur.fetchone.side_effect = [
|
|
{"graph": workflow_graph}, # Workflow definition
|
|
{"template": "Test prompt"} # Prompt template
|
|
]
|
|
cur.fetchall.return_value = [
|
|
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
|
|
]
|
|
|
|
# Mock LLM - returns "ja" signal
|
|
async def mock_llm(prompt, model):
|
|
return """## Analyse
|
|
Test analysis
|
|
|
|
## Entscheidungsfragen
|
|
- Relevanz: ja
|
|
"""
|
|
|
|
with patch('workflow_executor.get_db', return_value=conn):
|
|
with patch('workflow_executor.get_cursor', side_effect=mock_get_cursor):
|
|
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
|
|
result = await execute_workflow(
|
|
workflow_id="test-workflow",
|
|
profile_id="test-profile",
|
|
variables={},
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Assertions
|
|
assert result.status == "completed"
|
|
assert len(result.node_states) == 5 # start, analysis, logic, then_path, end (else_path skipped)
|
|
|
|
# Check which nodes were executed
|
|
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
|
|
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
|
|
|
|
assert "then_path" in executed_nodes
|
|
assert "else_path" in skipped_nodes
|
|
|
|
# Check aggregation
|
|
assert result.aggregated_result["executed_nodes"] == 4 # start, analysis, logic, then_path (end is no-op)
|
|
assert result.aggregated_result["skipped_nodes"] == 1 # else_path
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_else_path_taken():
|
|
"""Test: Simple if/else branching - else path taken"""
|
|
|
|
workflow_graph = {
|
|
"nodes": [
|
|
{"id": "start", "type": "start"},
|
|
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
|
|
"question_augmentations": [
|
|
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
|
|
]},
|
|
{"id": "logic", "type": "logic",
|
|
"condition": {
|
|
"expression": {
|
|
"operator": "eq",
|
|
"ref": "analysis.relevanz",
|
|
"value": "ja"
|
|
}
|
|
}},
|
|
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
|
|
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
|
|
{"id": "end", "type": "end"}
|
|
],
|
|
"edges": [
|
|
{"id": "e1", "from": "start", "to": "analysis"},
|
|
{"id": "e2", "from": "analysis", "to": "logic"},
|
|
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
|
|
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
|
|
{"id": "e5", "from": "then_path", "to": "end"},
|
|
{"id": "e6", "from": "else_path", "to": "end"}
|
|
]
|
|
}
|
|
|
|
conn, cur = create_mock_db()
|
|
cur.fetchone.side_effect = [
|
|
{"graph": workflow_graph},
|
|
{"template": "Test prompt"}
|
|
]
|
|
cur.fetchall.return_value = [
|
|
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
|
|
]
|
|
|
|
# Mock LLM - returns "nein" signal (condition false)
|
|
async def mock_llm(prompt, model):
|
|
return """## Analyse
|
|
Test analysis
|
|
|
|
## Entscheidungsfragen
|
|
- Relevanz: nein
|
|
"""
|
|
|
|
with patch('workflow_executor.get_db', return_value=conn):
|
|
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
|
|
result = await execute_workflow(
|
|
workflow_id="test-workflow",
|
|
profile_id="test-profile",
|
|
variables={},
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Assertions
|
|
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
|
|
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
|
|
|
|
assert "else_path" in executed_nodes
|
|
assert "then_path" in skipped_nodes
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_and_condition():
|
|
"""Test: AND condition - both must be true"""
|
|
|
|
workflow_graph = {
|
|
"nodes": [
|
|
{"id": "start", "type": "start"},
|
|
{"id": "analysis1", "type": "analysis", "prompt_slug": "test_prompt",
|
|
"question_augmentations": [
|
|
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
|
|
]},
|
|
{"id": "analysis2", "type": "analysis", "prompt_slug": "test_prompt",
|
|
"question_augmentations": [
|
|
{"id": "q2", "type": "prioritaet", "question": "Priority?", "answer_spectrum": ["hoch", "niedrig"]}
|
|
]},
|
|
{"id": "logic", "type": "logic",
|
|
"condition": {
|
|
"expression": {
|
|
"operator": "and",
|
|
"operands": [
|
|
{"operator": "eq", "ref": "analysis1.relevanz", "value": "ja"},
|
|
{"operator": "eq", "ref": "analysis2.prioritaet", "value": "hoch"}
|
|
]
|
|
}
|
|
}},
|
|
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
|
|
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
|
|
{"id": "end", "type": "end"}
|
|
],
|
|
"edges": [
|
|
{"id": "e1", "from": "start", "to": "analysis1"},
|
|
{"id": "e2", "from": "analysis1", "to": "analysis2"},
|
|
{"id": "e3", "from": "analysis2", "to": "logic"},
|
|
{"id": "e4", "from": "logic", "to": "then_path", "label": "then"},
|
|
{"id": "e5", "from": "logic", "to": "else_path", "label": "else"},
|
|
{"id": "e6", "from": "then_path", "to": "end"},
|
|
{"id": "e7", "from": "else_path", "to": "end"}
|
|
]
|
|
}
|
|
|
|
conn, cur = create_mock_db()
|
|
cur.fetchone.side_effect = [
|
|
{"graph": workflow_graph},
|
|
{"template": "Test prompt"},
|
|
{"template": "Test prompt"}
|
|
]
|
|
cur.fetchall.return_value = [
|
|
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None},
|
|
{"question_type": "prioritaet", "answer_spectrum": ["hoch", "niedrig"], "normalization_rules": None}
|
|
]
|
|
|
|
# Mock LLM - returns ja AND hoch (both true)
|
|
call_count = 0
|
|
async def mock_llm(prompt, model):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
return """## Analyse
|
|
Analysis 1
|
|
|
|
## Entscheidungsfragen
|
|
- Relevanz: ja
|
|
"""
|
|
else:
|
|
return """## Analyse
|
|
Analysis 2
|
|
|
|
## Entscheidungsfragen
|
|
- Prioritaet: hoch
|
|
"""
|
|
|
|
with patch('workflow_executor.get_db', return_value=conn):
|
|
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
|
|
result = await execute_workflow(
|
|
workflow_id="test-workflow",
|
|
profile_id="test-profile",
|
|
variables={},
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Assertions: Both true → then path taken
|
|
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
|
|
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
|
|
|
|
assert "then_path" in executed_nodes
|
|
assert "else_path" in skipped_nodes
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_conservative_skip():
|
|
"""Test: Fallback strategy CONSERVATIVE_SKIP"""
|
|
|
|
workflow_graph = {
|
|
"nodes": [
|
|
{"id": "start", "type": "start"},
|
|
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
|
|
"question_augmentations": [
|
|
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
|
|
]},
|
|
{"id": "logic", "type": "logic",
|
|
"condition": {
|
|
"expression": {
|
|
"operator": "eq",
|
|
"ref": "analysis.relevanz",
|
|
"value": "ja"
|
|
}
|
|
},
|
|
"fallback": {
|
|
"strategy": "conservative_skip"
|
|
}},
|
|
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
|
|
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
|
|
{"id": "end", "type": "end"}
|
|
],
|
|
"edges": [
|
|
{"id": "e1", "from": "start", "to": "analysis"},
|
|
{"id": "e2", "from": "analysis", "to": "logic"},
|
|
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
|
|
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
|
|
{"id": "e5", "from": "then_path", "to": "end"},
|
|
{"id": "e6", "from": "else_path", "to": "end"}
|
|
]
|
|
}
|
|
|
|
conn, cur = create_mock_db()
|
|
cur.fetchone.side_effect = [
|
|
{"graph": workflow_graph},
|
|
{"template": "Test prompt"}
|
|
]
|
|
cur.fetchall.return_value = [
|
|
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
|
|
]
|
|
|
|
# Mock LLM - returns UNCLEAR signal (triggers fallback)
|
|
async def mock_llm(prompt, model):
|
|
return """## Analyse
|
|
Test analysis
|
|
|
|
## Entscheidungsfragen
|
|
- Relevanz: unklar
|
|
"""
|
|
|
|
with patch('workflow_executor.get_db', return_value=conn):
|
|
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
|
|
result = await execute_workflow(
|
|
workflow_id="test-workflow",
|
|
profile_id="test-profile",
|
|
variables={},
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Assertions: CONSERVATIVE_SKIP → both paths skipped
|
|
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
|
|
|
|
assert "then_path" in skipped_nodes
|
|
assert "else_path" in skipped_nodes
|
|
assert result.aggregated_result["skipped_nodes"] == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_default_path():
|
|
"""Test: Fallback strategy DEFAULT_PATH"""
|
|
|
|
workflow_graph = {
|
|
"nodes": [
|
|
{"id": "start", "type": "start"},
|
|
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
|
|
"question_augmentations": [
|
|
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
|
|
]},
|
|
{"id": "logic", "type": "logic",
|
|
"condition": {
|
|
"expression": {
|
|
"operator": "eq",
|
|
"ref": "analysis.relevanz",
|
|
"value": "ja"
|
|
}
|
|
},
|
|
"fallback": {
|
|
"strategy": "default_path"
|
|
}},
|
|
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
|
|
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
|
|
{"id": "end", "type": "end"}
|
|
],
|
|
"edges": [
|
|
{"id": "e1", "from": "start", "to": "analysis"},
|
|
{"id": "e2", "from": "analysis", "to": "logic"},
|
|
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
|
|
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
|
|
{"id": "e5", "from": "then_path", "to": "end"},
|
|
{"id": "e6", "from": "else_path", "to": "end"}
|
|
]
|
|
}
|
|
|
|
conn, cur = create_mock_db()
|
|
cur.fetchone.side_effect = [
|
|
{"graph": workflow_graph},
|
|
{"template": "Test prompt"}
|
|
]
|
|
cur.fetchall.return_value = [
|
|
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
|
|
]
|
|
|
|
# Mock LLM - returns INVALID signal (triggers fallback)
|
|
async def mock_llm(prompt, model):
|
|
return """## Analyse
|
|
Test analysis
|
|
|
|
## Entscheidungsfragen
|
|
- Relevanz: totally_invalid_value
|
|
"""
|
|
|
|
with patch('workflow_executor.get_db', return_value=conn):
|
|
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
|
|
result = await execute_workflow(
|
|
workflow_id="test-workflow",
|
|
profile_id="test-profile",
|
|
variables={},
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Assertions: DEFAULT_PATH → else path taken
|
|
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
|
|
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
|
|
|
|
assert "else_path" in executed_nodes
|
|
assert "then_path" in skipped_nodes
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_linear_workflow_still_works():
|
|
"""Test: Linear workflow (no logic nodes) still works (Phase 2 compatibility)"""
|
|
|
|
workflow_graph = {
|
|
"nodes": [
|
|
{"id": "start", "type": "start"},
|
|
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt"},
|
|
{"id": "end", "type": "end"}
|
|
],
|
|
"edges": [
|
|
{"id": "e1", "from": "start", "to": "analysis"},
|
|
{"id": "e2", "from": "analysis", "to": "end"}
|
|
]
|
|
}
|
|
|
|
conn, cur = create_mock_db()
|
|
cur.fetchone.side_effect = [
|
|
{"graph": workflow_graph},
|
|
{"template": "Test prompt"}
|
|
]
|
|
cur.fetchall.return_value = []
|
|
|
|
async def mock_llm(prompt, model):
|
|
return "## Analyse\nTest analysis"
|
|
|
|
with patch('workflow_executor.get_db', return_value=conn):
|
|
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
|
|
result = await execute_workflow(
|
|
workflow_id="test-workflow",
|
|
profile_id="test-profile",
|
|
variables={},
|
|
openrouter_call_func=mock_llm
|
|
)
|
|
|
|
# Assertions: All nodes executed
|
|
assert result.status == "completed"
|
|
assert len(result.node_states) == 3
|
|
assert all(s.status == NodeStatus.EXECUTED for s in result.node_states)
|
|
assert result.aggregated_result["skipped_nodes"] == 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|