437 lines
14 KiB
Python
437 lines
14 KiB
Python
"""
|
|
Workflow Executor (Phase 2)
|
|
|
|
Führt Workflows sequenziell aus (noch keine Verzweigung/Logik).
|
|
|
|
Konzept-Basis: konzept_workflow_engine_konsolidated.md
|
|
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2)
|
|
"""
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime
|
|
import uuid
|
|
import logging
|
|
import json
|
|
|
|
from workflow_models import (
|
|
WorkflowGraph, NodeExecutionState, ExecutionResult,
|
|
NodeStatus, NormalizedSignal
|
|
)
|
|
from workflow_engine import parse_workflow_graph, get_execution_order
|
|
from question_augmenter import (
|
|
augment_prompt_with_questions,
|
|
parse_question_augmentations_from_jsonb
|
|
)
|
|
from result_container_parser import parse_result_container
|
|
from normalization_engine import normalize_all_signals, load_question_catalog
|
|
from db import get_db, get_cursor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def execute_workflow(
|
|
workflow_id: str,
|
|
profile_id: str,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func, # Callback für LLM-Calls: async (prompt, model) -> str
|
|
enable_debug: bool = False
|
|
) -> ExecutionResult:
|
|
"""
|
|
Führt einen Workflow aus (sequenziell, ohne Verzweigung).
|
|
|
|
Phase 2: Linear execution in topological order.
|
|
Phase 3: Conditional branching basierend auf normalized_signals.
|
|
|
|
Args:
|
|
workflow_id: UUID des Workflows
|
|
profile_id: UUID des Profils
|
|
variables: Platzhalter-Werte (z.B. {"name": "Lars", ...})
|
|
openrouter_call_func: async (prompt, model) -> str
|
|
enable_debug: Debug-Modus
|
|
|
|
Returns:
|
|
ExecutionResult mit allen node_states
|
|
|
|
Beispiel:
|
|
>>> result = await execute_workflow(
|
|
... workflow_id="test-workflow",
|
|
... profile_id="test-profile",
|
|
... variables={"name": "Lars"},
|
|
... openrouter_call_func=my_llm_func
|
|
... )
|
|
>>> result.status
|
|
'completed'
|
|
>>> len(result.node_states)
|
|
3
|
|
"""
|
|
execution_id = str(uuid.uuid4())
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
logger.info(f"Starting workflow execution: {execution_id} (workflow: {workflow_id})")
|
|
|
|
try:
|
|
# 1. Lade Workflow-Definition
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT graph FROM workflow_definitions WHERE id = %s AND active = true",
|
|
(workflow_id,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise ValueError(f"Workflow not found: {workflow_id}")
|
|
|
|
graph_json = row['graph']
|
|
|
|
# 2. Parse Graph
|
|
graph = parse_workflow_graph(graph_json)
|
|
logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
|
|
|
|
# 3. Topologische Sortierung
|
|
execution_order = get_execution_order(graph)
|
|
logger.info(f"Execution order: {execution_order}")
|
|
|
|
# 4. Lade Question Catalog
|
|
with get_db() as conn:
|
|
catalog = load_question_catalog(conn)
|
|
logger.debug(f"Loaded catalog: {len(catalog)} question types")
|
|
|
|
# 5. Execute Nodes sequenziell
|
|
node_states: List[NodeExecutionState] = []
|
|
context = {"variables": variables, "profile_id": profile_id}
|
|
|
|
for node_id in execution_order:
|
|
node = next(n for n in graph.nodes if n.id == node_id)
|
|
|
|
logger.info(f"Executing node: {node_id} (type: {node.type})")
|
|
|
|
node_state = await execute_node(
|
|
node=node,
|
|
context=context,
|
|
catalog=catalog,
|
|
openrouter_call_func=openrouter_call_func,
|
|
enable_debug=enable_debug
|
|
)
|
|
|
|
node_states.append(node_state)
|
|
|
|
# Füge Ergebnisse zu Context hinzu (für späteren Zugriff in Phase 3)
|
|
context[f"node_{node_id}"] = {
|
|
"analysis_core": node_state.analysis_core,
|
|
"normalized_signals": [s.model_dump() for s in node_state.normalized_signals]
|
|
}
|
|
|
|
# 6. Aggregiere Ergebnisse
|
|
aggregated = aggregate_results(node_states)
|
|
|
|
# 7. Speichere Execution State
|
|
completed_at = datetime.utcnow().isoformat()
|
|
save_execution_state(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
profile_id=profile_id,
|
|
node_states=node_states,
|
|
status="completed",
|
|
started_at=started_at,
|
|
completed_at=completed_at
|
|
)
|
|
|
|
logger.info(f"Workflow execution completed: {execution_id}")
|
|
|
|
return ExecutionResult(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
status="completed",
|
|
node_states=node_states,
|
|
aggregated_result=aggregated,
|
|
started_at=started_at,
|
|
completed_at=completed_at
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Workflow execution failed: {e}", exc_info=True)
|
|
|
|
# Speichere Failed State
|
|
completed_at = datetime.utcnow().isoformat()
|
|
save_execution_state(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
profile_id=profile_id,
|
|
node_states=node_states if 'node_states' in locals() else [],
|
|
status="failed",
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
error=str(e)
|
|
)
|
|
|
|
return ExecutionResult(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
status="failed",
|
|
node_states=node_states if 'node_states' in locals() else [],
|
|
aggregated_result={},
|
|
started_at=started_at,
|
|
completed_at=completed_at,
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
async def execute_node(
|
|
node,
|
|
context: Dict[str, Any],
|
|
catalog: Dict[str, Dict],
|
|
openrouter_call_func,
|
|
enable_debug: bool = False
|
|
) -> NodeExecutionState:
|
|
"""
|
|
Führt einen einzelnen Knoten aus.
|
|
|
|
Args:
|
|
node: WorkflowNode (aus graph.nodes)
|
|
context: Execution context (variables, profile_id, prior results)
|
|
catalog: Question catalog
|
|
openrouter_call_func: LLM callback: async (prompt, model) -> str
|
|
enable_debug: Debug mode
|
|
|
|
Returns:
|
|
NodeExecutionState
|
|
|
|
Node Types:
|
|
- start/end: No-op
|
|
- analysis: Load prompt → augment → LLM → parse → normalize
|
|
- logic/join: Not implemented in Phase 2
|
|
"""
|
|
started_at = datetime.utcnow().isoformat()
|
|
|
|
try:
|
|
# Start/End Nodes: No-Op
|
|
if node.type in ["start", "end"]:
|
|
logger.debug(f"Node {node.id}: No-op ({node.type})")
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
# Analysis Nodes
|
|
if node.type == "analysis":
|
|
# 1. Lade Prompt
|
|
prompt_template = await load_prompt_template(node.prompt_slug, context)
|
|
logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'")
|
|
|
|
# 2. Parse question_augmentations
|
|
questions = []
|
|
if node.question_augmentations:
|
|
# Convert list of dicts to JSONB-like format for parser
|
|
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
|
|
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
|
|
logger.debug(f"Node {node.id}: {len(questions)} question augmentations")
|
|
|
|
# 3. Augment Prompt
|
|
if questions:
|
|
augmented_prompt = augment_prompt_with_questions(
|
|
base_prompt=prompt_template,
|
|
questions=questions
|
|
)
|
|
else:
|
|
augmented_prompt = prompt_template
|
|
|
|
# 4. LLM Call
|
|
logger.debug(f"Node {node.id}: Calling LLM")
|
|
llm_response = await openrouter_call_func(
|
|
augmented_prompt,
|
|
"anthropic/claude-sonnet-4" # Default model
|
|
)
|
|
|
|
# 5. Parse Result Container
|
|
parsed = parse_result_container(llm_response)
|
|
logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})")
|
|
|
|
# 6. Normalize Signals
|
|
normalized_signals = []
|
|
if parsed["decision_signals"]:
|
|
# Hybrid Model: Node-spezifische Questions überschreiben Catalog
|
|
node_catalog = catalog.copy()
|
|
if node.question_augmentations:
|
|
for q in node.question_augmentations:
|
|
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
|
|
node_catalog[q_dict['type']] = {
|
|
"answer_spectrum": q_dict['answer_spectrum'],
|
|
"normalization_rules": None # Node-Questions haben keine Synonyme
|
|
}
|
|
logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with node-specific spectrum")
|
|
|
|
normalized_signals = normalize_all_signals(
|
|
decision_signals=parsed["decision_signals"],
|
|
catalog_dict=node_catalog
|
|
)
|
|
logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals")
|
|
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.EXECUTED,
|
|
analysis_core=parsed["analysis_core"],
|
|
decision_signals=parsed["decision_signals"],
|
|
normalized_signals=normalized_signals,
|
|
reasoning_anchors=parsed.get("reasoning_anchors"),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
# Unbekannter Node-Typ (Phase 3: logic, join)
|
|
raise ValueError(f"Node type '{node.type}' not implemented in Phase 2")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True)
|
|
return NodeExecutionState(
|
|
node_id=node.id,
|
|
status=NodeStatus.FAILED,
|
|
error=str(e),
|
|
started_at=started_at,
|
|
completed_at=datetime.utcnow().isoformat()
|
|
)
|
|
|
|
|
|
async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str:
|
|
"""
|
|
Lädt Prompt-Template aus DB und resolved Platzhalter.
|
|
|
|
Args:
|
|
prompt_slug: Slug des Prompts (z.B. "pipeline_body")
|
|
context: {"variables": {"name": "Lars", ...}, "profile_id": "..."}
|
|
|
|
Returns:
|
|
Resolved prompt template
|
|
|
|
Beispiel:
|
|
>>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"})
|
|
>>> "{{name}}" not in template
|
|
True
|
|
"""
|
|
from placeholder_resolver import resolve_placeholders
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT template FROM ai_prompts WHERE slug = %s AND active = true",
|
|
(prompt_slug,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise ValueError(f"Prompt not found: {prompt_slug}")
|
|
|
|
template = row['template']
|
|
|
|
# Resolve Placeholders
|
|
profile_id = context.get("profile_id")
|
|
resolved = resolve_placeholders(
|
|
template=template,
|
|
profile_id=profile_id
|
|
)
|
|
# TODO Phase 3: Support custom variables from workflow context
|
|
|
|
return resolved
|
|
|
|
|
|
def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
|
|
"""
|
|
Aggregiert Ergebnisse aller Knoten.
|
|
|
|
Args:
|
|
node_states: Liste aller NodeExecutionState
|
|
|
|
Returns:
|
|
{
|
|
"combined_analysis": "## node_1\n...\n\n## node_2\n...",
|
|
"all_signals": [{question_type, normalized_value, status}, ...],
|
|
"total_nodes": 3,
|
|
"executed_nodes": 3,
|
|
"failed_nodes": 0
|
|
}
|
|
|
|
Beispiel:
|
|
>>> states = [
|
|
... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"),
|
|
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2")
|
|
... ]
|
|
>>> result = aggregate_results(states)
|
|
>>> "## n1" in result["combined_analysis"]
|
|
True
|
|
>>> result["executed_nodes"]
|
|
2
|
|
"""
|
|
combined_analysis = []
|
|
all_signals = []
|
|
|
|
for state in node_states:
|
|
if state.status == NodeStatus.EXECUTED and state.analysis_core:
|
|
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
|
|
|
|
if state.normalized_signals:
|
|
all_signals.extend([s.model_dump() for s in state.normalized_signals])
|
|
|
|
return {
|
|
"combined_analysis": "\n\n".join(combined_analysis),
|
|
"all_signals": all_signals,
|
|
"total_nodes": len(node_states),
|
|
"executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED),
|
|
"failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED)
|
|
}
|
|
|
|
|
|
def save_execution_state(
|
|
execution_id: str,
|
|
workflow_id: str,
|
|
profile_id: str,
|
|
node_states: List[NodeExecutionState],
|
|
status: str,
|
|
started_at: str,
|
|
completed_at: Optional[str] = None,
|
|
error: Optional[str] = None
|
|
):
|
|
"""
|
|
Speichert Execution State in workflow_executions.
|
|
|
|
Args:
|
|
execution_id: UUID der Execution
|
|
workflow_id: UUID des Workflows
|
|
profile_id: UUID des Profils
|
|
node_states: Liste aller NodeExecutionState
|
|
status: 'completed' | 'failed' | 'partial'
|
|
started_at: ISO timestamp
|
|
completed_at: ISO timestamp (optional)
|
|
error: Fehlermeldung (optional)
|
|
|
|
Beispiel:
|
|
>>> save_execution_state(
|
|
... execution_id="exec-123",
|
|
... workflow_id="wf-456",
|
|
... profile_id="prof-789",
|
|
... node_states=[],
|
|
... status="completed",
|
|
... started_at="2026-04-03T12:00:00"
|
|
... )
|
|
"""
|
|
# Serialize node_states to JSON
|
|
node_states_json = [s.model_dump() for s in node_states]
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("""
|
|
INSERT INTO workflow_executions
|
|
(id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
""", (
|
|
execution_id,
|
|
workflow_id,
|
|
profile_id,
|
|
status,
|
|
json.dumps(node_states_json),
|
|
json.dumps({"error": error} if error else {}),
|
|
started_at,
|
|
completed_at
|
|
))
|
|
conn.commit()
|
|
|
|
logger.info(f"Saved execution state: {execution_id} (status: {status})")
|