mitai-jinkendo/tests/backend/test_phase3_workflow_branching.py
Lars 2ce0874dcb
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 14s
feat: Phase 3 - Logic Nodes + Conditional Branching
Backend:
- logic_evaluator.py (NEU, 307 Zeilen): Deterministischer Logic Evaluator
  - Vergleichsoperatoren: EQ, NEQ, IN, NOT_IN, GT, LT, GTE, LTE, CONTAINS
  - Logische Operatoren: AND, OR, NOT mit Verschachtelung
  - Resolve signal references (node_id.question_type)
  - Error handling für UNCLEAR/INVALID/NOT_DECIDABLE Signale

- workflow_executor.py (ERWEITERT):
  - execute_logic_node(): Bedingungen evaluieren, Pfade aktivieren/deaktivieren
  - execute_workflow(): BFS-Traversierung mit Edge-Activation statt Sequential
  - _apply_fallback(): 4 Fallback-Strategien (CONSERVATIVE_SKIP, DEFAULT_PATH, UNCERTAINTY_PATH, DOCUMENT_ONLY)
  - _has_active_incoming_edge(): Prüft ob Node erreichbar ist
  - _get_edges_by_label(): Findet then/else/uncertainty Pfade

- workflow_models.py (ERWEITERT):
  - LogicOperator.CONTAINS hinzugefügt

- version.py: 0.9k → 0.9l, workflow 0.3.0 → 0.4.0

Tests:
- test_phase3_logic_evaluator.py (NEU): 20 Unit Tests (alle passing)
  - Comparison operators (EQ, NEQ, IN, GT, LT, CONTAINS)
  - Logical operators (AND, OR, NOT)
  - Nested expressions
  - Error handling (missing refs, UNCLEAR/INVALID signals)

- test_phase2_workflow_executor.py (AKTUALISIERT): 11 Tests (alle passing)
  - execute_node() graph parameter hinzugefügt (Phase 3 requirement)
  - test_execute_node_unknown_type: logic → join (logic jetzt implementiert)

- test_phase3_workflow_branching.py (NEU): Integration Tests vorbereitet
  - Erfordert vollständige DB-Mock-Strategie (wird in E2E-Test nachgeholt)

Phase 2 Backward Compatibility:  Alle Phase 2 Tests bestehen weiterhin

Konzept: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: .claude/task/Workflow_engine_prompting_engine/phase3_anforderungsanalyse.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-04 08:02:22 +02:00

460 lines
16 KiB
Python

