""" Logic Evaluator (Phase 3) Rein deterministischer boolescher Evaluator für Workflow-Bedingungen. Keine KI, keine Interpretation - nur strikte Logik-Auswertung. Unterstützt: - Vergleichsoperatoren: EQ, NEQ, IN, NOT_IN, GT, LT, GTE, LTE, CONTAINS - Logische Operatoren: AND, OR, NOT - Verschachtelung via operands """ from typing import Dict, Any, Optional, Tuple, List import logging from workflow_models import ( LogicExpression, LogicOperator, NormalizedSignal, SignalStatus ) logger = logging.getLogger(__name__) def evaluate_logic_expression( expression: LogicExpression, context: Dict[str, Any] ) -> Tuple[bool, Optional[str]]: """ Evaluiert LogicExpression gegen context. Args: expression: Die auszuwertende Bedingung context: Execution context mit node_results Returns: (result: bool, error: Optional[str]) - result: True/False basierend auf Auswertung - error: Fehlermeldung wenn Evaluation fehlschlägt Beispiel: expression = LogicExpression( operator=LogicOperator.EQ, ref="body.relevanz", value="decrease" ) result, error = evaluate_logic_expression(expression, context) """ try: operator = expression.operator # Logische Operatoren (verschachtelt) if operator == LogicOperator.AND: return _evaluate_and(expression, context) elif operator == LogicOperator.OR: return _evaluate_or(expression, context) elif operator == LogicOperator.NOT: return _evaluate_not(expression, context) # Vergleichsoperatoren (benötigen ref) elif operator in [ LogicOperator.EQ, LogicOperator.NEQ, LogicOperator.IN, LogicOperator.NOT_IN, LogicOperator.GT, LogicOperator.LT, LogicOperator.GTE, LogicOperator.LTE, LogicOperator.CONTAINS ]: return _evaluate_comparison(expression, context) else: return False, f"Unknown operator: {operator}" except Exception as e: logger.error(f"Logic evaluation failed: {e}", exc_info=True) return False, str(e) def _evaluate_and( expression: LogicExpression, context: Dict[str, Any] ) -> Tuple[bool, Optional[str]]: """Evaluiert AND - alle operands müssen True sein.""" if not expression.operands: return False, "AND requires operands" for operand in expression.operands: result, error = evaluate_logic_expression(operand, context) if error: return False, f"AND operand failed: {error}" if not result: return False, None # Short-circuit: einer False → gesamt False return True, None def _evaluate_or( expression: LogicExpression, context: Dict[str, Any] ) -> Tuple[bool, Optional[str]]: """Evaluiert OR - mindestens ein operand muss True sein.""" if not expression.operands: return False, "OR requires operands" errors = [] for operand in expression.operands: result, error = evaluate_logic_expression(operand, context) if error: errors.append(error) continue if result: return True, None # Short-circuit: einer True → gesamt True # Alle False oder Fehler if errors: return False, f"All OR operands failed: {', '.join(errors)}" return False, None def _evaluate_not( expression: LogicExpression, context: Dict[str, Any] ) -> Tuple[bool, Optional[str]]: """Evaluiert NOT - negiert operand.""" if not expression.operands or len(expression.operands) != 1: return False, "NOT requires exactly one operand" result, error = evaluate_logic_expression(expression.operands[0], context) if error: return False, f"NOT operand failed: {error}" return not result, None def _evaluate_comparison( expression: LogicExpression, context: Dict[str, Any] ) -> Tuple[bool, Optional[str]]: """Evaluiert Vergleichsoperator.""" if not expression.ref: return False, f"{expression.operator} requires ref" # Signal auflösen signal, error = resolve_signal_reference(expression.ref, context) if error: return False, error if signal is None: return False, f"Signal not found: {expression.ref}" # Prüfe Signal-Status if signal.status in [SignalStatus.UNCLEAR, SignalStatus.INVALID, SignalStatus.NOT_DECIDABLE]: return False, f"Signal '{expression.ref}' has status {signal.status} - cannot evaluate" # Vergleich durchführen left_value = signal.normalized_value or signal.raw_value right_value = expression.value return compare_values(expression.operator, left_value, right_value) def resolve_signal_reference( ref: str, context: Dict[str, Any] ) -> Tuple[Optional[NormalizedSignal], Optional[str]]: """ Löst Referenz wie "node_1.relevanz" auf. Args: ref: Signal-Referenz im Format "node_id.question_type" context: Execution context mit node_results Returns: (signal, error_message) Error wenn: - Node existiert nicht in context - Signal-Typ existiert nicht für diesen Node """ # Parse reference parts = ref.split(".") if len(parts) != 2: return None, f"Invalid reference format: '{ref}' (expected 'node_id.question_type')" node_id, question_type = parts # Hole Node-Results aus context node_results = context.get("node_results", {}) if node_id not in node_results: return None, f"Node '{node_id}' not found in context" node_state = node_results[node_id] # Suche Signal for signal in node_state.normalized_signals: if signal.question_type == question_type: return signal, None return None, f"Signal '{question_type}' not found in node '{node_id}'" def compare_values( operator: LogicOperator, left: Any, right: Any ) -> Tuple[bool, Optional[str]]: """ Führt Vergleichsoperation durch. Args: operator: Vergleichsoperator left: Linker Wert (aus Signal) right: Rechter Wert (aus Expression) Returns: (result: bool, error: Optional[str]) """ try: if operator == LogicOperator.EQ: return left == right, None elif operator == LogicOperator.NEQ: return left != right, None elif operator == LogicOperator.IN: if not isinstance(right, (list, tuple, set)): return False, f"IN operator requires list, got {type(right)}" return left in right, None elif operator == LogicOperator.NOT_IN: if not isinstance(right, (list, tuple, set)): return False, f"NOT_IN operator requires list, got {type(right)}" return left not in right, None elif operator == LogicOperator.CONTAINS: # left enthält right (z.B. String-Suche) if isinstance(left, str) and isinstance(right, str): return right in left, None elif isinstance(left, (list, tuple, set)): return right in left, None else: return False, f"CONTAINS not supported for types {type(left)}/{type(right)}" elif operator == LogicOperator.GT: return _numeric_compare(left, right, lambda a, b: a > b) elif operator == LogicOperator.LT: return _numeric_compare(left, right, lambda a, b: a < b) elif operator == LogicOperator.GTE: return _numeric_compare(left, right, lambda a, b: a >= b) elif operator == LogicOperator.LTE: return _numeric_compare(left, right, lambda a, b: a <= b) else: return False, f"Unknown comparison operator: {operator}" except Exception as e: return False, f"Comparison failed: {e}" def _numeric_compare(left: Any, right: Any, compare_fn) -> Tuple[bool, Optional[str]]: """Hilfsfunktion für numerische Vergleiche.""" try: left_num = float(left) if not isinstance(left, (int, float)) else left right_num = float(right) if not isinstance(right, (int, float)) else right return compare_fn(left_num, right_num), None except (ValueError, TypeError) as e: return False, f"Cannot compare as numbers: {left} vs {right} ({e})"