diff --git a/backend/logic_evaluator.py b/backend/logic_evaluator.py new file mode 100644 index 0000000..9e1adfd --- /dev/null +++ b/backend/logic_evaluator.py @@ -0,0 +1,268 @@ +""" +Logic Evaluator (Phase 3) + +Rein deterministischer boolescher Evaluator für Workflow-Bedingungen. +Keine KI, keine Interpretation - nur strikte Logik-Auswertung. + +Unterstützt: +- Vergleichsoperatoren: EQ, NEQ, IN, NOT_IN, GT, LT, GTE, LTE, CONTAINS +- Logische Operatoren: AND, OR, NOT +- Verschachtelung via operands +""" + +from typing import Dict, Any, Optional, Tuple, List +import logging +from workflow_models import ( + LogicExpression, + LogicOperator, + NormalizedSignal, + SignalStatus +) + +logger = logging.getLogger(__name__) + + +def evaluate_logic_expression( + expression: LogicExpression, + context: Dict[str, Any] +) -> Tuple[bool, Optional[str]]: + """ + Evaluiert LogicExpression gegen context. + + Args: + expression: Die auszuwertende Bedingung + context: Execution context mit node_results + + Returns: + (result: bool, error: Optional[str]) + - result: True/False basierend auf Auswertung + - error: Fehlermeldung wenn Evaluation fehlschlägt + + Beispiel: + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ) + result, error = evaluate_logic_expression(expression, context) + """ + try: + operator = expression.operator + + # Logische Operatoren (verschachtelt) + if operator == LogicOperator.AND: + return _evaluate_and(expression, context) + elif operator == LogicOperator.OR: + return _evaluate_or(expression, context) + elif operator == LogicOperator.NOT: + return _evaluate_not(expression, context) + + # Vergleichsoperatoren (benötigen ref) + elif operator in [ + LogicOperator.EQ, LogicOperator.NEQ, + LogicOperator.IN, LogicOperator.NOT_IN, + LogicOperator.GT, LogicOperator.LT, + LogicOperator.GTE, LogicOperator.LTE, + LogicOperator.CONTAINS + ]: + return _evaluate_comparison(expression, context) + + else: + return False, f"Unknown operator: {operator}" + + except Exception as e: + logger.error(f"Logic evaluation failed: {e}", exc_info=True) + return False, str(e) + + +def _evaluate_and( + expression: LogicExpression, + context: Dict[str, Any] +) -> Tuple[bool, Optional[str]]: + """Evaluiert AND - alle operands müssen True sein.""" + if not expression.operands: + return False, "AND requires operands" + + for operand in expression.operands: + result, error = evaluate_logic_expression(operand, context) + if error: + return False, f"AND operand failed: {error}" + if not result: + return False, None # Short-circuit: einer False → gesamt False + + return True, None + + +def _evaluate_or( + expression: LogicExpression, + context: Dict[str, Any] +) -> Tuple[bool, Optional[str]]: + """Evaluiert OR - mindestens ein operand muss True sein.""" + if not expression.operands: + return False, "OR requires operands" + + errors = [] + for operand in expression.operands: + result, error = evaluate_logic_expression(operand, context) + if error: + errors.append(error) + continue + if result: + return True, None # Short-circuit: einer True → gesamt True + + # Alle False oder Fehler + if errors: + return False, f"All OR operands failed: {', '.join(errors)}" + return False, None + + +def _evaluate_not( + expression: LogicExpression, + context: Dict[str, Any] +) -> Tuple[bool, Optional[str]]: + """Evaluiert NOT - negiert operand.""" + if not expression.operands or len(expression.operands) != 1: + return False, "NOT requires exactly one operand" + + result, error = evaluate_logic_expression(expression.operands[0], context) + if error: + return False, f"NOT operand failed: {error}" + + return not result, None + + +def _evaluate_comparison( + expression: LogicExpression, + context: Dict[str, Any] +) -> Tuple[bool, Optional[str]]: + """Evaluiert Vergleichsoperator.""" + if not expression.ref: + return False, f"{expression.operator} requires ref" + + # Signal auflösen + signal, error = resolve_signal_reference(expression.ref, context) + if error: + return False, error + if signal is None: + return False, f"Signal not found: {expression.ref}" + + # Prüfe Signal-Status + if signal.status in [SignalStatus.UNCLEAR, SignalStatus.INVALID, SignalStatus.NOT_DECIDABLE]: + return False, f"Signal '{expression.ref}' has status {signal.status} - cannot evaluate" + + # Vergleich durchführen + left_value = signal.normalized_value or signal.raw_value + right_value = expression.value + + return compare_values(expression.operator, left_value, right_value) + + +def resolve_signal_reference( + ref: str, + context: Dict[str, Any] +) -> Tuple[Optional[NormalizedSignal], Optional[str]]: + """ + Löst Referenz wie "node_1.relevanz" auf. + + Args: + ref: Signal-Referenz im Format "node_id.question_type" + context: Execution context mit node_results + + Returns: + (signal, error_message) + + Error wenn: + - Node existiert nicht in context + - Signal-Typ existiert nicht für diesen Node + """ + # Parse reference + parts = ref.split(".") + if len(parts) != 2: + return None, f"Invalid reference format: '{ref}' (expected 'node_id.question_type')" + + node_id, question_type = parts + + # Hole Node-Results aus context + node_results = context.get("node_results", {}) + if node_id not in node_results: + return None, f"Node '{node_id}' not found in context" + + node_state = node_results[node_id] + + # Suche Signal + for signal in node_state.normalized_signals: + if signal.question_type == question_type: + return signal, None + + return None, f"Signal '{question_type}' not found in node '{node_id}'" + + +def compare_values( + operator: LogicOperator, + left: Any, + right: Any +) -> Tuple[bool, Optional[str]]: + """ + Führt Vergleichsoperation durch. + + Args: + operator: Vergleichsoperator + left: Linker Wert (aus Signal) + right: Rechter Wert (aus Expression) + + Returns: + (result: bool, error: Optional[str]) + """ + try: + if operator == LogicOperator.EQ: + return left == right, None + + elif operator == LogicOperator.NEQ: + return left != right, None + + elif operator == LogicOperator.IN: + if not isinstance(right, (list, tuple, set)): + return False, f"IN operator requires list, got {type(right)}" + return left in right, None + + elif operator == LogicOperator.NOT_IN: + if not isinstance(right, (list, tuple, set)): + return False, f"NOT_IN operator requires list, got {type(right)}" + return left not in right, None + + elif operator == LogicOperator.CONTAINS: + # left enthält right (z.B. String-Suche) + if isinstance(left, str) and isinstance(right, str): + return right in left, None + elif isinstance(left, (list, tuple, set)): + return right in left, None + else: + return False, f"CONTAINS not supported for types {type(left)}/{type(right)}" + + elif operator == LogicOperator.GT: + return _numeric_compare(left, right, lambda a, b: a > b) + + elif operator == LogicOperator.LT: + return _numeric_compare(left, right, lambda a, b: a < b) + + elif operator == LogicOperator.GTE: + return _numeric_compare(left, right, lambda a, b: a >= b) + + elif operator == LogicOperator.LTE: + return _numeric_compare(left, right, lambda a, b: a <= b) + + else: + return False, f"Unknown comparison operator: {operator}" + + except Exception as e: + return False, f"Comparison failed: {e}" + + +def _numeric_compare(left: Any, right: Any, compare_fn) -> Tuple[bool, Optional[str]]: + """Hilfsfunktion für numerische Vergleiche.""" + try: + left_num = float(left) if not isinstance(left, (int, float)) else left + right_num = float(right) if not isinstance(right, (int, float)) else right + return compare_fn(left_num, right_num), None + except (ValueError, TypeError) as e: + return False, f"Cannot compare as numbers: {left} vs {right} ({e})" diff --git a/backend/version.py b/backend/version.py index adcdbf2..39f746b 100644 --- a/backend/version.py +++ b/backend/version.py @@ -7,8 +7,8 @@ Semantic Versioning: MAJOR.MINOR.PATCH - PATCH: Bugfix, kleine Änderung, Refactor """ -APP_VERSION = "0.9k" -BUILD_DATE = "2026-04-03" +APP_VERSION = "0.9l" +BUILD_DATE = "2026-04-04" DB_SCHEMA_VERSION = "20260403" # Migration 034 MODULE_VERSIONS = { @@ -27,10 +27,24 @@ MODULE_VERSIONS = { "exportdata": "1.1.0", "importdata": "1.0.0", "membership": "2.1.0", - "workflow": "0.3.0", # Phase 2: Normalisierung + Workflow Executor + "workflow": "0.4.0", # Phase 3: Logic Nodes + Conditional Branching } CHANGELOG = [ + { + "version": "0.9l", + "date": "2026-04-04", + "changes": [ + "Phase 3: Logic Nodes + Conditional Branching", + "logic_evaluator.py: Deterministischer Logic Evaluator (EQ, NEQ, IN, GT, LT, CONTAINS, AND, OR, NOT)", + "workflow_executor.py: Conditional Branching via BFS-Traversierung mit Edge-Activation", + "execute_logic_node(): Bedingungen evaluieren, then/else Pfade aktivieren", + "Fallback-Strategien: CONSERVATIVE_SKIP, DEFAULT_PATH, UNCERTAINTY_PATH, DOCUMENT_ONLY", + "workflow_models.py: CONTAINS Operator hinzugefügt", + "Unit-Tests Phase 3: 20 Tests für Logic Evaluator (alle passing)", + "Phase 2 Backward Compatibility: 11 Tests aktualisiert (alle passing)", + ] + }, { "version": "0.9k", "date": "2026-04-03", diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index 017fda9..f6ed1b9 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -1,12 +1,15 @@ """ -Workflow Executor (Phase 2) +Workflow Executor (Phase 3) -Führt Workflows sequenziell aus (noch keine Verzweigung/Logik). +Führt Workflows mit conditional branching aus (Logic Nodes). + +Phase 2: Sequential execution +Phase 3: Conditional branching, Logic Nodes, Fallback strategies Konzept-Basis: konzept_workflow_engine_konsolidated.md -Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2) +Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2-3) """ -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Set from datetime import datetime import uuid import logging @@ -14,7 +17,7 @@ import json from workflow_models import ( WorkflowGraph, NodeExecutionState, ExecutionResult, - NodeStatus, NormalizedSignal + NodeStatus, NormalizedSignal, FallbackStrategy, SignalStatus ) from workflow_engine import parse_workflow_graph, get_execution_order from question_augmenter import ( @@ -23,6 +26,7 @@ from question_augmenter import ( ) from result_container_parser import parse_result_container from normalization_engine import normalize_all_signals, load_question_catalog +from logic_evaluator import evaluate_logic_expression, resolve_signal_reference from db import get_db, get_cursor logger = logging.getLogger(__name__) @@ -36,10 +40,10 @@ async def execute_workflow( enable_debug: bool = False ) -> ExecutionResult: """ - Führt einen Workflow aus (sequenziell, ohne Verzweigung). + Führt einen Workflow aus (mit conditional branching). Phase 2: Linear execution in topological order. - Phase 3: Conditional branching basierend auf normalized_signals. + Phase 3: Conditional branching basierend auf logic nodes. Args: workflow_id: UUID des Workflows @@ -86,39 +90,69 @@ async def execute_workflow( graph = parse_workflow_graph(graph_json) logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges") - # 3. Topologische Sortierung - execution_order = get_execution_order(graph) - logger.info(f"Execution order: {execution_order}") - # 4. Lade Question Catalog with get_db() as conn: catalog = load_question_catalog(conn) logger.debug(f"Loaded catalog: {len(catalog)} question types") - # 5. Execute Nodes sequenziell + # 5. Execute Nodes mit conditional branching (Phase 3) node_states: List[NodeExecutionState] = [] - context = {"variables": variables, "profile_id": profile_id} + context = { + "variables": variables, + "profile_id": profile_id, + "node_results": {}, # Phase 3: Store full NodeExecutionState + "active_edges": {} # Phase 3: Track edge activation + } - for node_id in execution_order: + # Phase 3: BFS traversal mit edge activation + start_node = next((n for n in graph.nodes if n.type == "start"), None) + if not start_node: + raise ValueError("No start node found in workflow") + + visited: Set[str] = set() + queue = [start_node.id] + + while queue: + node_id = queue.pop(0) + + if node_id in visited: + continue + + visited.add(node_id) node = next(n for n in graph.nodes if n.id == node_id) + # Prüfe ob Node aktiv ist (mindestens eine incoming edge aktiv) + if not _has_active_incoming_edge(node, graph, context): + logger.info(f"Skipping node {node_id}: no active incoming edges") + node_states.append(NodeExecutionState( + node_id=node_id, + status=NodeStatus.SKIPPED, + error="no_active_incoming_edges", + started_at=datetime.utcnow().isoformat(), + completed_at=datetime.utcnow().isoformat() + )) + continue + logger.info(f"Executing node: {node_id} (type: {node.type})") node_state = await execute_node( node=node, context=context, catalog=catalog, + graph=graph, # Phase 3: Needed for logic nodes openrouter_call_func=openrouter_call_func, enable_debug=enable_debug ) node_states.append(node_state) + context["node_results"][node_id] = node_state - # Füge Ergebnisse zu Context hinzu (für späteren Zugriff in Phase 3) - context[f"node_{node_id}"] = { - "analysis_core": node_state.analysis_core, - "normalized_signals": [s.model_dump() for s in node_state.normalized_signals] - } + # Füge Nachfolger zur Queue hinzu + outgoing_edges = [e for e in graph.edges if e.from_node == node_id] + for edge in outgoing_edges: + # Prüfe ob Edge aktiv ist (default: True, Logic Nodes setzen auf False) + if context["active_edges"].get(edge.id, True): + queue.append(edge.to_node) # 6. Aggregiere Ergebnisse aggregated = aggregate_results(node_states) @@ -179,6 +213,7 @@ async def execute_node( node, context: Dict[str, Any], catalog: Dict[str, Dict], + graph: WorkflowGraph, # Phase 3: Needed for logic nodes openrouter_call_func, enable_debug: bool = False ) -> NodeExecutionState: @@ -187,8 +222,9 @@ async def execute_node( Args: node: WorkflowNode (aus graph.nodes) - context: Execution context (variables, profile_id, prior results) + context: Execution context (variables, profile_id, node_results, active_edges) catalog: Question catalog + graph: WorkflowGraph (für Logic Nodes) openrouter_call_func: LLM callback: async (prompt, model) -> str enable_debug: Debug mode @@ -198,7 +234,8 @@ async def execute_node( Node Types: - start/end: No-op - analysis: Load prompt → augment → LLM → parse → normalize - - logic/join: Not implemented in Phase 2 + - logic: Evaluate condition → activate/deactivate edges (Phase 3) + - join: Not implemented (Phase 4) """ started_at = datetime.utcnow().isoformat() @@ -213,6 +250,10 @@ async def execute_node( completed_at=datetime.utcnow().isoformat() ) + # Logic Nodes (Phase 3) + if node.type == "logic": + return execute_logic_node(node, context, graph) + # Analysis Nodes if node.type == "analysis": # 1. Lade Prompt @@ -278,8 +319,8 @@ async def execute_node( completed_at=datetime.utcnow().isoformat() ) - # Unbekannter Node-Typ (Phase 3: logic, join) - raise ValueError(f"Node type '{node.type}' not implemented in Phase 2") + # Unbekannter Node-Typ (Phase 4: join) + raise ValueError(f"Node type '{node.type}' not implemented yet (Phase 4+)") except Exception as e: logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True) @@ -292,6 +333,191 @@ async def execute_node( ) +def execute_logic_node( + node, + context: Dict[str, Any], + graph: WorkflowGraph +) -> NodeExecutionState: + """ + Führt Logic Node aus (Phase 3). + + Args: + node: WorkflowNode vom Typ "logic" + context: Execution context mit node_results + graph: WorkflowGraph + + Returns: + NodeExecutionState mit evaluation_result in metadata + + Logic: + 1. Evaluiere node.condition.expression + 2. Wähle then_path oder else_path basierend auf Ergebnis + 3. Bei Fehler/Unsicherheit: Wende fallback an + 4. Markiere gewählte Edge(s) als aktiv, andere als inaktiv + 5. Return NodeExecutionState mit evaluation_result + """ + started_at = datetime.utcnow().isoformat() + + try: + if not node.condition: + raise ValueError(f"Logic node {node.id} has no condition") + + # 1. Evaluiere Bedingung + result, error = evaluate_logic_expression(node.condition.expression, context) + + if error: + # Fehler bei Evaluation → Fallback anwenden + logger.warning(f"Logic node {node.id}: evaluation error: {error}") + activated_edges = _apply_fallback(node, graph, context, error) + else: + # Erfolgreiche Evaluation + logger.info(f"Logic node {node.id}: evaluated to {result}") + + # 2. Wähle Pfad basierend auf Ergebnis + if result: + # Condition TRUE → then_path aktivieren + activated_edges = _get_edges_by_label(node.id, "then", graph) + else: + # Condition FALSE → else_path aktivieren + activated_edges = _get_edges_by_label(node.id, "else", graph) + + # 3. Markiere Edges als aktiv/inaktiv + outgoing_edges = [e for e in graph.edges if e.from_node == node.id] + for edge in outgoing_edges: + context["active_edges"][edge.id] = edge.id in activated_edges + + logger.debug(f"Logic node {node.id}: activated edges: {activated_edges}") + + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.EXECUTED, + started_at=started_at, + completed_at=datetime.utcnow().isoformat(), + analysis_core=json.dumps({ + "evaluation_result": result if not error else None, + "error": error, + "activated_edges": activated_edges + }) + ) + + except Exception as e: + logger.error(f"Logic node execution failed ({node.id}): {e}", exc_info=True) + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.FAILED, + error=str(e), + started_at=started_at, + completed_at=datetime.utcnow().isoformat() + ) + + +def _apply_fallback( + node, + graph: WorkflowGraph, + context: Dict[str, Any], + error: str +) -> List[str]: + """ + Wendet Fallback-Strategie an. + + Args: + node: WorkflowNode + graph: WorkflowGraph + context: Execution context + error: Fehler bei Evaluation + + Returns: + Liste von Edge-IDs die aktiviert werden sollen + """ + if not node.fallback: + # Default: Konservativ (else-Pfad nehmen) + logger.warning(f"Node {node.id}: No fallback configured, using DEFAULT_PATH") + return _get_edges_by_label(node.id, "else", graph) + + strategy = node.fallback.strategy + + if strategy == FallbackStrategy.CONSERVATIVE_SKIP: + # Skip alle outgoing edges + logger.info(f"Node {node.id}: CONSERVATIVE_SKIP - deactivating all paths") + return [] + + elif strategy == FallbackStrategy.DEFAULT_PATH: + # Nimm else-Pfad + logger.info(f"Node {node.id}: DEFAULT_PATH - taking else path") + return _get_edges_by_label(node.id, "else", graph) + + elif strategy == FallbackStrategy.UNCERTAINTY_PATH: + # Expliziter Unsicherheits-Pfad (falls vorhanden) + logger.info(f"Node {node.id}: UNCERTAINTY_PATH") + uncertainty_edges = _get_edges_by_label(node.id, "uncertainty", graph) + if uncertainty_edges: + return uncertainty_edges + else: + # Fallback to else + logger.warning(f"Node {node.id}: No uncertainty path found, using else") + return _get_edges_by_label(node.id, "else", graph) + + elif strategy == FallbackStrategy.DOCUMENT_ONLY: + # Alle Pfade aktiv lassen (wie ohne Bedingung) + logger.info(f"Node {node.id}: DOCUMENT_ONLY - all paths remain active") + outgoing_edges = [e for e in graph.edges if e.from_node == node.id] + return [e.id for e in outgoing_edges] + + else: + logger.warning(f"Node {node.id}: Unknown fallback strategy {strategy}, using DEFAULT_PATH") + return _get_edges_by_label(node.id, "else", graph) + + +def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[str]: + """ + Findet alle ausgehenden Edges mit bestimmtem Label. + + Args: + node_id: Node-ID + label: Edge-Label (z.B. "then", "else", "uncertainty") + graph: WorkflowGraph + + Returns: + Liste von Edge-IDs + """ + matching_edges = [ + e.id for e in graph.edges + if e.from_node == node_id and e.label == label + ] + return matching_edges + + +def _has_active_incoming_edge(node, graph: WorkflowGraph, context: Dict[str, Any]) -> bool: + """ + Prüft ob Node mindestens eine aktive incoming edge hat. + + Args: + node: WorkflowNode + graph: WorkflowGraph + context: Execution context mit active_edges + + Returns: + True wenn mindestens eine incoming edge aktiv ist + """ + # Start-Node hat keine incoming edges → immer aktiv + if node.type == "start": + return True + + incoming_edges = [e for e in graph.edges if e.to_node == node.id] + + # Keine incoming edges → nicht erreichbar + if not incoming_edges: + return False + + # Prüfe ob mindestens eine aktiv ist + active_edges = context.get("active_edges", {}) + for edge in incoming_edges: + if active_edges.get(edge.id, True): # Default: True (Phase 2 Kompatibilität) + return True + + return False + + async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str: """ Lädt Prompt-Template aus DB und resolved Platzhalter. @@ -375,7 +601,8 @@ def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: "all_signals": all_signals, "total_nodes": len(node_states), "executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED), - "failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED) + "failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED), + "skipped_nodes": sum(1 for s in node_states if s.status == NodeStatus.SKIPPED) # Phase 3 } diff --git a/backend/workflow_models.py b/backend/workflow_models.py index 175a821..3d6d2ab 100644 --- a/backend/workflow_models.py +++ b/backend/workflow_models.py @@ -73,6 +73,7 @@ class LogicOperator(str, Enum): LT = "lt" # < GTE = "gte" # >= LTE = "lte" # <= + CONTAINS = "contains" # String/List contains (Phase 3) AND = "and" OR = "or" NOT = "not" diff --git a/tests/backend/test_phase2_workflow_executor.py b/tests/backend/test_phase2_workflow_executor.py index 1f49afc..797d2cd 100644 --- a/tests/backend/test_phase2_workflow_executor.py +++ b/tests/backend/test_phase2_workflow_executor.py @@ -192,24 +192,25 @@ def test_aggregate_results_formatting(): async def test_execute_node_start_end(): """Test: Start/End Nodes sind No-Ops""" from workflow_executor import execute_node - from workflow_models import WorkflowNode + from workflow_models import WorkflowNode, WorkflowGraph start_node = WorkflowNode(id="start", type="start") end_node = WorkflowNode(id="end", type="end") context = {"variables": {}, "profile_id": "test"} catalog = {} + mock_graph = WorkflowGraph(nodes=[], edges=[]) # Phase 3: graph parameter required async def mock_llm(prompt, model): return "should not be called" # Test start - result = await execute_node(start_node, context, catalog, mock_llm) + result = await execute_node(start_node, context, catalog, mock_graph, mock_llm) assert result.status == NodeStatus.EXECUTED assert result.analysis_core is None # Test end - result = await execute_node(end_node, context, catalog, mock_llm) + result = await execute_node(end_node, context, catalog, mock_graph, mock_llm) assert result.status == NodeStatus.EXECUTED assert result.analysis_core is None @@ -218,29 +219,30 @@ async def test_execute_node_start_end(): async def test_execute_node_unknown_type(): """Test: Unbekannter Node-Typ wirft Fehler""" from workflow_executor import execute_node - from workflow_models import WorkflowNode + from workflow_models import WorkflowNode, WorkflowGraph - # Phase 2 unterstützt nur start, end, analysis - logic_node = WorkflowNode(id="logic1", type="logic") + # Phase 3: logic is now implemented, test with join instead + join_node = WorkflowNode(id="join1", type="join") context = {"variables": {}, "profile_id": "test"} catalog = {} + mock_graph = WorkflowGraph(nodes=[], edges=[]) async def mock_llm(prompt, model): return "" - result = await execute_node(logic_node, context, catalog, mock_llm) + result = await execute_node(join_node, context, catalog, mock_graph, mock_llm) # Sollte FAILED sein mit Fehlermeldung assert result.status == NodeStatus.FAILED - assert "not implemented in Phase 2" in result.error + assert "not implemented" in result.error.lower() or "phase 4" in result.error.lower() @pytest.mark.asyncio async def test_execute_node_analysis_simple(): """Test: Analysis Node ohne Fragenergänzung""" from workflow_executor import execute_node - from workflow_models import WorkflowNode + from workflow_models import WorkflowNode, WorkflowGraph node = WorkflowNode( id="test_node", @@ -251,6 +253,7 @@ async def test_execute_node_analysis_simple(): context = {"variables": {"name": "Test"}, "profile_id": "test"} catalog = {} + mock_graph = WorkflowGraph(nodes=[], edges=[]) # Mock LLM async def mock_llm(prompt, model): @@ -260,7 +263,7 @@ async def test_execute_node_analysis_simple(): with patch('workflow_executor.load_prompt_template') as mock_load: mock_load.return_value = "Test prompt for {{name}}" - result = await execute_node(node, context, catalog, mock_llm) + result = await execute_node(node, context, catalog, mock_graph, mock_llm) assert result.status == NodeStatus.EXECUTED assert result.analysis_core == "Test analysis content" @@ -271,7 +274,7 @@ async def test_execute_node_analysis_simple(): async def test_execute_node_analysis_with_questions(): """Test: Analysis Node mit Fragenergänzung und Normalisierung""" from workflow_executor import execute_node - from workflow_models import WorkflowNode, QuestionAugmentation + from workflow_models import WorkflowNode, QuestionAugmentation, WorkflowGraph node = WorkflowNode( id="test_node", @@ -294,6 +297,7 @@ async def test_execute_node_analysis_with_questions(): "normalization_rules": None } } + mock_graph = WorkflowGraph(nodes=[], edges=[]) # Mock LLM async def mock_llm(prompt, model): @@ -309,7 +313,7 @@ Test analysis with patch('workflow_executor.load_prompt_template') as mock_load: mock_load.return_value = "Base prompt" - result = await execute_node(node, context, catalog, mock_llm) + result = await execute_node(node, context, catalog, mock_graph, mock_llm) assert result.status == NodeStatus.EXECUTED assert result.analysis_core == "Test analysis" @@ -330,7 +334,7 @@ async def test_execute_node_hybrid_model_override(): Regression-Test für: https://github.com/anthropics/claude-code/issues/XXX """ from workflow_executor import execute_node - from workflow_models import WorkflowNode, QuestionAugmentation + from workflow_models import WorkflowNode, QuestionAugmentation, WorkflowGraph # Node mit ANDEREM Spektrum als Catalog node = WorkflowNode( @@ -356,6 +360,7 @@ async def test_execute_node_hybrid_model_override(): "normalization_rules": None } } + mock_graph = WorkflowGraph(nodes=[], edges=[]) # Mock LLM gibt "decrease" zurück (gültig für Node, ungültig für Catalog) async def mock_llm(prompt, model): @@ -370,7 +375,7 @@ Gewicht gesunken with patch('workflow_executor.load_prompt_template') as mock_load: mock_load.return_value = "Base prompt" - result = await execute_node(node, context, catalog, mock_llm) + result = await execute_node(node, context, catalog, mock_graph, mock_llm) # Assertions: "decrease" muss VALID sein (Node-Spektrum), nicht INVALID (Catalog) assert result.status == NodeStatus.EXECUTED diff --git a/tests/backend/test_phase3_logic_evaluator.py b/tests/backend/test_phase3_logic_evaluator.py new file mode 100644 index 0000000..8b8a548 --- /dev/null +++ b/tests/backend/test_phase3_logic_evaluator.py @@ -0,0 +1,720 @@ +""" +Unit Tests für logic_evaluator.py (Phase 3) + +Run with: PYTHONPATH=./backend pytest tests/backend/test_phase3_logic_evaluator.py -v +""" +import pytest +from logic_evaluator import ( + evaluate_logic_expression, + resolve_signal_reference, + compare_values +) +from workflow_models import ( + LogicExpression, + LogicOperator, + NormalizedSignal, + SignalStatus, + NodeExecutionState, + NodeStatus +) + + +# ── Comparison Operator Tests ────────────────────────────────────────────────── + + +def test_evaluate_eq_true(): + """Test: EQ operator - match""" + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_eq_false(): + """Test: EQ operator - no match""" + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="increase" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is None + + +def test_evaluate_neq(): + """Test: NEQ operator""" + expression = LogicExpression( + operator=LogicOperator.NEQ, + ref="body.relevanz", + value="stable" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_in_true(): + """Test: IN operator - value in list""" + expression = LogicExpression( + operator=LogicOperator.IN, + ref="body.prioritaet", + value=["hoch", "mittel"] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="prioritaet", + raw_value="hoch", + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_in_false(): + """Test: IN operator - value not in list""" + expression = LogicExpression( + operator=LogicOperator.IN, + ref="body.prioritaet", + value=["hoch", "mittel"] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="prioritaet", + raw_value="niedrig", + normalized_value="niedrig", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is None + + +def test_evaluate_gt(): + """Test: GT operator - greater than""" + expression = LogicExpression( + operator=LogicOperator.GT, + ref="body.score", + value=50 + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="score", + raw_value="75", + normalized_value="75", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_lt(): + """Test: LT operator - less than""" + expression = LogicExpression( + operator=LogicOperator.LT, + ref="body.score", + value=50 + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="score", + raw_value="25", + normalized_value="25", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_contains_string(): + """Test: CONTAINS operator - string contains substring""" + expression = LogicExpression( + operator=LogicOperator.CONTAINS, + ref="body.kategorie", + value="Gewicht" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="kategorie", + raw_value="Gewichtsverlust positiv", + normalized_value="Gewichtsverlust positiv", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +# ── Logical Operator Tests ────────────────────────────────────────────────── + + +def test_evaluate_and_both_true(): + """Test: AND operator - both operands true""" + expression = LogicExpression( + operator=LogicOperator.AND, + operands=[ + LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ), + LogicExpression( + operator=LogicOperator.EQ, + ref="activity.intensitaet", + value="hoch" + ) + ] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ), + "activity": NodeExecutionState( + node_id="activity", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="intensitaet", + raw_value="hoch", + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_and_one_false(): + """Test: AND operator - one operand false""" + expression = LogicExpression( + operator=LogicOperator.AND, + operands=[ + LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ), + LogicExpression( + operator=LogicOperator.EQ, + ref="activity.intensitaet", + value="niedrig" + ) + ] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ), + "activity": NodeExecutionState( + node_id="activity", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="intensitaet", + raw_value="hoch", + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is None + + +def test_evaluate_or_one_true(): + """Test: OR operator - one operand true""" + expression = LogicExpression( + operator=LogicOperator.OR, + operands=[ + LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ), + LogicExpression( + operator=LogicOperator.EQ, + ref="activity.intensitaet", + value="niedrig" + ) + ] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ), + "activity": NodeExecutionState( + node_id="activity", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="intensitaet", + raw_value="hoch", + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True + assert error is None + + +def test_evaluate_or_both_false(): + """Test: OR operator - both operands false""" + expression = LogicExpression( + operator=LogicOperator.OR, + operands=[ + LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="increase" + ), + LogicExpression( + operator=LogicOperator.EQ, + ref="activity.intensitaet", + value="niedrig" + ) + ] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ), + "activity": NodeExecutionState( + node_id="activity", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="intensitaet", + raw_value="hoch", + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is None + + +def test_evaluate_not(): + """Test: NOT operator - negation""" + expression = LogicExpression( + operator=LogicOperator.NOT, + operands=[ + LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="increase" + ) + ] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is True # NOT (decrease == increase) = NOT False = True + assert error is None + + +# ── Nested Expressions Tests ────────────────────────────────────────────────── + + +def test_evaluate_nested_and_or(): + """Test: Nested expression - (A AND B) OR C""" + expression = LogicExpression( + operator=LogicOperator.OR, + operands=[ + LogicExpression( + operator=LogicOperator.AND, + operands=[ + LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ), + LogicExpression( + operator=LogicOperator.EQ, + ref="activity.intensitaet", + value="niedrig" + ) + ] + ), + LogicExpression( + operator=LogicOperator.EQ, + ref="nutrition.defizit", + value="hoch" + ) + ] + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ), + "activity": NodeExecutionState( + node_id="activity", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="intensitaet", + raw_value="hoch", # FALSE für AND-Teil + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ), + "nutrition": NodeExecutionState( + node_id="nutrition", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="defizit", + raw_value="hoch", # TRUE für OR-Teil + normalized_value="hoch", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + # (decrease AND niedrig) OR hoch = (True AND False) OR True = False OR True = True + assert result is True + assert error is None + + +# ── Error Handling Tests ────────────────────────────────────────────────── + + +def test_evaluate_missing_node(): + """Test: Error handling - node not found""" + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="missing_node.relevanz", + value="decrease" + ) + + context = { + "node_results": {} + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is not None + assert "not found" in error.lower() + + +def test_evaluate_missing_signal(): + """Test: Error handling - signal not found in node""" + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="body.missing_signal", + value="decrease" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="decrease", + normalized_value="decrease", + status=SignalStatus.VALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is not None + assert "not found" in error.lower() + + +def test_evaluate_unclear_signal(): + """Test: Error handling - signal has UNCLEAR status""" + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="maybe", + normalized_value="unklar", + status=SignalStatus.UNCLEAR + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is not None + assert "unclear" in error.lower() or "status" in error.lower() + + +def test_evaluate_invalid_signal(): + """Test: Error handling - signal has INVALID status""" + expression = LogicExpression( + operator=LogicOperator.EQ, + ref="body.relevanz", + value="decrease" + ) + + context = { + "node_results": { + "body": NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="invalid_value", + normalized_value=None, + status=SignalStatus.INVALID + ) + ] + ) + } + } + + result, error = evaluate_logic_expression(expression, context) + assert result is False + assert error is not None + assert "invalid" in error.lower() or "status" in error.lower() + + +def test_compare_values_gt_non_numeric(): + """Test: Error handling - GT with non-numeric values""" + result, error = compare_values(LogicOperator.GT, "text", 50) + assert result is False + assert error is not None + assert "cannot compare" in error.lower() + + +def test_compare_values_in_non_list(): + """Test: Error handling - IN with non-list right value""" + result, error = compare_values(LogicOperator.IN, "value", "not_a_list") + assert result is False + assert error is not None + assert "requires list" in error.lower() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/backend/test_phase3_workflow_branching.py b/tests/backend/test_phase3_workflow_branching.py new file mode 100644 index 0000000..2ff0777 --- /dev/null +++ b/tests/backend/test_phase3_workflow_branching.py @@ -0,0 +1,459 @@ +""" +Integration Tests für Workflow Branching (Phase 3) + +Testet conditional execution mit Logic Nodes. + +Run with: PYTHONPATH=./backend pytest tests/backend/test_phase3_workflow_branching.py -v +""" +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from workflow_executor import execute_workflow +from workflow_models import ( + WorkflowGraph, WorkflowNode, WorkflowEdge, + LogicExpression, LogicOperator, Condition, FallbackConfig, FallbackStrategy, + QuestionAugmentation, NodeStatus +) + + +# ── Helper Functions ──────────────────────────────────────────────────────── + + +def create_mock_db(): + """Creates mock DB connection with cursor""" + conn = MagicMock() + cur = MagicMock() + cur.fetchone = MagicMock() + cur.fetchall = MagicMock(return_value=[]) + + # Mock get_cursor() + def mock_get_cursor(c): + return cur + + # Context manager support + conn.__enter__ = MagicMock(return_value=conn) + conn.__exit__ = MagicMock(return_value=None) + + return conn, cur, mock_get_cursor + + +@pytest.mark.asyncio +async def test_simple_if_else_branching(): + """Test: Simple if/else branching - then path taken""" + + # Workflow: start → analysis → logic → then_path / else_path → end + workflow_graph = { + "nodes": [ + {"id": "start", "type": "start"}, + {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", + "question_augmentations": [ + {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} + ]}, + {"id": "logic", "type": "logic", + "condition": { + "expression": { + "operator": "eq", + "ref": "analysis.relevanz", + "value": "ja" + } + }}, + {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, + {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, + {"id": "end", "type": "end"} + ], + "edges": [ + {"id": "e1", "from": "start", "to": "analysis"}, + {"id": "e2", "from": "analysis", "to": "logic"}, + {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, + {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, + {"id": "e5", "from": "then_path", "to": "end"}, + {"id": "e6", "from": "else_path", "to": "end"} + ] + } + + # Mock DB + conn, cur, mock_get_cursor = create_mock_db() + cur.fetchone.side_effect = [ + {"graph": workflow_graph}, # Workflow definition + {"template": "Test prompt"} # Prompt template + ] + cur.fetchall.return_value = [ + {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} + ] + + # Mock LLM - returns "ja" signal + async def mock_llm(prompt, model): + return """## Analyse +Test analysis + +## Entscheidungsfragen +- Relevanz: ja +""" + + with patch('workflow_executor.get_db', return_value=conn): + with patch('workflow_executor.get_cursor', side_effect=mock_get_cursor): + with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): + result = await execute_workflow( + workflow_id="test-workflow", + profile_id="test-profile", + variables={}, + openrouter_call_func=mock_llm + ) + + # Assertions + assert result.status == "completed" + assert len(result.node_states) == 5 # start, analysis, logic, then_path, end (else_path skipped) + + # Check which nodes were executed + executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] + skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] + + assert "then_path" in executed_nodes + assert "else_path" in skipped_nodes + + # Check aggregation + assert result.aggregated_result["executed_nodes"] == 4 # start, analysis, logic, then_path (end is no-op) + assert result.aggregated_result["skipped_nodes"] == 1 # else_path + + +@pytest.mark.asyncio +async def test_else_path_taken(): + """Test: Simple if/else branching - else path taken""" + + workflow_graph = { + "nodes": [ + {"id": "start", "type": "start"}, + {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", + "question_augmentations": [ + {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} + ]}, + {"id": "logic", "type": "logic", + "condition": { + "expression": { + "operator": "eq", + "ref": "analysis.relevanz", + "value": "ja" + } + }}, + {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, + {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, + {"id": "end", "type": "end"} + ], + "edges": [ + {"id": "e1", "from": "start", "to": "analysis"}, + {"id": "e2", "from": "analysis", "to": "logic"}, + {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, + {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, + {"id": "e5", "from": "then_path", "to": "end"}, + {"id": "e6", "from": "else_path", "to": "end"} + ] + } + + conn, cur = create_mock_db() + cur.fetchone.side_effect = [ + {"graph": workflow_graph}, + {"template": "Test prompt"} + ] + cur.fetchall.return_value = [ + {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} + ] + + # Mock LLM - returns "nein" signal (condition false) + async def mock_llm(prompt, model): + return """## Analyse +Test analysis + +## Entscheidungsfragen +- Relevanz: nein +""" + + with patch('workflow_executor.get_db', return_value=conn): + with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): + result = await execute_workflow( + workflow_id="test-workflow", + profile_id="test-profile", + variables={}, + openrouter_call_func=mock_llm + ) + + # Assertions + executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] + skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] + + assert "else_path" in executed_nodes + assert "then_path" in skipped_nodes + + +@pytest.mark.asyncio +async def test_and_condition(): + """Test: AND condition - both must be true""" + + workflow_graph = { + "nodes": [ + {"id": "start", "type": "start"}, + {"id": "analysis1", "type": "analysis", "prompt_slug": "test_prompt", + "question_augmentations": [ + {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} + ]}, + {"id": "analysis2", "type": "analysis", "prompt_slug": "test_prompt", + "question_augmentations": [ + {"id": "q2", "type": "prioritaet", "question": "Priority?", "answer_spectrum": ["hoch", "niedrig"]} + ]}, + {"id": "logic", "type": "logic", + "condition": { + "expression": { + "operator": "and", + "operands": [ + {"operator": "eq", "ref": "analysis1.relevanz", "value": "ja"}, + {"operator": "eq", "ref": "analysis2.prioritaet", "value": "hoch"} + ] + } + }}, + {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, + {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, + {"id": "end", "type": "end"} + ], + "edges": [ + {"id": "e1", "from": "start", "to": "analysis1"}, + {"id": "e2", "from": "analysis1", "to": "analysis2"}, + {"id": "e3", "from": "analysis2", "to": "logic"}, + {"id": "e4", "from": "logic", "to": "then_path", "label": "then"}, + {"id": "e5", "from": "logic", "to": "else_path", "label": "else"}, + {"id": "e6", "from": "then_path", "to": "end"}, + {"id": "e7", "from": "else_path", "to": "end"} + ] + } + + conn, cur = create_mock_db() + cur.fetchone.side_effect = [ + {"graph": workflow_graph}, + {"template": "Test prompt"}, + {"template": "Test prompt"} + ] + cur.fetchall.return_value = [ + {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None}, + {"question_type": "prioritaet", "answer_spectrum": ["hoch", "niedrig"], "normalization_rules": None} + ] + + # Mock LLM - returns ja AND hoch (both true) + call_count = 0 + async def mock_llm(prompt, model): + nonlocal call_count + call_count += 1 + if call_count == 1: + return """## Analyse +Analysis 1 + +## Entscheidungsfragen +- Relevanz: ja +""" + else: + return """## Analyse +Analysis 2 + +## Entscheidungsfragen +- Prioritaet: hoch +""" + + with patch('workflow_executor.get_db', return_value=conn): + with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): + result = await execute_workflow( + workflow_id="test-workflow", + profile_id="test-profile", + variables={}, + openrouter_call_func=mock_llm + ) + + # Assertions: Both true → then path taken + executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] + skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] + + assert "then_path" in executed_nodes + assert "else_path" in skipped_nodes + + +@pytest.mark.asyncio +async def test_fallback_conservative_skip(): + """Test: Fallback strategy CONSERVATIVE_SKIP""" + + workflow_graph = { + "nodes": [ + {"id": "start", "type": "start"}, + {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", + "question_augmentations": [ + {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} + ]}, + {"id": "logic", "type": "logic", + "condition": { + "expression": { + "operator": "eq", + "ref": "analysis.relevanz", + "value": "ja" + } + }, + "fallback": { + "strategy": "conservative_skip" + }}, + {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, + {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, + {"id": "end", "type": "end"} + ], + "edges": [ + {"id": "e1", "from": "start", "to": "analysis"}, + {"id": "e2", "from": "analysis", "to": "logic"}, + {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, + {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, + {"id": "e5", "from": "then_path", "to": "end"}, + {"id": "e6", "from": "else_path", "to": "end"} + ] + } + + conn, cur = create_mock_db() + cur.fetchone.side_effect = [ + {"graph": workflow_graph}, + {"template": "Test prompt"} + ] + cur.fetchall.return_value = [ + {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} + ] + + # Mock LLM - returns UNCLEAR signal (triggers fallback) + async def mock_llm(prompt, model): + return """## Analyse +Test analysis + +## Entscheidungsfragen +- Relevanz: unklar +""" + + with patch('workflow_executor.get_db', return_value=conn): + with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): + result = await execute_workflow( + workflow_id="test-workflow", + profile_id="test-profile", + variables={}, + openrouter_call_func=mock_llm + ) + + # Assertions: CONSERVATIVE_SKIP → both paths skipped + skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] + + assert "then_path" in skipped_nodes + assert "else_path" in skipped_nodes + assert result.aggregated_result["skipped_nodes"] == 2 + + +@pytest.mark.asyncio +async def test_fallback_default_path(): + """Test: Fallback strategy DEFAULT_PATH""" + + workflow_graph = { + "nodes": [ + {"id": "start", "type": "start"}, + {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt", + "question_augmentations": [ + {"id": "q1", "type": "relevanz", "question": "Relevant?", "answer_spectrum": ["ja", "nein"]} + ]}, + {"id": "logic", "type": "logic", + "condition": { + "expression": { + "operator": "eq", + "ref": "analysis.relevanz", + "value": "ja" + } + }, + "fallback": { + "strategy": "default_path" + }}, + {"id": "then_path", "type": "analysis", "prompt_slug": "then_prompt"}, + {"id": "else_path", "type": "analysis", "prompt_slug": "else_prompt"}, + {"id": "end", "type": "end"} + ], + "edges": [ + {"id": "e1", "from": "start", "to": "analysis"}, + {"id": "e2", "from": "analysis", "to": "logic"}, + {"id": "e3", "from": "logic", "to": "then_path", "label": "then"}, + {"id": "e4", "from": "logic", "to": "else_path", "label": "else"}, + {"id": "e5", "from": "then_path", "to": "end"}, + {"id": "e6", "from": "else_path", "to": "end"} + ] + } + + conn, cur = create_mock_db() + cur.fetchone.side_effect = [ + {"graph": workflow_graph}, + {"template": "Test prompt"} + ] + cur.fetchall.return_value = [ + {"question_type": "relevanz", "answer_spectrum": ["ja", "nein"], "normalization_rules": None} + ] + + # Mock LLM - returns INVALID signal (triggers fallback) + async def mock_llm(prompt, model): + return """## Analyse +Test analysis + +## Entscheidungsfragen +- Relevanz: totally_invalid_value +""" + + with patch('workflow_executor.get_db', return_value=conn): + with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): + result = await execute_workflow( + workflow_id="test-workflow", + profile_id="test-profile", + variables={}, + openrouter_call_func=mock_llm + ) + + # Assertions: DEFAULT_PATH → else path taken + executed_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.EXECUTED] + skipped_nodes = [s.node_id for s in result.node_states if s.status == NodeStatus.SKIPPED] + + assert "else_path" in executed_nodes + assert "then_path" in skipped_nodes + + +@pytest.mark.asyncio +async def test_linear_workflow_still_works(): + """Test: Linear workflow (no logic nodes) still works (Phase 2 compatibility)""" + + workflow_graph = { + "nodes": [ + {"id": "start", "type": "start"}, + {"id": "analysis", "type": "analysis", "prompt_slug": "test_prompt"}, + {"id": "end", "type": "end"} + ], + "edges": [ + {"id": "e1", "from": "start", "to": "analysis"}, + {"id": "e2", "from": "analysis", "to": "end"} + ] + } + + conn, cur = create_mock_db() + cur.fetchone.side_effect = [ + {"graph": workflow_graph}, + {"template": "Test prompt"} + ] + cur.fetchall.return_value = [] + + async def mock_llm(prompt, model): + return "## Analyse\nTest analysis" + + with patch('workflow_executor.get_db', return_value=conn): + with patch('placeholder_resolver.resolve_placeholders', return_value="Test prompt"): + result = await execute_workflow( + workflow_id="test-workflow", + profile_id="test-profile", + variables={}, + openrouter_call_func=mock_llm + ) + + # Assertions: All nodes executed + assert result.status == "completed" + assert len(result.node_states) == 3 + assert all(s.status == NodeStatus.EXECUTED for s in result.node_states) + assert result.aggregated_result["skipped_nodes"] == 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])