mitai-jinkendo/backend/join_evaluator.py
Lars e2a132353d
All checks were successful
Deploy Development / deploy (push) Successful in 45s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: Phase 4 - Join Nodes and Path Consolidation
Backend Implementation (v0.9m, workflow 0.5.0):
- join_evaluator.py (394 lines): Join-Strategie-Evaluator
  - evaluate_join_node(): Hauptlogik für Join-Node Execution
  - Join-Strategien: wait_all, wait_any, best_effort
  - Skip-Handling: ignore_skipped, use_placeholder, require_minimum
  - Result Consolidation: merge analysis_cores, combine signals
  - Partial Execution: korrekte Behandlung von SKIPPED/FAILED Pfaden

- workflow_executor.py: execute_join_node() Integration
  - BFS-Traversierung erweitert für Join-Nodes
  - NodeExecutionState List → Dict Konvertierung für Signale
  - Signal-Name-Kollisionen via node_id Präfix gelöst

Testing (49 Tests passing):
- test_phase4_join_nodes.py: 18 neue Unit Tests
  - Join-Strategien (wait_all, wait_any, best_effort)
  - Skip-Handling (ignore, placeholder)
  - Result Consolidation (merge, combine)
  - Partial Execution (mixed status paths)
  - Helper Functions (collect, check, merge, combine)

- Backward Compatibility: 31 Phase 2/3 Tests (alle passing)
  - test_phase2_workflow_executor.py: 1 Test aktualisiert
  - test_phase3_logic_evaluator.py: 20 Tests unverändert

Konzept: konzept_workflow_engine_konsolidated.md (Sektion 8.8)
Anforderungsanalyse: phase4_anforderungsanalyse.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-04 12:27:31 +02:00

397 lines
13 KiB
Python

