mitai-jinkendo/backend/workflow_executor.py
Lars aeb0ee6ad9
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 15s
debug: Add comprehensive placeholder resolution logging to workflow executor
- Log placeholder loading (count, sample keys)
- Log key cleaning process (before/after)
- Log sample values (name, age, geschlecht)
- Log template before/after resolution
- Log resolved and unresolved placeholders
- Add .strip() to key cleaning to handle spaces

This will help diagnose why {{ name }}, {{ age }}, {{ geschlecht }} are not resolving in inline templates.
Issue: Part 3 Inline Prompts - placeholder resolution debugging
2026-04-11 09:38:18 +02:00

1054 lines
40 KiB
Python

"""
Workflow Executor (Phase 4)
Führt Workflows mit conditional branching und path consolidation aus.
Phase 2: Sequential execution
Phase 3: Conditional branching, Logic Nodes, Fallback strategies
Phase 4: Join Nodes, Path consolidation
Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2-4)
"""
from typing import Dict, Any, List, Optional, Set
from datetime import datetime
import uuid
import logging
import json
from jinja2 import Environment, ChainableUndefined, TemplateError
from workflow_models import (
WorkflowGraph, WorkflowNode, NodeExecutionState, ExecutionResult,
NodeStatus, NormalizedSignal, FallbackStrategy, SignalStatus,
EndNodeOutputMode
)
from workflow_engine import parse_workflow_graph, get_execution_order
from question_augmenter import (
augment_prompt_with_questions,
parse_question_augmentations_from_jsonb
)
from result_container_parser import parse_result_container
from normalization_engine import normalize_all_signals, normalize_decision_signal, load_question_catalog
from logic_evaluator import evaluate_logic_expression, resolve_signal_reference
from join_evaluator import evaluate_join_node as evaluate_join_node_core
from db import get_db, get_cursor
logger = logging.getLogger(__name__)
async def execute_workflow(
workflow_id: Optional[str] = None,
graph_data: Optional[Dict] = None, # Phase 5: Direkt von ai_prompts.graph_data
profile_id: str = None,
variables: Dict[str, Any] = None,
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
enable_debug: bool = False
) -> ExecutionResult:
"""
Führt einen Workflow aus (mit conditional branching und path consolidation).
Phase 2: Linear execution in topological order.
Phase 3: Conditional branching basierend auf logic nodes.
Phase 4: Join nodes und path consolidation.
Phase 5: Unterstützt graph_data direkt (aus ai_prompts, nicht workflow_definitions)
Args:
workflow_id: UUID des Workflows (legacy, für workflow_definitions Tabelle)
graph_data: Workflow-Graph als Dict (NEU, für ai_prompts.graph_data)
profile_id: UUID des Profils
variables: Platzhalter-Werte (z.B. {"name": "Lars", ...})
openrouter_call_func: async (prompt, model) -> str
enable_debug: Debug-Modus
Returns:
ExecutionResult mit allen node_states
Beispiel (Phase 5):
>>> result = await execute_workflow(
... graph_data={"nodes": [...], "edges": [...]},
... profile_id="test-profile",
... variables={"name": "Lars"},
... openrouter_call_func=my_llm_func
... )
"""
execution_id = str(uuid.uuid4())
started_at = datetime.utcnow().isoformat()
logger.info(f"Starting workflow execution: {execution_id}")
try:
# 1. Lade Workflow-Definition
if graph_data:
# Phase 5: Graph direkt aus ai_prompts.graph_data (already a dict)
graph_dict = graph_data
logger.debug(f"Using provided graph_data")
elif workflow_id:
# Phase 0-4: Graph aus workflow_definitions Tabelle (legacy)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT graph FROM workflow_definitions WHERE id = %s AND active = true",
(workflow_id,)
)
row = cur.fetchone()
if not row:
raise ValueError(f"Workflow not found: {workflow_id}")
graph_dict = row['graph'] # PostgreSQL JSONB returns dict
logger.debug(f"Loaded graph from workflow_definitions: {workflow_id}")
else:
raise ValueError("Entweder workflow_id oder graph_data muss übergeben werden")
# 2. Parse Graph
graph = parse_workflow_graph(graph_dict)
logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
# 4. Lade Question Catalog
with get_db() as conn:
catalog = load_question_catalog(conn)
logger.debug(f"Loaded catalog: {len(catalog)} question types")
# 5. Execute Nodes mit conditional branching (Phase 3)
node_states: List[NodeExecutionState] = []
context = {
"variables": variables,
"profile_id": profile_id,
"node_results": {}, # Phase 3: Store full NodeExecutionState
"active_edges": {}, # Phase 3: Track edge activation
"graph": graph # Phase 4: Needed for End Node to access questions
}
# Phase 3: BFS traversal mit edge activation
start_node = next((n for n in graph.nodes if n.type == "start"), None)
if not start_node:
raise ValueError("No start node found in workflow")
visited: Set[str] = set()
queue = [start_node.id]
while queue:
node_id = queue.pop(0)
if node_id in visited:
continue
visited.add(node_id)
node = next(n for n in graph.nodes if n.id == node_id)
# Prüfe ob Node aktiv ist (mindestens eine incoming edge aktiv)
if not _has_active_incoming_edge(node, graph, context):
logger.info(f"Skipping node {node_id}: no active incoming edges")
node_states.append(NodeExecutionState(
node_id=node_id,
status=NodeStatus.SKIPPED,
error="no_active_incoming_edges",
started_at=datetime.utcnow().isoformat(),
completed_at=datetime.utcnow().isoformat()
))
continue
logger.info(f"Executing node: {node_id} (type: {node.type})")
node_state = await execute_node(
node=node,
context=context,
catalog=catalog,
graph=graph, # Phase 3: Needed for logic nodes
openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug
)
node_states.append(node_state)
context["node_results"][node_id] = node_state
# Füge Nachfolger zur Queue hinzu
outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
for edge in outgoing_edges:
# Prüfe ob Edge aktiv ist (default: True, Logic Nodes setzen auf False)
if context["active_edges"].get(edge.id, True):
queue.append(edge.to_node)
# 6. Aggregiere Ergebnisse
aggregated = aggregate_results(node_states, graph)
# 7. Speichere Execution State
completed_at = datetime.utcnow().isoformat()
save_execution_state(
execution_id=execution_id,
workflow_id=workflow_id,
profile_id=profile_id,
node_states=node_states,
status="completed",
started_at=started_at,
completed_at=completed_at
)
logger.info(f"Workflow execution completed: {execution_id}")
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
status="completed",
node_states=node_states,
aggregated_result=aggregated,
started_at=started_at,
completed_at=completed_at
)
except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True)
# Speichere Failed State
completed_at = datetime.utcnow().isoformat()
save_execution_state(
execution_id=execution_id,
workflow_id=workflow_id, # Can be None when graph_data is used
profile_id=profile_id,
node_states=node_states if 'node_states' in locals() else [],
status="failed",
started_at=started_at,
completed_at=completed_at,
error=str(e)
)
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id or "N/A", # ExecutionResult requires string, use placeholder
status="failed",
node_states=node_states if 'node_states' in locals() else [],
aggregated_result={},
started_at=started_at,
completed_at=completed_at,
error=str(e)
)
async def execute_node(
node,
context: Dict[str, Any],
catalog: Dict[str, Dict],
graph: WorkflowGraph, # Phase 3: Needed for logic nodes
openrouter_call_func,
enable_debug: bool = False
) -> NodeExecutionState:
"""
Führt einen einzelnen Knoten aus.
Args:
node: WorkflowNode (aus graph.nodes)
context: Execution context (variables, profile_id, node_results, active_edges)
catalog: Question catalog
graph: WorkflowGraph (für Logic Nodes)
openrouter_call_func: LLM callback: async (prompt, model) -> str
enable_debug: Debug mode
Returns:
NodeExecutionState
Node Types:
- start/end: No-op
- analysis: Load prompt → augment → LLM → parse → normalize
- logic: Evaluate condition → activate/deactivate edges (Phase 3)
- join: Consolidate paths → merge results (Phase 4)
"""
started_at = datetime.utcnow().isoformat()
try:
# Start Node: No-Op
if node.type == "start":
logger.debug(f"Node {node.id}: No-op (start)")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# End Node: Output Generation (Phase 4)
if node.type == "end":
return execute_end_node(node, context)
# Logic Nodes (Phase 3)
if node.type == "logic":
return execute_logic_node(node, context, graph)
# Join Nodes (Phase 4)
if node.type == "join":
return execute_join_node(node, context, graph)
# Analysis Nodes
if node.type == "analysis":
# 1. Lade Prompt (Part 3: inline_template support)
prompt_template = await load_prompt_template(node, context)
source_type = "inline" if node.inline_template else "reference"
logger.debug(f"Node {node.id}: Loaded prompt from {source_type}")
# 2. Parse question_augmentations
questions = []
if node.question_augmentations:
# Convert list of dicts to JSONB-like format for parser
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
logger.debug(f"Node {node.id}: {len(questions)} question augmentations")
# 3. Augment Prompt
if questions:
augmented_prompt = augment_prompt_with_questions(
base_prompt=prompt_template,
questions=questions
)
else:
augmented_prompt = prompt_template
# 4. LLM Call
logger.debug(f"Node {node.id}: Calling LLM")
llm_response = await openrouter_call_func(
augmented_prompt,
"anthropic/claude-sonnet-4" # Default model
)
# 5. Parse Result Container
parsed = parse_result_container(llm_response)
logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})")
# 6. Normalize Signals
# NOTE: decision_signals now use question.id as key (not type)
# We need to build a catalog: id → {type, spectrum} for normalization
normalized_signals = []
if parsed["decision_signals"]:
# Build catalog: id → answer_spectrum (for normalization)
id_catalog = {}
if questions:
for q in questions:
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
id_catalog[q_dict['id']] = {
"type": q_dict['type'], # Keep type for normalization
"answer_spectrum": q_dict['answer_spectrum'],
"normalization_rules": None # Node-Questions haben keine Synonyme
}
# Normalize each signal (signals keyed by ID now)
for signal_id, signal_value in parsed["decision_signals"].items():
if signal_id in id_catalog:
q_config = id_catalog[signal_id]
# Use the type-based catalog for normalization rules (if any)
type_catalog_entry = catalog.get(q_config['type'], {})
# Normalize with question-specific spectrum
normalized_signal = normalize_decision_signal(
question_type=signal_id, # Use ID as question_type
raw_value=signal_value,
answer_spectrum=q_config['answer_spectrum'],
normalization_rules=type_catalog_entry.get('normalization_rules')
)
normalized_signals.append(normalized_signal)
logger.debug(f"Node {node.id}: Normalized signal '{signal_id}' = '{signal_value}''{normalized_signal.normalized_value}' (status: {normalized_signal.status})")
logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=parsed["analysis_core"],
decision_signals=parsed["decision_signals"],
normalized_signals=normalized_signals,
reasoning_anchors=parsed.get("reasoning_anchors"),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# Unbekannter Node-Typ (Phase 4: join)
raise ValueError(f"Node type '{node.type}' not implemented yet (Phase 4+)")
except Exception as e:
logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def execute_logic_node(
node,
context: Dict[str, Any],
graph: WorkflowGraph
) -> NodeExecutionState:
"""
Führt Logic Node aus (Phase 3).
Args:
node: WorkflowNode vom Typ "logic"
context: Execution context mit node_results
graph: WorkflowGraph
Returns:
NodeExecutionState mit evaluation_result in metadata
Logic:
1. Evaluiere node.condition.expression
2. Wähle then_path oder else_path basierend auf Ergebnis
3. Bei Fehler/Unsicherheit: Wende fallback an
4. Markiere gewählte Edge(s) als aktiv, andere als inaktiv
5. Return NodeExecutionState mit evaluation_result
"""
started_at = datetime.utcnow().isoformat()
try:
if not node.condition:
raise ValueError(f"Logic node {node.id} has no condition")
# 1. Evaluiere Bedingung
result, error = evaluate_logic_expression(node.condition.expression, context)
if error:
# Fehler bei Evaluation → Fallback anwenden
logger.warning(f"Logic node {node.id}: evaluation error: {error}")
activated_edges = _apply_fallback(node, graph, context, error)
else:
# Erfolgreiche Evaluation
logger.info(f"Logic node {node.id}: evaluated to {result}")
# 2. Wähle Pfad basierend auf Ergebnis
if result:
# Condition TRUE → then_path aktivieren
activated_edges = _get_edges_by_label(node.id, "then", graph)
else:
# Condition FALSE → else_path aktivieren
activated_edges = _get_edges_by_label(node.id, "else", graph)
# 3. Markiere Edges als aktiv/inaktiv
outgoing_edges = [e for e in graph.edges if e.from_node == node.id]
for edge in outgoing_edges:
context["active_edges"][edge.id] = edge.id in activated_edges
logger.debug(f"Logic node {node.id}: activated edges: {activated_edges}")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
started_at=started_at,
completed_at=datetime.utcnow().isoformat(),
analysis_core=json.dumps({
"evaluation_result": result if not error else None,
"error": error,
"activated_edges": activated_edges
})
)
except Exception as e:
logger.error(f"Logic node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def execute_join_node(
node,
context: Dict[str, Any],
graph: WorkflowGraph
) -> NodeExecutionState:
"""
Führt Join Node aus (Phase 4).
Args:
node: WorkflowNode vom Typ "join"
context: Execution context mit node_results
graph: WorkflowGraph
Returns:
NodeExecutionState mit konsolidierten Daten
Logic:
1. Evaluiere Join-Strategie (via join_evaluator)
2. Prüfe ob ready (alle erforderlichen Pfade verfügbar?)
3. Konsolidiere Ergebnisse (merge analysis_core, combine signals)
4. Return NodeExecutionState mit konsolidierten Daten
Error Handling:
- wait_all + fehlende Pfade → FAILED
- wait_any + keine Pfade → FAILED
- best_effort + keine Pfade → EXECUTED (mit metadata)
"""
started_at = datetime.utcnow().isoformat()
try:
logger.info(f"Executing join node: {node.id}")
# 1. Evaluiere Join-Node
join_result = evaluate_join_node_core(node, graph, context)
# 2. Prüfe ob ready
if not join_result.ready:
# Strategie nicht erfüllt → FAILED
logger.warning(f"Join node {node.id}: Not ready - {join_result.error}")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=join_result.error,
started_at=started_at,
completed_at=datetime.utcnow().isoformat(),
metadata=join_result.metadata
)
# 3. Baue konsolidierten analysis_core
# Format: JSON mit node_id → analysis_core mapping
consolidated_core_json = json.dumps(
join_result.consolidated_analysis_core,
ensure_ascii=False,
indent=2
)
# 4. Log Konsolidierung
executed_count = join_result.metadata.get("executed_paths", 0)
total_count = join_result.metadata.get("total_paths", 0)
logger.info(
f"Join node {node.id}: Consolidated {executed_count}/{total_count} paths"
)
# 5. Convert consolidated_signals Dict → List[NormalizedSignal]
# (NodeExecutionState expects List, but join_evaluator returns Dict)
consolidated_signals_list = list(join_result.consolidated_signals.values())
# 6. Return NodeExecutionState
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=consolidated_core_json,
normalized_signals=consolidated_signals_list,
metadata=join_result.metadata,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
except Exception as e:
logger.error(f"Join node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def execute_end_node(
node,
context: Dict[str, Any]
) -> NodeExecutionState:
"""
Führt End Node aus (Phase 4 - Template Engine).
Args:
node: WorkflowNode vom Typ "end"
context: Execution context mit node_results
Returns:
NodeExecutionState mit finaler Ausgabe in analysis_core
Output Modes:
- AUTO: Concatenates all analysis_core values (backward compatible)
- TEMPLATE: Renders Jinja2 template with {{node_id.property}} placeholders
Template Context:
- {{node_id.analysis_core}}: Analysis text from node
- {{node_id.decision_signals}}: Dict of raw decision signals
- {{node_id.decision_signals.key}}: Specific signal value
- {{node_id.status}}: Node execution status
- Conditional rendering: {% if node_id %}...{% endif %}
- Default values: {{node_id|default("fallback")}}
Example Template:
```
# Final Analysis
## Body Composition
{{body_analysis.analysis_core}}
{% if training_analysis %}
## Training Recommendation
{{training_analysis.analysis_core}}
{% endif %}
## Decision Factors
- Relevance: {{body_analysis.decision_signals.relevanz}}
- Priority: {{body_analysis.decision_signals.prioritaet}}
```
"""
started_at = datetime.utcnow().isoformat()
try:
logger.info(f"Executing end node: {node.id}")
# Determine output mode (default: AUTO for backward compatibility)
output_mode = node.output_mode or EndNodeOutputMode.AUTO
if output_mode == EndNodeOutputMode.AUTO:
# AUTO mode: Concatenate all analysis_core values
logger.debug(f"End node {node.id}: Using AUTO output mode")
combined_analysis = []
for node_id, node_state in context.get("node_results", {}).items():
if node_state.status == NodeStatus.EXECUTED and node_state.analysis_core:
combined_analysis.append(f"## {node_id}\n{node_state.analysis_core}")
final_output = "\n\n".join(combined_analysis) if combined_analysis else "[No analysis generated]"
elif output_mode == EndNodeOutputMode.TEMPLATE:
# TEMPLATE mode: Render Jinja2 template
logger.debug(f"End node {node.id}: Using TEMPLATE output mode")
if not node.template:
raise ValueError(f"End node {node.id} has output_mode=TEMPLATE but no template defined")
# Build template context: {{node_id}} → {analysis_core, signal_X, question_X, status}
template_context = {}
graph = context.get("graph") # WorkflowGraph object
for node_id, node_state in context.get("node_results", {}).items():
node_context = {
"analysis_core": node_state.analysis_core or "",
"decision_signals": node_state.decision_signals or {},
"reasoning_anchors": node_state.reasoning_anchors or "",
"status": node_state.status.value if node_state.status else "unknown",
}
# Backward-compatible shortcut:
# allow {{node_id.relevanz}} in addition to {{node_id.decision_signals.relevanz}}
if node_state.decision_signals:
for signal_key, signal_value in node_state.decision_signals.items():
node_context[signal_key] = signal_value
# Add normalized signals as {{node_id.signal_ID}}
# NOTE: question_type now IS the ID (not the type!)
if node_state.normalized_signals:
for signal in node_state.normalized_signals:
# Convert NormalizedSignal object to dict if needed
signal_dict = signal.model_dump() if hasattr(signal, 'model_dump') else signal
q_id = signal_dict['question_type'] # This is actually the ID now!
signal_key = f"signal_{q_id}"
signal_value = signal_dict['normalized_value'] or signal_dict['raw_value']
node_context[signal_key] = signal_value
logger.info(f"Mapped signal: {q_id}{signal_key} = '{signal_value}'")
# Add question texts as {{node_id.question_ID}}
if graph:
workflow_node = next((n for n in graph.nodes if n.id == node_id), None)
if workflow_node and workflow_node.question_augmentations:
for q in workflow_node.question_augmentations:
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
q_id = q_dict.get('id')
if q_id:
question_key = f"question_{q_id}"
node_context[question_key] = q_dict.get('question', '')
template_context[node_id] = node_context
logger.debug(f"End node {node.id}: Built template context for {len(template_context)} nodes")
# DEBUG: Log template context keys for each node
for node_id, node_ctx in template_context.items():
logger.info(f"End node template context[{node_id}]: {list(node_ctx.keys())}")
# Log signal keys specifically
signal_keys = [k for k in node_ctx.keys() if k.startswith('signal_')]
question_keys = [k for k in node_ctx.keys() if k.startswith('question_')]
if signal_keys:
logger.info(f" Signals: {signal_keys} → values: {[(k, node_ctx[k]) for k in signal_keys]}")
if question_keys:
logger.info(f" Questions: {question_keys} → values: {[(k, node_ctx[k]) for k in question_keys]}")
# Render template
try:
# ChainableUndefined keeps missing nested attributes renderable
# so Jinja's default filter can handle them.
jinja_env = Environment(undefined=ChainableUndefined)
jinja_template = jinja_env.from_string(node.template)
final_output = jinja_template.render(template_context)
logger.info(f"End node {node.id}: Template rendered successfully ({len(final_output)} chars)")
except TemplateError as te:
error_msg = f"Template rendering failed: {str(te)}"
logger.error(f"End node {node.id}: {error_msg}")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=error_msg,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
else:
raise ValueError(f"Unknown output_mode: {output_mode}")
# Return NodeExecutionState with final output
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=final_output,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
except Exception as e:
logger.error(f"End node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def _apply_fallback(
node,
graph: WorkflowGraph,
context: Dict[str, Any],
error: str
) -> List[str]:
"""
Wendet Fallback-Strategie an.
Args:
node: WorkflowNode
graph: WorkflowGraph
context: Execution context
error: Fehler bei Evaluation
Returns:
Liste von Edge-IDs die aktiviert werden sollen
"""
if not node.fallback:
# Default: Konservativ (else-Pfad nehmen)
logger.warning(f"Node {node.id}: No fallback configured, using DEFAULT_PATH")
return _get_edges_by_label(node.id, "else", graph)
strategy = node.fallback.strategy
if strategy == FallbackStrategy.CONSERVATIVE_SKIP:
# Skip alle outgoing edges
logger.info(f"Node {node.id}: CONSERVATIVE_SKIP - deactivating all paths")
return []
elif strategy == FallbackStrategy.DEFAULT_PATH:
# Nimm else-Pfad
logger.info(f"Node {node.id}: DEFAULT_PATH - taking else path")
return _get_edges_by_label(node.id, "else", graph)
elif strategy == FallbackStrategy.UNCERTAINTY_PATH:
# Expliziter Unsicherheits-Pfad (falls vorhanden)
logger.info(f"Node {node.id}: UNCERTAINTY_PATH")
uncertainty_edges = _get_edges_by_label(node.id, "uncertainty", graph)
if uncertainty_edges:
return uncertainty_edges
else:
# Fallback to else
logger.warning(f"Node {node.id}: No uncertainty path found, using else")
return _get_edges_by_label(node.id, "else", graph)
elif strategy == FallbackStrategy.DOCUMENT_ONLY:
# Alle Pfade aktiv lassen (wie ohne Bedingung)
logger.info(f"Node {node.id}: DOCUMENT_ONLY - all paths remain active")
outgoing_edges = [e for e in graph.edges if e.from_node == node.id]
return [e.id for e in outgoing_edges]
else:
logger.warning(f"Node {node.id}: Unknown fallback strategy {strategy}, using DEFAULT_PATH")
return _get_edges_by_label(node.id, "else", graph)
def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[str]:
"""
Findet alle ausgehenden Edges mit bestimmtem Label.
Args:
node_id: Node-ID
label: Edge-Label (z.B. "then", "else", "uncertainty")
graph: WorkflowGraph
Returns:
Liste von Edge-IDs
"""
matching_edges = [
e.id for e in graph.edges
if e.from_node == node_id and e.label == label
]
return matching_edges
def _has_active_incoming_edge(node, graph: WorkflowGraph, context: Dict[str, Any]) -> bool:
"""
Prüft ob Node mindestens eine aktive incoming edge hat.
Args:
node: WorkflowNode
graph: WorkflowGraph
context: Execution context mit active_edges
Returns:
True wenn mindestens eine incoming edge aktiv ist
"""
# Start-Node hat keine incoming edges → immer aktiv
if node.type == "start":
return True
incoming_edges = [e for e in graph.edges if e.to_node == node.id]
# Keine incoming edges → nicht erreichbar
if not incoming_edges:
return False
# Prüfe ob mindestens eine aktiv ist
active_edges = context.get("active_edges", {})
for edge in incoming_edges:
if active_edges.get(edge.id, True): # Default: True (Phase 2 Kompatibilität)
return True
return False
async def load_prompt_template(node: WorkflowNode, context: Dict[str, Any]) -> str:
"""
Lädt Prompt-Template aus DB (reference mode) oder direkt vom Node (inline mode).
Part 3: Inline Prompts - Unterstützt zwei Modi:
- Reference Mode: prompt_slug → Template aus ai_prompts Tabelle
- Inline Mode: inline_template → Template direkt vom Node
Args:
node: WorkflowNode mit prompt_slug ODER inline_template
context: {"variables": {"name": "Lars", ...}, "profile_id": "..."}
Returns:
Resolved prompt template
Raises:
HTTPException: Wenn weder prompt_slug noch inline_template gesetzt
Beispiel:
>>> node = WorkflowNode(id="n1", prompt_slug="pipeline_body")
>>> template = await load_prompt_template(node, {"profile_id": "123"})
>>> "{{name}}" not in template
True
"""
from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog
from prompt_executor import resolve_placeholders
from fastapi import HTTPException
# Mode 1: Inline Template (NEU)
if node.inline_template:
logger.debug(f"Node {node.id}: Using inline template ({len(node.inline_template)} chars)")
template = node.inline_template
# Mode 2: Reference (bestehend)
elif node.prompt_slug:
logger.debug(f"Node {node.id}: Loading prompt '{node.prompt_slug}' from DB")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT template FROM ai_prompts WHERE slug = %s AND active = true",
(node.prompt_slug,)
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail=f"Prompt not found: {node.prompt_slug}")
template = row['template']
# Mode 3: Error - weder inline noch reference
else:
raise HTTPException(
status_code=400,
detail=f"Node {node.id}: Either prompt_slug or inline_template required"
)
# Resolve Placeholders using modern prompt_executor method
profile_id = context.get("profile_id")
if not profile_id:
raise HTTPException(status_code=400, detail="profile_id required in context")
# Build variables dict with ALL registered placeholders
variables = {}
try:
# Get all placeholder values from registry
processed_placeholders = get_placeholder_example_values(profile_id)
logger.info(f"🔍 DEBUG: Loaded {len(processed_placeholders)} placeholders from registry")
logger.info(f"🔍 DEBUG: Sample keys (first 3): {list(processed_placeholders.keys())[:3]}")
# Remove {{ }} from keys (placeholder_resolver returns them with wrappers)
cleaned_placeholders = {
key.replace('{{', '').replace('}}', '').strip(): value
for key, value in processed_placeholders.items()
}
logger.info(f"🔍 DEBUG: Cleaned keys (first 3): {list(cleaned_placeholders.keys())[:3]}")
logger.info(f"🔍 DEBUG: Sample values: name={cleaned_placeholders.get('name')}, age={cleaned_placeholders.get('age')}, geschlecht={cleaned_placeholders.get('geschlecht')}")
variables.update(cleaned_placeholders)
except Exception as e:
logger.error(f"❌ CRITICAL: Failed to load placeholders for workflow: {e}", exc_info=True)
# Load catalog for |d modifier support
try:
catalog = get_placeholder_catalog(profile_id)
except Exception as e:
catalog = None
logger.warning(f"Failed to load placeholder catalog for workflow: {e}")
logger.info(f"🔍 DEBUG: Template before resolution:\n{template[:200]}...")
logger.info(f"🔍 DEBUG: Variables dict has {len(variables)} entries")
# Resolve with modern executor
debug_info = {}
resolved = resolve_placeholders(
template=template,
variables=variables,
debug_info=debug_info,
catalog=catalog
)
logger.info(f"🔍 DEBUG: Resolved placeholders: {debug_info.get('resolved_placeholders', {})}")
logger.info(f"🔍 DEBUG: Unresolved placeholders: {debug_info.get('unresolved_placeholders', [])}")
logger.info(f"🔍 DEBUG: Template after resolution:\n{resolved[:200]}...")
return resolved
def aggregate_results(node_states: List[NodeExecutionState], graph) -> Dict[str, Any]:
"""
Aggregiert Ergebnisse aller Knoten.
Phase 4: Wenn End Node existiert, verwende NUR dessen Output.
Sonst (backward compatible): Kombiniere alle analysis_core Werte.
Args:
node_states: Liste aller NodeExecutionState
graph: WorkflowGraph object (to identify End Nodes)
Returns:
{
"analysis_core": "Final output (from End Node or combined)",
"all_signals": [{question_type, normalized_value, status}, ...],
"total_nodes": 3,
"executed_nodes": 3,
"failed_nodes": 0
}
Beispiel:
>>> states = [
... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"),
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2"),
... NodeExecutionState(node_id="end", status=NodeStatus.EXECUTED, analysis_core="Final")
... ]
>>> result = aggregate_results(states, graph)
>>> result["analysis_core"]
"Final"
"""
# Find End Node in graph
end_node_ids = [n.id for n in graph.nodes if n.type == "end"]
combined_analysis = []
all_signals = []
final_output = None
for state in node_states:
# Check if this is an End Node
is_end_node = state.node_id in end_node_ids
if state.status == NodeStatus.EXECUTED and state.analysis_core:
if is_end_node:
# End Node output is the final output (don't add to combined_analysis)
final_output = state.analysis_core
else:
# Regular node - add to combined analysis
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
if state.normalized_signals:
all_signals.extend([s.model_dump() for s in state.normalized_signals])
# Phase 4: If End Node exists, use its output ONLY
# Backward compatible: If no End Node, use combined_analysis
if final_output:
primary_output = final_output
else:
primary_output = "\n\n".join(combined_analysis)
return {
"analysis_core": primary_output,
"combined_analysis": "\n\n".join(combined_analysis), # Keep for debugging (excludes End Node)
"all_signals": all_signals,
"total_nodes": len(node_states),
"executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED),
"failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED),
"skipped_nodes": sum(1 for s in node_states if s.status == NodeStatus.SKIPPED) # Phase 3
}
def save_execution_state(
execution_id: str,
workflow_id: Optional[str], # None when using graph_data directly (Phase 5)
profile_id: str,
node_states: List[NodeExecutionState],
status: str,
started_at: str,
completed_at: Optional[str] = None,
error: Optional[str] = None
):
"""
Speichert Execution State in workflow_executions.
Args:
execution_id: UUID der Execution
workflow_id: UUID des Workflows (None wenn graph_data direkt verwendet wird)
profile_id: UUID des Profils
node_states: Liste aller NodeExecutionState
status: 'completed' | 'failed' | 'partial'
started_at: ISO timestamp
completed_at: ISO timestamp (optional)
error: Fehlermeldung (optional)
Beispiel:
>>> save_execution_state(
... execution_id="exec-123",
... workflow_id="wf-456",
... profile_id="prof-789",
... node_states=[],
... status="completed",
... started_at="2026-04-03T12:00:00"
... )
"""
# Serialize node_states to JSON
node_states_json = [s.model_dump() for s in node_states]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
INSERT INTO workflow_executions
(id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
node_states = EXCLUDED.node_states,
execution_log = EXCLUDED.execution_log,
completed_at = EXCLUDED.completed_at
""", (
execution_id,
workflow_id,
profile_id,
status,
json.dumps(node_states_json),
json.dumps({"error": error} if error else {}),
started_at,
completed_at
))
conn.commit()
logger.info(f"Saved execution state: {execution_id} (status: {status})")