mitai-jinkendo/backend/workflow_executor.py
Lars c588372f3a
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 21s
fix: Hybrid model - node-specific question spectrums override catalog (Phase 1 requirement)
2026-04-03 21:49:13 +02:00

437 lines
14 KiB
Python

"""
Workflow Executor (Phase 2)
Führt Workflows sequenziell aus (noch keine Verzweigung/Logik).
Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2)
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
import uuid
import logging
import json
from workflow_models import (
WorkflowGraph, NodeExecutionState, ExecutionResult,
NodeStatus, NormalizedSignal
)
from workflow_engine import parse_workflow_graph, get_execution_order
from question_augmenter import (
augment_prompt_with_questions,
parse_question_augmentations_from_jsonb
)
from result_container_parser import parse_result_container
from normalization_engine import normalize_all_signals, load_question_catalog
from db import get_db, get_cursor
logger = logging.getLogger(__name__)
async def execute_workflow(
workflow_id: str,
profile_id: str,
variables: Dict[str, Any],
openrouter_call_func, # Callback für LLM-Calls: async (prompt, model) -> str
enable_debug: bool = False
) -> ExecutionResult:
"""
Führt einen Workflow aus (sequenziell, ohne Verzweigung).
Phase 2: Linear execution in topological order.
Phase 3: Conditional branching basierend auf normalized_signals.
Args:
workflow_id: UUID des Workflows
profile_id: UUID des Profils
variables: Platzhalter-Werte (z.B. {"name": "Lars", ...})
openrouter_call_func: async (prompt, model) -> str
enable_debug: Debug-Modus
Returns:
ExecutionResult mit allen node_states
Beispiel:
>>> result = await execute_workflow(
... workflow_id="test-workflow",
... profile_id="test-profile",
... variables={"name": "Lars"},
... openrouter_call_func=my_llm_func
... )
>>> result.status
'completed'
>>> len(result.node_states)
3
"""
execution_id = str(uuid.uuid4())
started_at = datetime.utcnow().isoformat()
logger.info(f"Starting workflow execution: {execution_id} (workflow: {workflow_id})")
try:
# 1. Lade Workflow-Definition
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT graph FROM workflow_definitions WHERE id = %s AND active = true",
(workflow_id,)
)
row = cur.fetchone()
if not row:
raise ValueError(f"Workflow not found: {workflow_id}")
graph_json = row['graph']
# 2. Parse Graph
graph = parse_workflow_graph(graph_json)
logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
# 3. Topologische Sortierung
execution_order = get_execution_order(graph)
logger.info(f"Execution order: {execution_order}")
# 4. Lade Question Catalog
with get_db() as conn:
catalog = load_question_catalog(conn)
logger.debug(f"Loaded catalog: {len(catalog)} question types")
# 5. Execute Nodes sequenziell
node_states: List[NodeExecutionState] = []
context = {"variables": variables, "profile_id": profile_id}
for node_id in execution_order:
node = next(n for n in graph.nodes if n.id == node_id)
logger.info(f"Executing node: {node_id} (type: {node.type})")
node_state = await execute_node(
node=node,
context=context,
catalog=catalog,
openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug
)
node_states.append(node_state)
# Füge Ergebnisse zu Context hinzu (für späteren Zugriff in Phase 3)
context[f"node_{node_id}"] = {
"analysis_core": node_state.analysis_core,
"normalized_signals": [s.model_dump() for s in node_state.normalized_signals]
}
# 6. Aggregiere Ergebnisse
aggregated = aggregate_results(node_states)
# 7. Speichere Execution State
completed_at = datetime.utcnow().isoformat()
save_execution_state(
execution_id=execution_id,
workflow_id=workflow_id,
profile_id=profile_id,
node_states=node_states,
status="completed",
started_at=started_at,
completed_at=completed_at
)
logger.info(f"Workflow execution completed: {execution_id}")
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id,
status="completed",
node_states=node_states,
aggregated_result=aggregated,
started_at=started_at,
completed_at=completed_at
)
except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True)
# Speichere Failed State
completed_at = datetime.utcnow().isoformat()
save_execution_state(
execution_id=execution_id,
workflow_id=workflow_id,
profile_id=profile_id,
node_states=node_states if 'node_states' in locals() else [],
status="failed",
started_at=started_at,
completed_at=completed_at,
error=str(e)
)
return ExecutionResult(
execution_id=execution_id,
workflow_id=workflow_id,
status="failed",
node_states=node_states if 'node_states' in locals() else [],
aggregated_result={},
started_at=started_at,
completed_at=completed_at,
error=str(e)
)
async def execute_node(
node,
context: Dict[str, Any],
catalog: Dict[str, Dict],
openrouter_call_func,
enable_debug: bool = False
) -> NodeExecutionState:
"""
Führt einen einzelnen Knoten aus.
Args:
node: WorkflowNode (aus graph.nodes)
context: Execution context (variables, profile_id, prior results)
catalog: Question catalog
openrouter_call_func: LLM callback: async (prompt, model) -> str
enable_debug: Debug mode
Returns:
NodeExecutionState
Node Types:
- start/end: No-op
- analysis: Load prompt → augment → LLM → parse → normalize
- logic/join: Not implemented in Phase 2
"""
started_at = datetime.utcnow().isoformat()
try:
# Start/End Nodes: No-Op
if node.type in ["start", "end"]:
logger.debug(f"Node {node.id}: No-op ({node.type})")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# Analysis Nodes
if node.type == "analysis":
# 1. Lade Prompt
prompt_template = await load_prompt_template(node.prompt_slug, context)
logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'")
# 2. Parse question_augmentations
questions = []
if node.question_augmentations:
# Convert list of dicts to JSONB-like format for parser
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
logger.debug(f"Node {node.id}: {len(questions)} question augmentations")
# 3. Augment Prompt
if questions:
augmented_prompt = augment_prompt_with_questions(
base_prompt=prompt_template,
questions=questions
)
else:
augmented_prompt = prompt_template
# 4. LLM Call
logger.debug(f"Node {node.id}: Calling LLM")
llm_response = await openrouter_call_func(
augmented_prompt,
"anthropic/claude-sonnet-4" # Default model
)
# 5. Parse Result Container
parsed = parse_result_container(llm_response)
logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})")
# 6. Normalize Signals
normalized_signals = []
if parsed["decision_signals"]:
# Hybrid Model: Node-spezifische Questions überschreiben Catalog
node_catalog = catalog.copy()
if node.question_augmentations:
for q in node.question_augmentations:
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
node_catalog[q_dict['type']] = {
"answer_spectrum": q_dict['answer_spectrum'],
"normalization_rules": None # Node-Questions haben keine Synonyme
}
logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with node-specific spectrum")
normalized_signals = normalize_all_signals(
decision_signals=parsed["decision_signals"],
catalog_dict=node_catalog
)
logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals")
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.EXECUTED,
analysis_core=parsed["analysis_core"],
decision_signals=parsed["decision_signals"],
normalized_signals=normalized_signals,
reasoning_anchors=parsed.get("reasoning_anchors"),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
# Unbekannter Node-Typ (Phase 3: logic, join)
raise ValueError(f"Node type '{node.type}' not implemented in Phase 2")
except Exception as e:
logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True)
return NodeExecutionState(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e),
started_at=started_at,
completed_at=datetime.utcnow().isoformat()
)
async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str:
"""
Lädt Prompt-Template aus DB und resolved Platzhalter.
Args:
prompt_slug: Slug des Prompts (z.B. "pipeline_body")
context: {"variables": {"name": "Lars", ...}, "profile_id": "..."}
Returns:
Resolved prompt template
Beispiel:
>>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"})
>>> "{{name}}" not in template
True
"""
from placeholder_resolver import resolve_placeholders
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT template FROM ai_prompts WHERE slug = %s AND active = true",
(prompt_slug,)
)
row = cur.fetchone()
if not row:
raise ValueError(f"Prompt not found: {prompt_slug}")
template = row['template']
# Resolve Placeholders
profile_id = context.get("profile_id")
resolved = resolve_placeholders(
template=template,
profile_id=profile_id
)
# TODO Phase 3: Support custom variables from workflow context
return resolved
def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
"""
Aggregiert Ergebnisse aller Knoten.
Args:
node_states: Liste aller NodeExecutionState
Returns:
{
"combined_analysis": "## node_1\n...\n\n## node_2\n...",
"all_signals": [{question_type, normalized_value, status}, ...],
"total_nodes": 3,
"executed_nodes": 3,
"failed_nodes": 0
}
Beispiel:
>>> states = [
... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"),
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2")
... ]
>>> result = aggregate_results(states)
>>> "## n1" in result["combined_analysis"]
True
>>> result["executed_nodes"]
2
"""
combined_analysis = []
all_signals = []
for state in node_states:
if state.status == NodeStatus.EXECUTED and state.analysis_core:
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
if state.normalized_signals:
all_signals.extend([s.model_dump() for s in state.normalized_signals])
return {
"combined_analysis": "\n\n".join(combined_analysis),
"all_signals": all_signals,
"total_nodes": len(node_states),
"executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED),
"failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED)
}
def save_execution_state(
execution_id: str,
workflow_id: str,
profile_id: str,
node_states: List[NodeExecutionState],
status: str,
started_at: str,
completed_at: Optional[str] = None,
error: Optional[str] = None
):
"""
Speichert Execution State in workflow_executions.
Args:
execution_id: UUID der Execution
workflow_id: UUID des Workflows
profile_id: UUID des Profils
node_states: Liste aller NodeExecutionState
status: 'completed' | 'failed' | 'partial'
started_at: ISO timestamp
completed_at: ISO timestamp (optional)
error: Fehlermeldung (optional)
Beispiel:
>>> save_execution_state(
... execution_id="exec-123",
... workflow_id="wf-456",
... profile_id="prof-789",
... node_states=[],
... status="completed",
... started_at="2026-04-03T12:00:00"
... )
"""
# Serialize node_states to JSON
node_states_json = [s.model_dump() for s in node_states]
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
INSERT INTO workflow_executions
(id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
execution_id,
workflow_id,
profile_id,
status,
json.dumps(node_states_json),
json.dumps({"error": error} if error else {}),
started_at,
completed_at
))
conn.commit()
logger.info(f"Saved execution state: {execution_id} (status: {status})")