"""
Join Evaluator (Phase 4)
Evaluiert Join-Knoten: Sammelt incoming paths, prüft Strategien, konsolidiert Ergebnisse.
Join-Strategien:
- wait_all: Alle eingehenden Pfade müssen ausgeführt sein (strikt)
- wait_any: Mindestens ein Pfad muss ausgeführt sein
- best_effort: Verwendet verfügbare Pfade (fehlertoleranz)
Skip-Handling:
- ignore_skipped: Übersprungene Pfade nicht in Ergebnis
- use_placeholder: Platzhalter für übersprungene Pfade
- require_minimum: Mindestanzahl erforderlich
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.8)
Anforderungsanalyse: phase4_anforderungsanalyse.md
"""
from typing import Dict, Any, Optional, List, NamedTuple
from pydantic import BaseModel, Field
import logging
from workflow_models import (
WorkflowNode,
WorkflowGraph,
JoinStrategy,
SkipHandling,
NodeStatus,
NormalizedSignal
)
logger = logging.getLogger(__name__)
# ── Data Structures ───────────────────────────────────────────────────────────
class PathStatus(NamedTuple):
"""
Status eines eingehenden Pfads am Join-Node.
Attributes:
node_id: ID des Source-Nodes (Node vor dem Join)
status: Ausführungsstatus (EXECUTED, SKIPPED, FAILED)
analysis_core: Analyseinhalt (wenn vorhanden)
signals: Normalisierte Signale des Nodes (question_type → Signal)
"""
node_id: str
status: NodeStatus
analysis_core: Optional[str]
signals: Dict[str, NormalizedSignal] # Converted from List to Dict by _collect_incoming_paths
class JoinResult(BaseModel):
"""
Ergebnis der Join-Evaluation.
Enthält konsolidierte Daten aller eingehenden Pfade.
"""
ready: bool = Field(..., description="Sind erforderliche Pfade verfügbar?")
consolidated_analysis_core: Dict[str, str] = Field(
default_factory=dict,
description="Konsolidierte Analysekerne: node_id → analysis_core"
)
consolidated_signals: Dict[str, NormalizedSignal] = Field(
default_factory=dict,
description="Konsolidierte Signale: node_id.question_id → signal"
)
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Pfad-Status, Statistiken, Hinweise"
)
error: Optional[str] = Field(None, description="Fehler bei nicht erfüllter Strategie")
# ── Helper Functions ──────────────────────────────────────────────────────────
def _collect_incoming_paths(
node: WorkflowNode,
graph: WorkflowGraph,
context: Dict[str, Any]
) -> List[PathStatus]:
"""
Sammelt alle eingehenden Pfade eines Join-Nodes.
Args:
node: Join-Node
graph: Workflow-Graph
context: Execution context mit node_results
Returns:
List[PathStatus] - Status aller eingehenden Pfade
Details:
- Findet alle Edges mit to_node == join_node.id
- Extrahiert NodeExecutionState aus context["node_results"]
- Erstellt PathStatus für jeden incoming node
"""
incoming_edges = [e for e in graph.edges if e.to_node == node.id]
paths: List[PathStatus] = []
logger.debug(f"Join node {node.id}: Found {len(incoming_edges)} incoming edges")
for edge in incoming_edges:
source_node_id = edge.from_node
# Hole NodeExecutionState aus context
node_state = context["node_results"].get(source_node_id)
if node_state:
# Node wurde ausgeführt
# Convert List[NormalizedSignal] to Dict[question_type → Signal]
signals_dict = {}
if node_state.normalized_signals:
for signal in node_state.normalized_signals:
signals_dict[signal.question_type] = signal
path = PathStatus(
node_id=source_node_id,
status=node_state.status,
analysis_core=node_state.analysis_core,
signals=signals_dict
)
paths.append(path)
logger.debug(f" Path {source_node_id}: {node_state.status.value}")
else:
# Node nicht in node_results → wurde nicht besucht (unreachable)
path = PathStatus(
node_id=source_node_id,
status=NodeStatus.SKIPPED,
analysis_core=None,
signals={}
)
paths.append(path)
logger.debug(f" Path {source_node_id}: not visited (unreachable)")
return paths
def _check_join_strategy(
paths: List[PathStatus],
strategy: JoinStrategy
) -> tuple[bool, Optional[str]]:
"""
Prüft ob Join-Strategie erfüllt ist.
Args:
paths: Liste aller eingehenden Pfade
strategy: Join-Strategie (wait_all, wait_any, best_effort)
Returns:
(ready: bool, error: Optional[str])
- ready: True wenn Strategie erfüllt
- error: Fehlermeldung wenn nicht erfüllt
Strategien:
- wait_all: Alle Pfade EXECUTED (strikt)
- wait_any: Mindestens ein Pfad EXECUTED
- best_effort: Immer ready (fehlertoleranz)
"""
executed_paths = [p for p in paths if p.status == NodeStatus.EXECUTED]
failed_paths = [p for p in paths if p.status == NodeStatus.FAILED]
skipped_paths = [p for p in paths if p.status == NodeStatus.SKIPPED]
logger.debug(f"Join strategy check: {strategy.value}")
logger.debug(f" Executed: {len(executed_paths)}/{len(paths)}")
logger.debug(f" Failed: {len(failed_paths)}")
logger.debug(f" Skipped: {len(skipped_paths)}")
if strategy == JoinStrategy.WAIT_ALL:
# Alle Pfade müssen EXECUTED sein
if len(executed_paths) < len(paths):
missing = [p.node_id for p in paths if p.status != NodeStatus.EXECUTED]
error = f"wait_all strategy failed: {len(executed_paths)}/{len(paths)} paths executed. Missing: {missing}"
logger.warning(error)
return False, error
return True, None
elif strategy == JoinStrategy.WAIT_ANY:
# Mindestens ein Pfad EXECUTED
if len(executed_paths) == 0:
error = f"wait_any strategy failed: no paths executed ({len(failed_paths)} failed, {len(skipped_paths)} skipped)"
logger.warning(error)
return False, error
return True, None
elif strategy == JoinStrategy.BEST_EFFORT:
# Immer bereit (fehlertoleranz)
if len(executed_paths) == 0:
logger.info(f"best_effort: No paths executed, but continuing (fehlertoleranz)")
return True, None
else:
# Unbekannte Strategie
error = f"Unknown join strategy: {strategy}"
logger.error(error)
return False, error
def _merge_analysis_cores(
paths: List[PathStatus],
skip_handling: Optional[SkipHandling]
) -> Dict[str, str]:
"""
Merged Analysekerne aller eingehenden Pfade.
Args:
paths: Liste aller Pfade
skip_handling: Umgang mit übersprungenen Pfaden
Returns:
Dict[node_id → analysis_core]
Details:
- EXECUTED Pfade: analysis_core übernehmen
- SKIPPED Pfade: abhängig von skip_handling
- IGNORE_SKIPPED: nicht in Ergebnis
- USE_PLACEHOLDER: Platzhalter "[Skipped: {node_id}]"
- FAILED Pfade: Platzhalter "[Failed: {node_id}]"
"""
merged: Dict[str, str] = {}
for path in paths:
if path.status == NodeStatus.EXECUTED and path.analysis_core:
# Normale Übernahme
merged[path.node_id] = path.analysis_core
elif path.status == NodeStatus.SKIPPED:
# Skip-Handling
if skip_handling == SkipHandling.USE_PLACEHOLDER:
merged[path.node_id] = f"[Path skipped: {path.node_id}]"
elif skip_handling == SkipHandling.IGNORE_SKIPPED:
# Nicht in Ergebnis
pass
# REQUIRE_MINIMUM wird in _check_join_strategy behandelt
elif path.status == NodeStatus.FAILED:
# Failed Pfade dokumentieren
merged[path.node_id] = f"[Path failed: {path.node_id}]"
logger.debug(f"Merged analysis cores: {len(merged)} entries")
return merged
def _combine_signals(
paths: List[PathStatus]
) -> Dict[str, NormalizedSignal]:
"""
Kombiniert Signale aller eingehenden Pfade.
Args:
paths: Liste aller Pfade
Returns:
Dict[node_id.question_id → NormalizedSignal]
Details:
- Signal-Namen werden mit node_id geprefixed (verhindert Kollision)
- Format: "node_id.question_id" → Signal
- Nur EXECUTED Pfade werden berücksichtigt
Beispiel:
path_a.relevanz → Signal(value="hoch", ...)
path_b.relevanz → Signal(value="mittel", ...)
"""
combined: Dict[str, NormalizedSignal] = {}
for path in paths:
if path.status != NodeStatus.EXECUTED:
# Nur ausgeführte Pfade haben valide Signale
continue
for signal_key, signal in path.signals.items():
# Präfix mit node_id (verhindert Kollision)
prefixed_key = f"{path.node_id}.{signal_key}"
combined[prefixed_key] = signal
logger.debug(f"Combined signals: {len(combined)} entries")
return combined
# ── Main Function ─────────────────────────────────────────────────────────────
def evaluate_join_node(
node: WorkflowNode,
graph: WorkflowGraph,
context: Dict[str, Any]
) -> JoinResult:
"""
Evaluiert Join-Node: Sammelt Pfade, prüft Strategie, konsolidiert Ergebnisse.
Args:
node: Join-Node aus Workflow-Graph
graph: Gesamter Workflow-Graph
context: Execution context mit node_results
Returns:
JoinResult mit konsolidierten Daten
Workflow:
1. Sammle incoming paths (_collect_incoming_paths)
2. Prüfe Join-Strategie (_check_join_strategy)
3. Merge analysis_cores (_merge_analysis_cores)
4. Combine signals (_combine_signals)
5. Baue metadata (Pfad-Status, Statistiken)
Beispiel:
>>> result = evaluate_join_node(join_node, graph, context)
>>> result.ready
True
>>> result.consolidated_analysis_core
{"path_a": "Analysis A...", "path_b": "Analysis B..."}
"""
logger.info(f"Evaluating join node: {node.id}")
# 1. Sammle incoming paths
paths = _collect_incoming_paths(node, graph, context)
if not paths:
# Kein eingehender Pfad (Fehlkonfiguration)
logger.warning(f"Join node {node.id}: No incoming edges found")
return JoinResult(
ready=False,
error="No incoming edges for join node",
metadata={"note": "Configuration error: join node must have incoming edges"}
)
# 2. Prüfe Join-Strategie
strategy = node.join_strategy or JoinStrategy.WAIT_ALL # Default
skip_handling = node.skip_handling or SkipHandling.IGNORE_SKIPPED # Default
ready, strategy_error = _check_join_strategy(paths, strategy)
if not ready:
# Strategie nicht erfüllt
executed_list = [p.node_id for p in paths if p.status == NodeStatus.EXECUTED]
skipped_list = [p.node_id for p in paths if p.status == NodeStatus.SKIPPED]
failed_list = [p.node_id for p in paths if p.status == NodeStatus.FAILED]
return JoinResult(
ready=False,
error=strategy_error,
metadata={
"join_strategy": strategy.value,
"total_paths": len(paths),
"executed_paths": len(executed_list),
"skipped_paths": len(skipped_list),
"failed_paths": len(failed_list),
"path_details": {
"executed": executed_list,
"skipped": skipped_list,
"failed": failed_list
}
}
)
# 3. Merge analysis_cores
merged_cores = _merge_analysis_cores(paths, skip_handling)
# 4. Combine signals
combined_signals = _combine_signals(paths)
# 5. Baue metadata
executed_count = sum(1 for p in paths if p.status == NodeStatus.EXECUTED)
skipped_count = sum(1 for p in paths if p.status == NodeStatus.SKIPPED)
failed_count = sum(1 for p in paths if p.status == NodeStatus.FAILED)
metadata = {
"join_strategy": strategy.value,
"skip_handling": skip_handling.value,
"total_paths": len(paths),
"executed_paths": executed_count,
"skipped_paths": skipped_count,
"failed_paths": failed_count,
"path_details": {
"executed": [p.node_id for p in paths if p.status == NodeStatus.EXECUTED],
"skipped": [p.node_id for p in paths if p.status == NodeStatus.SKIPPED],
"failed": [p.node_id for p in paths if p.status == NodeStatus.FAILED]
}
}
# Spezielle Hinweise
if executed_count == 0 and strategy == JoinStrategy.BEST_EFFORT:
metadata["note"] = "No paths executed, but best_effort allows empty consolidation"
logger.info(f"Join node {node.id}: Consolidated {executed_count}/{len(paths)} paths")
return JoinResult(
ready=True,
consolidated_analysis_core=merged_cores,
consolidated_signals=combined_signals,
metadata=metadata
)