""" Workflow Executor (Phase 2) Führt Workflows sequenziell aus (noch keine Verzweigung/Logik). Konzept-Basis: konzept_workflow_engine_konsolidated.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2) """ from typing import Dict, Any, List, Optional from datetime import datetime import uuid import logging import json from workflow_models import ( WorkflowGraph, NodeExecutionState, ExecutionResult, NodeStatus, NormalizedSignal ) from workflow_engine import parse_workflow_graph, get_execution_order from question_augmenter import ( augment_prompt_with_questions, parse_question_augmentations_from_jsonb ) from result_container_parser import parse_result_container from normalization_engine import normalize_all_signals, load_question_catalog from db import get_db, get_cursor logger = logging.getLogger(__name__) async def execute_workflow( workflow_id: str, profile_id: str, variables: Dict[str, Any], openrouter_call_func, # Callback für LLM-Calls: async (prompt, model) -> str enable_debug: bool = False ) -> ExecutionResult: """ Führt einen Workflow aus (sequenziell, ohne Verzweigung). Phase 2: Linear execution in topological order. Phase 3: Conditional branching basierend auf normalized_signals. Args: workflow_id: UUID des Workflows profile_id: UUID des Profils variables: Platzhalter-Werte (z.B. {"name": "Lars", ...}) openrouter_call_func: async (prompt, model) -> str enable_debug: Debug-Modus Returns: ExecutionResult mit allen node_states Beispiel: >>> result = await execute_workflow( ... workflow_id="test-workflow", ... profile_id="test-profile", ... variables={"name": "Lars"}, ... openrouter_call_func=my_llm_func ... ) >>> result.status 'completed' >>> len(result.node_states) 3 """ execution_id = str(uuid.uuid4()) started_at = datetime.utcnow().isoformat() logger.info(f"Starting workflow execution: {execution_id} (workflow: {workflow_id})") try: # 1. Lade Workflow-Definition with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT graph FROM workflow_definitions WHERE id = %s AND active = true", (workflow_id,) ) row = cur.fetchone() if not row: raise ValueError(f"Workflow not found: {workflow_id}") graph_json = row[0] # 2. Parse Graph graph = parse_workflow_graph(graph_json) logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges") # 3. Topologische Sortierung execution_order = get_execution_order(graph) logger.info(f"Execution order: {execution_order}") # 4. Lade Question Catalog with get_db() as conn: catalog = load_question_catalog(conn) logger.debug(f"Loaded catalog: {len(catalog)} question types") # 5. Execute Nodes sequenziell node_states: List[NodeExecutionState] = [] context = {"variables": variables, "profile_id": profile_id} for node_id in execution_order: node = next(n for n in graph.nodes if n.id == node_id) logger.info(f"Executing node: {node_id} (type: {node.type})") node_state = await execute_node( node=node, context=context, catalog=catalog, openrouter_call_func=openrouter_call_func, enable_debug=enable_debug ) node_states.append(node_state) # Füge Ergebnisse zu Context hinzu (für späteren Zugriff in Phase 3) context[f"node_{node_id}"] = { "analysis_core": node_state.analysis_core, "normalized_signals": [s.model_dump() for s in node_state.normalized_signals] } # 6. Aggregiere Ergebnisse aggregated = aggregate_results(node_states) # 7. Speichere Execution State completed_at = datetime.utcnow().isoformat() save_execution_state( execution_id=execution_id, workflow_id=workflow_id, profile_id=profile_id, node_states=node_states, status="completed", started_at=started_at, completed_at=completed_at ) logger.info(f"Workflow execution completed: {execution_id}") return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id, status="completed", node_states=node_states, aggregated_result=aggregated, started_at=started_at, completed_at=completed_at ) except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) # Speichere Failed State completed_at = datetime.utcnow().isoformat() save_execution_state( execution_id=execution_id, workflow_id=workflow_id, profile_id=profile_id, node_states=node_states if 'node_states' in locals() else [], status="failed", started_at=started_at, completed_at=completed_at, error=str(e) ) return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id, status="failed", node_states=node_states if 'node_states' in locals() else [], aggregated_result={}, started_at=started_at, completed_at=completed_at, error=str(e) ) async def execute_node( node, context: Dict[str, Any], catalog: Dict[str, Dict], openrouter_call_func, enable_debug: bool = False ) -> NodeExecutionState: """ Führt einen einzelnen Knoten aus. Args: node: WorkflowNode (aus graph.nodes) context: Execution context (variables, profile_id, prior results) catalog: Question catalog openrouter_call_func: LLM callback: async (prompt, model) -> str enable_debug: Debug mode Returns: NodeExecutionState Node Types: - start/end: No-op - analysis: Load prompt → augment → LLM → parse → normalize - logic/join: Not implemented in Phase 2 """ started_at = datetime.utcnow().isoformat() try: # Start/End Nodes: No-Op if node.type in ["start", "end"]: logger.debug(f"Node {node.id}: No-op ({node.type})") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # Analysis Nodes if node.type == "analysis": # 1. Lade Prompt prompt_template = await load_prompt_template(node.prompt_slug, context) logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'") # 2. Parse question_augmentations questions = [] if node.question_augmentations: # Convert list of dicts to JSONB-like format for parser questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations] questions = parse_question_augmentations_from_jsonb(questions_jsonb) logger.debug(f"Node {node.id}: {len(questions)} question augmentations") # 3. Augment Prompt if questions: augmented_prompt = augment_prompt_with_questions( base_prompt=prompt_template, questions=questions ) else: augmented_prompt = prompt_template # 4. LLM Call logger.debug(f"Node {node.id}: Calling LLM") llm_response = await openrouter_call_func( augmented_prompt, "anthropic/claude-sonnet-4" # Default model ) # 5. Parse Result Container parsed = parse_result_container(llm_response) logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})") # 6. Normalize Signals normalized_signals = [] if parsed["decision_signals"]: normalized_signals = normalize_all_signals( decision_signals=parsed["decision_signals"], catalog_dict=catalog ) logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, analysis_core=parsed["analysis_core"], decision_signals=parsed["decision_signals"], normalized_signals=normalized_signals, reasoning_anchors=parsed.get("reasoning_anchors"), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # Unbekannter Node-Typ (Phase 3: logic, join) raise ValueError(f"Node type '{node.type}' not implemented in Phase 2") except Exception as e: logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str: """ Lädt Prompt-Template aus DB und resolved Platzhalter. Args: prompt_slug: Slug des Prompts (z.B. "pipeline_body") context: {"variables": {"name": "Lars", ...}, "profile_id": "..."} Returns: Resolved prompt template Beispiel: >>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"}) >>> "{{name}}" not in template True """ from placeholder_resolver import resolve_placeholders with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT template FROM ai_prompts WHERE slug = %s AND active = true", (prompt_slug,) ) row = cur.fetchone() if not row: raise ValueError(f"Prompt not found: {prompt_slug}") template = row[0] # Resolve Placeholders profile_id = context.get("profile_id") resolved = resolve_placeholders( template=template, profile_id=profile_id, extra_vars=context.get("variables", {}) ) return resolved def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: """ Aggregiert Ergebnisse aller Knoten. Args: node_states: Liste aller NodeExecutionState Returns: { "combined_analysis": "## node_1\n...\n\n## node_2\n...", "all_signals": [{question_type, normalized_value, status}, ...], "total_nodes": 3, "executed_nodes": 3, "failed_nodes": 0 } Beispiel: >>> states = [ ... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"), ... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2") ... ] >>> result = aggregate_results(states) >>> "## n1" in result["combined_analysis"] True >>> result["executed_nodes"] 2 """ combined_analysis = [] all_signals = [] for state in node_states: if state.status == NodeStatus.EXECUTED and state.analysis_core: combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}") if state.normalized_signals: all_signals.extend([s.model_dump() for s in state.normalized_signals]) return { "combined_analysis": "\n\n".join(combined_analysis), "all_signals": all_signals, "total_nodes": len(node_states), "executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED), "failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED) } def save_execution_state( execution_id: str, workflow_id: str, profile_id: str, node_states: List[NodeExecutionState], status: str, started_at: str, completed_at: Optional[str] = None, error: Optional[str] = None ): """ Speichert Execution State in workflow_executions. Args: execution_id: UUID der Execution workflow_id: UUID des Workflows profile_id: UUID des Profils node_states: Liste aller NodeExecutionState status: 'completed' | 'failed' | 'partial' started_at: ISO timestamp completed_at: ISO timestamp (optional) error: Fehlermeldung (optional) Beispiel: >>> save_execution_state( ... execution_id="exec-123", ... workflow_id="wf-456", ... profile_id="prof-789", ... node_states=[], ... status="completed", ... started_at="2026-04-03T12:00:00" ... ) """ # Serialize node_states to JSON node_states_json = [s.model_dump() for s in node_states] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" INSERT INTO workflow_executions (id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( execution_id, workflow_id, profile_id, status, json.dumps(node_states_json), json.dumps({"error": error} if error else {}), started_at, completed_at )) conn.commit() logger.info(f"Saved execution state: {execution_id} (status: {status})")