""" Join Evaluator (Phase 4) Evaluiert Join-Knoten: Sammelt incoming paths, prüft Strategien, konsolidiert Ergebnisse. Join-Strategien: - wait_all: Alle eingehenden Pfade müssen ausgeführt sein (strikt) - wait_any: Mindestens ein Pfad muss ausgeführt sein - best_effort: Verwendet verfügbare Pfade (fehlertoleranz) Skip-Handling: - ignore_skipped: Übersprungene Pfade nicht in Ergebnis - use_placeholder: Platzhalter für übersprungene Pfade - require_minimum: Mindestanzahl erforderlich Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.8) Anforderungsanalyse: phase4_anforderungsanalyse.md """ from typing import Dict, Any, Optional, List, NamedTuple from pydantic import BaseModel, Field import logging from workflow_models import ( WorkflowNode, WorkflowGraph, JoinStrategy, SkipHandling, NodeStatus, NormalizedSignal ) logger = logging.getLogger(__name__) # ── Data Structures ─────────────────────────────────────────────────────────── class PathStatus(NamedTuple): """ Status eines eingehenden Pfads am Join-Node. Attributes: node_id: ID des Source-Nodes (Node vor dem Join) status: Ausführungsstatus (EXECUTED, SKIPPED, FAILED) analysis_core: Analyseinhalt (wenn vorhanden) signals: Normalisierte Signale des Nodes (question_type → Signal) """ node_id: str status: NodeStatus analysis_core: Optional[str] signals: Dict[str, NormalizedSignal] # Converted from List to Dict by _collect_incoming_paths class JoinResult(BaseModel): """ Ergebnis der Join-Evaluation. Enthält konsolidierte Daten aller eingehenden Pfade. """ ready: bool = Field(..., description="Sind erforderliche Pfade verfügbar?") consolidated_analysis_core: Dict[str, str] = Field( default_factory=dict, description="Konsolidierte Analysekerne: node_id → analysis_core" ) consolidated_signals: Dict[str, NormalizedSignal] = Field( default_factory=dict, description="Konsolidierte Signale: node_id.question_id → signal" ) metadata: Dict[str, Any] = Field( default_factory=dict, description="Pfad-Status, Statistiken, Hinweise" ) error: Optional[str] = Field(None, description="Fehler bei nicht erfüllter Strategie") # ── Helper Functions ────────────────────────────────────────────────────────── def _collect_incoming_paths( node: WorkflowNode, graph: WorkflowGraph, context: Dict[str, Any] ) -> List[PathStatus]: """ Sammelt alle eingehenden Pfade eines Join-Nodes. Args: node: Join-Node graph: Workflow-Graph context: Execution context mit node_results Returns: List[PathStatus] - Status aller eingehenden Pfade Details: - Findet alle Edges mit to_node == join_node.id - Extrahiert NodeExecutionState aus context["node_results"] - Erstellt PathStatus für jeden incoming node """ incoming_edges = [e for e in graph.edges if e.to_node == node.id] paths: List[PathStatus] = [] logger.debug(f"Join node {node.id}: Found {len(incoming_edges)} incoming edges") for edge in incoming_edges: source_node_id = edge.from_node # Hole NodeExecutionState aus context node_state = context["node_results"].get(source_node_id) if node_state: # Node wurde ausgeführt # Convert List[NormalizedSignal] to Dict[question_type → Signal] signals_dict = {} if node_state.normalized_signals: for signal in node_state.normalized_signals: signals_dict[signal.question_type] = signal path = PathStatus( node_id=source_node_id, status=node_state.status, analysis_core=node_state.analysis_core, signals=signals_dict ) paths.append(path) logger.debug(f" Path {source_node_id}: {node_state.status.value}") else: # Node nicht in node_results → wurde nicht besucht (unreachable) path = PathStatus( node_id=source_node_id, status=NodeStatus.SKIPPED, analysis_core=None, signals={} ) paths.append(path) logger.debug(f" Path {source_node_id}: not visited (unreachable)") return paths def _check_join_strategy( paths: List[PathStatus], strategy: JoinStrategy ) -> tuple[bool, Optional[str]]: """ Prüft ob Join-Strategie erfüllt ist. Args: paths: Liste aller eingehenden Pfade strategy: Join-Strategie (wait_all, wait_any, best_effort) Returns: (ready: bool, error: Optional[str]) - ready: True wenn Strategie erfüllt - error: Fehlermeldung wenn nicht erfüllt Strategien: - wait_all: Alle Pfade EXECUTED (strikt) - wait_any: Mindestens ein Pfad EXECUTED - best_effort: Immer ready (fehlertoleranz) """ executed_paths = [p for p in paths if p.status == NodeStatus.EXECUTED] failed_paths = [p for p in paths if p.status == NodeStatus.FAILED] skipped_paths = [p for p in paths if p.status == NodeStatus.SKIPPED] logger.debug(f"Join strategy check: {strategy.value}") logger.debug(f" Executed: {len(executed_paths)}/{len(paths)}") logger.debug(f" Failed: {len(failed_paths)}") logger.debug(f" Skipped: {len(skipped_paths)}") if strategy == JoinStrategy.WAIT_ALL: # Alle Pfade müssen EXECUTED sein if len(executed_paths) < len(paths): missing = [p.node_id for p in paths if p.status != NodeStatus.EXECUTED] error = f"wait_all strategy failed: {len(executed_paths)}/{len(paths)} paths executed. Missing: {missing}" logger.warning(error) return False, error return True, None elif strategy == JoinStrategy.WAIT_ANY: # Mindestens ein Pfad EXECUTED if len(executed_paths) == 0: error = f"wait_any strategy failed: no paths executed ({len(failed_paths)} failed, {len(skipped_paths)} skipped)" logger.warning(error) return False, error return True, None elif strategy == JoinStrategy.BEST_EFFORT: # Immer bereit (fehlertoleranz) if len(executed_paths) == 0: logger.info(f"best_effort: No paths executed, but continuing (fehlertoleranz)") return True, None else: # Unbekannte Strategie error = f"Unknown join strategy: {strategy}" logger.error(error) return False, error def _merge_analysis_cores( paths: List[PathStatus], skip_handling: Optional[SkipHandling] ) -> Dict[str, str]: """ Merged Analysekerne aller eingehenden Pfade. Args: paths: Liste aller Pfade skip_handling: Umgang mit übersprungenen Pfaden Returns: Dict[node_id → analysis_core] Details: - EXECUTED Pfade: analysis_core übernehmen - SKIPPED Pfade: abhängig von skip_handling - IGNORE_SKIPPED: nicht in Ergebnis - USE_PLACEHOLDER: Platzhalter "[Skipped: {node_id}]" - FAILED Pfade: Platzhalter "[Failed: {node_id}]" """ merged: Dict[str, str] = {} for path in paths: if path.status == NodeStatus.EXECUTED and path.analysis_core: # Normale Übernahme merged[path.node_id] = path.analysis_core elif path.status == NodeStatus.SKIPPED: # Skip-Handling if skip_handling == SkipHandling.USE_PLACEHOLDER: merged[path.node_id] = f"[Path skipped: {path.node_id}]" elif skip_handling == SkipHandling.IGNORE_SKIPPED: # Nicht in Ergebnis pass # REQUIRE_MINIMUM wird in _check_join_strategy behandelt elif path.status == NodeStatus.FAILED: # Failed Pfade dokumentieren merged[path.node_id] = f"[Path failed: {path.node_id}]" logger.debug(f"Merged analysis cores: {len(merged)} entries") return merged def _combine_signals( paths: List[PathStatus] ) -> Dict[str, NormalizedSignal]: """ Kombiniert Signale aller eingehenden Pfade. Args: paths: Liste aller Pfade Returns: Dict[node_id.question_id → NormalizedSignal] Details: - Signal-Namen werden mit node_id geprefixed (verhindert Kollision) - Format: "node_id.question_id" → Signal - Nur EXECUTED Pfade werden berücksichtigt Beispiel: path_a.relevanz → Signal(value="hoch", ...) path_b.relevanz → Signal(value="mittel", ...) """ combined: Dict[str, NormalizedSignal] = {} for path in paths: if path.status != NodeStatus.EXECUTED: # Nur ausgeführte Pfade haben valide Signale continue for signal_key, signal in path.signals.items(): # Präfix mit node_id (verhindert Kollision) prefixed_key = f"{path.node_id}.{signal_key}" combined[prefixed_key] = signal logger.debug(f"Combined signals: {len(combined)} entries") return combined # ── Main Function ───────────────────────────────────────────────────────────── def evaluate_join_node( node: WorkflowNode, graph: WorkflowGraph, context: Dict[str, Any] ) -> JoinResult: """ Evaluiert Join-Node: Sammelt Pfade, prüft Strategie, konsolidiert Ergebnisse. Args: node: Join-Node aus Workflow-Graph graph: Gesamter Workflow-Graph context: Execution context mit node_results Returns: JoinResult mit konsolidierten Daten Workflow: 1. Sammle incoming paths (_collect_incoming_paths) 2. Prüfe Join-Strategie (_check_join_strategy) 3. Merge analysis_cores (_merge_analysis_cores) 4. Combine signals (_combine_signals) 5. Baue metadata (Pfad-Status, Statistiken) Beispiel: >>> result = evaluate_join_node(join_node, graph, context) >>> result.ready True >>> result.consolidated_analysis_core {"path_a": "Analysis A...", "path_b": "Analysis B..."} """ logger.info(f"Evaluating join node: {node.id}") # 1. Sammle incoming paths paths = _collect_incoming_paths(node, graph, context) if not paths: # Kein eingehender Pfad (Fehlkonfiguration) logger.warning(f"Join node {node.id}: No incoming edges found") return JoinResult( ready=False, error="No incoming edges for join node", metadata={"note": "Configuration error: join node must have incoming edges"} ) # 2. Prüfe Join-Strategie strategy = node.join_strategy or JoinStrategy.WAIT_ALL # Default skip_handling = node.skip_handling or SkipHandling.IGNORE_SKIPPED # Default ready, strategy_error = _check_join_strategy(paths, strategy) if not ready: # Strategie nicht erfüllt executed_list = [p.node_id for p in paths if p.status == NodeStatus.EXECUTED] skipped_list = [p.node_id for p in paths if p.status == NodeStatus.SKIPPED] failed_list = [p.node_id for p in paths if p.status == NodeStatus.FAILED] return JoinResult( ready=False, error=strategy_error, metadata={ "join_strategy": strategy.value, "total_paths": len(paths), "executed_paths": len(executed_list), "skipped_paths": len(skipped_list), "failed_paths": len(failed_list), "path_details": { "executed": executed_list, "skipped": skipped_list, "failed": failed_list } } ) # 3. Merge analysis_cores merged_cores = _merge_analysis_cores(paths, skip_handling) # 4. Combine signals combined_signals = _combine_signals(paths) # 5. Baue metadata executed_count = sum(1 for p in paths if p.status == NodeStatus.EXECUTED) skipped_count = sum(1 for p in paths if p.status == NodeStatus.SKIPPED) failed_count = sum(1 for p in paths if p.status == NodeStatus.FAILED) metadata = { "join_strategy": strategy.value, "skip_handling": skip_handling.value, "total_paths": len(paths), "executed_paths": executed_count, "skipped_paths": skipped_count, "failed_paths": failed_count, "path_details": { "executed": [p.node_id for p in paths if p.status == NodeStatus.EXECUTED], "skipped": [p.node_id for p in paths if p.status == NodeStatus.SKIPPED], "failed": [p.node_id for p in paths if p.status == NodeStatus.FAILED] } } # Spezielle Hinweise if executed_count == 0 and strategy == JoinStrategy.BEST_EFFORT: metadata["note"] = "No paths executed, but best_effort allows empty consolidation" logger.info(f"Join node {node.id}: Consolidated {executed_count}/{len(paths)} paths") return JoinResult( ready=True, consolidated_analysis_core=merged_cores, consolidated_signals=combined_signals, metadata=metadata )