mitai-jinkendo/tests/backend/test_workflow_engine.py
Lars b5be6e21a5
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: Phase 0 - Workflow Engine Foundation
Backend:
- DB-Migration 034: workflow_definitions, workflow_question_catalog, workflow_executions
- ai_prompts.question_augmentations JSONB-Spalte (Hybridmodell: Prompt-Defaults)
- 6 Grundtypen Fragenergänzungen mit Normalisierungsregeln (Seed-Daten)
- Pydantic-Modelle (16 Models, 11 Enums) in workflow_models.py
- Workflow-Engine: Graph-Parsing, Topologische Sortierung, DAG-Validierung
- Dispatcher-Erweiterung type='workflow' (Stub für Phase 1-3)
- Adjacency Lists, Erreichbarkeits-Prüfungen, Zyklen-Erkennung

Testing:
- 22 Unit-Tests (alle bestanden): Graph-Parsing, Validierung, Topologische Sortierung
- Fixtures: simple_valid_graph, parallel_graph, branching_graph

Version:
- APP_VERSION 0.9i
- DB_SCHEMA_VERSION 20260403
- Module: workflow 0.1.0

Anforderungsanalyse: .claude/task/Workflow_engine_prompting_engine/anforderungsanalyse_umsetzungsplan.md
Konzept-Basis: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-03 16:55:51 +02:00

414 lines
17 KiB
Python

