Backend: - workflow_executor.py: Add progress_callback parameter, emit events for execution_started, node_complete, execution_complete, execution_failed - prompt_executor.py: Thread progress_callback through execute chain - routers/prompts.py: New /execute-stream endpoint with asyncio Queue for SSE Frontend: - utils/api.js: New executeUnifiedPromptStream() function with EventSource - pages/Analysis.jsx: Use SSE with live progress display (X/Y Nodes) Fixes: - No more gateway timeouts for complex workflows (10+ nodes) - Live progress feedback for users - Unlimited workflow complexity Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
677 lines
24 KiB
Python
677 lines
24 KiB
Python
"""
|
|
Unified Prompt Executor (Issue #28 Phase 2)
|
|
|
|
Executes both base and pipeline-type prompts with:
|
|
- Dynamic placeholder resolution
|
|
- JSON output validation
|
|
- Multi-stage parallel execution
|
|
- Reference and inline prompt support
|
|
"""
|
|
import json
|
|
import re
|
|
from typing import Dict, Any, Optional
|
|
from db import get_db, get_cursor, r2d
|
|
from fastapi import HTTPException
|
|
from placeholder_resolver import get_catalog_row_for_key
|
|
|
|
|
|
def resolve_placeholders(template: str, variables: Dict[str, Any], debug_info: Optional[Dict] = None, catalog: Optional[Dict] = None) -> str:
|
|
"""
|
|
Replace {{placeholder}} with values from variables dict.
|
|
|
|
Modifiers (Katalog aus get_placeholder_catalog empfohlen):
|
|
- {{key|d}} — Wert — description (kurz)
|
|
- {{key|x}} — nur Erklärung (Katalogfeld ai_caption), ohne Zahlenwert
|
|
- {{key|d,x}} — Wert — description — Erklärung
|
|
|
|
Args:
|
|
template: String with {{key}} or {{key|modifiers}} placeholders
|
|
variables: Dict of key -> value mappings
|
|
debug_info: Optional dict to collect debug information
|
|
catalog: Optional placeholder catalog for descriptions (from get_placeholder_catalog)
|
|
|
|
Returns:
|
|
Template with placeholders replaced
|
|
"""
|
|
resolved = {}
|
|
unresolved = []
|
|
|
|
def replacer(match):
|
|
full_placeholder = match.group(1).strip()
|
|
|
|
# Parse key and modifiers (e.g., "weight_aktuell|d" -> key="weight_aktuell", modifiers="d")
|
|
parts = full_placeholder.split('|')
|
|
key = parts[0].strip()
|
|
modifiers = parts[1].strip() if len(parts) > 1 else ''
|
|
mods = {x.strip().lower() for x in modifiers.split(",") if x.strip()}
|
|
want_d = "d" in mods
|
|
want_x = "x" in mods
|
|
|
|
def _warn(msg: str):
|
|
if debug_info is not None:
|
|
debug_info.setdefault("warnings", []).append(msg)
|
|
|
|
row = get_catalog_row_for_key(catalog, key) if catalog else None
|
|
|
|
if want_x and not want_d:
|
|
if key not in variables:
|
|
if debug_info is not None:
|
|
unresolved.append(key)
|
|
return match.group(0)
|
|
expl = (row.get("ai_caption") or "").strip() if row else ""
|
|
if not expl and catalog is None:
|
|
_warn(f"Modifier |x für {key}: Katalog fehlt (ai_caption).")
|
|
out = expl
|
|
if debug_info is not None:
|
|
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
|
|
return out
|
|
|
|
if key not in variables:
|
|
if debug_info is not None:
|
|
unresolved.append(key)
|
|
return match.group(0)
|
|
|
|
value = variables[key]
|
|
if isinstance(value, (dict, list)):
|
|
resolved_value = json.dumps(value, ensure_ascii=False)
|
|
else:
|
|
resolved_value = str(value)
|
|
|
|
if not want_d and not want_x:
|
|
out = resolved_value
|
|
if debug_info is not None:
|
|
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
|
|
return out
|
|
|
|
parts = [resolved_value]
|
|
if want_d:
|
|
if row:
|
|
desc = (row.get("description") or "").strip()
|
|
if desc:
|
|
parts.append(desc)
|
|
else:
|
|
_warn(f"Modifier |d für {key}: Katalog fehlt (description).")
|
|
if want_x:
|
|
expl = (row.get("ai_caption") or "").strip() if row else ""
|
|
if expl:
|
|
parts.append(expl)
|
|
elif catalog is not None:
|
|
_warn(f"Modifier |x (mit |d) für {key}: ai_caption leer.")
|
|
|
|
out = " — ".join(parts)
|
|
if debug_info is not None:
|
|
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
|
|
return out
|
|
|
|
result = re.sub(r'\{\{([^}]+)\}\}', replacer, template)
|
|
|
|
# Store debug info
|
|
if debug_info is not None:
|
|
debug_info['resolved_placeholders'] = resolved
|
|
debug_info['unresolved_placeholders'] = unresolved
|
|
|
|
return result
|
|
|
|
|
|
def validate_json_output(output: str, schema: Optional[Dict] = None, debug_info: Optional[Dict] = None) -> Dict:
|
|
"""
|
|
Validate that output is valid JSON.
|
|
|
|
Unwraps Markdown-wrapped JSON (```json ... ```) if present.
|
|
|
|
Args:
|
|
output: String to validate
|
|
schema: Optional JSON schema to validate against (TODO: jsonschema library)
|
|
debug_info: Optional dict to attach to error for debugging
|
|
|
|
Returns:
|
|
Parsed JSON dict
|
|
|
|
Raises:
|
|
HTTPException: If output is not valid JSON (with debug info attached)
|
|
"""
|
|
# Try to unwrap Markdown code blocks (common AI pattern)
|
|
unwrapped = output.strip()
|
|
if unwrapped.startswith('```json'):
|
|
# Extract content between ```json and ```
|
|
lines = unwrapped.split('\n')
|
|
if len(lines) > 2 and lines[-1].strip() == '```':
|
|
unwrapped = '\n'.join(lines[1:-1])
|
|
elif unwrapped.startswith('```'):
|
|
# Generic code block
|
|
lines = unwrapped.split('\n')
|
|
if len(lines) > 2 and lines[-1].strip() == '```':
|
|
unwrapped = '\n'.join(lines[1:-1])
|
|
|
|
try:
|
|
parsed = json.loads(unwrapped)
|
|
# TODO: Add jsonschema validation if schema provided
|
|
return parsed
|
|
except json.JSONDecodeError as e:
|
|
error_detail = {
|
|
"error": f"AI returned invalid JSON: {str(e)}",
|
|
"raw_output": output[:500] + ('...' if len(output) > 500 else ''),
|
|
"unwrapped": unwrapped[:500] if unwrapped != output else None,
|
|
"output_length": len(output)
|
|
}
|
|
if debug_info:
|
|
error_detail["debug"] = debug_info
|
|
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=error_detail
|
|
)
|
|
|
|
|
|
async def execute_prompt(
|
|
prompt_slug: str,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func,
|
|
enable_debug: bool = False,
|
|
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single prompt (base or pipeline type).
|
|
|
|
Args:
|
|
prompt_slug: Slug of prompt to execute
|
|
variables: Dict of variables for placeholder replacement
|
|
openrouter_call_func: Async function(prompt_text) -> response_text
|
|
enable_debug: If True, include debug information in response
|
|
|
|
Returns:
|
|
Dict with execution results:
|
|
{
|
|
"type": "base" | "pipeline",
|
|
"slug": "...",
|
|
"output": "..." | {...}, # String or parsed JSON
|
|
"stages": [...] # Only for pipeline type
|
|
"debug": {...} # Only if enable_debug=True
|
|
}
|
|
"""
|
|
# Load prompt from database
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""SELECT * FROM ai_prompts
|
|
WHERE slug = %s AND active = true""",
|
|
(prompt_slug,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}")
|
|
|
|
prompt = r2d(row)
|
|
|
|
prompt_type = prompt.get('type', 'pipeline')
|
|
|
|
# Get catalog from variables if available (passed from execute_prompt_with_data)
|
|
catalog = variables.pop('_catalog', None) if '_catalog' in variables else None
|
|
|
|
if prompt_type == 'base':
|
|
# Base prompt: single execution with template
|
|
return await execute_base_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
|
|
|
|
elif prompt_type == 'pipeline':
|
|
# Pipeline prompt: multi-stage execution
|
|
return await execute_pipeline_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
|
|
|
|
elif prompt_type == 'workflow':
|
|
# Workflow prompt: graph-based execution (Phase 0: Foundation)
|
|
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback)
|
|
|
|
else:
|
|
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
|
|
|
|
|
|
async def execute_base_prompt(
|
|
prompt: Dict,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func,
|
|
enable_debug: bool = False,
|
|
catalog: Optional[Dict] = None,
|
|
node_questions: Optional[list] = None # Phase 1: Knotengebundene Fragen
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a base-type prompt (single template).
|
|
|
|
Phase 1: Unterstützt Fragenergänzungen (Hybridmodell)
|
|
- node_questions: Knotengebundene Fragen (Priorität 1)
|
|
- prompt.question_augmentations: Prompt-Defaults (Priorität 2)
|
|
"""
|
|
from question_augmenter import (
|
|
parse_question_augmentations_from_jsonb,
|
|
merge_question_augmentations,
|
|
augment_prompt_with_questions
|
|
)
|
|
from result_container_parser import parse_result_container_robust
|
|
|
|
template = prompt.get('template')
|
|
if not template:
|
|
raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}")
|
|
|
|
debug_info = {} if enable_debug else None
|
|
|
|
# Phase 1: Load question augmentations (Hybridmodell)
|
|
prompt_default_questions = None
|
|
if prompt.get('question_augmentations'):
|
|
try:
|
|
from workflow_models import QuestionAugmentation
|
|
prompt_default_questions = parse_question_augmentations_from_jsonb(
|
|
prompt['question_augmentations']
|
|
)
|
|
except Exception as e:
|
|
if enable_debug:
|
|
debug_info['question_augmentations_error'] = str(e)
|
|
|
|
# Merge question augmentations (Vorrangregel: Knoten > Prompt)
|
|
questions = merge_question_augmentations(node_questions, prompt_default_questions)
|
|
|
|
# Resolve placeholders (with optional catalog for |d modifier)
|
|
prompt_text = resolve_placeholders(template, variables, debug_info, catalog)
|
|
|
|
# Phase 1: Augment prompt with questions (if any)
|
|
if questions:
|
|
prompt_text = augment_prompt_with_questions(prompt_text, questions)
|
|
if enable_debug:
|
|
debug_info['question_augmentations_count'] = len(questions)
|
|
debug_info['question_types'] = [q.type for q in questions]
|
|
|
|
if enable_debug:
|
|
debug_info['template'] = template
|
|
# Volltext für Test-UI (Admin); sehr große Prompts nur weich begrenzen
|
|
_max = 512 * 1024
|
|
debug_info['final_prompt'] = (
|
|
prompt_text if len(prompt_text) <= _max else prompt_text[:_max] + "\n… [gekürzt, >512KB]"
|
|
)
|
|
debug_info['available_variables'] = list(variables.keys())
|
|
|
|
# Call AI
|
|
response = await openrouter_call_func(prompt_text)
|
|
|
|
if enable_debug:
|
|
debug_info['ai_response_length'] = len(response)
|
|
debug_info['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
|
|
|
|
# Phase 1: Parse structured result if questions were used
|
|
if questions:
|
|
expected_question_types = [q.type for q in questions]
|
|
container = parse_result_container_robust(response, expected_question_types)
|
|
|
|
if enable_debug:
|
|
debug_info['parsing_status'] = container['parsing_status']
|
|
debug_info['parsing_warnings'] = container.get('warnings', [])
|
|
|
|
output = container
|
|
output_format = 'structured_container' # New format type
|
|
else:
|
|
# Legacy behavior: Validate JSON if required
|
|
output_format = prompt.get('output_format', 'text')
|
|
if output_format == 'json':
|
|
output = validate_json_output(response, prompt.get('output_schema'), debug_info if enable_debug else None)
|
|
else:
|
|
output = response
|
|
|
|
result = {
|
|
"type": "base",
|
|
"slug": prompt['slug'],
|
|
"output": output,
|
|
"output_format": output_format
|
|
}
|
|
|
|
if enable_debug:
|
|
result['debug'] = debug_info
|
|
|
|
return result
|
|
|
|
|
|
async def execute_pipeline_prompt(
|
|
prompt: Dict,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func,
|
|
enable_debug: bool = False,
|
|
catalog: Optional[Dict] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a pipeline-type prompt (multi-stage).
|
|
|
|
Each stage's results are added to variables for next stage.
|
|
"""
|
|
stages = prompt.get('stages')
|
|
if not stages:
|
|
raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}")
|
|
|
|
# Parse stages if stored as JSON string
|
|
if isinstance(stages, str):
|
|
stages = json.loads(stages)
|
|
|
|
stage_results = []
|
|
context_vars = variables.copy()
|
|
pipeline_debug = [] if enable_debug else None
|
|
|
|
# Execute stages in order
|
|
for stage_def in sorted(stages, key=lambda s: s['stage']):
|
|
stage_num = stage_def['stage']
|
|
stage_prompts = stage_def.get('prompts', [])
|
|
|
|
if not stage_prompts:
|
|
continue
|
|
|
|
stage_debug = {} if enable_debug else None
|
|
if enable_debug:
|
|
stage_debug['stage'] = stage_num
|
|
stage_debug['available_variables'] = list(context_vars.keys())
|
|
stage_debug['prompts'] = []
|
|
|
|
# Execute all prompts in this stage (parallel concept, sequential impl for now)
|
|
stage_outputs = {}
|
|
|
|
for prompt_def in stage_prompts:
|
|
source = prompt_def.get('source')
|
|
output_key = prompt_def.get('output_key', f'stage{stage_num}')
|
|
output_format = prompt_def.get('output_format', 'text')
|
|
|
|
prompt_debug = {} if enable_debug else None
|
|
|
|
if source == 'reference':
|
|
# Reference to another prompt
|
|
ref_slug = prompt_def.get('slug')
|
|
if not ref_slug:
|
|
raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}")
|
|
|
|
if enable_debug:
|
|
prompt_debug['source'] = 'reference'
|
|
prompt_debug['ref_slug'] = ref_slug
|
|
|
|
# Load referenced prompt
|
|
result = await execute_prompt(ref_slug, context_vars, openrouter_call_func, enable_debug)
|
|
output = result['output']
|
|
|
|
if enable_debug and 'debug' in result:
|
|
prompt_debug['ref_debug'] = result['debug']
|
|
|
|
elif source == 'inline':
|
|
# Inline template
|
|
template = prompt_def.get('template')
|
|
if not template:
|
|
raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}")
|
|
|
|
placeholder_debug = {} if enable_debug else None
|
|
prompt_text = resolve_placeholders(template, context_vars, placeholder_debug, catalog)
|
|
|
|
if enable_debug:
|
|
prompt_debug['source'] = 'inline'
|
|
prompt_debug['template'] = template
|
|
_max = 512 * 1024
|
|
prompt_debug['final_prompt'] = (
|
|
prompt_text if len(prompt_text) <= _max else prompt_text[:_max] + "\n… [gekürzt, >512KB]"
|
|
)
|
|
prompt_debug.update(placeholder_debug)
|
|
|
|
response = await openrouter_call_func(prompt_text)
|
|
|
|
if enable_debug:
|
|
prompt_debug['ai_response_length'] = len(response)
|
|
prompt_debug['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
|
|
|
|
# Validate JSON if required
|
|
if output_format == 'json':
|
|
output = validate_json_output(response, prompt_def.get('output_schema'), prompt_debug if enable_debug else None)
|
|
else:
|
|
output = response
|
|
|
|
else:
|
|
raise HTTPException(400, f"Unknown prompt source: {source}")
|
|
|
|
# Store output with key
|
|
stage_outputs[output_key] = output
|
|
|
|
# Add to context for next stage
|
|
context_var_key = f'stage_{stage_num}_{output_key}'
|
|
context_vars[context_var_key] = output
|
|
|
|
if enable_debug:
|
|
prompt_debug['output_key'] = output_key
|
|
prompt_debug['context_var_key'] = context_var_key
|
|
stage_debug['prompts'].append(prompt_debug)
|
|
|
|
stage_results.append({
|
|
"stage": stage_num,
|
|
"outputs": stage_outputs
|
|
})
|
|
|
|
if enable_debug:
|
|
stage_debug['output'] = stage_outputs # Add outputs to debug info for value table
|
|
pipeline_debug.append(stage_debug)
|
|
|
|
# Final output is last stage's first output
|
|
final_output = stage_results[-1]['outputs'] if stage_results else {}
|
|
|
|
result = {
|
|
"type": "pipeline",
|
|
"slug": prompt['slug'],
|
|
"stages": stage_results,
|
|
"output": final_output,
|
|
"output_format": prompt.get('output_format', 'text')
|
|
}
|
|
|
|
if enable_debug:
|
|
result['debug'] = {
|
|
'initial_variables': list(variables.keys()),
|
|
'stages': pipeline_debug
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
async def execute_prompt_with_data(
|
|
prompt_slug: str,
|
|
profile_id: str,
|
|
modules: Optional[Dict[str, bool]] = None,
|
|
timeframes: Optional[Dict[str, int]] = None,
|
|
openrouter_call_func = None,
|
|
enable_debug: bool = False,
|
|
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute prompt with data loaded from database.
|
|
|
|
Args:
|
|
prompt_slug: Slug of prompt to execute
|
|
profile_id: User profile ID
|
|
modules: Dict of module -> enabled (e.g., {"körper": true})
|
|
timeframes: Dict of module -> days (e.g., {"körper": 30})
|
|
openrouter_call_func: Async function for AI calls
|
|
enable_debug: If True, include debug information in response
|
|
|
|
Returns:
|
|
Execution result dict
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog
|
|
|
|
# Build variables from data modules
|
|
variables = {
|
|
'profile_id': profile_id,
|
|
'today': datetime.now().strftime('%Y-%m-%d')
|
|
}
|
|
|
|
# Load placeholder catalog for |d / |x Modifier
|
|
try:
|
|
catalog = get_placeholder_catalog(profile_id)
|
|
except Exception as e:
|
|
catalog = None
|
|
print(f"Warning: Could not load placeholder catalog: {e}")
|
|
|
|
variables['_catalog'] = catalog # Will be popped in execute_prompt (can be None)
|
|
|
|
# Add PROCESSED placeholders (name, weight_trend, caliper_summary, etc.)
|
|
# This makes old-style prompts work with the new executor
|
|
try:
|
|
processed_placeholders = get_placeholder_example_values(profile_id)
|
|
# Remove {{ }} from keys (placeholder_resolver returns them with wrappers)
|
|
cleaned_placeholders = {
|
|
key.replace('{{', '').replace('}}', ''): value
|
|
for key, value in processed_placeholders.items()
|
|
}
|
|
variables.update(cleaned_placeholders)
|
|
except Exception as e:
|
|
# Continue even if placeholder resolution fails
|
|
if enable_debug:
|
|
variables['_placeholder_error'] = str(e)
|
|
|
|
# Load data for enabled modules
|
|
if modules:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Weight data
|
|
if modules.get('körper'):
|
|
days = timeframes.get('körper', 30)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, weight FROM weight_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['weight_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Nutrition data
|
|
if modules.get('ernährung'):
|
|
days = timeframes.get('ernährung', 30)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, kcal, protein_g, fat_g, carbs_g
|
|
FROM nutrition_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Activity data
|
|
if modules.get('training'):
|
|
days = timeframes.get('training', 14)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, activity_type, duration_min, kcal_active, hr_avg
|
|
FROM activity_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['activity_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Sleep data
|
|
if modules.get('schlaf'):
|
|
days = timeframes.get('schlaf', 14)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, sleep_segments, source
|
|
FROM sleep_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['sleep_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Vitals data
|
|
if modules.get('vitalwerte'):
|
|
days = timeframes.get('vitalwerte', 7)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
|
|
# Baseline vitals
|
|
cur.execute(
|
|
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
|
|
FROM vitals_baseline
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Blood pressure
|
|
cur.execute(
|
|
"""SELECT measured_at, systolic, diastolic, pulse
|
|
FROM blood_pressure_log
|
|
WHERE profile_id = %s AND measured_at >= %s
|
|
ORDER BY measured_at DESC""",
|
|
(profile_id, since + ' 00:00:00')
|
|
)
|
|
variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Mental/Goals (no timeframe, just current state)
|
|
if modules.get('mentales') or modules.get('ziele'):
|
|
# TODO: Add mental state / goals data when implemented
|
|
variables['goals_data'] = []
|
|
|
|
# Execute prompt
|
|
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback)
|
|
|
|
|
|
async def execute_workflow_prompt(
|
|
prompt: Dict,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func,
|
|
enable_debug: bool = False,
|
|
catalog: Optional[Dict] = None,
|
|
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a workflow-type prompt (graph-based execution).
|
|
|
|
Phase 2-4: Sequenzielle Workflow-Execution, conditional branching, path consolidation
|
|
Phase 5: Graph aus ai_prompts.graph_data (nicht workflow_definitions)
|
|
|
|
Args:
|
|
prompt: Prompt dict from database (must have 'graph_data' field)
|
|
variables: Dict of variables for placeholder replacement
|
|
openrouter_call_func: Async function(prompt_text, model) -> response_text
|
|
enable_debug: If True, include debug information in response
|
|
catalog: Optional placeholder catalog
|
|
|
|
Returns:
|
|
Dict with execution results:
|
|
{
|
|
"type": "workflow",
|
|
"execution_id": "...",
|
|
"status": "completed" | "failed",
|
|
"aggregated_result": {...},
|
|
"node_states": [...], # Only if enable_debug=True
|
|
"error": "..." # Only if status=failed
|
|
}
|
|
"""
|
|
from workflow_executor import execute_workflow
|
|
|
|
# Phase 5: Graph aus ai_prompts.graph_data
|
|
graph_data = prompt.get('graph_data')
|
|
if not graph_data:
|
|
raise HTTPException(400, "Workflow-Prompt fehlt 'graph_data' Feld")
|
|
|
|
# Execute workflow (mit graph_data statt workflow_id)
|
|
result = await execute_workflow(
|
|
graph_data=graph_data, # NEU: Direkt graph_data übergeben
|
|
profile_id=variables.get('profile_id', 'unknown'), # From context
|
|
variables=variables,
|
|
openrouter_call_func=openrouter_call_func,
|
|
enable_debug=enable_debug,
|
|
progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen
|
|
)
|
|
|
|
# Convert ExecutionResult to dict for API response
|
|
response = {
|
|
"type": "workflow",
|
|
"execution_id": result.execution_id,
|
|
"status": result.status,
|
|
"aggregated_result": result.aggregated_result
|
|
}
|
|
|
|
if enable_debug:
|
|
response["node_states"] = [s.model_dump() for s in result.node_states]
|
|
|
|
if result.error:
|
|
response["error"] = result.error
|
|
|
|
return response
|