""" Unit Tests für Workflow Engine (Phase 0: Foundation) Tests für: - Graph-Parsing - Topologische Sortierung - DAG-Validierung (Zyklen-Erkennung) - Erreichbarkeits-Prüfungen Run with: pytest tests/backend/test_workflow_engine.py -v """ import pytest import sys import os # Add backend to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../backend')) from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge, NodeType, Position from workflow_engine import WorkflowEngine, parse_workflow_graph, validate_workflow_graph from fastapi import HTTPException # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def simple_valid_graph(): """Einfacher gültiger Graph: START → ANALYSIS → END""" return WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test_prompt"), WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="analysis1"), WorkflowEdge(id="e2", from_node="analysis1", to_node="end") ] ) @pytest.fixture def parallel_graph(): """Graph mit Parallelität: START → (A1 || A2) → JOIN → END""" return WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)), WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="prompt1"), WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="prompt2"), WorkflowNode(id="join", type=NodeType.JOIN, position=Position(x=200, y=50)), WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="analysis1"), WorkflowEdge(id="e2", from_node="start", to_node="analysis2"), WorkflowEdge(id="e3", from_node="analysis1", to_node="join"), WorkflowEdge(id="e4", from_node="analysis2", to_node="join"), WorkflowEdge(id="e5", from_node="join", to_node="end") ] ) @pytest.fixture def branching_graph(): """Graph mit Verzweigung: START → LOGIC → (A1 | A2) → END""" return WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)), WorkflowNode(id="logic1", type=NodeType.LOGIC, position=Position(x=100, y=50)), WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="prompt1"), WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=200, y=100), prompt_slug="prompt2"), WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="logic1"), WorkflowEdge(id="e2", from_node="logic1", to_node="analysis1", label="then"), WorkflowEdge(id="e3", from_node="logic1", to_node="analysis2", label="else"), WorkflowEdge(id="e4", from_node="analysis1", to_node="end"), WorkflowEdge(id="e5", from_node="analysis2", to_node="end") ] ) # ── Test: Graph-Parsing ─────────────────────────────────────────────────────── def test_parse_workflow_graph_valid(simple_valid_graph): """Test: Gültiger Graph wird korrekt geparst""" graph_dict = simple_valid_graph.model_dump() parsed = parse_workflow_graph(graph_dict) assert len(parsed.nodes) == 3 assert len(parsed.edges) == 2 assert parsed.nodes[0].type == NodeType.START assert parsed.nodes[2].type == NodeType.END def test_parse_workflow_graph_invalid_format(): """Test: Ungültiges Format wirft ValidationError""" invalid_graph = {"nodes": "not a list", "edges": []} with pytest.raises(Exception): # Pydantic ValidationError parse_workflow_graph(invalid_graph) # ── Test: Graph-Validierung ────────────────────────────────────────────────── def test_valid_graph_initialization(simple_valid_graph): """Test: Gültiger Graph kann initialisiert werden""" engine = WorkflowEngine(simple_valid_graph) assert len(engine.nodes_by_id) == 3 assert len(engine.edges_by_id) == 2 assert engine.topological_order == ["start", "analysis1", "end"] def test_validate_graph_no_start_node(): """Test: Graph ohne START-Knoten wird abgelehnt""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=0, y=0), prompt_slug="test"), WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0)) ], edges=[WorkflowEdge(id="e1", from_node="analysis1", to_node="end")] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "Kein START-Knoten" in str(exc_info.value.detail) def test_validate_graph_no_end_node(): """Test: Graph ohne END-Knoten wird abgelehnt""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test") ], edges=[WorkflowEdge(id="e1", from_node="start", to_node="analysis1")] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "Kein END-Knoten" in str(exc_info.value.detail) def test_validate_graph_multiple_start_nodes(): """Test: Graph mit mehreren START-Knoten wird abgelehnt""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start1", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="start2", type=NodeType.START, position=Position(x=0, y=100)), WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0)) ], edges=[ WorkflowEdge(id="e1", from_node="start1", to_node="end"), WorkflowEdge(id="e2", from_node="start2", to_node="end") ] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "Mehrere START-Knoten" in str(exc_info.value.detail) def test_validate_graph_missing_node_reference(): """Test: Edge referenziert nicht-existierenden Knoten""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0)) ], edges=[WorkflowEdge(id="e1", from_node="start", to_node="nonexistent")] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "existiert nicht" in str(exc_info.value.detail) # ── Test: Zyklen-Erkennung ──────────────────────────────────────────────────── def test_detect_cycle_simple(): """Test: Einfacher Zyklus wird erkannt (A → B → A)""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), WorkflowNode(id="b", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"), WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=0)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="a"), WorkflowEdge(id="e2", from_node="a", to_node="b"), WorkflowEdge(id="e3", from_node="b", to_node="a"), # Zyklus! WorkflowEdge(id="e4", from_node="b", to_node="end") ] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "Zyklus erkannt" in str(exc_info.value.detail) def test_detect_cycle_self_loop(): """Test: Selbst-Zyklus wird erkannt (A → A)""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="a"), WorkflowEdge(id="e2", from_node="a", to_node="a"), # Selbst-Zyklus! WorkflowEdge(id="e3", from_node="a", to_node="end") ] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "Zyklus erkannt" in str(exc_info.value.detail) def test_no_cycle_branching(branching_graph): """Test: Verzweigung ohne Zyklus wird akzeptiert""" engine = WorkflowEngine(branching_graph) assert engine is not None # Kein Fehler # ── Test: Erreichbarkeit ────────────────────────────────────────────────────── def test_unreachable_node_from_start(): """Test: Nicht vom START erreichbarer Knoten wird erkannt""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), WorkflowNode(id="isolated", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="test"), # Isoliert! WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="a"), WorkflowEdge(id="e2", from_node="a", to_node="end") # 'isolated' ist nicht verbunden ] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "nicht erreichbar vom START" in str(exc_info.value.detail) def test_node_cannot_reach_end(): """Test: Knoten der END nicht erreichen kann wird erkannt""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), WorkflowNode(id="dead_end", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"), WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=100)) ], edges=[ WorkflowEdge(id="e1", from_node="start", to_node="a"), WorkflowEdge(id="e2", from_node="a", to_node="dead_end") # 'dead_end' kann END nicht erreichen (keine Verbindung) ] ) with pytest.raises(HTTPException) as exc_info: WorkflowEngine(graph) assert exc_info.value.status_code == 400 assert "END nicht erreichen" in str(exc_info.value.detail) # ── Test: Topologische Sortierung ───────────────────────────────────────────── def test_topological_sort_simple(simple_valid_graph): """Test: Einfacher linearer Graph hat korrekte topologische Sortierung""" engine = WorkflowEngine(simple_valid_graph) assert engine.topological_order == ["start", "analysis1", "end"] def test_topological_sort_parallel(parallel_graph): """Test: Paralleler Graph - Topologische Sortierung""" engine = WorkflowEngine(parallel_graph) # START muss zuerst kommen assert engine.topological_order[0] == "start" # analysis1 und analysis2 können in beliebiger Reihenfolge kommen (parallel) assert set(engine.topological_order[1:3]) == {"analysis1", "analysis2"} # JOIN kommt nach beiden Analysen assert engine.topological_order[3] == "join" # END kommt zuletzt assert engine.topological_order[4] == "end" def test_topological_sort_branching(branching_graph): """Test: Verzweigter Graph - Topologische Sortierung""" engine = WorkflowEngine(branching_graph) # START → LOGIC muss zuerst kommen assert engine.topological_order[:2] == ["start", "logic1"] # analysis1 und analysis2 können in beliebiger Reihenfolge kommen (alternative Pfade) assert set(engine.topological_order[2:4]) == {"analysis1", "analysis2"} # END kommt zuletzt assert engine.topological_order[4] == "end" # ── Test: Ausführungs-Ebenen (Parallelität) ─────────────────────────────────── def test_execution_order_simple(simple_valid_graph): """Test: Einfacher Graph hat 3 Ebenen (keine Parallelität)""" engine = WorkflowEngine(simple_valid_graph) execution_order = engine.get_execution_order() assert len(execution_order) == 3 assert execution_order[0] == ["start"] assert execution_order[1] == ["analysis1"] assert execution_order[2] == ["end"] def test_execution_order_parallel(parallel_graph): """Test: Paralleler Graph - Ebene 2 hat 2 Knoten (können parallel laufen)""" engine = WorkflowEngine(parallel_graph) execution_order = engine.get_execution_order() assert len(execution_order) == 4 assert execution_order[0] == ["start"] assert set(execution_order[1]) == {"analysis1", "analysis2"} # Parallel! assert execution_order[2] == ["join"] assert execution_order[3] == ["end"] def test_execution_order_branching(branching_graph): """Test: Verzweigter Graph - Alternative Pfade sind auf derselben Ebene""" engine = WorkflowEngine(branching_graph) execution_order = engine.get_execution_order() assert len(execution_order) == 4 assert execution_order[0] == ["start"] assert execution_order[1] == ["logic1"] assert set(execution_order[2]) == {"analysis1", "analysis2"} # Alternative Pfade assert execution_order[3] == ["end"] # ── Test: Validierungs-Hilfsfunktion ────────────────────────────────────────── def test_validate_workflow_graph_valid(simple_valid_graph): """Test: Hilfsfunktion validate_workflow_graph für gültigen Graph""" is_valid, errors = validate_workflow_graph(simple_valid_graph) assert is_valid is True assert errors == [] def test_validate_workflow_graph_invalid(): """Test: Hilfsfunktion validate_workflow_graph für ungültigen Graph""" graph = WorkflowGraph( nodes=[ WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)) # Kein END-Knoten! ], edges=[] ) is_valid, errors = validate_workflow_graph(graph) assert is_valid is False assert len(errors) > 0 assert any("END-Knoten" in str(e) for e in errors) # ── Test: Adjacency Lists ───────────────────────────────────────────────────── def test_adjacency_lists_creation(parallel_graph): """Test: Adjacency Lists werden korrekt erstellt""" engine = WorkflowEngine(parallel_graph) # Outgoing edges vom START start_outgoing = engine.outgoing_edges.get("start", []) assert len(start_outgoing) == 2 assert set(e.to_node for e in start_outgoing) == {"analysis1", "analysis2"} # Incoming edges zu JOIN join_incoming = engine.incoming_edges.get("join", []) assert len(join_incoming) == 2 assert set(e.from_node for e in join_incoming) == {"analysis1", "analysis2"} # ── Run Tests ───────────────────────────────────────────────────────────────── if __name__ == "__main__": pytest.main([__file__, "-v"])