mitai-jinkendo/backend/workflow_executor.py
Lars 857c55aeb8
All checks were successful
Deploy Development / deploy (push) Successful in 51s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
fix: Workflow placeholder resolution + complete catalog display
Backend workflow_executor.py:
- load_prompt_template() now uses modern resolve_placeholders() from prompt_executor
- Calls get_placeholder_example_values() to populate ALL registered placeholders
- Passes catalog for |d modifier support
- Fixes issue where basis prompts had empty/null placeholder values in workflows

Backend placeholder_resolver.py:
- get_placeholder_catalog() now includes ALL placeholders from PLACEHOLDER_MAP
- Uncategorized placeholders added to "Sonstiges" category
- Fixes discrepancy: 111 total placeholders but only ~30 shown in picker

Root Cause:
- Workflow used old resolve_placeholders() (only PLACEHOLDER_MAP, no variables)
- Isolated execution used modern resolve_placeholders() (full variables dict)
- Catalog excluded non-registry placeholders from PLACEHOLDER_MAP

Impact:
- All placeholders now resolve correctly in workflow execution
- PlaceholderPicker shows all 111+ placeholders (not just registry ones)

Version: 0.9p (workflow module)
Part 3: End Node Template Engine - Bug Fixes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-09 18:10:04 +02:00

926 lines
33 KiB
Python

