""" Workflow Engine (Phase 0: Foundation) Graph-Parsing, topologische Sortierung, DAG-Validierung. Konzept-Basis: konzept_workflow_engine_konsolidated.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md Phase 0: Foundation (Graph-Parsing, Validation) Phase 1-3: Vollständige Execution-Logic """ from typing import Dict, List, Set, Optional, Tuple, Any from workflow_models import ( WorkflowGraph, WorkflowNode, WorkflowEdge, NodeType, NodeStatus ) from fastapi import HTTPException class WorkflowEngine: """ Workflow-Execution-Engine für graph-basierte Prompt-Workflows. Phase 0: Graph-Parsing und Validierung Phase 1-3: Execution-Logic """ def __init__(self, graph: WorkflowGraph): """ Initialisiere Engine mit Workflow-Graph. Args: graph: Workflow-Graph (Knoten + Kanten) Raises: HTTPException: Bei ungültigem Graph (Zyklen, fehlende Knoten, etc.) """ self.graph = graph self.nodes_by_id: Dict[str, WorkflowNode] = {node.id: node for node in graph.nodes} self.edges_by_id: Dict[str, WorkflowEdge] = {edge.id: edge for edge in graph.edges} # Adjacency lists für Traversierung self.outgoing_edges: Dict[str, List[WorkflowEdge]] = {} self.incoming_edges: Dict[str, List[WorkflowEdge]] = {} # Build adjacency lists for edge in graph.edges: # Outgoing edges (from node) if edge.from_node not in self.outgoing_edges: self.outgoing_edges[edge.from_node] = [] self.outgoing_edges[edge.from_node].append(edge) # Incoming edges (to node) if edge.to_node not in self.incoming_edges: self.incoming_edges[edge.to_node] = [] self.incoming_edges[edge.to_node].append(edge) # Validiere Graph self._validate_graph() # Topologische Sortierung self.topological_order = self._topological_sort() def _validate_graph(self): """ Validiere Workflow-Graph. Prüfungen: 1. Alle referenzierten Knoten existieren 2. Genau ein START-Knoten 3. Mindestens ein END-Knoten 4. Keine Zyklen (DAG) 5. Alle Knoten erreichbar vom START 6. Alle Knoten können END erreichen Raises: HTTPException: Bei Validierungsfehlern """ errors = [] # 1. Prüfe ob alle referenzierten Knoten existieren for edge in self.graph.edges: if edge.from_node not in self.nodes_by_id: errors.append(f"Edge {edge.id}: from_node '{edge.from_node}' existiert nicht") if edge.to_node not in self.nodes_by_id: errors.append(f"Edge {edge.id}: to_node '{edge.to_node}' existiert nicht") if errors: raise HTTPException(400, {"error": "Ungültiger Graph", "details": errors}) # 2. Genau ein START-Knoten start_nodes = [n for n in self.graph.nodes if n.type == NodeType.START] if len(start_nodes) == 0: errors.append("Kein START-Knoten gefunden") elif len(start_nodes) > 1: errors.append(f"Mehrere START-Knoten gefunden: {[n.id for n in start_nodes]}") # 3. Mindestens ein END-Knoten end_nodes = [n for n in self.graph.nodes if n.type == NodeType.END] if len(end_nodes) == 0: errors.append("Kein END-Knoten gefunden") if errors: raise HTTPException(400, {"error": "Ungültiger Graph", "details": errors}) # 4. Keine Zyklen (DAG-Prüfung) cycle = self._detect_cycle() if cycle: errors.append(f"Zyklus erkannt: {' → '.join(cycle)}") if errors: raise HTTPException(400, {"error": "Ungültiger Graph (Zyklus)", "details": errors}) # 5. Alle Knoten erreichbar vom START start_node = start_nodes[0] reachable = self._get_reachable_nodes(start_node.id) unreachable = [n.id for n in self.graph.nodes if n.id not in reachable] if unreachable: errors.append(f"Knoten nicht erreichbar vom START: {unreachable}") # 6. Alle Knoten können END erreichen (Rückwärts-Prüfung) end_nodes_ids = [n.id for n in end_nodes] can_reach_end = self._get_nodes_reaching_end(end_nodes_ids) cannot_reach_end = [n.id for n in self.graph.nodes if n.id not in can_reach_end] if cannot_reach_end: errors.append(f"Knoten können END nicht erreichen: {cannot_reach_end}") if errors: raise HTTPException(400, {"error": "Ungültiger Graph (Erreichbarkeit)", "details": errors}) def _detect_cycle(self) -> Optional[List[str]]: """ Erkenne Zyklen im Graph mittels DFS. Returns: Liste der Knoten im Zyklus, oder None wenn kein Zyklus """ visited: Set[str] = set() rec_stack: Set[str] = set() # Recursion stack für DFS parent: Dict[str, Optional[str]] = {} def dfs(node_id: str) -> Optional[List[str]]: visited.add(node_id) rec_stack.add(node_id) # Besuche alle Nachbarn for edge in self.outgoing_edges.get(node_id, []): neighbor = edge.to_node if neighbor not in visited: parent[neighbor] = node_id cycle = dfs(neighbor) if cycle: return cycle elif neighbor in rec_stack: # Zyklus gefunden! Rekonstruiere Pfad cycle_path = [neighbor] current = node_id while current != neighbor: cycle_path.append(current) current = parent.get(current) cycle_path.append(neighbor) # Schließe Zyklus return list(reversed(cycle_path)) rec_stack.remove(node_id) return None # Starte DFS von allen Knoten (für disconnected components) for node in self.graph.nodes: if node.id not in visited: parent[node.id] = None cycle = dfs(node.id) if cycle: return cycle return None def _get_reachable_nodes(self, start_node_id: str) -> Set[str]: """ Finde alle vom Startknoten aus erreichbaren Knoten (Vorwärts-Traversierung). Args: start_node_id: ID des Startknotens Returns: Set aller erreichbaren Knoten-IDs """ reachable: Set[str] = set() stack = [start_node_id] while stack: current = stack.pop() if current in reachable: continue reachable.add(current) # Füge alle Nachbarn hinzu for edge in self.outgoing_edges.get(current, []): stack.append(edge.to_node) return reachable def _get_nodes_reaching_end(self, end_node_ids: List[str]) -> Set[str]: """ Finde alle Knoten die mindestens einen END-Knoten erreichen können. Rückwärts-Traversierung von END-Knoten. Args: end_node_ids: Liste der END-Knoten-IDs Returns: Set aller Knoten-IDs die END erreichen können """ can_reach_end: Set[str] = set() stack = list(end_node_ids) while stack: current = stack.pop() if current in can_reach_end: continue can_reach_end.add(current) # Füge alle Vorgänger hinzu (rückwärts) for edge in self.incoming_edges.get(current, []): stack.append(edge.from_node) return can_reach_end def _topological_sort(self) -> List[str]: """ Berechne topologische Sortierung des Graphen (Kahn's Algorithm). Die topologische Sortierung gibt eine Reihenfolge vor, in der Knoten ausgeführt werden können, ohne dass Abhängigkeiten verletzt werden. Returns: Liste der Knoten-IDs in topologischer Reihenfolge Raises: HTTPException: Bei Zyklen (sollte durch _validate_graph verhindert sein) """ # Berechne In-Degree für jeden Knoten in_degree: Dict[str, int] = {node.id: 0 for node in self.graph.nodes} for edge in self.graph.edges: in_degree[edge.to_node] += 1 # Queue mit Knoten ohne Vorgänger (In-Degree = 0) queue = [node_id for node_id, degree in in_degree.items() if degree == 0] sorted_order = [] while queue: # Entferne Knoten mit In-Degree 0 current = queue.pop(0) sorted_order.append(current) # Reduziere In-Degree aller Nachbarn for edge in self.outgoing_edges.get(current, []): neighbor = edge.to_node in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) # Wenn nicht alle Knoten sortiert wurden: Zyklus (sollte nicht passieren) if len(sorted_order) != len(self.graph.nodes): raise HTTPException( 500, "Topologische Sortierung fehlgeschlagen (Zyklus?). " "Dies sollte durch _validate_graph verhindert worden sein." ) return sorted_order def get_execution_order(self) -> List[List[str]]: """ Berechne Ausführungs-Reihenfolge als Ebenen (für parallele Execution). Knoten auf derselben Ebene können parallel ausgeführt werden. Returns: Liste von Ebenen, jede Ebene ist eine Liste von Knoten-IDs: [ ["node_start"], ["node_1", "node_2"], # können parallel laufen ["node_3"], ["node_end"] ] """ # Berechne Level für jeden Knoten (längster Pfad vom START) levels: Dict[str, int] = {} for node_id in self.topological_order: # Finde maximales Level aller Vorgänger incoming = self.incoming_edges.get(node_id, []) if not incoming: # Knoten ohne Vorgänger (START) levels[node_id] = 0 else: max_parent_level = max(levels[edge.from_node] for edge in incoming) levels[node_id] = max_parent_level + 1 # Gruppiere Knoten nach Level max_level = max(levels.values()) if levels else 0 execution_order = [[] for _ in range(max_level + 1)] for node_id, level in levels.items(): execution_order[level].append(node_id) return execution_order async def execute( self, variables: Dict[str, Any], openrouter_call_func, profile_id: str, enable_debug: bool = False ) -> Dict[str, Any]: """ Führe Workflow aus. Phase 0: Stub-Implementierung Phase 1-3: Vollständige Implementierung mit: - Fragenergänzung - Normalisierung - Logik-Auswertung - Pfad-Routing - Join-Konsolidierung Args: variables: Dict of variables for placeholder replacement openrouter_call_func: Async function(prompt_text) -> response_text profile_id: User profile ID enable_debug: If True, include debug information Returns: Dict with execution results Raises: HTTPException: 501 Not Implemented (Phase 0) """ raise HTTPException( status_code=501, detail="Workflow-Execution noch nicht implementiert (Phase 0: Foundation). " "Vollständige Implementierung erfolgt in Phase 1-3." ) def parse_workflow_graph(graph_jsonb: Dict) -> WorkflowGraph: """ Parse JSONB-Graph aus Datenbank zu Pydantic WorkflowGraph-Modell. Args: graph_jsonb: JSONB dict aus workflow_definitions.graph Unterstützt beide Edge-Formate: - Backend: {"from": "...", "to": "..."} - Frontend: {"source": "...", "target": "..."} Returns: Validiertes WorkflowGraph-Objekt Raises: ValidationError: Bei ungültigem Graph-Format """ # Normalize edges: convert React Flow format (source/target) to backend format (from/to) if "edges" in graph_jsonb: normalized_edges = [] for edge in graph_jsonb["edges"]: normalized_edge = edge.copy() # Convert source → from, target → to (if present) if "source" in normalized_edge and "from" not in normalized_edge: normalized_edge["from"] = normalized_edge.pop("source") if "target" in normalized_edge and "to" not in normalized_edge: normalized_edge["to"] = normalized_edge.pop("target") normalized_edges.append(normalized_edge) graph_jsonb = {**graph_jsonb, "edges": normalized_edges} return WorkflowGraph(**graph_jsonb) def validate_workflow_graph(graph: WorkflowGraph) -> Tuple[bool, List[str]]: """ Validiere Workflow-Graph (ohne Engine zu initialisieren). Kann für UI-Validierung verwendet werden (vor dem Speichern). Args: graph: Workflow-Graph Returns: Tuple (is_valid, errors) - is_valid: True wenn Graph gültig - errors: Liste von Fehler-Strings (leer wenn valid) """ try: engine = WorkflowEngine(graph) return True, [] except HTTPException as e: # Extrahiere Fehler aus HTTPException detail detail = e.detail if isinstance(detail, dict): errors = detail.get('details', [detail.get('error', str(e))]) else: errors = [str(detail)] return False, errors except Exception as e: return False, [f"Unerwarteter Fehler: {str(e)}"] def get_execution_order(graph: WorkflowGraph) -> List[str]: """ Berechne sequenzielle Ausführungs-Reihenfolge (Phase 2). Phase 2: Sequenziell (flattened topological sort). Phase 7: Parallele Execution (levels statt flat list). Args: graph: Workflow-Graph Returns: Liste von Knoten-IDs in Ausführungsreihenfolge Beispiel: ["start", "node_1", "node_2", "end"] Raises: HTTPException: Bei ungültigem Graph Beispiel: >>> from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge >>> graph = WorkflowGraph( ... nodes=[ ... WorkflowNode(id="start", type="start"), ... WorkflowNode(id="end", type="end") ... ], ... edges=[WorkflowEdge(id="e1", from_node="start", to_node="end")] ... ) >>> get_execution_order(graph) ['start', 'end'] """ engine = WorkflowEngine(graph) # Nutze topological_order (flattened) return engine.topological_order