Backend: - DB-Migration 034: workflow_definitions, workflow_question_catalog, workflow_executions - ai_prompts.question_augmentations JSONB-Spalte (Hybridmodell: Prompt-Defaults) - 6 Grundtypen Fragenergänzungen mit Normalisierungsregeln (Seed-Daten) - Pydantic-Modelle (16 Models, 11 Enums) in workflow_models.py - Workflow-Engine: Graph-Parsing, Topologische Sortierung, DAG-Validierung - Dispatcher-Erweiterung type='workflow' (Stub für Phase 1-3) - Adjacency Lists, Erreichbarkeits-Prüfungen, Zyklen-Erkennung Testing: - 22 Unit-Tests (alle bestanden): Graph-Parsing, Validierung, Topologische Sortierung - Fixtures: simple_valid_graph, parallel_graph, branching_graph Version: - APP_VERSION 0.9i - DB_SCHEMA_VERSION 20260403 - Module: workflow 0.1.0 Anforderungsanalyse: .claude/task/Workflow_engine_prompting_engine/anforderungsanalyse_umsetzungsplan.md Konzept-Basis: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
414 lines
17 KiB
Python
414 lines
17 KiB
Python
"""
|
|
Unit Tests für Workflow Engine (Phase 0: Foundation)
|
|
|
|
Tests für:
|
|
- Graph-Parsing
|
|
- Topologische Sortierung
|
|
- DAG-Validierung (Zyklen-Erkennung)
|
|
- Erreichbarkeits-Prüfungen
|
|
|
|
Run with: pytest tests/backend/test_workflow_engine.py -v
|
|
"""
|
|
import pytest
|
|
import sys
|
|
import os
|
|
|
|
# Add backend to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../backend'))
|
|
|
|
from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge, NodeType, Position
|
|
from workflow_engine import WorkflowEngine, parse_workflow_graph, validate_workflow_graph
|
|
from fastapi import HTTPException
|
|
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def simple_valid_graph():
|
|
"""Einfacher gültiger Graph: START → ANALYSIS → END"""
|
|
return WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test_prompt"),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="analysis1"),
|
|
WorkflowEdge(id="e2", from_node="analysis1", to_node="end")
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def parallel_graph():
|
|
"""Graph mit Parallelität: START → (A1 || A2) → JOIN → END"""
|
|
return WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)),
|
|
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="prompt1"),
|
|
WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="prompt2"),
|
|
WorkflowNode(id="join", type=NodeType.JOIN, position=Position(x=200, y=50)),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="analysis1"),
|
|
WorkflowEdge(id="e2", from_node="start", to_node="analysis2"),
|
|
WorkflowEdge(id="e3", from_node="analysis1", to_node="join"),
|
|
WorkflowEdge(id="e4", from_node="analysis2", to_node="join"),
|
|
WorkflowEdge(id="e5", from_node="join", to_node="end")
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def branching_graph():
|
|
"""Graph mit Verzweigung: START → LOGIC → (A1 | A2) → END"""
|
|
return WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)),
|
|
WorkflowNode(id="logic1", type=NodeType.LOGIC, position=Position(x=100, y=50)),
|
|
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="prompt1"),
|
|
WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=200, y=100), prompt_slug="prompt2"),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="logic1"),
|
|
WorkflowEdge(id="e2", from_node="logic1", to_node="analysis1", label="then"),
|
|
WorkflowEdge(id="e3", from_node="logic1", to_node="analysis2", label="else"),
|
|
WorkflowEdge(id="e4", from_node="analysis1", to_node="end"),
|
|
WorkflowEdge(id="e5", from_node="analysis2", to_node="end")
|
|
]
|
|
)
|
|
|
|
|
|
# ── Test: Graph-Parsing ───────────────────────────────────────────────────────
|
|
|
|
def test_parse_workflow_graph_valid(simple_valid_graph):
|
|
"""Test: Gültiger Graph wird korrekt geparst"""
|
|
graph_dict = simple_valid_graph.model_dump()
|
|
parsed = parse_workflow_graph(graph_dict)
|
|
|
|
assert len(parsed.nodes) == 3
|
|
assert len(parsed.edges) == 2
|
|
assert parsed.nodes[0].type == NodeType.START
|
|
assert parsed.nodes[2].type == NodeType.END
|
|
|
|
|
|
def test_parse_workflow_graph_invalid_format():
|
|
"""Test: Ungültiges Format wirft ValidationError"""
|
|
invalid_graph = {"nodes": "not a list", "edges": []}
|
|
|
|
with pytest.raises(Exception): # Pydantic ValidationError
|
|
parse_workflow_graph(invalid_graph)
|
|
|
|
|
|
# ── Test: Graph-Validierung ──────────────────────────────────────────────────
|
|
|
|
def test_valid_graph_initialization(simple_valid_graph):
|
|
"""Test: Gültiger Graph kann initialisiert werden"""
|
|
engine = WorkflowEngine(simple_valid_graph)
|
|
|
|
assert len(engine.nodes_by_id) == 3
|
|
assert len(engine.edges_by_id) == 2
|
|
assert engine.topological_order == ["start", "analysis1", "end"]
|
|
|
|
|
|
def test_validate_graph_no_start_node():
|
|
"""Test: Graph ohne START-Knoten wird abgelehnt"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=0, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0))
|
|
],
|
|
edges=[WorkflowEdge(id="e1", from_node="analysis1", to_node="end")]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "Kein START-Knoten" in str(exc_info.value.detail)
|
|
|
|
|
|
def test_validate_graph_no_end_node():
|
|
"""Test: Graph ohne END-Knoten wird abgelehnt"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test")
|
|
],
|
|
edges=[WorkflowEdge(id="e1", from_node="start", to_node="analysis1")]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "Kein END-Knoten" in str(exc_info.value.detail)
|
|
|
|
|
|
def test_validate_graph_multiple_start_nodes():
|
|
"""Test: Graph mit mehreren START-Knoten wird abgelehnt"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start1", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="start2", type=NodeType.START, position=Position(x=0, y=100)),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start1", to_node="end"),
|
|
WorkflowEdge(id="e2", from_node="start2", to_node="end")
|
|
]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "Mehrere START-Knoten" in str(exc_info.value.detail)
|
|
|
|
|
|
def test_validate_graph_missing_node_reference():
|
|
"""Test: Edge referenziert nicht-existierenden Knoten"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0))
|
|
],
|
|
edges=[WorkflowEdge(id="e1", from_node="start", to_node="nonexistent")]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "existiert nicht" in str(exc_info.value.detail)
|
|
|
|
|
|
# ── Test: Zyklen-Erkennung ────────────────────────────────────────────────────
|
|
|
|
def test_detect_cycle_simple():
|
|
"""Test: Einfacher Zyklus wird erkannt (A → B → A)"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="b", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=0))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="a"),
|
|
WorkflowEdge(id="e2", from_node="a", to_node="b"),
|
|
WorkflowEdge(id="e3", from_node="b", to_node="a"), # Zyklus!
|
|
WorkflowEdge(id="e4", from_node="b", to_node="end")
|
|
]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "Zyklus erkannt" in str(exc_info.value.detail)
|
|
|
|
|
|
def test_detect_cycle_self_loop():
|
|
"""Test: Selbst-Zyklus wird erkannt (A → A)"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="a"),
|
|
WorkflowEdge(id="e2", from_node="a", to_node="a"), # Selbst-Zyklus!
|
|
WorkflowEdge(id="e3", from_node="a", to_node="end")
|
|
]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "Zyklus erkannt" in str(exc_info.value.detail)
|
|
|
|
|
|
def test_no_cycle_branching(branching_graph):
|
|
"""Test: Verzweigung ohne Zyklus wird akzeptiert"""
|
|
engine = WorkflowEngine(branching_graph)
|
|
assert engine is not None # Kein Fehler
|
|
|
|
|
|
# ── Test: Erreichbarkeit ──────────────────────────────────────────────────────
|
|
|
|
def test_unreachable_node_from_start():
|
|
"""Test: Nicht vom START erreichbarer Knoten wird erkannt"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="isolated", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="test"), # Isoliert!
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="a"),
|
|
WorkflowEdge(id="e2", from_node="a", to_node="end")
|
|
# 'isolated' ist nicht verbunden
|
|
]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "nicht erreichbar vom START" in str(exc_info.value.detail)
|
|
|
|
|
|
def test_node_cannot_reach_end():
|
|
"""Test: Knoten der END nicht erreichen kann wird erkannt"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)),
|
|
WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="dead_end", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"),
|
|
WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=100))
|
|
],
|
|
edges=[
|
|
WorkflowEdge(id="e1", from_node="start", to_node="a"),
|
|
WorkflowEdge(id="e2", from_node="a", to_node="dead_end")
|
|
# 'dead_end' kann END nicht erreichen (keine Verbindung)
|
|
]
|
|
)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
WorkflowEngine(graph)
|
|
|
|
assert exc_info.value.status_code == 400
|
|
assert "END nicht erreichen" in str(exc_info.value.detail)
|
|
|
|
|
|
# ── Test: Topologische Sortierung ─────────────────────────────────────────────
|
|
|
|
def test_topological_sort_simple(simple_valid_graph):
|
|
"""Test: Einfacher linearer Graph hat korrekte topologische Sortierung"""
|
|
engine = WorkflowEngine(simple_valid_graph)
|
|
assert engine.topological_order == ["start", "analysis1", "end"]
|
|
|
|
|
|
def test_topological_sort_parallel(parallel_graph):
|
|
"""Test: Paralleler Graph - Topologische Sortierung"""
|
|
engine = WorkflowEngine(parallel_graph)
|
|
|
|
# START muss zuerst kommen
|
|
assert engine.topological_order[0] == "start"
|
|
|
|
# analysis1 und analysis2 können in beliebiger Reihenfolge kommen (parallel)
|
|
assert set(engine.topological_order[1:3]) == {"analysis1", "analysis2"}
|
|
|
|
# JOIN kommt nach beiden Analysen
|
|
assert engine.topological_order[3] == "join"
|
|
|
|
# END kommt zuletzt
|
|
assert engine.topological_order[4] == "end"
|
|
|
|
|
|
def test_topological_sort_branching(branching_graph):
|
|
"""Test: Verzweigter Graph - Topologische Sortierung"""
|
|
engine = WorkflowEngine(branching_graph)
|
|
|
|
# START → LOGIC muss zuerst kommen
|
|
assert engine.topological_order[:2] == ["start", "logic1"]
|
|
|
|
# analysis1 und analysis2 können in beliebiger Reihenfolge kommen (alternative Pfade)
|
|
assert set(engine.topological_order[2:4]) == {"analysis1", "analysis2"}
|
|
|
|
# END kommt zuletzt
|
|
assert engine.topological_order[4] == "end"
|
|
|
|
|
|
# ── Test: Ausführungs-Ebenen (Parallelität) ───────────────────────────────────
|
|
|
|
def test_execution_order_simple(simple_valid_graph):
|
|
"""Test: Einfacher Graph hat 3 Ebenen (keine Parallelität)"""
|
|
engine = WorkflowEngine(simple_valid_graph)
|
|
execution_order = engine.get_execution_order()
|
|
|
|
assert len(execution_order) == 3
|
|
assert execution_order[0] == ["start"]
|
|
assert execution_order[1] == ["analysis1"]
|
|
assert execution_order[2] == ["end"]
|
|
|
|
|
|
def test_execution_order_parallel(parallel_graph):
|
|
"""Test: Paralleler Graph - Ebene 2 hat 2 Knoten (können parallel laufen)"""
|
|
engine = WorkflowEngine(parallel_graph)
|
|
execution_order = engine.get_execution_order()
|
|
|
|
assert len(execution_order) == 4
|
|
assert execution_order[0] == ["start"]
|
|
assert set(execution_order[1]) == {"analysis1", "analysis2"} # Parallel!
|
|
assert execution_order[2] == ["join"]
|
|
assert execution_order[3] == ["end"]
|
|
|
|
|
|
def test_execution_order_branching(branching_graph):
|
|
"""Test: Verzweigter Graph - Alternative Pfade sind auf derselben Ebene"""
|
|
engine = WorkflowEngine(branching_graph)
|
|
execution_order = engine.get_execution_order()
|
|
|
|
assert len(execution_order) == 4
|
|
assert execution_order[0] == ["start"]
|
|
assert execution_order[1] == ["logic1"]
|
|
assert set(execution_order[2]) == {"analysis1", "analysis2"} # Alternative Pfade
|
|
assert execution_order[3] == ["end"]
|
|
|
|
|
|
# ── Test: Validierungs-Hilfsfunktion ──────────────────────────────────────────
|
|
|
|
def test_validate_workflow_graph_valid(simple_valid_graph):
|
|
"""Test: Hilfsfunktion validate_workflow_graph für gültigen Graph"""
|
|
is_valid, errors = validate_workflow_graph(simple_valid_graph)
|
|
|
|
assert is_valid is True
|
|
assert errors == []
|
|
|
|
|
|
def test_validate_workflow_graph_invalid():
|
|
"""Test: Hilfsfunktion validate_workflow_graph für ungültigen Graph"""
|
|
graph = WorkflowGraph(
|
|
nodes=[
|
|
WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0))
|
|
# Kein END-Knoten!
|
|
],
|
|
edges=[]
|
|
)
|
|
|
|
is_valid, errors = validate_workflow_graph(graph)
|
|
|
|
assert is_valid is False
|
|
assert len(errors) > 0
|
|
assert any("END-Knoten" in str(e) for e in errors)
|
|
|
|
|
|
# ── Test: Adjacency Lists ─────────────────────────────────────────────────────
|
|
|
|
def test_adjacency_lists_creation(parallel_graph):
|
|
"""Test: Adjacency Lists werden korrekt erstellt"""
|
|
engine = WorkflowEngine(parallel_graph)
|
|
|
|
# Outgoing edges vom START
|
|
start_outgoing = engine.outgoing_edges.get("start", [])
|
|
assert len(start_outgoing) == 2
|
|
assert set(e.to_node for e in start_outgoing) == {"analysis1", "analysis2"}
|
|
|
|
# Incoming edges zu JOIN
|
|
join_incoming = engine.incoming_edges.get("join", [])
|
|
assert len(join_incoming) == 2
|
|
assert set(e.from_node for e in join_incoming) == {"analysis1", "analysis2"}
|
|
|
|
|
|
# ── Run Tests ─────────────────────────────────────────────────────────────────
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|