""" Workflow Executor (Phase 4) Führt Workflows mit conditional branching und path consolidation aus. Phase 2: Sequential execution Phase 3: Conditional branching, Logic Nodes, Fallback strategies Phase 4: Join Nodes, Path consolidation Konzept-Basis: konzept_workflow_engine_konsolidated.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2-4) """ from typing import Dict, Any, List, Optional, Set from datetime import datetime import uuid import logging import json from workflow_models import ( WorkflowGraph, NodeExecutionState, ExecutionResult, NodeStatus, NormalizedSignal, FallbackStrategy, SignalStatus ) from workflow_engine import parse_workflow_graph, get_execution_order from question_augmenter import ( augment_prompt_with_questions, parse_question_augmentations_from_jsonb ) from result_container_parser import parse_result_container from normalization_engine import normalize_all_signals, load_question_catalog from logic_evaluator import evaluate_logic_expression, resolve_signal_reference from join_evaluator import evaluate_join_node as evaluate_join_node_core from db import get_db, get_cursor logger = logging.getLogger(__name__) async def execute_workflow( workflow_id: Optional[str] = None, graph_data: Optional[Dict] = None, # Phase 5: Direkt von ai_prompts.graph_data profile_id: str = None, variables: Dict[str, Any] = None, openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str enable_debug: bool = False ) -> ExecutionResult: """ Führt einen Workflow aus (mit conditional branching und path consolidation). Phase 2: Linear execution in topological order. Phase 3: Conditional branching basierend auf logic nodes. Phase 4: Join nodes und path consolidation. Phase 5: Unterstützt graph_data direkt (aus ai_prompts, nicht workflow_definitions) Args: workflow_id: UUID des Workflows (legacy, für workflow_definitions Tabelle) graph_data: Workflow-Graph als Dict (NEU, für ai_prompts.graph_data) profile_id: UUID des Profils variables: Platzhalter-Werte (z.B. {"name": "Lars", ...}) openrouter_call_func: async (prompt, model) -> str enable_debug: Debug-Modus Returns: ExecutionResult mit allen node_states Beispiel (Phase 5): >>> result = await execute_workflow( ... graph_data={"nodes": [...], "edges": [...]}, ... profile_id="test-profile", ... variables={"name": "Lars"}, ... openrouter_call_func=my_llm_func ... ) """ execution_id = str(uuid.uuid4()) started_at = datetime.utcnow().isoformat() logger.info(f"Starting workflow execution: {execution_id}") try: # 1. Lade Workflow-Definition if graph_data: # Phase 5: Graph direkt aus ai_prompts.graph_data graph_json = json.dumps(graph_data) logger.debug(f"Using provided graph_data") elif workflow_id: # Phase 0-4: Graph aus workflow_definitions Tabelle (legacy) with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT graph FROM workflow_definitions WHERE id = %s AND active = true", (workflow_id,) ) row = cur.fetchone() if not row: raise ValueError(f"Workflow not found: {workflow_id}") graph_json = row['graph'] logger.debug(f"Loaded graph from workflow_definitions: {workflow_id}") else: raise ValueError("Entweder workflow_id oder graph_data muss übergeben werden") # 2. Parse Graph graph = parse_workflow_graph(graph_json) logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges") # 4. Lade Question Catalog with get_db() as conn: catalog = load_question_catalog(conn) logger.debug(f"Loaded catalog: {len(catalog)} question types") # 5. Execute Nodes mit conditional branching (Phase 3) node_states: List[NodeExecutionState] = [] context = { "variables": variables, "profile_id": profile_id, "node_results": {}, # Phase 3: Store full NodeExecutionState "active_edges": {} # Phase 3: Track edge activation } # Phase 3: BFS traversal mit edge activation start_node = next((n for n in graph.nodes if n.type == "start"), None) if not start_node: raise ValueError("No start node found in workflow") visited: Set[str] = set() queue = [start_node.id] while queue: node_id = queue.pop(0) if node_id in visited: continue visited.add(node_id) node = next(n for n in graph.nodes if n.id == node_id) # Prüfe ob Node aktiv ist (mindestens eine incoming edge aktiv) if not _has_active_incoming_edge(node, graph, context): logger.info(f"Skipping node {node_id}: no active incoming edges") node_states.append(NodeExecutionState( node_id=node_id, status=NodeStatus.SKIPPED, error="no_active_incoming_edges", started_at=datetime.utcnow().isoformat(), completed_at=datetime.utcnow().isoformat() )) continue logger.info(f"Executing node: {node_id} (type: {node.type})") node_state = await execute_node( node=node, context=context, catalog=catalog, graph=graph, # Phase 3: Needed for logic nodes openrouter_call_func=openrouter_call_func, enable_debug=enable_debug ) node_states.append(node_state) context["node_results"][node_id] = node_state # Füge Nachfolger zur Queue hinzu outgoing_edges = [e for e in graph.edges if e.from_node == node_id] for edge in outgoing_edges: # Prüfe ob Edge aktiv ist (default: True, Logic Nodes setzen auf False) if context["active_edges"].get(edge.id, True): queue.append(edge.to_node) # 6. Aggregiere Ergebnisse aggregated = aggregate_results(node_states) # 7. Speichere Execution State completed_at = datetime.utcnow().isoformat() save_execution_state( execution_id=execution_id, workflow_id=workflow_id, profile_id=profile_id, node_states=node_states, status="completed", started_at=started_at, completed_at=completed_at ) logger.info(f"Workflow execution completed: {execution_id}") return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id, status="completed", node_states=node_states, aggregated_result=aggregated, started_at=started_at, completed_at=completed_at ) except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) # Speichere Failed State completed_at = datetime.utcnow().isoformat() save_execution_state( execution_id=execution_id, workflow_id=workflow_id, profile_id=profile_id, node_states=node_states if 'node_states' in locals() else [], status="failed", started_at=started_at, completed_at=completed_at, error=str(e) ) return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id, status="failed", node_states=node_states if 'node_states' in locals() else [], aggregated_result={}, started_at=started_at, completed_at=completed_at, error=str(e) ) async def execute_node( node, context: Dict[str, Any], catalog: Dict[str, Dict], graph: WorkflowGraph, # Phase 3: Needed for logic nodes openrouter_call_func, enable_debug: bool = False ) -> NodeExecutionState: """ Führt einen einzelnen Knoten aus. Args: node: WorkflowNode (aus graph.nodes) context: Execution context (variables, profile_id, node_results, active_edges) catalog: Question catalog graph: WorkflowGraph (für Logic Nodes) openrouter_call_func: LLM callback: async (prompt, model) -> str enable_debug: Debug mode Returns: NodeExecutionState Node Types: - start/end: No-op - analysis: Load prompt → augment → LLM → parse → normalize - logic: Evaluate condition → activate/deactivate edges (Phase 3) - join: Consolidate paths → merge results (Phase 4) """ started_at = datetime.utcnow().isoformat() try: # Start/End Nodes: No-Op if node.type in ["start", "end"]: logger.debug(f"Node {node.id}: No-op ({node.type})") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # Logic Nodes (Phase 3) if node.type == "logic": return execute_logic_node(node, context, graph) # Join Nodes (Phase 4) if node.type == "join": return execute_join_node(node, context, graph) # Analysis Nodes if node.type == "analysis": # 1. Lade Prompt prompt_template = await load_prompt_template(node.prompt_slug, context) logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'") # 2. Parse question_augmentations questions = [] if node.question_augmentations: # Convert list of dicts to JSONB-like format for parser questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations] questions = parse_question_augmentations_from_jsonb(questions_jsonb) logger.debug(f"Node {node.id}: {len(questions)} question augmentations") # 3. Augment Prompt if questions: augmented_prompt = augment_prompt_with_questions( base_prompt=prompt_template, questions=questions ) else: augmented_prompt = prompt_template # 4. LLM Call logger.debug(f"Node {node.id}: Calling LLM") llm_response = await openrouter_call_func( augmented_prompt, "anthropic/claude-sonnet-4" # Default model ) # 5. Parse Result Container parsed = parse_result_container(llm_response) logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})") # 6. Normalize Signals normalized_signals = [] if parsed["decision_signals"]: # Hybrid Model: Node-spezifische Questions überschreiben Catalog node_catalog = catalog.copy() if node.question_augmentations: for q in node.question_augmentations: q_dict = q.model_dump() if hasattr(q, 'model_dump') else q node_catalog[q_dict['type']] = { "answer_spectrum": q_dict['answer_spectrum'], "normalization_rules": None # Node-Questions haben keine Synonyme } logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with node-specific spectrum") normalized_signals = normalize_all_signals( decision_signals=parsed["decision_signals"], catalog_dict=node_catalog ) logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, analysis_core=parsed["analysis_core"], decision_signals=parsed["decision_signals"], normalized_signals=normalized_signals, reasoning_anchors=parsed.get("reasoning_anchors"), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # Unbekannter Node-Typ (Phase 4: join) raise ValueError(f"Node type '{node.type}' not implemented yet (Phase 4+)") except Exception as e: logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def execute_logic_node( node, context: Dict[str, Any], graph: WorkflowGraph ) -> NodeExecutionState: """ Führt Logic Node aus (Phase 3). Args: node: WorkflowNode vom Typ "logic" context: Execution context mit node_results graph: WorkflowGraph Returns: NodeExecutionState mit evaluation_result in metadata Logic: 1. Evaluiere node.condition.expression 2. Wähle then_path oder else_path basierend auf Ergebnis 3. Bei Fehler/Unsicherheit: Wende fallback an 4. Markiere gewählte Edge(s) als aktiv, andere als inaktiv 5. Return NodeExecutionState mit evaluation_result """ started_at = datetime.utcnow().isoformat() try: if not node.condition: raise ValueError(f"Logic node {node.id} has no condition") # 1. Evaluiere Bedingung result, error = evaluate_logic_expression(node.condition.expression, context) if error: # Fehler bei Evaluation → Fallback anwenden logger.warning(f"Logic node {node.id}: evaluation error: {error}") activated_edges = _apply_fallback(node, graph, context, error) else: # Erfolgreiche Evaluation logger.info(f"Logic node {node.id}: evaluated to {result}") # 2. Wähle Pfad basierend auf Ergebnis if result: # Condition TRUE → then_path aktivieren activated_edges = _get_edges_by_label(node.id, "then", graph) else: # Condition FALSE → else_path aktivieren activated_edges = _get_edges_by_label(node.id, "else", graph) # 3. Markiere Edges als aktiv/inaktiv outgoing_edges = [e for e in graph.edges if e.from_node == node.id] for edge in outgoing_edges: context["active_edges"][edge.id] = edge.id in activated_edges logger.debug(f"Logic node {node.id}: activated edges: {activated_edges}") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, started_at=started_at, completed_at=datetime.utcnow().isoformat(), analysis_core=json.dumps({ "evaluation_result": result if not error else None, "error": error, "activated_edges": activated_edges }) ) except Exception as e: logger.error(f"Logic node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def execute_join_node( node, context: Dict[str, Any], graph: WorkflowGraph ) -> NodeExecutionState: """ Führt Join Node aus (Phase 4). Args: node: WorkflowNode vom Typ "join" context: Execution context mit node_results graph: WorkflowGraph Returns: NodeExecutionState mit konsolidierten Daten Logic: 1. Evaluiere Join-Strategie (via join_evaluator) 2. Prüfe ob ready (alle erforderlichen Pfade verfügbar?) 3. Konsolidiere Ergebnisse (merge analysis_core, combine signals) 4. Return NodeExecutionState mit konsolidierten Daten Error Handling: - wait_all + fehlende Pfade → FAILED - wait_any + keine Pfade → FAILED - best_effort + keine Pfade → EXECUTED (mit metadata) """ started_at = datetime.utcnow().isoformat() try: logger.info(f"Executing join node: {node.id}") # 1. Evaluiere Join-Node join_result = evaluate_join_node_core(node, graph, context) # 2. Prüfe ob ready if not join_result.ready: # Strategie nicht erfüllt → FAILED logger.warning(f"Join node {node.id}: Not ready - {join_result.error}") return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=join_result.error, started_at=started_at, completed_at=datetime.utcnow().isoformat(), metadata=join_result.metadata ) # 3. Baue konsolidierten analysis_core # Format: JSON mit node_id → analysis_core mapping consolidated_core_json = json.dumps( join_result.consolidated_analysis_core, ensure_ascii=False, indent=2 ) # 4. Log Konsolidierung executed_count = join_result.metadata.get("executed_paths", 0) total_count = join_result.metadata.get("total_paths", 0) logger.info( f"Join node {node.id}: Consolidated {executed_count}/{total_count} paths" ) # 5. Convert consolidated_signals Dict → List[NormalizedSignal] # (NodeExecutionState expects List, but join_evaluator returns Dict) consolidated_signals_list = list(join_result.consolidated_signals.values()) # 6. Return NodeExecutionState return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, analysis_core=consolidated_core_json, normalized_signals=consolidated_signals_list, metadata=join_result.metadata, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) except Exception as e: logger.error(f"Join node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def _apply_fallback( node, graph: WorkflowGraph, context: Dict[str, Any], error: str ) -> List[str]: """ Wendet Fallback-Strategie an. Args: node: WorkflowNode graph: WorkflowGraph context: Execution context error: Fehler bei Evaluation Returns: Liste von Edge-IDs die aktiviert werden sollen """ if not node.fallback: # Default: Konservativ (else-Pfad nehmen) logger.warning(f"Node {node.id}: No fallback configured, using DEFAULT_PATH") return _get_edges_by_label(node.id, "else", graph) strategy = node.fallback.strategy if strategy == FallbackStrategy.CONSERVATIVE_SKIP: # Skip alle outgoing edges logger.info(f"Node {node.id}: CONSERVATIVE_SKIP - deactivating all paths") return [] elif strategy == FallbackStrategy.DEFAULT_PATH: # Nimm else-Pfad logger.info(f"Node {node.id}: DEFAULT_PATH - taking else path") return _get_edges_by_label(node.id, "else", graph) elif strategy == FallbackStrategy.UNCERTAINTY_PATH: # Expliziter Unsicherheits-Pfad (falls vorhanden) logger.info(f"Node {node.id}: UNCERTAINTY_PATH") uncertainty_edges = _get_edges_by_label(node.id, "uncertainty", graph) if uncertainty_edges: return uncertainty_edges else: # Fallback to else logger.warning(f"Node {node.id}: No uncertainty path found, using else") return _get_edges_by_label(node.id, "else", graph) elif strategy == FallbackStrategy.DOCUMENT_ONLY: # Alle Pfade aktiv lassen (wie ohne Bedingung) logger.info(f"Node {node.id}: DOCUMENT_ONLY - all paths remain active") outgoing_edges = [e for e in graph.edges if e.from_node == node.id] return [e.id for e in outgoing_edges] else: logger.warning(f"Node {node.id}: Unknown fallback strategy {strategy}, using DEFAULT_PATH") return _get_edges_by_label(node.id, "else", graph) def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[str]: """ Findet alle ausgehenden Edges mit bestimmtem Label. Args: node_id: Node-ID label: Edge-Label (z.B. "then", "else", "uncertainty") graph: WorkflowGraph Returns: Liste von Edge-IDs """ matching_edges = [ e.id for e in graph.edges if e.from_node == node_id and e.label == label ] return matching_edges def _has_active_incoming_edge(node, graph: WorkflowGraph, context: Dict[str, Any]) -> bool: """ Prüft ob Node mindestens eine aktive incoming edge hat. Args: node: WorkflowNode graph: WorkflowGraph context: Execution context mit active_edges Returns: True wenn mindestens eine incoming edge aktiv ist """ # Start-Node hat keine incoming edges → immer aktiv if node.type == "start": return True incoming_edges = [e for e in graph.edges if e.to_node == node.id] # Keine incoming edges → nicht erreichbar if not incoming_edges: return False # Prüfe ob mindestens eine aktiv ist active_edges = context.get("active_edges", {}) for edge in incoming_edges: if active_edges.get(edge.id, True): # Default: True (Phase 2 Kompatibilität) return True return False async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str: """ Lädt Prompt-Template aus DB und resolved Platzhalter. Args: prompt_slug: Slug des Prompts (z.B. "pipeline_body") context: {"variables": {"name": "Lars", ...}, "profile_id": "..."} Returns: Resolved prompt template Beispiel: >>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"}) >>> "{{name}}" not in template True """ from placeholder_resolver import resolve_placeholders with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT template FROM ai_prompts WHERE slug = %s AND active = true", (prompt_slug,) ) row = cur.fetchone() if not row: raise ValueError(f"Prompt not found: {prompt_slug}") template = row['template'] # Resolve Placeholders profile_id = context.get("profile_id") resolved = resolve_placeholders( template=template, profile_id=profile_id ) # TODO Phase 3: Support custom variables from workflow context return resolved def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: """ Aggregiert Ergebnisse aller Knoten. Args: node_states: Liste aller NodeExecutionState Returns: { "combined_analysis": "## node_1\n...\n\n## node_2\n...", "all_signals": [{question_type, normalized_value, status}, ...], "total_nodes": 3, "executed_nodes": 3, "failed_nodes": 0 } Beispiel: >>> states = [ ... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"), ... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2") ... ] >>> result = aggregate_results(states) >>> "## n1" in result["combined_analysis"] True >>> result["executed_nodes"] 2 """ combined_analysis = [] all_signals = [] for state in node_states: if state.status == NodeStatus.EXECUTED and state.analysis_core: combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}") if state.normalized_signals: all_signals.extend([s.model_dump() for s in state.normalized_signals]) return { "combined_analysis": "\n\n".join(combined_analysis), "all_signals": all_signals, "total_nodes": len(node_states), "executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED), "failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED), "skipped_nodes": sum(1 for s in node_states if s.status == NodeStatus.SKIPPED) # Phase 3 } def save_execution_state( execution_id: str, workflow_id: str, profile_id: str, node_states: List[NodeExecutionState], status: str, started_at: str, completed_at: Optional[str] = None, error: Optional[str] = None ): """ Speichert Execution State in workflow_executions. Args: execution_id: UUID der Execution workflow_id: UUID des Workflows profile_id: UUID des Profils node_states: Liste aller NodeExecutionState status: 'completed' | 'failed' | 'partial' started_at: ISO timestamp completed_at: ISO timestamp (optional) error: Fehlermeldung (optional) Beispiel: >>> save_execution_state( ... execution_id="exec-123", ... workflow_id="wf-456", ... profile_id="prof-789", ... node_states=[], ... status="completed", ... started_at="2026-04-03T12:00:00" ... ) """ # Serialize node_states to JSON node_states_json = [s.model_dump() for s in node_states] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" INSERT INTO workflow_executions (id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( execution_id, workflow_id, profile_id, status, json.dumps(node_states_json), json.dumps({"error": error} if error else {}), started_at, completed_at )) conn.commit() logger.info(f"Saved execution state: {execution_id} (status: {status})")