"""
Workflow Executor (Phase 4)
Führt Workflows mit conditional branching und path consolidation aus.
Phase 2: Sequential execution
Phase 3: Conditional branching, Logic Nodes, Fallback strategies
Phase 4: Join Nodes, Path consolidation
Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2-4)
"""
from typing import Dict, Any, List, Optional, Set
from datetime import datetime
import uuid
import logging
import json
from jinja2 import Template, TemplateError
from workflow_models import (
WorkflowGraph, NodeExecutionState, ExecutionResult,
NodeStatus, NormalizedSignal, FallbackStrategy, SignalStatus,
EndNodeOutputMode
)
from workflow_engine import parse_workflow_graph, get_execution_order
from question_augmenter import (
augment_prompt_with_questions,
parse_question_augmentations_from_jsonb
)
from result_container_parser import parse_result_container
from normalization_engine import normalize_all_signals, load_question_catalog
from logic_evaluator import evaluate_logic_expression, resolve_signal_reference
from join_evaluator import evaluate_join_node as evaluate_join_node_core
from db import get_db, get_cursor
logger = logging.getLogger(__name__)
async def execute_workflow(
workflow_id: Optional[str] = None,
graph_data: Optional[Dict] = None, # Phase 5: Direkt von ai_prompts.graph_data
profile_id: str = None,
variables: Dict[str, Any] = None,
openrouter_call_func = None, # Callback für LLM-Calls: async (prompt, model) -> str
enable_debug: bool = False
) -> ExecutionResult:
"""
Führt einen Workflow aus (mit conditional branching und path consolidation).
Phase 2: Linear execution in topological order.
Phase 3: Conditional branching basierend auf logic nodes.
Phase 4: Join nodes und path consolidation.
Phase 5: Unterstützt graph_data direkt (aus ai_prompts, nicht workflow_definitions)
Args:
workflow_id: UUID des Workflows (legacy, für workflow_definitions Tabelle)
graph_data: Workflow-Graph als Dict (NEU, für ai_prompts.graph_data)
profile_id: UUID des Profils
variables: Platzhalter-Werte (z.B. {"name": "Lars", ...})
openrouter_call_func: async (prompt, model) -> str
enable_debug: Debug-Modus
Returns:
ExecutionResult mit allen node_states
Beispiel (Phase 5):
>>> result = await execute_workflow(
... graph_data={"nodes": [...], "edges": [...]},
... profile_id="test-profile",
... variables={"name": "Lars"},
... openrouter_call_func=my_llm_func
... )
"""
execution_id = str(uuid.uuid4())
started_at = datetime.utcnow().isoformat()
logger.info(f"Starting workflow execution: {execution_id}")
try:
# 1. Lade Workflow-Definition
if graph_data:
# Phase 5: Graph direkt aus ai_prompts.graph_data (already a dict)
graph_dict = graph_data
logger.debug(f"Using provided graph_data")
elif workflow_id:
# Phase 0-4: Graph aus workflow_definitions Tabelle (legacy)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT graph FROM workflow_definitions WHERE id = %s AND active = true",
(workflow_id,)
)
row = cur.fetchone()
if not row:
raise ValueError(f"Workflow not found: {workflow_id}")
graph_dict = row['graph'] # PostgreSQL JSONB returns dict
logger.debug(f"Loaded graph from workflow_definitions: {workflow_id}")
else:
raise ValueError("Entweder workflow_id oder graph_data muss übergeben werden")
# 2. Parse Graph
graph = parse_workflow_graph(graph_dict)
logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
# 4. Lade Question Catalog
with get_db() as conn:
catalog = load_question_catalog(conn)
logger.debug(f"Loaded catalog: {len(catalog)} question types")
# 5. Execute Nodes mit conditional branching (Phase 3)
node_states: List[NodeExecutionState] = []
context = {
"variables": variables,
"profile_id": profile_id,
"node_results": {}, # Phase 3: Store full NodeExecutionState
"active_edges": {} # Phase 3: Track edge activation
}
# Phase 3: BFS traversal mit edge activation
start_node = next((n for n in graph.nodes if n.type == "start"), None)
if not start_node:
raise ValueError("No start node found in workflow")
visited: Set[str] = set()
queue = [start_node.id]
while queue:
node_id = queue.pop(0)
if node_id in visited:
continue
visited.add(node_id)
node = next(n for n in graph.nodes if n.id == node_id)
# Prüfe ob Node aktiv ist (mindestens eine incoming edge aktiv)
if not _has_active_incoming_edge(node, graph, context):
logger.info(f"Skipping node {node_id}: no active incoming edges")
node_states.append(NodeExecutionState(
node_id=node_id,
status=NodeStatus.SKIPPED,
error="no_active_incoming_edges",
started_at=datetime.utcnow().isoformat(),
completed_at=datetime.utcnow().isoformat()
))
continue
logger.info(f"Executing node: {node_id} (type: {node.type})")
node_state = await execute_node(
node=node,
context=context,
catalog=catalog,
graph=graph, # Phase 3: Needed for logic nodes
openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug
)
node_states.append(node_state)
context["node_results"][node_id] = node_state
# Füge Nachfolger zur Queue hinzu
outgoing_edges = [e for e in graph.edges if e.from_node == node_id]
for edge in outgoing_edges:
# Prüfe ob Edge aktiv ist (default: True, Logic Nodes setzen auf False)
if context["active_edges"].get(edge.id, True):
queue.append(edge.to_node)
# 6. Aggregiere Ergebnisse
aggregated = aggregate_results(node_states)
# 7. Speichere Execution State
completed_at = datetime.utcnow().isoformat()
save_execution_state(
execution_id=execution_id,
workflow_id=workflow_id,
profile_id=profile_id,
node_states=node_states,
status="completed",
started_at=started_at,
completed_at=completed_at
)
logger.info(f"Workflow execution completed: {execution_id}")
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id or "N/A", # Placeholder when graph_data is used directly
status="completed",
node_states=node_states,
aggregated_result=aggregated,
started_at=started_at,
completed_at=completed_at
)
except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True)
# Speichere Failed State
completed_at = datetime.utcnow().isoformat()
save_execution_state(
execution_id=execution_id,
workflow_id=workflow_id, # Can be None when graph_data is used
profile_id=profile_id,
node_states=node_states if 'node_states' in locals() else [],
status="failed",
started_at=started_at,
completed_at=completed_at,
error=str(e)
)
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id or "N/A", # ExecutionResult requires string, use placeholder
status="failed",
node_states=node_states if 'node_states' in locals() else [],
aggregated_result={},
started_at=started_at,
completed_at=completed_at,
error=str(e)
)
async def execute_node(
node,
context: Dict[str, Any],
catalog: Dict[str, Dict],
graph: WorkflowGraph, # Phase 3: Needed for logic nodes
openrouter_call_func,
enable_debug: bool = False
) -> NodeExecutionState:
"""
Führt einen einzelnen Knoten aus.
Args:
node: WorkflowNode (aus graph.nodes)
context: Execution context (variables, profile_id, node_results, active_edges)
catalog: Question catalog
graph: WorkflowGraph (für Logic Nodes)
openrouter_call_func: LLM callback: async (prompt, model) -> str
enable_debug: Debug mode
Returns:
NodeExecutionState
Node Types:
- start/end: No-op
- analysis: Load prompt → augment → LLM → parse → normalize
- logic: Evaluate condition → activate/deactivate edges (Phase 3)
- join: Consolidate paths → merge results (Phase 4)
"""
started_at = datetime.utcnow().isoformat()
try:
# Start Node: No-Op
if node.type == "start":
logger.debug(f"Node {node.id}: No-op (start)")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# End Node: Output Generation (Phase 4)
if node.type == "end":
return execute_end_node(node, context)
# Logic Nodes (Phase 3)
if node.type == "logic":
return execute_logic_node(node, context, graph)
# Join Nodes (Phase 4)
if node.type == "join":
return execute_join_node(node, context, graph)
# Analysis Nodes
if node.type == "analysis":
# 1. Lade Prompt
prompt_template = await load_prompt_template(node.prompt_slug, context)
logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'")
# 2. Parse question_augmentations
questions = []
if node.question_augmentations:
# Convert list of dicts to JSONB-like format for parser
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
logger.debug(f"Node {node.id}: {len(questions)} question augmentations")
# 3. Augment Prompt
if questions:
augmented_prompt = augment_prompt_with_questions(
base_prompt=prompt_template,
questions=questions
)
else:
augmented_prompt = prompt_template
# 4. LLM Call
logger.debug(f"Node {node.id}: Calling LLM")
llm_response = await openrouter_call_func(
augmented_prompt,
"anthropic/claude-sonnet-4" # Default model
)
# 5. Parse Result Container
parsed = parse_result_container(llm_response)
logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})")
# 6. Normalize Signals
normalized_signals = []
if parsed["decision_signals"]:
# Hybrid Model: Node-spezifische Questions überschreiben Catalog
node_catalog = catalog.copy()
if node.question_augmentations:
for q in node.question_augmentations:
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
node_catalog[q_dict['type']] = {
"answer_spectrum": q_dict['answer_spectrum'],
"normalization_rules": None # Node-Questions haben keine Synonyme
}
logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with node-specific spectrum")
normalized_signals = normalize_all_signals(
decision_signals=parsed["decision_signals"],
catalog_dict=node_catalog
)
logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=parsed["analysis_core"],
decision_signals=parsed["decision_signals"],
normalized_signals=normalized_signals,
reasoning_anchors=parsed.get("reasoning_anchors"),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# Unbekannter Node-Typ (Phase 4: join)
raise ValueError(f"Node type '{node.type}' not implemented yet (Phase 4+)")
except Exception as e:
logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def execute_logic_node(
node,
context: Dict[str, Any],
graph: WorkflowGraph
) -> NodeExecutionState:
"""
Führt Logic Node aus (Phase 3).
Args:
node: WorkflowNode vom Typ "logic"
context: Execution context mit node_results
graph: WorkflowGraph
Returns:
NodeExecutionState mit evaluation_result in metadata
Logic:
1. Evaluiere node.condition.expression
2. Wähle then_path oder else_path basierend auf Ergebnis
3. Bei Fehler/Unsicherheit: Wende fallback an
4. Markiere gewählte Edge(s) als aktiv, andere als inaktiv
5. Return NodeExecutionState mit evaluation_result
"""
started_at = datetime.utcnow().isoformat()
try:
if not node.condition:
raise ValueError(f"Logic node {node.id} has no condition")
# 1. Evaluiere Bedingung
result, error = evaluate_logic_expression(node.condition.expression, context)
if error:
# Fehler bei Evaluation → Fallback anwenden
logger.warning(f"Logic node {node.id}: evaluation error: {error}")
activated_edges = _apply_fallback(node, graph, context, error)
else:
# Erfolgreiche Evaluation
logger.info(f"Logic node {node.id}: evaluated to {result}")
# 2. Wähle Pfad basierend auf Ergebnis
if result:
# Condition TRUE → then_path aktivieren
activated_edges = _get_edges_by_label(node.id, "then", graph)
else:
# Condition FALSE → else_path aktivieren
activated_edges = _get_edges_by_label(node.id, "else", graph)
# 3. Markiere Edges als aktiv/inaktiv
outgoing_edges = [e for e in graph.edges if e.from_node == node.id]
for edge in outgoing_edges:
context["active_edges"][edge.id] = edge.id in activated_edges
logger.debug(f"Logic node {node.id}: activated edges: {activated_edges}")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
started_at=started_at,
completed_at=datetime.utcnow().isoformat(),
analysis_core=json.dumps({
"evaluation_result": result if not error else None,
"error": error,
"activated_edges": activated_edges
})
)
except Exception as e:
logger.error(f"Logic node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def execute_join_node(
node,
context: Dict[str, Any],
graph: WorkflowGraph
) -> NodeExecutionState:
"""
Führt Join Node aus (Phase 4).
Args:
node: WorkflowNode vom Typ "join"
context: Execution context mit node_results
graph: WorkflowGraph
Returns:
NodeExecutionState mit konsolidierten Daten
Logic:
1. Evaluiere Join-Strategie (via join_evaluator)
2. Prüfe ob ready (alle erforderlichen Pfade verfügbar?)
3. Konsolidiere Ergebnisse (merge analysis_core, combine signals)
4. Return NodeExecutionState mit konsolidierten Daten
Error Handling:
- wait_all + fehlende Pfade → FAILED
- wait_any + keine Pfade → FAILED
- best_effort + keine Pfade → EXECUTED (mit metadata)
"""
started_at = datetime.utcnow().isoformat()
try:
logger.info(f"Executing join node: {node.id}")
# 1. Evaluiere Join-Node
join_result = evaluate_join_node_core(node, graph, context)
# 2. Prüfe ob ready
if not join_result.ready:
# Strategie nicht erfüllt → FAILED
logger.warning(f"Join node {node.id}: Not ready - {join_result.error}")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=join_result.error,
started_at=started_at,
completed_at=datetime.utcnow().isoformat(),
metadata=join_result.metadata
)
# 3. Baue konsolidierten analysis_core
# Format: JSON mit node_id → analysis_core mapping
consolidated_core_json = json.dumps(
join_result.consolidated_analysis_core,
ensure_ascii=False,
indent=2
)
# 4. Log Konsolidierung
executed_count = join_result.metadata.get("executed_paths", 0)
total_count = join_result.metadata.get("total_paths", 0)
logger.info(
f"Join node {node.id}: Consolidated {executed_count}/{total_count} paths"
)
# 5. Convert consolidated_signals Dict → List[NormalizedSignal]
# (NodeExecutionState expects List, but join_evaluator returns Dict)
consolidated_signals_list = list(join_result.consolidated_signals.values())
# 6. Return NodeExecutionState
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=consolidated_core_json,
normalized_signals=consolidated_signals_list,
metadata=join_result.metadata,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
except Exception as e:
logger.error(f"Join node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def execute_end_node(
node,
context: Dict[str, Any]
) -> NodeExecutionState:
"""
Führt End Node aus (Phase 4 - Template Engine).
Args:
node: WorkflowNode vom Typ "end"
context: Execution context mit node_results
Returns:
NodeExecutionState mit finaler Ausgabe in analysis_core
Output Modes:
- AUTO: Concatenates all analysis_core values (backward compatible)
- TEMPLATE: Renders Jinja2 template with {{node_id.property}} placeholders
Template Context:
- {{node_id.analysis_core}}: Analysis text from node
- {{node_id.decision_signals}}: Dict of raw decision signals
- {{node_id.decision_signals.key}}: Specific signal value
- {{node_id.status}}: Node execution status
- Conditional rendering: {% if node_id %}...{% endif %}
- Default values: {{node_id|default("fallback")}}
Example Template:
```
# Final Analysis
## Body Composition
{{body_analysis.analysis_core}}
{% if training_analysis %}
## Training Recommendation
{{training_analysis.analysis_core}}
{% endif %}
## Decision Factors
- Relevance: {{body_analysis.decision_signals.relevanz}}
- Priority: {{body_analysis.decision_signals.prioritaet}}
```
"""
started_at = datetime.utcnow().isoformat()
try:
logger.info(f"Executing end node: {node.id}")
# Determine output mode (default: AUTO for backward compatibility)
output_mode = node.output_mode or EndNodeOutputMode.AUTO
if output_mode == EndNodeOutputMode.AUTO:
# AUTO mode: Concatenate all analysis_core values
logger.debug(f"End node {node.id}: Using AUTO output mode")
combined_analysis = []
for node_id, node_state in context.get("node_results", {}).items():
if node_state.status == NodeStatus.EXECUTED and node_state.analysis_core:
combined_analysis.append(f"## {node_id}\n{node_state.analysis_core}")
final_output = "\n\n".join(combined_analysis) if combined_analysis else "[No analysis generated]"
elif output_mode == EndNodeOutputMode.TEMPLATE:
# TEMPLATE mode: Render Jinja2 template
logger.debug(f"End node {node.id}: Using TEMPLATE output mode")
if not node.template:
raise ValueError(f"End node {node.id} has output_mode=TEMPLATE but no template defined")
# Build template context: {{node_id}} → {analysis_core, decision_signals, status}
template_context = {}
for node_id, node_state in context.get("node_results", {}).items():
template_context[node_id] = {
"analysis_core": node_state.analysis_core or "",
"decision_signals": node_state.decision_signals or {},
"reasoning_anchors": node_state.reasoning_anchors or "",
"status": node_state.status.value if node_state.status else "unknown",
# Add individual signal access: {{node_id.signal_name}}
**node_state.decision_signals # Flatten signals into node context
}
logger.debug(f"End node {node.id}: Built template context for {len(template_context)} nodes")
# Render template
try:
jinja_template = Template(node.template)
final_output = jinja_template.render(template_context)
logger.info(f"End node {node.id}: Template rendered successfully ({len(final_output)} chars)")
except TemplateError as te:
error_msg = f"Template rendering failed: {str(te)}"
logger.error(f"End node {node.id}: {error_msg}")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=error_msg,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
else:
raise ValueError(f"Unknown output_mode: {output_mode}")
# Return NodeExecutionState with final output
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=final_output,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
except Exception as e:
logger.error(f"End node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
def _apply_fallback(
node,
graph: WorkflowGraph,
context: Dict[str, Any],
error: str
) -> List[str]:
"""
Wendet Fallback-Strategie an.
Args:
node: WorkflowNode
graph: WorkflowGraph
context: Execution context
error: Fehler bei Evaluation
Returns:
Liste von Edge-IDs die aktiviert werden sollen
"""
if not node.fallback:
# Default: Konservativ (else-Pfad nehmen)
logger.warning(f"Node {node.id}: No fallback configured, using DEFAULT_PATH")
return _get_edges_by_label(node.id, "else", graph)
strategy = node.fallback.strategy
if strategy == FallbackStrategy.CONSERVATIVE_SKIP:
# Skip alle outgoing edges
logger.info(f"Node {node.id}: CONSERVATIVE_SKIP - deactivating all paths")
return []
elif strategy == FallbackStrategy.DEFAULT_PATH:
# Nimm else-Pfad
logger.info(f"Node {node.id}: DEFAULT_PATH - taking else path")
return _get_edges_by_label(node.id, "else", graph)
elif strategy == FallbackStrategy.UNCERTAINTY_PATH:
# Expliziter Unsicherheits-Pfad (falls vorhanden)
logger.info(f"Node {node.id}: UNCERTAINTY_PATH")
uncertainty_edges = _get_edges_by_label(node.id, "uncertainty", graph)
if uncertainty_edges:
return uncertainty_edges
else:
# Fallback to else
logger.warning(f"Node {node.id}: No uncertainty path found, using else")
return _get_edges_by_label(node.id, "else", graph)
elif strategy == FallbackStrategy.DOCUMENT_ONLY:
# Alle Pfade aktiv lassen (wie ohne Bedingung)
logger.info(f"Node {node.id}: DOCUMENT_ONLY - all paths remain active")
outgoing_edges = [e for e in graph.edges if e.from_node == node.id]
return [e.id for e in outgoing_edges]
else:
logger.warning(f"Node {node.id}: Unknown fallback strategy {strategy}, using DEFAULT_PATH")
return _get_edges_by_label(node.id, "else", graph)
def _get_edges_by_label(node_id: str, label: str, graph: WorkflowGraph) -> List[str]:
"""
Findet alle ausgehenden Edges mit bestimmtem Label.
Args:
node_id: Node-ID
label: Edge-Label (z.B. "then", "else", "uncertainty")
graph: WorkflowGraph
Returns:
Liste von Edge-IDs
"""
matching_edges = [
e.id for e in graph.edges
if e.from_node == node_id and e.label == label
]
return matching_edges
def _has_active_incoming_edge(node, graph: WorkflowGraph, context: Dict[str, Any]) -> bool:
"""
Prüft ob Node mindestens eine aktive incoming edge hat.
Args:
node: WorkflowNode
graph: WorkflowGraph
context: Execution context mit active_edges
Returns:
True wenn mindestens eine incoming edge aktiv ist
"""
# Start-Node hat keine incoming edges → immer aktiv
if node.type == "start":
return True
incoming_edges = [e for e in graph.edges if e.to_node == node.id]
# Keine incoming edges → nicht erreichbar
if not incoming_edges:
return False
# Prüfe ob mindestens eine aktiv ist
active_edges = context.get("active_edges", {})
for edge in incoming_edges:
if active_edges.get(edge.id, True): # Default: True (Phase 2 Kompatibilität)
return True
return False
async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str:
"""
Lädt Prompt-Template aus DB und resolved Platzhalter.
Args:
prompt_slug: Slug des Prompts (z.B. "pipeline_body")
context: {"variables": {"name": "Lars", ...}, "profile_id": "..."}
Returns:
Resolved prompt template
Beispiel:
>>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"})
>>> "{{name}}" not in template
True
"""
from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog
from prompt_executor import resolve_placeholders
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT template FROM ai_prompts WHERE slug = %s AND active = true",
(prompt_slug,)
)
row = cur.fetchone()
if not row:
raise ValueError(f"Prompt not found: {prompt_slug}")
template = row['template']
# Resolve Placeholders using modern prompt_executor method
profile_id = context.get("profile_id")
# Build variables dict with ALL registered placeholders
variables = {}
try:
# Get all placeholder values from registry
processed_placeholders = get_placeholder_example_values(profile_id)
# Remove {{ }} from keys (placeholder_resolver returns them with wrappers)
cleaned_placeholders = {
key.replace('{{', '').replace('}}', ''): value
for key, value in processed_placeholders.items()
}
variables.update(cleaned_placeholders)
except Exception as e:
logger.warning(f"Failed to load placeholders for workflow: {e}")
# Load catalog for |d modifier support
try:
catalog = get_placeholder_catalog(profile_id)
except Exception as e:
catalog = None
logger.warning(f"Failed to load placeholder catalog for workflow: {e}")
# Resolve with modern executor
resolved = resolve_placeholders(
template=template,
variables=variables,
catalog=catalog
)
return resolved
def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
"""
Aggregiert Ergebnisse aller Knoten.
Args:
node_states: Liste aller NodeExecutionState
Returns:
{
"combined_analysis": "## node_1\n...\n\n## node_2\n...",
"all_signals": [{question_type, normalized_value, status}, ...],
"total_nodes": 3,
"executed_nodes": 3,
"failed_nodes": 0
}
Beispiel:
>>> states = [
... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"),
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2")
... ]
>>> result = aggregate_results(states)
>>> "## n1" in result["combined_analysis"]
True
>>> result["executed_nodes"]
2
"""
combined_analysis = []
all_signals = []
for state in node_states:
if state.status == NodeStatus.EXECUTED and state.analysis_core:
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
if state.normalized_signals:
all_signals.extend([s.model_dump() for s in state.normalized_signals])
return {
"combined_analysis": "\n\n".join(combined_analysis),
"all_signals": all_signals,
"total_nodes": len(node_states),
"executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED),
"failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED),
"skipped_nodes": sum(1 for s in node_states if s.status == NodeStatus.SKIPPED) # Phase 3
}
def save_execution_state(
execution_id: str,
workflow_id: Optional[str], # None when using graph_data directly (Phase 5)
profile_id: str,
node_states: List[NodeExecutionState],
status: str,
started_at: str,
completed_at: Optional[str] = None,
error: Optional[str] = None
):
"""
Speichert Execution State in workflow_executions.
Args:
execution_id: UUID der Execution
workflow_id: UUID des Workflows (None wenn graph_data direkt verwendet wird)
profile_id: UUID des Profils
node_states: Liste aller NodeExecutionState
status: 'completed' | 'failed' | 'partial'
started_at: ISO timestamp
completed_at: ISO timestamp (optional)
error: Fehlermeldung (optional)
Beispiel:
>>> save_execution_state(
... execution_id="exec-123",
... workflow_id="wf-456",
... profile_id="prof-789",
... node_states=[],
... status="completed",
... started_at="2026-04-03T12:00:00"
... )
"""
# Serialize node_states to JSON
node_states_json = [s.model_dump() for s in node_states]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
INSERT INTO workflow_executions
(id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
node_states = EXCLUDED.node_states,
execution_log = EXCLUDED.execution_log,
completed_at = EXCLUDED.completed_at
""", (
execution_id,
workflow_id,
profile_id,
status,
json.dumps(node_states_json),
json.dumps({"error": error} if error else {}),
started_at,
completed_at
))
conn.commit()
logger.info(f"Saved execution state: {execution_id} (status: {status})")