Backend Implementation (v0.9m, workflow 0.5.0): - join_evaluator.py (394 lines): Join-Strategie-Evaluator - evaluate_join_node(): Hauptlogik für Join-Node Execution - Join-Strategien: wait_all, wait_any, best_effort - Skip-Handling: ignore_skipped, use_placeholder, require_minimum - Result Consolidation: merge analysis_cores, combine signals - Partial Execution: korrekte Behandlung von SKIPPED/FAILED Pfaden - workflow_executor.py: execute_join_node() Integration - BFS-Traversierung erweitert für Join-Nodes - NodeExecutionState List → Dict Konvertierung für Signale - Signal-Name-Kollisionen via node_id Präfix gelöst Testing (49 Tests passing): - test_phase4_join_nodes.py: 18 neue Unit Tests - Join-Strategien (wait_all, wait_any, best_effort) - Skip-Handling (ignore, placeholder) - Result Consolidation (merge, combine) - Partial Execution (mixed status paths) - Helper Functions (collect, check, merge, combine) - Backward Compatibility: 31 Phase 2/3 Tests (alle passing) - test_phase2_workflow_executor.py: 1 Test aktualisiert - test_phase3_logic_evaluator.py: 20 Tests unverändert Konzept: konzept_workflow_engine_konsolidated.md (Sektion 8.8) Anforderungsanalyse: phase4_anforderungsanalyse.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
397 lines
13 KiB
Python
397 lines
13 KiB
Python
"""
|
|
Join Evaluator (Phase 4)
|
|
|
|
Evaluiert Join-Knoten: Sammelt incoming paths, prüft Strategien, konsolidiert Ergebnisse.
|
|
|
|
Join-Strategien:
|
|
- wait_all: Alle eingehenden Pfade müssen ausgeführt sein (strikt)
|
|
- wait_any: Mindestens ein Pfad muss ausgeführt sein
|
|
- best_effort: Verwendet verfügbare Pfade (fehlertoleranz)
|
|
|
|
Skip-Handling:
|
|
- ignore_skipped: Übersprungene Pfade nicht in Ergebnis
|
|
- use_placeholder: Platzhalter für übersprungene Pfade
|
|
- require_minimum: Mindestanzahl erforderlich
|
|
|
|
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.8)
|
|
Anforderungsanalyse: phase4_anforderungsanalyse.md
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional, List, NamedTuple
|
|
from pydantic import BaseModel, Field
|
|
import logging
|
|
|
|
from workflow_models import (
|
|
WorkflowNode,
|
|
WorkflowGraph,
|
|
JoinStrategy,
|
|
SkipHandling,
|
|
NodeStatus,
|
|
NormalizedSignal
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ── Data Structures ───────────────────────────────────────────────────────────
|
|
|
|
|
|
class PathStatus(NamedTuple):
|
|
"""
|
|
Status eines eingehenden Pfads am Join-Node.
|
|
|
|
Attributes:
|
|
node_id: ID des Source-Nodes (Node vor dem Join)
|
|
status: Ausführungsstatus (EXECUTED, SKIPPED, FAILED)
|
|
analysis_core: Analyseinhalt (wenn vorhanden)
|
|
signals: Normalisierte Signale des Nodes (question_type → Signal)
|
|
"""
|
|
node_id: str
|
|
status: NodeStatus
|
|
analysis_core: Optional[str]
|
|
signals: Dict[str, NormalizedSignal] # Converted from List to Dict by _collect_incoming_paths
|
|
|
|
|
|
class JoinResult(BaseModel):
|
|
"""
|
|
Ergebnis der Join-Evaluation.
|
|
|
|
Enthält konsolidierte Daten aller eingehenden Pfade.
|
|
"""
|
|
ready: bool = Field(..., description="Sind erforderliche Pfade verfügbar?")
|
|
consolidated_analysis_core: Dict[str, str] = Field(
|
|
default_factory=dict,
|
|
description="Konsolidierte Analysekerne: node_id → analysis_core"
|
|
)
|
|
consolidated_signals: Dict[str, NormalizedSignal] = Field(
|
|
default_factory=dict,
|
|
description="Konsolidierte Signale: node_id.question_id → signal"
|
|
)
|
|
metadata: Dict[str, Any] = Field(
|
|
default_factory=dict,
|
|
description="Pfad-Status, Statistiken, Hinweise"
|
|
)
|
|
error: Optional[str] = Field(None, description="Fehler bei nicht erfüllter Strategie")
|
|
|
|
|
|
# ── Helper Functions ──────────────────────────────────────────────────────────
|
|
|
|
|
|
def _collect_incoming_paths(
|
|
node: WorkflowNode,
|
|
graph: WorkflowGraph,
|
|
context: Dict[str, Any]
|
|
) -> List[PathStatus]:
|
|
"""
|
|
Sammelt alle eingehenden Pfade eines Join-Nodes.
|
|
|
|
Args:
|
|
node: Join-Node
|
|
graph: Workflow-Graph
|
|
context: Execution context mit node_results
|
|
|
|
Returns:
|
|
List[PathStatus] - Status aller eingehenden Pfade
|
|
|
|
Details:
|
|
- Findet alle Edges mit to_node == join_node.id
|
|
- Extrahiert NodeExecutionState aus context["node_results"]
|
|
- Erstellt PathStatus für jeden incoming node
|
|
"""
|
|
incoming_edges = [e for e in graph.edges if e.to_node == node.id]
|
|
paths: List[PathStatus] = []
|
|
|
|
logger.debug(f"Join node {node.id}: Found {len(incoming_edges)} incoming edges")
|
|
|
|
for edge in incoming_edges:
|
|
source_node_id = edge.from_node
|
|
|
|
# Hole NodeExecutionState aus context
|
|
node_state = context["node_results"].get(source_node_id)
|
|
|
|
if node_state:
|
|
# Node wurde ausgeführt
|
|
# Convert List[NormalizedSignal] to Dict[question_type → Signal]
|
|
signals_dict = {}
|
|
if node_state.normalized_signals:
|
|
for signal in node_state.normalized_signals:
|
|
signals_dict[signal.question_type] = signal
|
|
|
|
path = PathStatus(
|
|
node_id=source_node_id,
|
|
status=node_state.status,
|
|
analysis_core=node_state.analysis_core,
|
|
signals=signals_dict
|
|
)
|
|
paths.append(path)
|
|
logger.debug(f" Path {source_node_id}: {node_state.status.value}")
|
|
else:
|
|
# Node nicht in node_results → wurde nicht besucht (unreachable)
|
|
path = PathStatus(
|
|
node_id=source_node_id,
|
|
status=NodeStatus.SKIPPED,
|
|
analysis_core=None,
|
|
signals={}
|
|
)
|
|
paths.append(path)
|
|
logger.debug(f" Path {source_node_id}: not visited (unreachable)")
|
|
|
|
return paths
|
|
|
|
|
|
def _check_join_strategy(
|
|
paths: List[PathStatus],
|
|
strategy: JoinStrategy
|
|
) -> tuple[bool, Optional[str]]:
|
|
"""
|
|
Prüft ob Join-Strategie erfüllt ist.
|
|
|
|
Args:
|
|
paths: Liste aller eingehenden Pfade
|
|
strategy: Join-Strategie (wait_all, wait_any, best_effort)
|
|
|
|
Returns:
|
|
(ready: bool, error: Optional[str])
|
|
- ready: True wenn Strategie erfüllt
|
|
- error: Fehlermeldung wenn nicht erfüllt
|
|
|
|
Strategien:
|
|
- wait_all: Alle Pfade EXECUTED (strikt)
|
|
- wait_any: Mindestens ein Pfad EXECUTED
|
|
- best_effort: Immer ready (fehlertoleranz)
|
|
"""
|
|
executed_paths = [p for p in paths if p.status == NodeStatus.EXECUTED]
|
|
failed_paths = [p for p in paths if p.status == NodeStatus.FAILED]
|
|
skipped_paths = [p for p in paths if p.status == NodeStatus.SKIPPED]
|
|
|
|
logger.debug(f"Join strategy check: {strategy.value}")
|
|
logger.debug(f" Executed: {len(executed_paths)}/{len(paths)}")
|
|
logger.debug(f" Failed: {len(failed_paths)}")
|
|
logger.debug(f" Skipped: {len(skipped_paths)}")
|
|
|
|
if strategy == JoinStrategy.WAIT_ALL:
|
|
# Alle Pfade müssen EXECUTED sein
|
|
if len(executed_paths) < len(paths):
|
|
missing = [p.node_id for p in paths if p.status != NodeStatus.EXECUTED]
|
|
error = f"wait_all strategy failed: {len(executed_paths)}/{len(paths)} paths executed. Missing: {missing}"
|
|
logger.warning(error)
|
|
return False, error
|
|
return True, None
|
|
|
|
elif strategy == JoinStrategy.WAIT_ANY:
|
|
# Mindestens ein Pfad EXECUTED
|
|
if len(executed_paths) == 0:
|
|
error = f"wait_any strategy failed: no paths executed ({len(failed_paths)} failed, {len(skipped_paths)} skipped)"
|
|
logger.warning(error)
|
|
return False, error
|
|
return True, None
|
|
|
|
elif strategy == JoinStrategy.BEST_EFFORT:
|
|
# Immer bereit (fehlertoleranz)
|
|
if len(executed_paths) == 0:
|
|
logger.info(f"best_effort: No paths executed, but continuing (fehlertoleranz)")
|
|
return True, None
|
|
|
|
else:
|
|
# Unbekannte Strategie
|
|
error = f"Unknown join strategy: {strategy}"
|
|
logger.error(error)
|
|
return False, error
|
|
|
|
|
|
def _merge_analysis_cores(
|
|
paths: List[PathStatus],
|
|
skip_handling: Optional[SkipHandling]
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Merged Analysekerne aller eingehenden Pfade.
|
|
|
|
Args:
|
|
paths: Liste aller Pfade
|
|
skip_handling: Umgang mit übersprungenen Pfaden
|
|
|
|
Returns:
|
|
Dict[node_id → analysis_core]
|
|
|
|
Details:
|
|
- EXECUTED Pfade: analysis_core übernehmen
|
|
- SKIPPED Pfade: abhängig von skip_handling
|
|
- IGNORE_SKIPPED: nicht in Ergebnis
|
|
- USE_PLACEHOLDER: Platzhalter "[Skipped: {node_id}]"
|
|
- FAILED Pfade: Platzhalter "[Failed: {node_id}]"
|
|
"""
|
|
merged: Dict[str, str] = {}
|
|
|
|
for path in paths:
|
|
if path.status == NodeStatus.EXECUTED and path.analysis_core:
|
|
# Normale Übernahme
|
|
merged[path.node_id] = path.analysis_core
|
|
|
|
elif path.status == NodeStatus.SKIPPED:
|
|
# Skip-Handling
|
|
if skip_handling == SkipHandling.USE_PLACEHOLDER:
|
|
merged[path.node_id] = f"[Path skipped: {path.node_id}]"
|
|
elif skip_handling == SkipHandling.IGNORE_SKIPPED:
|
|
# Nicht in Ergebnis
|
|
pass
|
|
# REQUIRE_MINIMUM wird in _check_join_strategy behandelt
|
|
|
|
elif path.status == NodeStatus.FAILED:
|
|
# Failed Pfade dokumentieren
|
|
merged[path.node_id] = f"[Path failed: {path.node_id}]"
|
|
|
|
logger.debug(f"Merged analysis cores: {len(merged)} entries")
|
|
return merged
|
|
|
|
|
|
def _combine_signals(
|
|
paths: List[PathStatus]
|
|
) -> Dict[str, NormalizedSignal]:
|
|
"""
|
|
Kombiniert Signale aller eingehenden Pfade.
|
|
|
|
Args:
|
|
paths: Liste aller Pfade
|
|
|
|
Returns:
|
|
Dict[node_id.question_id → NormalizedSignal]
|
|
|
|
Details:
|
|
- Signal-Namen werden mit node_id geprefixed (verhindert Kollision)
|
|
- Format: "node_id.question_id" → Signal
|
|
- Nur EXECUTED Pfade werden berücksichtigt
|
|
|
|
Beispiel:
|
|
path_a.relevanz → Signal(value="hoch", ...)
|
|
path_b.relevanz → Signal(value="mittel", ...)
|
|
"""
|
|
combined: Dict[str, NormalizedSignal] = {}
|
|
|
|
for path in paths:
|
|
if path.status != NodeStatus.EXECUTED:
|
|
# Nur ausgeführte Pfade haben valide Signale
|
|
continue
|
|
|
|
for signal_key, signal in path.signals.items():
|
|
# Präfix mit node_id (verhindert Kollision)
|
|
prefixed_key = f"{path.node_id}.{signal_key}"
|
|
combined[prefixed_key] = signal
|
|
|
|
logger.debug(f"Combined signals: {len(combined)} entries")
|
|
return combined
|
|
|
|
|
|
# ── Main Function ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
def evaluate_join_node(
|
|
node: WorkflowNode,
|
|
graph: WorkflowGraph,
|
|
context: Dict[str, Any]
|
|
) -> JoinResult:
|
|
"""
|
|
Evaluiert Join-Node: Sammelt Pfade, prüft Strategie, konsolidiert Ergebnisse.
|
|
|
|
Args:
|
|
node: Join-Node aus Workflow-Graph
|
|
graph: Gesamter Workflow-Graph
|
|
context: Execution context mit node_results
|
|
|
|
Returns:
|
|
JoinResult mit konsolidierten Daten
|
|
|
|
Workflow:
|
|
1. Sammle incoming paths (_collect_incoming_paths)
|
|
2. Prüfe Join-Strategie (_check_join_strategy)
|
|
3. Merge analysis_cores (_merge_analysis_cores)
|
|
4. Combine signals (_combine_signals)
|
|
5. Baue metadata (Pfad-Status, Statistiken)
|
|
|
|
Beispiel:
|
|
>>> result = evaluate_join_node(join_node, graph, context)
|
|
>>> result.ready
|
|
True
|
|
>>> result.consolidated_analysis_core
|
|
{"path_a": "Analysis A...", "path_b": "Analysis B..."}
|
|
"""
|
|
logger.info(f"Evaluating join node: {node.id}")
|
|
|
|
# 1. Sammle incoming paths
|
|
paths = _collect_incoming_paths(node, graph, context)
|
|
|
|
if not paths:
|
|
# Kein eingehender Pfad (Fehlkonfiguration)
|
|
logger.warning(f"Join node {node.id}: No incoming edges found")
|
|
return JoinResult(
|
|
ready=False,
|
|
error="No incoming edges for join node",
|
|
metadata={"note": "Configuration error: join node must have incoming edges"}
|
|
)
|
|
|
|
# 2. Prüfe Join-Strategie
|
|
strategy = node.join_strategy or JoinStrategy.WAIT_ALL # Default
|
|
skip_handling = node.skip_handling or SkipHandling.IGNORE_SKIPPED # Default
|
|
|
|
ready, strategy_error = _check_join_strategy(paths, strategy)
|
|
|
|
if not ready:
|
|
# Strategie nicht erfüllt
|
|
executed_list = [p.node_id for p in paths if p.status == NodeStatus.EXECUTED]
|
|
skipped_list = [p.node_id for p in paths if p.status == NodeStatus.SKIPPED]
|
|
failed_list = [p.node_id for p in paths if p.status == NodeStatus.FAILED]
|
|
|
|
return JoinResult(
|
|
ready=False,
|
|
error=strategy_error,
|
|
metadata={
|
|
"join_strategy": strategy.value,
|
|
"total_paths": len(paths),
|
|
"executed_paths": len(executed_list),
|
|
"skipped_paths": len(skipped_list),
|
|
"failed_paths": len(failed_list),
|
|
"path_details": {
|
|
"executed": executed_list,
|
|
"skipped": skipped_list,
|
|
"failed": failed_list
|
|
}
|
|
}
|
|
)
|
|
|
|
# 3. Merge analysis_cores
|
|
merged_cores = _merge_analysis_cores(paths, skip_handling)
|
|
|
|
# 4. Combine signals
|
|
combined_signals = _combine_signals(paths)
|
|
|
|
# 5. Baue metadata
|
|
executed_count = sum(1 for p in paths if p.status == NodeStatus.EXECUTED)
|
|
skipped_count = sum(1 for p in paths if p.status == NodeStatus.SKIPPED)
|
|
failed_count = sum(1 for p in paths if p.status == NodeStatus.FAILED)
|
|
|
|
metadata = {
|
|
"join_strategy": strategy.value,
|
|
"skip_handling": skip_handling.value,
|
|
"total_paths": len(paths),
|
|
"executed_paths": executed_count,
|
|
"skipped_paths": skipped_count,
|
|
"failed_paths": failed_count,
|
|
"path_details": {
|
|
"executed": [p.node_id for p in paths if p.status == NodeStatus.EXECUTED],
|
|
"skipped": [p.node_id for p in paths if p.status == NodeStatus.SKIPPED],
|
|
"failed": [p.node_id for p in paths if p.status == NodeStatus.FAILED]
|
|
}
|
|
}
|
|
|
|
# Spezielle Hinweise
|
|
if executed_count == 0 and strategy == JoinStrategy.BEST_EFFORT:
|
|
metadata["note"] = "No paths executed, but best_effort allows empty consolidation"
|
|
|
|
logger.info(f"Join node {node.id}: Consolidated {executed_count}/{len(paths)} paths")
|
|
|
|
return JoinResult(
|
|
ready=True,
|
|
consolidated_analysis_core=merged_cores,
|
|
consolidated_signals=combined_signals,
|
|
metadata=metadata
|
|
)
|