""" Workflow Executor (Phase 4) Führt Workflows mit conditional branching und path consolidation aus. Phase 2: Sequential execution Phase 3: Conditional branching, Logic Nodes, Fallback strategies Phase 4: Join Nodes, Path consolidation Konzept-Basis: konzept_workflow_engine_konsolidated.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2-4) """ from typing import Dict, Any, List, Optional, Set from datetime import datetime import uuid import logging import json from jinja2 import Environment, ChainableUndefined, TemplateError from workflow_models import ( WorkflowGraph, WorkflowNode, NodeExecutionState, ExecutionResult, NodeStatus, NormalizedSignal, FallbackStrategy, SignalStatus, EndNodeOutputMode ) from workflow_engine import parse_workflow_graph, get_execution_order from question_augmenter import ( augment_prompt_with_questions, parse_question_augmentations_from_jsonb ) from result_container_parser import parse_result_container from normalization_engine import normalize_all_signals, normalize_decision_signal, load_question_catalog from logic_evaluator import evaluate_logic_expression, resolve_signal_reference from join_evaluator import evaluate_join_node as evaluate_join_node_core from db import get_db, get_cursor logger = logging.getLogger(__name__) async def execute_workflow( workflow_id: Optional[str] = None, graph_data: Optional[Dict] = None, # Phase 5: Direkt von ai_prompts.graph_data profile_id: str = None, variables: Dict[str, Any] = None, openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str enable_debug: bool = False ) -> ExecutionResult: """ Führt einen Workflow aus (mit conditional branching und path consolidation). Phase 2: Linear execution in topological order. Phase 3: Conditional branching basierend auf logic nodes. Phase 4: Join nodes und path consolidation. Phase 5: Unterstützt graph_data direkt (aus ai_prompts, nicht workflow_definitions) Args: workflow_id: UUID des Workflows (legacy, für workflow_definitions Tabelle) graph_data: Workflow-Graph als Dict (NEU, für ai_prompts.graph_data) profile_id: UUID des Profils variables: Platzhalter-Werte (z.B. {"name": "Lars", ...}) openrouter_call_func: async (prompt, model) -> str enable_debug: Debug-Modus Returns: ExecutionResult mit allen node_states Beispiel (Phase 5): >>> result = await execute_workflow( ... graph_data={"nodes": [...], "edges": [...]}, ... profile_id="test-profile", ... variables={"name": "Lars"}, ... openrouter_call_func=my_llm_func ... ) """ execution_id = str(uuid.uuid4()) started_at = datetime.utcnow().isoformat() logger.info(f"Starting workflow execution: {execution_id}") try: # 1. Lade Workflow-Definition if graph_data: # Phase 5: Graph direkt aus ai_prompts.graph_data (already a dict) graph_dict = graph_data logger.debug(f"Using provided graph_data") elif workflow_id: # Phase 0-4: Graph aus workflow_definitions Tabelle (legacy) with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT graph FROM workflow_definitions WHERE id = %s AND active = true", (workflow_id,) ) row = cur.fetchone() if not row: raise ValueError(f"Workflow not found: {workflow_id}") graph_dict = row['graph'] # PostgreSQL JSONB returns dict logger.debug(f"Loaded graph from workflow_definitions: {workflow_id}") else: raise ValueError("Entweder workflow_id oder graph_data muss übergeben werden") # 2. Parse Graph graph = parse_workflow_graph(graph_dict) logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges") # 4. Lade Question Catalog with get_db() as conn: catalog = load_question_catalog(conn) logger.debug(f"Loaded catalog: {len(catalog)} question types") # 5. Execute Nodes mit conditional branching (Phase 3) node_states: List[NodeExecutionState] = [] context = { "variables": variables, "profile_id": profile_id, "node_results": {}, # Phase 3: Store full NodeExecutionState "active_edges": {}, # Phase 3: Track edge activation "graph": graph # Phase 4: Needed for End Node to access questions } # Phase 3: BFS traversal mit edge activation start_node = next((n for n in graph.nodes if n.type == "start"), None) if not start_node: raise ValueError("No start node found in workflow") visited: Set[str] = set() queue = [start_node.id] while queue: node_id = queue.pop(0) if node_id in visited: continue visited.add(node_id) node = next(n for n in graph.nodes if n.id == node_id) # Prüfe ob Node aktiv ist (mindestens eine incoming edge aktiv) if not _has_active_incoming_edge(node, graph, context): logger.info(f"Skipping node {node_id}: no active incoming edges") node_states.append(NodeExecutionState( node_id=node_id, status=NodeStatus.SKIPPED, error="no_active_incoming_edges", started_at=datetime.utcnow().isoformat(), completed_at=datetime.utcnow().isoformat() )) continue logger.info(f"Executing node: {node_id} (type: {node.type})") node_state = await execute_node( node=node, context=context, catalog=catalog, graph=graph, # Phase 3: Needed for logic nodes openrouter_call_func=openrouter_call_func, enable_debug=enable_debug ) node_states.append(node_state) context["node_results"][node_id] = node_state # Füge Nachfolger zur Queue hinzu outgoing_edges = [e for e in graph.edges if e.from_node == node_id] for edge in outgoing_edges: # Prüfe ob Edge aktiv ist (default: True, Logic Nodes setzen auf False) if context["active_edges"].get(edge.id, True): queue.append(edge.to_node) # 6. Aggregiere Ergebnisse aggregated = aggregate_results(node_states, graph) # 7. Speichere Execution State completed_at = datetime.utcnow().isoformat() save_execution_state( execution_id=execution_id, workflow_id=workflow_id, profile_id=profile_id, node_states=node_states, status="completed", started_at=started_at, completed_at=completed_at ) logger.info(f"Workflow execution completed: {execution_id}") return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly status="completed", node_states=node_states, aggregated_result=aggregated, started_at=started_at, completed_at=completed_at ) except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) # Speichere Failed State completed_at = datetime.utcnow().isoformat() save_execution_state( execution_id=execution_id, workflow_id=workflow_id, # Can be None when graph_data is used profile_id=profile_id, node_states=node_states if 'node_states' in locals() else [], status="failed", started_at=started_at, completed_at=completed_at, error=str(e) ) return ExecutionResult( execution_id=execution_id, workflow_id=workflow_id or "N/A", # ExecutionResult requires string, use placeholder status="failed", node_states=node_states if 'node_states' in locals() else [], aggregated_result={}, started_at=started_at, completed_at=completed_at, error=str(e) ) async def execute_node( node, context: Dict[str, Any], catalog: Dict[str, Dict], graph: WorkflowGraph, # Phase 3: Needed for logic nodes openrouter_call_func, enable_debug: bool = False ) -> NodeExecutionState: """ Führt einen einzelnen Knoten aus. Args: node: WorkflowNode (aus graph.nodes) context: Execution context (variables, profile_id, node_results, active_edges) catalog: Question catalog graph: WorkflowGraph (für Logic Nodes) openrouter_call_func: LLM callback: async (prompt, model) -> str enable_debug: Debug mode Returns: NodeExecutionState Node Types: - start/end: No-op - analysis: Load prompt → augment → LLM → parse → normalize - logic: Evaluate condition → activate/deactivate edges (Phase 3) - join: Consolidate paths → merge results (Phase 4) """ started_at = datetime.utcnow().isoformat() try: # Start Node: No-Op if node.type == "start": logger.debug(f"Node {node.id}: No-op (start)") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # End Node: Output Generation (Phase 4) if node.type == "end": return execute_end_node(node, context) # Logic Nodes (Phase 3) if node.type == "logic": return execute_logic_node(node, context, graph) # Join Nodes (Phase 4) if node.type == "join": return execute_join_node(node, context, graph) # Analysis Nodes if node.type == "analysis": # 1. Lade Prompt (Part 3: inline_template support) prompt_template = await load_prompt_template(node, context) source_type = "inline" if node.inline_template else "reference" logger.debug(f"Node {node.id}: Loaded prompt from {source_type}") # 2. Parse question_augmentations questions = [] if node.question_augmentations: # Convert list of dicts to JSONB-like format for parser questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations] questions = parse_question_augmentations_from_jsonb(questions_jsonb) logger.debug(f"Node {node.id}: {len(questions)} question augmentations") # 3. Augment Prompt if questions: augmented_prompt = augment_prompt_with_questions( base_prompt=prompt_template, questions=questions ) else: augmented_prompt = prompt_template # 4. LLM Call logger.debug(f"Node {node.id}: Calling LLM") llm_response = await openrouter_call_func( augmented_prompt, "anthropic/claude-sonnet-4" # Default model ) # 5. Parse Result Container parsed = parse_result_container(llm_response) logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})") # 6. Normalize Signals # NOTE: decision_signals now use question.id as key (not type) # We need to build a catalog: id → {type, spectrum} for normalization normalized_signals = [] if parsed["decision_signals"]: # Build catalog: id → answer_spectrum (for normalization) id_catalog = {} if questions: for q in questions: q_dict = q.model_dump() if hasattr(q, 'model_dump') else q id_catalog[q_dict['id']] = { "type": q_dict['type'], # Keep type for normalization "answer_spectrum": q_dict['answer_spectrum'], "normalization_rules": None # Node-Questions haben keine Synonyme } # Normalize each signal (signals keyed by ID now) for signal_id, signal_value in parsed["decision_signals"].items(): if signal_id in id_catalog: q_config = id_catalog[signal_id] # Use the type-based catalog for normalization rules (if any) type_catalog_entry = catalog.get(q_config['type'], {}) # Normalize with question-specific spectrum normalized_signal = normalize_decision_signal( question_type=signal_id, # Use ID as question_type raw_value=signal_value, answer_spectrum=q_config['answer_spectrum'], normalization_rules=type_catalog_entry.get('normalization_rules') ) normalized_signals.append(normalized_signal) logger.debug(f"Node {node.id}: Normalized signal '{signal_id}' = '{signal_value}' → '{normalized_signal.normalized_value}' (status: {normalized_signal.status})") logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, analysis_core=parsed["analysis_core"], decision_signals=parsed["decision_signals"], normalized_signals=normalized_signals, reasoning_anchors=parsed.get("reasoning_anchors"), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # Unbekannter Node-Typ (Phase 4: join) raise ValueError(f"Node type '{node.type}' not implemented yet (Phase 4+)") except Exception as e: logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def execute_logic_node( node, context: Dict[str, Any], graph: WorkflowGraph ) -> NodeExecutionState: """ Führt Logic Node aus (Phase 3). Args: node: WorkflowNode vom Typ "logic" context: Execution context mit node_results graph: WorkflowGraph Returns: NodeExecutionState mit evaluation_result in metadata Logic: 1. Evaluiere node.condition.expression 2. Wähle then_path oder else_path basierend auf Ergebnis 3. Bei Fehler/Unsicherheit: Wende fallback an 4. Markiere gewählte Edge(s) als aktiv, andere als inaktiv 5. Return NodeExecutionState mit evaluation_result """ started_at = datetime.utcnow().isoformat() try: if not node.condition: error_msg = f"Logic node {node.id} has no condition defined" logger.error(error_msg) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=error_msg, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # Handle both formats (thanks to Union[LogicExpression, Condition] type): # 1. Direct LogicExpression (UI format): node.condition is LogicExpression # 2. Wrapped in Condition (legacy): node.condition is Condition with .expression from workflow_models import LogicExpression, Condition expression = None if isinstance(node.condition, LogicExpression): # UI format: direct LogicExpression expression = node.condition elif isinstance(node.condition, Condition): # Legacy format: wrapped in Condition expression = node.condition.expression else: # Fallback: try to detect format manually if hasattr(node.condition, 'operator') and hasattr(node.condition, 'operands'): expression = node.condition # Looks like LogicExpression elif hasattr(node.condition, 'expression'): expression = node.condition.expression # Looks like Condition if expression is None: error_msg = f"Logic node {node.id} has no valid condition/expression defined" logger.error(error_msg) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=error_msg, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) # 1. Evaluiere Bedingung result, error = evaluate_logic_expression(expression, context) if error: # Fehler bei Evaluation → Fallback anwenden logger.warning(f"Logic node {node.id}: evaluation error: {error}") activated_edges = _apply_fallback(node, graph, context, error) else: # Erfolgreiche Evaluation logger.info(f"Logic node {node.id}: evaluated to {result}") # 2. Wähle Pfad basierend auf Ergebnis if result: # Condition TRUE → then_path aktivieren activated_edges = _get_edges_by_label(node.id, "then", graph) else: # Condition FALSE → else_path aktivieren activated_edges = _get_edges_by_label(node.id, "else", graph) # 3. Markiere Edges als aktiv/inaktiv outgoing_edges = [e for e in graph.edges if e.from_node == node.id] for edge in outgoing_edges: context["active_edges"][edge.id] = edge.id in activated_edges logger.debug(f"Logic node {node.id}: activated edges: {activated_edges}") return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, started_at=started_at, completed_at=datetime.utcnow().isoformat(), analysis_core=json.dumps({ "evaluation_result": result if not error else None, "error": error, "activated_edges": activated_edges }) ) except Exception as e: logger.error(f"Logic node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def execute_join_node( node, context: Dict[str, Any], graph: WorkflowGraph ) -> NodeExecutionState: """ Führt Join Node aus (Phase 4). Args: node: WorkflowNode vom Typ "join" context: Execution context mit node_results graph: WorkflowGraph Returns: NodeExecutionState mit konsolidierten Daten Logic: 1. Evaluiere Join-Strategie (via join_evaluator) 2. Prüfe ob ready (alle erforderlichen Pfade verfügbar?) 3. Konsolidiere Ergebnisse (merge analysis_core, combine signals) 4. Return NodeExecutionState mit konsolidierten Daten Error Handling: - wait_all + fehlende Pfade → FAILED - wait_any + keine Pfade → FAILED - best_effort + keine Pfade → EXECUTED (mit metadata) """ started_at = datetime.utcnow().isoformat() try: logger.info(f"Executing join node: {node.id}") # 1. Evaluiere Join-Node join_result = evaluate_join_node_core(node, graph, context) # 2. Prüfe ob ready if not join_result.ready: # Strategie nicht erfüllt → FAILED logger.warning(f"Join node {node.id}: Not ready - {join_result.error}") return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=join_result.error, started_at=started_at, completed_at=datetime.utcnow().isoformat(), metadata=join_result.metadata ) # 3. Baue konsolidierten analysis_core # Format: JSON mit node_id → analysis_core mapping consolidated_core_json = json.dumps( join_result.consolidated_analysis_core, ensure_ascii=False, indent=2 ) # 4. Log Konsolidierung executed_count = join_result.metadata.get("executed_paths", 0) total_count = join_result.metadata.get("total_paths", 0) logger.info( f"Join node {node.id}: Consolidated {executed_count}/{total_count} paths" ) # 5. Convert consolidated_signals Dict → List[NormalizedSignal] # (NodeExecutionState expects List, but join_evaluator returns Dict) consolidated_signals_list = list(join_result.consolidated_signals.values()) # 6. Return NodeExecutionState return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, analysis_core=consolidated_core_json, normalized_signals=consolidated_signals_list, metadata=join_result.metadata, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) except Exception as e: logger.error(f"Join node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def execute_end_node( node, context: Dict[str, Any] ) -> NodeExecutionState: """ Führt End Node aus (Phase 4 - Template Engine). Args: node: WorkflowNode vom Typ "end" context: Execution context mit node_results Returns: NodeExecutionState mit finaler Ausgabe in analysis_core Output Modes: - AUTO: Concatenates all analysis_core values (backward compatible) - TEMPLATE: Renders Jinja2 template with {{node_id.property}} placeholders Template Context: - {{node_id.analysis_core}}: Analysis text from node - {{node_id.decision_signals}}: Dict of raw decision signals - {{node_id.decision_signals.key}}: Specific signal value - {{node_id.status}}: Node execution status - Conditional rendering: {% if node_id %}...{% endif %} - Default values: {{node_id|default("fallback")}} Example Template: ``` # Final Analysis ## Body Composition {{body_analysis.analysis_core}} {% if training_analysis %} ## Training Recommendation {{training_analysis.analysis_core}} {% endif %} ## Decision Factors - Relevance: {{body_analysis.decision_signals.relevanz}} - Priority: {{body_analysis.decision_signals.prioritaet}} ``` """ started_at = datetime.utcnow().isoformat() try: logger.info(f"Executing end node: {node.id}") # Determine output mode (default: AUTO for backward compatibility) output_mode = node.output_mode or EndNodeOutputMode.AUTO if output_mode == EndNodeOutputMode.AUTO: # AUTO mode: Concatenate all analysis_core values logger.debug(f"End node {node.id}: Using AUTO output mode") # Get graph from context for node label lookup graph = context.get("graph") combined_analysis = [] for node_id, node_state in context.get("node_results", {}).items(): if node_state.status == NodeStatus.EXECUTED and node_state.analysis_core: # Get node label from graph (fallback to node_id if not found) node_label = node_id # default if graph: node_obj = next((n for n in graph.nodes if n.id == node_id), None) if node_obj and hasattr(node_obj, 'data') and node_obj.data: node_label = node_obj.data.get('label', node_id) combined_analysis.append(f"## {node_label}\n{node_state.analysis_core}") final_output = "\n\n".join(combined_analysis) if combined_analysis else "[No analysis generated]" elif output_mode == EndNodeOutputMode.TEMPLATE: # TEMPLATE mode: Render Jinja2 template logger.debug(f"End node {node.id}: Using TEMPLATE output mode") if not node.template: raise ValueError(f"End node {node.id} has output_mode=TEMPLATE but no template defined") # Build template context: {{node_id}} → {analysis_core, signal_X, question_X, status} template_context = {} graph = context.get("graph") # WorkflowGraph object for node_id, node_state in context.get("node_results", {}).items(): node_context = { "analysis_core": node_state.analysis_core or "", "decision_signals": node_state.decision_signals or {}, "reasoning_anchors": node_state.reasoning_anchors or "", "status": node_state.status.value if node_state.status else "unknown", } # Backward-compatible shortcut: # allow {{node_id.relevanz}} in addition to {{node_id.decision_signals.relevanz}} if node_state.decision_signals: for signal_key, signal_value in node_state.decision_signals.items(): node_context[signal_key] = signal_value # Add normalized signals as {{node_id.signal_ID}} # NOTE: question_type now IS the ID (not the type!) if node_state.normalized_signals: for signal in node_state.normalized_signals: # Convert NormalizedSignal object to dict if needed signal_dict = signal.model_dump() if hasattr(signal, 'model_dump') else signal q_id = signal_dict['question_type'] # This is actually the ID now! signal_key = f"signal_{q_id}" signal_value = signal_dict['normalized_value'] or signal_dict['raw_value'] node_context[signal_key] = signal_value logger.info(f"Mapped signal: {q_id} → {signal_key} = '{signal_value}'") # Add question texts as {{node_id.question_ID}} if graph: workflow_node = next((n for n in graph.nodes if n.id == node_id), None) if workflow_node and workflow_node.question_augmentations: for q in workflow_node.question_augmentations: q_dict = q.model_dump() if hasattr(q, 'model_dump') else q q_id = q_dict.get('id') if q_id: question_key = f"question_{q_id}" node_context[question_key] = q_dict.get('question', '') template_context[node_id] = node_context logger.debug(f"End node {node.id}: Built template context for {len(template_context)} nodes") # DEBUG: Log template context keys for each node for node_id, node_ctx in template_context.items(): logger.info(f"End node template context[{node_id}]: {list(node_ctx.keys())}") # Log signal keys specifically signal_keys = [k for k in node_ctx.keys() if k.startswith('signal_')] question_keys = [k for k in node_ctx.keys() if k.startswith('question_')] if signal_keys: logger.info(f" Signals: {signal_keys} → values: {[(k, node_ctx[k]) for k in signal_keys]}") if question_keys: logger.info(f" Questions: {question_keys} → values: {[(k, node_ctx[k]) for k in question_keys]}") # Render template try: # ChainableUndefined keeps missing nested attributes renderable # so Jinja's default filter can handle them. jinja_env = Environment(undefined=ChainableUndefined) jinja_template = jinja_env.from_string(node.template) final_output = jinja_template.render(template_context) logger.info(f"End node {node.id}: Template rendered successfully ({len(final_output)} chars)") except TemplateError as te: error_msg = f"Template rendering failed: {str(te)}" logger.error(f"End node {node.id}: {error_msg}") return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=error_msg, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) else: raise ValueError(f"Unknown output_mode: {output_mode}") # Return NodeExecutionState with final output return NodeExecutionState( node_id=node.id, status=NodeStatus.EXECUTED, analysis_core=final_output, started_at=started_at, completed_at=datetime.utcnow().isoformat() ) except Exception as e: logger.error(f"End node execution failed ({node.id}): {e}", exc_info=True) return NodeExecutionState( node_id=node.id, status=NodeStatus.FAILED, error=str(e), started_at=started_at, completed_at=datetime.utcnow().isoformat() ) def _apply_fallback( node, graph: WorkflowGraph, context: Dict[str, Any], error: str ) -> List[str]: """ Wendet Fallback-Strategie an. Args: node: WorkflowNode graph: WorkflowGraph context: Execution context error: Fehler bei Evaluation Returns: Liste von Edge-IDs die aktiviert werden sollen """ if not node.fallback: # Default: Konservativ (else-Pfad nehmen) logger.warning(f"Node {node.id}: No fallback configured, using DEFAULT_PATH") return _get_edges_by_label(node.id, "else", graph) strategy = node.fallback.strategy if strategy == FallbackStrategy.CONSERVATIVE_SKIP: # Skip alle outgoing edges logger.info(f"Node {node.id}: CONSERVATIVE_SKIP - deactivating all paths") return [] elif strategy == FallbackStrategy.DEFAULT_PATH: # Nimm else-Pfad logger.info(f"Node {node.id}: DEFAULT_PATH - taking else path") return _get_edges_by_label(node.id, "else", graph) elif strategy == FallbackStrategy.UNCERTAINTY_PATH: # Expliziter Unsicherheits-Pfad (falls vorhanden) logger.info(f"Node {node.id}: UNCERTAINTY_PATH") uncertainty_edges = _get_edges_by_label(node.id, "uncertainty", graph) if uncertainty_edges: return uncertainty_edges else: # Fallback to else logger.warning(f"Node {node.id}: No uncertainty path found, using else") return _get_edges_by_label(node.id, "else", graph) elif strategy == FallbackStrategy.DOCUMENT_ONLY: # Alle Pfade aktiv lassen (wie ohne Bedingung) logger.info(f"Node {node.id}: DOCUMENT_ONLY - all paths remain active") outgoing_edges = [e for e in graph.edges if e.from_node == node.id] return [e.id for e in outgoing_edges] else: logger.warning(f"Node {node.id}: Unknown fallback strategy {strategy}, using DEFAULT_PATH") return _get_edges_by_label(node.id, "else", graph) def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[str]: """ Findet alle ausgehenden Edges mit bestimmtem Label. Args: node_id: Node-ID label: Edge-Label (z.B. "then", "else", "uncertainty") graph: WorkflowGraph Returns: Liste von Edge-IDs """ matching_edges = [ e.id for e in graph.edges if e.from_node == node_id and e.label == label ] return matching_edges def _has_active_incoming_edge(node, graph: WorkflowGraph, context: Dict[str, Any]) -> bool: """ Prüft ob Node mindestens eine aktive incoming edge hat. Args: node: WorkflowNode graph: WorkflowGraph context: Execution context mit active_edges Returns: True wenn mindestens eine incoming edge aktiv ist """ # Start-Node hat keine incoming edges → immer aktiv if node.type == "start": return True incoming_edges = [e for e in graph.edges if e.to_node == node.id] # Keine incoming edges → nicht erreichbar if not incoming_edges: return False # Prüfe ob mindestens eine aktiv ist active_edges = context.get("active_edges", {}) for edge in incoming_edges: if active_edges.get(edge.id, True): # Default: True (Phase 2 Kompatibilität) return True return False async def load_prompt_template(node: WorkflowNode, context: Dict[str, Any]) -> str: """ Lädt Prompt-Template aus DB (reference mode) oder direkt vom Node (inline mode). Part 3: Inline Prompts - Unterstützt zwei Modi: - Reference Mode: prompt_slug → Template aus ai_prompts Tabelle - Inline Mode: inline_template → Template direkt vom Node Args: node: WorkflowNode mit prompt_slug ODER inline_template context: {"variables": {"name": "Lars", ...}, "profile_id": "..."} Returns: Resolved prompt template Raises: HTTPException: Wenn weder prompt_slug noch inline_template gesetzt Beispiel: >>> node = WorkflowNode(id="n1", prompt_slug="pipeline_body") >>> template = await load_prompt_template(node, {"profile_id": "123"}) >>> "{{name}}" not in template True """ from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog from prompt_executor import resolve_placeholders from fastapi import HTTPException # Mode 1: Inline Template (NEU) if node.inline_template: logger.debug(f"Node {node.id}: Using inline template ({len(node.inline_template)} chars)") template = node.inline_template # Mode 2: Reference (bestehend) elif node.prompt_slug: logger.debug(f"Node {node.id}: Loading prompt '{node.prompt_slug}' from DB") with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT template FROM ai_prompts WHERE slug = %s AND active = true", (node.prompt_slug,) ) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail=f"Prompt not found: {node.prompt_slug}") template = row['template'] # Mode 3: Error - weder inline noch reference else: raise HTTPException( status_code=400, detail=f"Node {node.id}: Either prompt_slug or inline_template required" ) # Resolve Placeholders using modern prompt_executor method profile_id = context.get("profile_id") if not profile_id: raise HTTPException(status_code=400, detail="profile_id required in context") # Build variables dict with ALL registered placeholders variables = {} try: # Get all placeholder values from registry processed_placeholders = get_placeholder_example_values(profile_id) logger.info(f"🔍 DEBUG: Loaded {len(processed_placeholders)} placeholders from registry") logger.info(f"🔍 DEBUG: Sample keys (first 3): {list(processed_placeholders.keys())[:3]}") # Remove {{ }} from keys (placeholder_resolver returns them with wrappers) cleaned_placeholders = { key.replace('{{', '').replace('}}', '').strip(): value for key, value in processed_placeholders.items() } logger.info(f"🔍 DEBUG: Cleaned keys (first 3): {list(cleaned_placeholders.keys())[:3]}") logger.info(f"🔍 DEBUG: Sample values: name={cleaned_placeholders.get('name')}, age={cleaned_placeholders.get('age')}, geschlecht={cleaned_placeholders.get('geschlecht')}") variables.update(cleaned_placeholders) except Exception as e: logger.error(f"❌ CRITICAL: Failed to load placeholders for workflow: {e}", exc_info=True) # Add workflow node outputs as placeholders (Part 3: Inline Prompts) # Format: node_id.analysis_core, node_id.signal_xyz, node_id.question_xyz node_results = context.get("node_results", {}) if node_results: logger.info(f"🔍 DEBUG: Adding {len(node_results)} node outputs as placeholders") for node_id, node_state in node_results.items(): # analysis_core if hasattr(node_state, 'analysis_core') and node_state.analysis_core: key = f"{node_id}.analysis_core" variables[key] = node_state.analysis_core logger.debug(f" Added placeholder: {key} = {node_state.analysis_core[:50]}...") # decision_signals (keyed by question ID) if hasattr(node_state, 'decision_signals') and node_state.decision_signals: for signal_id, signal_value in node_state.decision_signals.items(): # Signal placeholder: node_id.signal_question_id signal_key = f"{node_id}.signal_{signal_id}" variables[signal_key] = signal_value logger.debug(f" Added placeholder: {signal_key} = {signal_value}") # Question texts (from graph metadata if available) # NOTE: Question text placeholders are populated from graph in PlaceholderPicker # Here we only add if available in node_state metadata if hasattr(node_state, 'metadata') and isinstance(node_state.metadata, dict): questions = node_state.metadata.get('questions', []) for q in questions: if isinstance(q, dict) and 'id' in q and 'question' in q: question_key = f"{node_id}.question_{q['id']}" variables[question_key] = q['question'] logger.debug(f" Added placeholder: {question_key}") # Load catalog for |d modifier support try: catalog = get_placeholder_catalog(profile_id) except Exception as e: catalog = None logger.warning(f"Failed to load placeholder catalog for workflow: {e}") logger.info(f"🔍 DEBUG: Template before resolution:\n{template[:200]}...") logger.info(f"🔍 DEBUG: Variables dict has {len(variables)} entries") # Resolve with modern executor debug_info = {} resolved = resolve_placeholders( template=template, variables=variables, debug_info=debug_info, catalog=catalog ) logger.info(f"🔍 DEBUG: Resolved placeholders: {debug_info.get('resolved_placeholders', {})}") logger.info(f"🔍 DEBUG: Unresolved placeholders: {debug_info.get('unresolved_placeholders', [])}") logger.info(f"🔍 DEBUG: Template after resolution:\n{resolved[:200]}...") return resolved def aggregate_results(node_states: List[NodeExecutionState], graph) -> Dict[str, Any]: """ Aggregiert Ergebnisse aller Knoten. Phase 4: Wenn End Node existiert, verwende NUR dessen Output. Sonst (backward compatible): Kombiniere alle analysis_core Werte. Args: node_states: Liste aller NodeExecutionState graph: WorkflowGraph object (to identify End Nodes) Returns: { "analysis_core": "Final output (from End Node or combined)", "all_signals": [{question_type, normalized_value, status}, ...], "total_nodes": 3, "executed_nodes": 3, "failed_nodes": 0 } Beispiel: >>> states = [ ... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"), ... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2"), ... NodeExecutionState(node_id="end", status=NodeStatus.EXECUTED, analysis_core="Final") ... ] >>> result = aggregate_results(states, graph) >>> result["analysis_core"] "Final" """ # Find End Node in graph end_node_ids = [n.id for n in graph.nodes if n.type == "end"] combined_analysis = [] all_signals = [] final_output = None for state in node_states: # Check if this is an End Node is_end_node = state.node_id in end_node_ids if state.status == NodeStatus.EXECUTED and state.analysis_core: if is_end_node: # End Node output is the final output (don't add to combined_analysis) final_output = state.analysis_core else: # Regular node - add to combined analysis # Get node label from graph (fallback to node_id) node_label = state.node_id # default node_obj = next((n for n in graph.nodes if n.id == state.node_id), None) if node_obj and hasattr(node_obj, 'data') and node_obj.data: node_label = node_obj.data.get('label', state.node_id) combined_analysis.append(f"## {node_label}\n{state.analysis_core}") if state.normalized_signals: all_signals.extend([s.model_dump() for s in state.normalized_signals]) # Phase 4: If End Node exists, use its output ONLY # Backward compatible: If no End Node, use combined_analysis if final_output: primary_output = final_output else: primary_output = "\n\n".join(combined_analysis) return { "analysis_core": primary_output, "combined_analysis": "\n\n".join(combined_analysis), # Keep for debugging (excludes End Node) "all_signals": all_signals, "total_nodes": len(node_states), "executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED), "failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED), "skipped_nodes": sum(1 for s in node_states if s.status == NodeStatus.SKIPPED) # Phase 3 } def save_execution_state( execution_id: str, workflow_id: Optional[str], # None when using graph_data directly (Phase 5) profile_id: str, node_states: List[NodeExecutionState], status: str, started_at: str, completed_at: Optional[str] = None, error: Optional[str] = None ): """ Speichert Execution State in workflow_executions. Args: execution_id: UUID der Execution workflow_id: UUID des Workflows (None wenn graph_data direkt verwendet wird) profile_id: UUID des Profils node_states: Liste aller NodeExecutionState status: 'completed' | 'failed' | 'partial' started_at: ISO timestamp completed_at: ISO timestamp (optional) error: Fehlermeldung (optional) Beispiel: >>> save_execution_state( ... execution_id="exec-123", ... workflow_id="wf-456", ... profile_id="prof-789", ... node_states=[], ... status="completed", ... started_at="2026-04-03T12:00:00" ... ) """ # Serialize node_states to JSON node_states_json = [s.model_dump() for s in node_states] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" INSERT INTO workflow_executions (id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, node_states = EXCLUDED.node_states, execution_log = EXCLUDED.execution_log, completed_at = EXCLUDED.completed_at """, ( execution_id, workflow_id, profile_id, status, json.dumps(node_states_json), json.dumps({"error": error} if error else {}), started_at, completed_at )) conn.commit() logger.info(f"Saved execution state: {execution_id} (status: {status})")