mitai-jinkendo/backend/workflow_engine.py
Lars acd4830795
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 14s
fix: Access topological_order directly from engine, not from non-existent validator attribute
2026-04-03 21:38:45 +02:00

428 lines
14 KiB
Python

"""
Workflow Engine (Phase 0: Foundation)
Graph-Parsing, topologische Sortierung, DAG-Validierung.
Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md
Phase 0: Foundation (Graph-Parsing, Validation)
Phase 1-3: Vollständige Execution-Logic
"""
from typing import Dict, List, Set, Optional, Tuple, Any
from workflow_models import (
WorkflowGraph,
WorkflowNode,
WorkflowEdge,
NodeType,
NodeStatus
)
from fastapi import HTTPException
class WorkflowEngine:
"""
Workflow-Execution-Engine für graph-basierte Prompt-Workflows.
Phase 0: Graph-Parsing und Validierung
Phase 1-3: Execution-Logic
"""
def __init__(self, graph: WorkflowGraph):
"""
Initialisiere Engine mit Workflow-Graph.
Args:
graph: Workflow-Graph (Knoten + Kanten)
Raises:
HTTPException: Bei ungültigem Graph (Zyklen, fehlende Knoten, etc.)
"""
self.graph = graph
self.nodes_by_id: Dict[str, WorkflowNode] = {node.id: node for node in graph.nodes}
self.edges_by_id: Dict[str, WorkflowEdge] = {edge.id: edge for edge in graph.edges}
# Adjacency lists für Traversierung
self.outgoing_edges: Dict[str, List[WorkflowEdge]] = {}
self.incoming_edges: Dict[str, List[WorkflowEdge]] = {}
# Build adjacency lists
for edge in graph.edges:
# Outgoing edges (from node)
if edge.from_node not in self.outgoing_edges:
self.outgoing_edges[edge.from_node] = []
self.outgoing_edges[edge.from_node].append(edge)
# Incoming edges (to node)
if edge.to_node not in self.incoming_edges:
self.incoming_edges[edge.to_node] = []
self.incoming_edges[edge.to_node].append(edge)
# Validiere Graph
self._validate_graph()
# Topologische Sortierung
self.topological_order = self._topological_sort()
def _validate_graph(self):
"""
Validiere Workflow-Graph.
Prüfungen:
1. Alle referenzierten Knoten existieren
2. Genau ein START-Knoten
3. Mindestens ein END-Knoten
4. Keine Zyklen (DAG)
5. Alle Knoten erreichbar vom START
6. Alle Knoten können END erreichen
Raises:
HTTPException: Bei Validierungsfehlern
"""
errors = []
# 1. Prüfe ob alle referenzierten Knoten existieren
for edge in self.graph.edges:
if edge.from_node not in self.nodes_by_id:
errors.append(f"Edge {edge.id}: from_node '{edge.from_node}' existiert nicht")
if edge.to_node not in self.nodes_by_id:
errors.append(f"Edge {edge.id}: to_node '{edge.to_node}' existiert nicht")
if errors:
raise HTTPException(400, {"error": "Ungültiger Graph", "details": errors})
# 2. Genau ein START-Knoten
start_nodes = [n for n in self.graph.nodes if n.type == NodeType.START]
if len(start_nodes) == 0:
errors.append("Kein START-Knoten gefunden")
elif len(start_nodes) > 1:
errors.append(f"Mehrere START-Knoten gefunden: {[n.id for n in start_nodes]}")
# 3. Mindestens ein END-Knoten
end_nodes = [n for n in self.graph.nodes if n.type == NodeType.END]
if len(end_nodes) == 0:
errors.append("Kein END-Knoten gefunden")
if errors:
raise HTTPException(400, {"error": "Ungültiger Graph", "details": errors})
# 4. Keine Zyklen (DAG-Prüfung)
cycle = self._detect_cycle()
if cycle:
errors.append(f"Zyklus erkannt: {''.join(cycle)}")
if errors:
raise HTTPException(400, {"error": "Ungültiger Graph (Zyklus)", "details": errors})
# 5. Alle Knoten erreichbar vom START
start_node = start_nodes[0]
reachable = self._get_reachable_nodes(start_node.id)
unreachable = [n.id for n in self.graph.nodes if n.id not in reachable]
if unreachable:
errors.append(f"Knoten nicht erreichbar vom START: {unreachable}")
# 6. Alle Knoten können END erreichen (Rückwärts-Prüfung)
end_nodes_ids = [n.id for n in end_nodes]
can_reach_end = self._get_nodes_reaching_end(end_nodes_ids)
cannot_reach_end = [n.id for n in self.graph.nodes if n.id not in can_reach_end]
if cannot_reach_end:
errors.append(f"Knoten können END nicht erreichen: {cannot_reach_end}")
if errors:
raise HTTPException(400, {"error": "Ungültiger Graph (Erreichbarkeit)", "details": errors})
def _detect_cycle(self) -> Optional[List[str]]:
"""
Erkenne Zyklen im Graph mittels DFS.
Returns:
Liste der Knoten im Zyklus, oder None wenn kein Zyklus
"""
visited: Set[str] = set()
rec_stack: Set[str] = set() # Recursion stack für DFS
parent: Dict[str, Optional[str]] = {}
def dfs(node_id: str) -> Optional[List[str]]:
visited.add(node_id)
rec_stack.add(node_id)
# Besuche alle Nachbarn
for edge in self.outgoing_edges.get(node_id, []):
neighbor = edge.to_node
if neighbor not in visited:
parent[neighbor] = node_id
cycle = dfs(neighbor)
if cycle:
return cycle
elif neighbor in rec_stack:
# Zyklus gefunden! Rekonstruiere Pfad
cycle_path = [neighbor]
current = node_id
while current != neighbor:
cycle_path.append(current)
current = parent.get(current)
cycle_path.append(neighbor) # Schließe Zyklus
return list(reversed(cycle_path))
rec_stack.remove(node_id)
return None
# Starte DFS von allen Knoten (für disconnected components)
for node in self.graph.nodes:
if node.id not in visited:
parent[node.id] = None
cycle = dfs(node.id)
if cycle:
return cycle
return None
def _get_reachable_nodes(self, start_node_id: str) -> Set[str]:
"""
Finde alle vom Startknoten aus erreichbaren Knoten (Vorwärts-Traversierung).
Args:
start_node_id: ID des Startknotens
Returns:
Set aller erreichbaren Knoten-IDs
"""
reachable: Set[str] = set()
stack = [start_node_id]
while stack:
current = stack.pop()
if current in reachable:
continue
reachable.add(current)
# Füge alle Nachbarn hinzu
for edge in self.outgoing_edges.get(current, []):
stack.append(edge.to_node)
return reachable
def _get_nodes_reaching_end(self, end_node_ids: List[str]) -> Set[str]:
"""
Finde alle Knoten die mindestens einen END-Knoten erreichen können.
Rückwärts-Traversierung von END-Knoten.
Args:
end_node_ids: Liste der END-Knoten-IDs
Returns:
Set aller Knoten-IDs die END erreichen können
"""
can_reach_end: Set[str] = set()
stack = list(end_node_ids)
while stack:
current = stack.pop()
if current in can_reach_end:
continue
can_reach_end.add(current)
# Füge alle Vorgänger hinzu (rückwärts)
for edge in self.incoming_edges.get(current, []):
stack.append(edge.from_node)
return can_reach_end
def _topological_sort(self) -> List[str]:
"""
Berechne topologische Sortierung des Graphen (Kahn's Algorithm).
Die topologische Sortierung gibt eine Reihenfolge vor, in der Knoten
ausgeführt werden können, ohne dass Abhängigkeiten verletzt werden.
Returns:
Liste der Knoten-IDs in topologischer Reihenfolge
Raises:
HTTPException: Bei Zyklen (sollte durch _validate_graph verhindert sein)
"""
# Berechne In-Degree für jeden Knoten
in_degree: Dict[str, int] = {node.id: 0 for node in self.graph.nodes}
for edge in self.graph.edges:
in_degree[edge.to_node] += 1
# Queue mit Knoten ohne Vorgänger (In-Degree = 0)
queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
sorted_order = []
while queue:
# Entferne Knoten mit In-Degree 0
current = queue.pop(0)
sorted_order.append(current)
# Reduziere In-Degree aller Nachbarn
for edge in self.outgoing_edges.get(current, []):
neighbor = edge.to_node
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Wenn nicht alle Knoten sortiert wurden: Zyklus (sollte nicht passieren)
if len(sorted_order) != len(self.graph.nodes):
raise HTTPException(
500,
"Topologische Sortierung fehlgeschlagen (Zyklus?). "
"Dies sollte durch _validate_graph verhindert worden sein."
)
return sorted_order
def get_execution_order(self) -> List[List[str]]:
"""
Berechne Ausführungs-Reihenfolge als Ebenen (für parallele Execution).
Knoten auf derselben Ebene können parallel ausgeführt werden.
Returns:
Liste von Ebenen, jede Ebene ist eine Liste von Knoten-IDs:
[
["node_start"],
["node_1", "node_2"], # können parallel laufen
["node_3"],
["node_end"]
]
"""
# Berechne Level für jeden Knoten (längster Pfad vom START)
levels: Dict[str, int] = {}
for node_id in self.topological_order:
# Finde maximales Level aller Vorgänger
incoming = self.incoming_edges.get(node_id, [])
if not incoming:
# Knoten ohne Vorgänger (START)
levels[node_id] = 0
else:
max_parent_level = max(levels[edge.from_node] for edge in incoming)
levels[node_id] = max_parent_level + 1
# Gruppiere Knoten nach Level
max_level = max(levels.values()) if levels else 0
execution_order = [[] for _ in range(max_level + 1)]
for node_id, level in levels.items():
execution_order[level].append(node_id)
return execution_order
async def execute(
self,
variables: Dict[str, Any],
openrouter_call_func,
profile_id: str,
enable_debug: bool = False
) -> Dict[str, Any]:
"""
Führe Workflow aus.
Phase 0: Stub-Implementierung
Phase 1-3: Vollständige Implementierung mit:
- Fragenergänzung
- Normalisierung
- Logik-Auswertung
- Pfad-Routing
- Join-Konsolidierung
Args:
variables: Dict of variables for placeholder replacement
openrouter_call_func: Async function(prompt_text) -> response_text
profile_id: User profile ID
enable_debug: If True, include debug information
Returns:
Dict with execution results
Raises:
HTTPException: 501 Not Implemented (Phase 0)
"""
raise HTTPException(
status_code=501,
detail="Workflow-Execution noch nicht implementiert (Phase 0: Foundation). "
"Vollständige Implementierung erfolgt in Phase 1-3."
)
def parse_workflow_graph(graph_jsonb: Dict) -> WorkflowGraph:
"""
Parse JSONB-Graph aus Datenbank zu Pydantic WorkflowGraph-Modell.
Args:
graph_jsonb: JSONB dict aus workflow_definitions.graph
Returns:
Validiertes WorkflowGraph-Objekt
Raises:
ValidationError: Bei ungültigem Graph-Format
"""
return WorkflowGraph(**graph_jsonb)
def validate_workflow_graph(graph: WorkflowGraph) -> Tuple[bool, List[str]]:
"""
Validiere Workflow-Graph (ohne Engine zu initialisieren).
Kann für UI-Validierung verwendet werden (vor dem Speichern).
Args:
graph: Workflow-Graph
Returns:
Tuple (is_valid, errors)
- is_valid: True wenn Graph gültig
- errors: Liste von Fehler-Strings (leer wenn valid)
"""
try:
engine = WorkflowEngine(graph)
return True, []
except HTTPException as e:
# Extrahiere Fehler aus HTTPException detail
detail = e.detail
if isinstance(detail, dict):
errors = detail.get('details', [detail.get('error', str(e))])
else:
errors = [str(detail)]
return False, errors
except Exception as e:
return False, [f"Unerwarteter Fehler: {str(e)}"]
def get_execution_order(graph: WorkflowGraph) -> List[str]:
"""
Berechne sequenzielle Ausführungs-Reihenfolge (Phase 2).
Phase 2: Sequenziell (flattened topological sort).
Phase 7: Parallele Execution (levels statt flat list).
Args:
graph: Workflow-Graph
Returns:
Liste von Knoten-IDs in Ausführungsreihenfolge
Beispiel: ["start", "node_1", "node_2", "end"]
Raises:
HTTPException: Bei ungültigem Graph
Beispiel:
>>> from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge
>>> graph = WorkflowGraph(
... nodes=[
... WorkflowNode(id="start", type="start"),
... WorkflowNode(id="end", type="end")
... ],
... edges=[WorkflowEdge(id="e1", from_node="start", to_node="end")]
... )
>>> get_execution_order(graph)
['start', 'end']
"""
engine = WorkflowEngine(graph)
# Nutze topological_order (flattened)
return engine.topological_order