"""
Unit Tests für Workflow Engine (Phase 0: Foundation)
Tests für:
- Graph-Parsing
- Topologische Sortierung
- DAG-Validierung (Zyklen-Erkennung)
- Erreichbarkeits-Prüfungen
Run with: pytest tests/backend/test_workflow_engine.py -v
"""
import pytest
import sys
import os
# Add backend to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../backend'))
from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge, NodeType, Position
from workflow_engine import WorkflowEngine, parse_workflow_graph, validate_workflow_graph
from fastapi import HTTPException
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def simple_valid_graph():
"""Einfacher gültiger Graph: START → ANALYSIS → END"""
return WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test_prompt"),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="analysis1"),
WorkflowEdge(id="e2", from_node="analysis1", to_node="end")
]
)
@pytest.fixture
def parallel_graph():
"""Graph mit Parallelität: START → (A1 || A2) → JOIN → END"""
return WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)),
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="prompt1"),
WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="prompt2"),
WorkflowNode(id="join", type=NodeType.JOIN, position=Position(x=200, y=50)),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="analysis1"),
WorkflowEdge(id="e2", from_node="start", to_node="analysis2"),
WorkflowEdge(id="e3", from_node="analysis1", to_node="join"),
WorkflowEdge(id="e4", from_node="analysis2", to_node="join"),
WorkflowEdge(id="e5", from_node="join", to_node="end")
]
)
@pytest.fixture
def branching_graph():
"""Graph mit Verzweigung: START → LOGIC → (A1 | A2) → END"""
return WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)),
WorkflowNode(id="logic1", type=NodeType.LOGIC, position=Position(x=100, y=50)),
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="prompt1"),
WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=200, y=100), prompt_slug="prompt2"),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="logic1"),
WorkflowEdge(id="e2", from_node="logic1", to_node="analysis1", label="then"),
WorkflowEdge(id="e3", from_node="logic1", to_node="analysis2", label="else"),
WorkflowEdge(id="e4", from_node="analysis1", to_node="end"),
WorkflowEdge(id="e5", from_node="analysis2", to_node="end")
]
)
# ── Test: Graph-Parsing ───────────────────────────────────────────────────────
def test_parse_workflow_graph_valid(simple_valid_graph):
"""Test: Gültiger Graph wird korrekt geparst"""
graph_dict = simple_valid_graph.model_dump()
parsed = parse_workflow_graph(graph_dict)
assert len(parsed.nodes) == 3
assert len(parsed.edges) == 2
assert parsed.nodes[0].type == NodeType.START
assert parsed.nodes[2].type == NodeType.END
def test_parse_workflow_graph_invalid_format():
"""Test: Ungültiges Format wirft ValidationError"""
invalid_graph = {"nodes": "not a list", "edges": []}
with pytest.raises(Exception): # Pydantic ValidationError
parse_workflow_graph(invalid_graph)
# ── Test: Graph-Validierung ──────────────────────────────────────────────────
def test_valid_graph_initialization(simple_valid_graph):
"""Test: Gültiger Graph kann initialisiert werden"""
engine = WorkflowEngine(simple_valid_graph)
assert len(engine.nodes_by_id) == 3
assert len(engine.edges_by_id) == 2
assert engine.topological_order == ["start", "analysis1", "end"]
def test_validate_graph_no_start_node():
"""Test: Graph ohne START-Knoten wird abgelehnt"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=0, y=0), prompt_slug="test"),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0))
],
edges=[WorkflowEdge(id="e1", from_node="analysis1", to_node="end")]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "Kein START-Knoten" in str(exc_info.value.detail)
def test_validate_graph_no_end_node():
"""Test: Graph ohne END-Knoten wird abgelehnt"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test")
],
edges=[WorkflowEdge(id="e1", from_node="start", to_node="analysis1")]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "Kein END-Knoten" in str(exc_info.value.detail)
def test_validate_graph_multiple_start_nodes():
"""Test: Graph mit mehreren START-Knoten wird abgelehnt"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start1", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="start2", type=NodeType.START, position=Position(x=0, y=100)),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0))
],
edges=[
WorkflowEdge(id="e1", from_node="start1", to_node="end"),
WorkflowEdge(id="e2", from_node="start2", to_node="end")
]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "Mehrere START-Knoten" in str(exc_info.value.detail)
def test_validate_graph_missing_node_reference():
"""Test: Edge referenziert nicht-existierenden Knoten"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0))
],
edges=[WorkflowEdge(id="e1", from_node="start", to_node="nonexistent")]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "existiert nicht" in str(exc_info.value.detail)
# ── Test: Zyklen-Erkennung ────────────────────────────────────────────────────
def test_detect_cycle_simple():
"""Test: Einfacher Zyklus wird erkannt (A → B → A)"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
WorkflowNode(id="b", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=0))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="a"),
WorkflowEdge(id="e2", from_node="a", to_node="b"),
WorkflowEdge(id="e3", from_node="b", to_node="a"), # Zyklus!
WorkflowEdge(id="e4", from_node="b", to_node="end")
]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "Zyklus erkannt" in str(exc_info.value.detail)
def test_detect_cycle_self_loop():
"""Test: Selbst-Zyklus wird erkannt (A → A)"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="a"),
WorkflowEdge(id="e2", from_node="a", to_node="a"), # Selbst-Zyklus!
WorkflowEdge(id="e3", from_node="a", to_node="end")
]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "Zyklus erkannt" in str(exc_info.value.detail)
def test_no_cycle_branching(branching_graph):
"""Test: Verzweigung ohne Zyklus wird akzeptiert"""
engine = WorkflowEngine(branching_graph)
assert engine is not None # Kein Fehler
# ── Test: Erreichbarkeit ──────────────────────────────────────────────────────
def test_unreachable_node_from_start():
"""Test: Nicht vom START erreichbarer Knoten wird erkannt"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
WorkflowNode(id="isolated", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="test"), # Isoliert!
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="a"),
WorkflowEdge(id="e2", from_node="a", to_node="end")
# 'isolated' ist nicht verbunden
]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "nicht erreichbar vom START" in str(exc_info.value.detail)
def test_node_cannot_reach_end():
"""Test: Knoten der END nicht erreichen kann wird erkannt"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
WorkflowNode(id="dead_end", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"),
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=100))
],
edges=[
WorkflowEdge(id="e1", from_node="start", to_node="a"),
WorkflowEdge(id="e2", from_node="a", to_node="dead_end")
# 'dead_end' kann END nicht erreichen (keine Verbindung)
]
)
with pytest.raises(HTTPException) as exc_info:
WorkflowEngine(graph)
assert exc_info.value.status_code == 400
assert "END nicht erreichen" in str(exc_info.value.detail)
# ── Test: Topologische Sortierung ─────────────────────────────────────────────
def test_topological_sort_simple(simple_valid_graph):
"""Test: Einfacher linearer Graph hat korrekte topologische Sortierung"""
engine = WorkflowEngine(simple_valid_graph)
assert engine.topological_order == ["start", "analysis1", "end"]
def test_topological_sort_parallel(parallel_graph):
"""Test: Paralleler Graph - Topologische Sortierung"""
engine = WorkflowEngine(parallel_graph)
# START muss zuerst kommen
assert engine.topological_order[0] == "start"
# analysis1 und analysis2 können in beliebiger Reihenfolge kommen (parallel)
assert set(engine.topological_order[1:3]) == {"analysis1", "analysis2"}
# JOIN kommt nach beiden Analysen
assert engine.topological_order[3] == "join"
# END kommt zuletzt
assert engine.topological_order[4] == "end"
def test_topological_sort_branching(branching_graph):
"""Test: Verzweigter Graph - Topologische Sortierung"""
engine = WorkflowEngine(branching_graph)
# START → LOGIC muss zuerst kommen
assert engine.topological_order[:2] == ["start", "logic1"]
# analysis1 und analysis2 können in beliebiger Reihenfolge kommen (alternative Pfade)
assert set(engine.topological_order[2:4]) == {"analysis1", "analysis2"}
# END kommt zuletzt
assert engine.topological_order[4] == "end"
# ── Test: Ausführungs-Ebenen (Parallelität) ───────────────────────────────────
def test_execution_order_simple(simple_valid_graph):
"""Test: Einfacher Graph hat 3 Ebenen (keine Parallelität)"""
engine = WorkflowEngine(simple_valid_graph)
execution_order = engine.get_execution_order()
assert len(execution_order) == 3
assert execution_order[0] == ["start"]
assert execution_order[1] == ["analysis1"]
assert execution_order[2] == ["end"]
def test_execution_order_parallel(parallel_graph):
"""Test: Paralleler Graph - Ebene 2 hat 2 Knoten (können parallel laufen)"""
engine = WorkflowEngine(parallel_graph)
execution_order = engine.get_execution_order()
assert len(execution_order) == 4
assert execution_order[0] == ["start"]
assert set(execution_order[1]) == {"analysis1", "analysis2"} # Parallel!
assert execution_order[2] == ["join"]
assert execution_order[3] == ["end"]
def test_execution_order_branching(branching_graph):
"""Test: Verzweigter Graph - Alternative Pfade sind auf derselben Ebene"""
engine = WorkflowEngine(branching_graph)
execution_order = engine.get_execution_order()
assert len(execution_order) == 4
assert execution_order[0] == ["start"]
assert execution_order[1] == ["logic1"]
assert set(execution_order[2]) == {"analysis1", "analysis2"} # Alternative Pfade
assert execution_order[3] == ["end"]
# ── Test: Validierungs-Hilfsfunktion ──────────────────────────────────────────
def test_validate_workflow_graph_valid(simple_valid_graph):
"""Test: Hilfsfunktion validate_workflow_graph für gültigen Graph"""
is_valid, errors = validate_workflow_graph(simple_valid_graph)
assert is_valid is True
assert errors == []
def test_validate_workflow_graph_invalid():
"""Test: Hilfsfunktion validate_workflow_graph für ungültigen Graph"""
graph = WorkflowGraph(
nodes=[
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0))
# Kein END-Knoten!
],
edges=[]
)
is_valid, errors = validate_workflow_graph(graph)
assert is_valid is False
assert len(errors) > 0
assert any("END-Knoten" in str(e) for e in errors)
# ── Test: Adjacency Lists ─────────────────────────────────────────────────────
def test_adjacency_lists_creation(parallel_graph):
"""Test: Adjacency Lists werden korrekt erstellt"""
engine = WorkflowEngine(parallel_graph)
# Outgoing edges vom START
start_outgoing = engine.outgoing_edges.get("start", [])
assert len(start_outgoing) == 2
assert set(e.to_node for e in start_outgoing) == {"analysis1", "analysis2"}
# Incoming edges zu JOIN
join_incoming = engine.incoming_edges.get("join", [])
assert len(join_incoming) == 2
assert set(e.from_node for e in join_incoming) == {"analysis1", "analysis2"}
# ── Run Tests ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
pytest.main([__file__, "-v"])