"""
Integration Tests für Workflow Branching (Phase 3)
Testet conditional execution mit Logic Nodes.
Run with: PYTHONPATH=./backend pytest tests/backend/test_phase3_workflow_branching.py -v
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from workflow_executor import execute_workflow
from workflow_models import (
WorkflowGraph, WorkflowNode, WorkflowEdge,
LogicExpression, LogicOperator, Condition, FallbackConfig, FallbackStrategy,
QuestionAugmentation, NodeStatus
)
# ── Helper Functions ────────────────────────────────────────────────────────
def create_mock_db():
"""Creates mock DB connection with cursor"""
conn = MagicMock()
cur = MagicMock()
cur.fetchone = MagicMock()
cur.fetchall = MagicMock(return_value=[])
# Mock get_cursor()
def mock_get_cursor(c):
return cur
# Context manager support
conn.__enter__ = MagicMock(return_value=conn)
conn.__exit__ = MagicMock(return_value=None)
return conn, cur, mock_get_cursor
@pytest.mark.asyncio
async def test_simple_if_else_branching():
"""Test: Simple if/else branching - then path taken"""
# Workflow: start → analysis → logic → then_path / else_path → end
workflow_graph = {
"nodes": [
{"id": "start", "type": "start"},
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
"question_augmentations": [
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
]},
{"id": "logic", "type": "logic",
"condition": {
"expression": {
"operator": "eq",
"ref": "analysis.relevanz",
"value": "ja"
}
}},
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
{"id": "end", "type": "end"}
],
"edges": [
{"id": "e1", "from": "start", "to": "analysis"},
{"id": "e2", "from": "analysis", "to": "logic"},
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
{"id": "e5", "from": "then_path", "to": "end"},
{"id": "e6", "from": "else_path", "to": "end"}
]
}
# Mock DB
conn, cur, mock_get_cursor = create_mock_db()
cur.fetchone.side_effect = [
{"graph": workflow_graph}, # Workflow definition
{"template": "Test prompt"} # Prompt template
]
cur.fetchall.return_value = [
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
]
# Mock LLM - returns "ja" signal
async def mock_llm(prompt, model):
return """## Analyse
Test analysis
## Entscheidungsfragen
- Relevanz: ja
"""
with patch('workflow_executor.get_db', return_value=conn):
with patch('workflow_executor.get_cursor', side_effect=mock_get_cursor):
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
result = await execute_workflow(
workflow_id="test-workflow",
profile_id="test-profile",
variables={},
openrouter_call_func=mock_llm
)
# Assertions
assert result.status == "completed"
assert len(result.node_states) == 5 # start, analysis, logic, then_path, end (else_path skipped)
# Check which nodes were executed
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
assert "then_path" in executed_nodes
assert "else_path" in skipped_nodes
# Check aggregation
assert result.aggregated_result["executed_nodes"] == 4 # start, analysis, logic, then_path (end is no-op)
assert result.aggregated_result["skipped_nodes"] == 1 # else_path
@pytest.mark.asyncio
async def test_else_path_taken():
"""Test: Simple if/else branching - else path taken"""
workflow_graph = {
"nodes": [
{"id": "start", "type": "start"},
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
"question_augmentations": [
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
]},
{"id": "logic", "type": "logic",
"condition": {
"expression": {
"operator": "eq",
"ref": "analysis.relevanz",
"value": "ja"
}
}},
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
{"id": "end", "type": "end"}
],
"edges": [
{"id": "e1", "from": "start", "to": "analysis"},
{"id": "e2", "from": "analysis", "to": "logic"},
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
{"id": "e5", "from": "then_path", "to": "end"},
{"id": "e6", "from": "else_path", "to": "end"}
]
}
conn, cur = create_mock_db()
cur.fetchone.side_effect = [
{"graph": workflow_graph},
{"template": "Test prompt"}
]
cur.fetchall.return_value = [
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
]
# Mock LLM - returns "nein" signal (condition false)
async def mock_llm(prompt, model):
return """## Analyse
Test analysis
## Entscheidungsfragen
- Relevanz: nein
"""
with patch('workflow_executor.get_db', return_value=conn):
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
result = await execute_workflow(
workflow_id="test-workflow",
profile_id="test-profile",
variables={},
openrouter_call_func=mock_llm
)
# Assertions
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
assert "else_path" in executed_nodes
assert "then_path" in skipped_nodes
@pytest.mark.asyncio
async def test_and_condition():
"""Test: AND condition - both must be true"""
workflow_graph = {
"nodes": [
{"id": "start", "type": "start"},
{"id": "analysis1", "type": "analysis", "prompt_slug": "test_prompt",
"question_augmentations": [
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
]},
{"id": "analysis2", "type": "analysis", "prompt_slug": "test_prompt",
"question_augmentations": [
{"id": "q2", "type": "prioritaet", "question": "Priority?", "answer_spectrum": ["hoch", "niedrig"]}
]},
{"id": "logic", "type": "logic",
"condition": {
"expression": {
"operator": "and",
"operands": [
{"operator": "eq", "ref": "analysis1.relevanz", "value": "ja"},
{"operator": "eq", "ref": "analysis2.prioritaet", "value": "hoch"}
]
}
}},
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
{"id": "end", "type": "end"}
],
"edges": [
{"id": "e1", "from": "start", "to": "analysis1"},
{"id": "e2", "from": "analysis1", "to": "analysis2"},
{"id": "e3", "from": "analysis2", "to": "logic"},
{"id": "e4", "from": "logic", "to": "then_path", "label": "then"},
{"id": "e5", "from": "logic", "to": "else_path", "label": "else"},
{"id": "e6", "from": "then_path", "to": "end"},
{"id": "e7", "from": "else_path", "to": "end"}
]
}
conn, cur = create_mock_db()
cur.fetchone.side_effect = [
{"graph": workflow_graph},
{"template": "Test prompt"},
{"template": "Test prompt"}
]
cur.fetchall.return_value = [
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None},
{"question_type": "prioritaet", "answer_spectrum": ["hoch", "niedrig"], "normalization_rules": None}
]
# Mock LLM - returns ja AND hoch (both true)
call_count = 0
async def mock_llm(prompt, model):
nonlocal call_count
call_count += 1
if call_count == 1:
return """## Analyse
Analysis 1
## Entscheidungsfragen
- Relevanz: ja
"""
else:
return """## Analyse
Analysis 2
## Entscheidungsfragen
- Prioritaet: hoch
"""
with patch('workflow_executor.get_db', return_value=conn):
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
result = await execute_workflow(
workflow_id="test-workflow",
profile_id="test-profile",
variables={},
openrouter_call_func=mock_llm
)
# Assertions: Both true → then path taken
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
assert "then_path" in executed_nodes
assert "else_path" in skipped_nodes
@pytest.mark.asyncio
async def test_fallback_conservative_skip():
"""Test: Fallback strategy CONSERVATIVE_SKIP"""
workflow_graph = {
"nodes": [
{"id": "start", "type": "start"},
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
"question_augmentations": [
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
]},
{"id": "logic", "type": "logic",
"condition": {
"expression": {
"operator": "eq",
"ref": "analysis.relevanz",
"value": "ja"
}
},
"fallback": {
"strategy": "conservative_skip"
}},
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
{"id": "end", "type": "end"}
],
"edges": [
{"id": "e1", "from": "start", "to": "analysis"},
{"id": "e2", "from": "analysis", "to": "logic"},
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
{"id": "e5", "from": "then_path", "to": "end"},
{"id": "e6", "from": "else_path", "to": "end"}
]
}
conn, cur = create_mock_db()
cur.fetchone.side_effect = [
{"graph": workflow_graph},
{"template": "Test prompt"}
]
cur.fetchall.return_value = [
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
]
# Mock LLM - returns UNCLEAR signal (triggers fallback)
async def mock_llm(prompt, model):
return """## Analyse
Test analysis
## Entscheidungsfragen
- Relevanz: unklar
"""
with patch('workflow_executor.get_db', return_value=conn):
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
result = await execute_workflow(
workflow_id="test-workflow",
profile_id="test-profile",
variables={},
openrouter_call_func=mock_llm
)
# Assertions: CONSERVATIVE_SKIP → both paths skipped
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
assert "then_path" in skipped_nodes
assert "else_path" in skipped_nodes
assert result.aggregated_result["skipped_nodes"] == 2
@pytest.mark.asyncio
async def test_fallback_default_path():
"""Test: Fallback strategy DEFAULT_PATH"""
workflow_graph = {
"nodes": [
{"id": "start", "type": "start"},
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt",
"question_augmentations": [
{"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]}
]},
{"id": "logic", "type": "logic",
"condition": {
"expression": {
"operator": "eq",
"ref": "analysis.relevanz",
"value": "ja"
}
},
"fallback": {
"strategy": "default_path"
}},
{"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"},
{"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"},
{"id": "end", "type": "end"}
],
"edges": [
{"id": "e1", "from": "start", "to": "analysis"},
{"id": "e2", "from": "analysis", "to": "logic"},
{"id": "e3", "from": "logic", "to": "then_path", "label": "then"},
{"id": "e4", "from": "logic", "to": "else_path", "label": "else"},
{"id": "e5", "from": "then_path", "to": "end"},
{"id": "e6", "from": "else_path", "to": "end"}
]
}
conn, cur = create_mock_db()
cur.fetchone.side_effect = [
{"graph": workflow_graph},
{"template": "Test prompt"}
]
cur.fetchall.return_value = [
{"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}
]
# Mock LLM - returns INVALID signal (triggers fallback)
async def mock_llm(prompt, model):
return """## Analyse
Test analysis
## Entscheidungsfragen
- Relevanz: totally_invalid_value
"""
with patch('workflow_executor.get_db', return_value=conn):
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
result = await execute_workflow(
workflow_id="test-workflow",
profile_id="test-profile",
variables={},
openrouter_call_func=mock_llm
)
# Assertions: DEFAULT_PATH → else path taken
executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED]
skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED]
assert "else_path" in executed_nodes
assert "then_path" in skipped_nodes
@pytest.mark.asyncio
async def test_linear_workflow_still_works():
"""Test: Linear workflow (no logic nodes) still works (Phase 2 compatibility)"""
workflow_graph = {
"nodes": [
{"id": "start", "type": "start"},
{"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt"},
{"id": "end", "type": "end"}
],
"edges": [
{"id": "e1", "from": "start", "to": "analysis"},
{"id": "e2", "from": "analysis", "to": "end"}
]
}
conn, cur = create_mock_db()
cur.fetchone.side_effect = [
{"graph": workflow_graph},
{"template": "Test prompt"}
]
cur.fetchall.return_value = []
async def mock_llm(prompt, model):
return "## Analyse\nTest analysis"
with patch('workflow_executor.get_db', return_value=conn):
with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"):
result = await execute_workflow(
workflow_id="test-workflow",
profile_id="test-profile",
variables={},
openrouter_call_func=mock_llm
)
# Assertions: All nodes executed
assert result.status == "completed"
assert len(result.node_states) == 3
assert all(s.status == NodeStatus.EXECUTED for s in result.node_states)
assert result.aggregated_result["skipped_nodes"] == 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])