mitai-jinkendo/backend/prompt_executor.py
Lars ba474b0a57
Some checks failed
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Failing after 1s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Implement Server-Sent Events (SSE) for long-running workflows
Backend:
- workflow_executor.py: Add progress_callback parameter, emit events for execution_started, node_complete, execution_complete, execution_failed
- prompt_executor.py: Thread progress_callback through execute chain
- routers/prompts.py: New /execute-stream endpoint with asyncio Queue for SSE

Frontend:
- utils/api.js: New executeUnifiedPromptStream() function with EventSource
- pages/Analysis.jsx: Use SSE with live progress display (X/Y Nodes)

Fixes:
- No more gateway timeouts for complex workflows (10+ nodes)
- Live progress feedback for users
- Unlimited workflow complexity

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-04-13 11:23:16 +02:00

677 lines
24 KiB
Python

"""
Unified Prompt Executor (Issue #28 Phase 2)
Executes both base and pipeline-type prompts with:
- Dynamic placeholder resolution
- JSON output validation
- Multi-stage parallel execution
- Reference and inline prompt support
"""
import json
import re
from typing import Dict, Any, Optional
from db import get_db, get_cursor, r2d
from fastapi import HTTPException
from placeholder_resolver import get_catalog_row_for_key
def resolve_placeholders(template: str, variables: Dict[str, Any], debug_info: Optional[Dict] = None, catalog: Optional[Dict] = None) -> str:
"""
Replace {{placeholder}} with values from variables dict.
Modifiers (Katalog aus get_placeholder_catalog empfohlen):
- {{key|d}} — Wert — description (kurz)
- {{key|x}} — nur Erklärung (Katalogfeld ai_caption), ohne Zahlenwert
- {{key|d,x}} — Wert — description — Erklärung
Args:
template: String with {{key}} or {{key|modifiers}} placeholders
variables: Dict of key -> value mappings
debug_info: Optional dict to collect debug information
catalog: Optional placeholder catalog for descriptions (from get_placeholder_catalog)
Returns:
Template with placeholders replaced
"""
resolved = {}
unresolved = []
def replacer(match):
full_placeholder = match.group(1).strip()
# Parse key and modifiers (e.g., "weight_aktuell|d" -> key="weight_aktuell", modifiers="d")
parts = full_placeholder.split('|')
key = parts[0].strip()
modifiers = parts[1].strip() if len(parts) > 1 else ''
mods = {x.strip().lower() for x in modifiers.split(",") if x.strip()}
want_d = "d" in mods
want_x = "x" in mods
def _warn(msg: str):
if debug_info is not None:
debug_info.setdefault("warnings", []).append(msg)
row = get_catalog_row_for_key(catalog, key) if catalog else None
if want_x and not want_d:
if key not in variables:
if debug_info is not None:
unresolved.append(key)
return match.group(0)
expl = (row.get("ai_caption") or "").strip() if row else ""
if not expl and catalog is None:
_warn(f"Modifier |x für {key}: Katalog fehlt (ai_caption).")
out = expl
if debug_info is not None:
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
return out
if key not in variables:
if debug_info is not None:
unresolved.append(key)
return match.group(0)
value = variables[key]
if isinstance(value, (dict, list)):
resolved_value = json.dumps(value, ensure_ascii=False)
else:
resolved_value = str(value)
if not want_d and not want_x:
out = resolved_value
if debug_info is not None:
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
return out
parts = [resolved_value]
if want_d:
if row:
desc = (row.get("description") or "").strip()
if desc:
parts.append(desc)
else:
_warn(f"Modifier |d für {key}: Katalog fehlt (description).")
if want_x:
expl = (row.get("ai_caption") or "").strip() if row else ""
if expl:
parts.append(expl)
elif catalog is not None:
_warn(f"Modifier |x (mit |d) für {key}: ai_caption leer.")
out = "".join(parts)
if debug_info is not None:
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
return out
result = re.sub(r'\{\{([^}]+)\}\}', replacer, template)
# Store debug info
if debug_info is not None:
debug_info['resolved_placeholders'] = resolved
debug_info['unresolved_placeholders'] = unresolved
return result
def validate_json_output(output: str, schema: Optional[Dict] = None, debug_info: Optional[Dict] = None) -> Dict:
"""
Validate that output is valid JSON.
Unwraps Markdown-wrapped JSON (```json ... ```) if present.
Args:
output: String to validate
schema: Optional JSON schema to validate against (TODO: jsonschema library)
debug_info: Optional dict to attach to error for debugging
Returns:
Parsed JSON dict
Raises:
HTTPException: If output is not valid JSON (with debug info attached)
"""
# Try to unwrap Markdown code blocks (common AI pattern)
unwrapped = output.strip()
if unwrapped.startswith('```json'):
# Extract content between ```json and ```
lines = unwrapped.split('\n')
if len(lines) > 2 and lines[-1].strip() == '```':
unwrapped = '\n'.join(lines[1:-1])
elif unwrapped.startswith('```'):
# Generic code block
lines = unwrapped.split('\n')
if len(lines) > 2 and lines[-1].strip() == '```':
unwrapped = '\n'.join(lines[1:-1])
try:
parsed = json.loads(unwrapped)
# TODO: Add jsonschema validation if schema provided
return parsed
except json.JSONDecodeError as e:
error_detail = {
"error": f"AI returned invalid JSON: {str(e)}",
"raw_output": output[:500] + ('...' if len(output) > 500 else ''),
"unwrapped": unwrapped[:500] if unwrapped != output else None,
"output_length": len(output)
}
if debug_info:
error_detail["debug"] = debug_info
raise HTTPException(
status_code=500,
detail=error_detail
)
async def execute_prompt(
prompt_slug: str,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]:
"""
Execute a single prompt (base or pipeline type).
Args:
prompt_slug: Slug of prompt to execute
variables: Dict of variables for placeholder replacement
openrouter_call_func: Async function(prompt_text) -> response_text
enable_debug: If True, include debug information in response
Returns:
Dict with execution results:
{
"type": "base" | "pipeline",
"slug": "...",
"output": "..." | {...}, # String or parsed JSON
"stages": [...] # Only for pipeline type
"debug": {...} # Only if enable_debug=True
}
"""
# Load prompt from database
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT * FROM ai_prompts
WHERE slug = %s AND active = true""",
(prompt_slug,)
)
row = cur.fetchone()
if not row:
raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}")
prompt = r2d(row)
prompt_type = prompt.get('type', 'pipeline')
# Get catalog from variables if available (passed from execute_prompt_with_data)
catalog = variables.pop('_catalog', None) if '_catalog' in variables else None
if prompt_type == 'base':
# Base prompt: single execution with template
return await execute_base_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
elif prompt_type == 'pipeline':
# Pipeline prompt: multi-stage execution
return await execute_pipeline_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
elif prompt_type == 'workflow':
# Workflow prompt: graph-based execution (Phase 0: Foundation)
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog, progress_callback)
else:
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
async def execute_base_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None,
node_questions: Optional[list] = None # Phase 1: Knotengebundene Fragen
) -> Dict[str, Any]:
"""
Execute a base-type prompt (single template).
Phase 1: Unterstützt Fragenergänzungen (Hybridmodell)
- node_questions: Knotengebundene Fragen (Priorität 1)
- prompt.question_augmentations: Prompt-Defaults (Priorität 2)
"""
from question_augmenter import (
parse_question_augmentations_from_jsonb,
merge_question_augmentations,
augment_prompt_with_questions
)
from result_container_parser import parse_result_container_robust
template = prompt.get('template')
if not template:
raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}")
debug_info = {} if enable_debug else None
# Phase 1: Load question augmentations (Hybridmodell)
prompt_default_questions = None
if prompt.get('question_augmentations'):
try:
from workflow_models import QuestionAugmentation
prompt_default_questions = parse_question_augmentations_from_jsonb(
prompt['question_augmentations']
)
except Exception as e:
if enable_debug:
debug_info['question_augmentations_error'] = str(e)
# Merge question augmentations (Vorrangregel: Knoten > Prompt)
questions = merge_question_augmentations(node_questions, prompt_default_questions)
# Resolve placeholders (with optional catalog for |d modifier)
prompt_text = resolve_placeholders(template, variables, debug_info, catalog)
# Phase 1: Augment prompt with questions (if any)
if questions:
prompt_text = augment_prompt_with_questions(prompt_text, questions)
if enable_debug:
debug_info['question_augmentations_count'] = len(questions)
debug_info['question_types'] = [q.type for q in questions]
if enable_debug:
debug_info['template'] = template
# Volltext für Test-UI (Admin); sehr große Prompts nur weich begrenzen
_max = 512 * 1024
debug_info['final_prompt'] = (
prompt_text if len(prompt_text) <= _max else prompt_text[:_max] + "\n… [gekürzt, >512KB]"
)
debug_info['available_variables'] = list(variables.keys())
# Call AI
response = await openrouter_call_func(prompt_text)
if enable_debug:
debug_info['ai_response_length'] = len(response)
debug_info['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
# Phase 1: Parse structured result if questions were used
if questions:
expected_question_types = [q.type for q in questions]
container = parse_result_container_robust(response, expected_question_types)
if enable_debug:
debug_info['parsing_status'] = container['parsing_status']
debug_info['parsing_warnings'] = container.get('warnings', [])
output = container
output_format = 'structured_container' # New format type
else:
# Legacy behavior: Validate JSON if required
output_format = prompt.get('output_format', 'text')
if output_format == 'json':
output = validate_json_output(response, prompt.get('output_schema'), debug_info if enable_debug else None)
else:
output = response
result = {
"type": "base",
"slug": prompt['slug'],
"output": output,
"output_format": output_format
}
if enable_debug:
result['debug'] = debug_info
return result
async def execute_pipeline_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Execute a pipeline-type prompt (multi-stage).
Each stage's results are added to variables for next stage.
"""
stages = prompt.get('stages')
if not stages:
raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}")
# Parse stages if stored as JSON string
if isinstance(stages, str):
stages = json.loads(stages)
stage_results = []
context_vars = variables.copy()
pipeline_debug = [] if enable_debug else None
# Execute stages in order
for stage_def in sorted(stages, key=lambda s: s['stage']):
stage_num = stage_def['stage']
stage_prompts = stage_def.get('prompts', [])
if not stage_prompts:
continue
stage_debug = {} if enable_debug else None
if enable_debug:
stage_debug['stage'] = stage_num
stage_debug['available_variables'] = list(context_vars.keys())
stage_debug['prompts'] = []
# Execute all prompts in this stage (parallel concept, sequential impl for now)
stage_outputs = {}
for prompt_def in stage_prompts:
source = prompt_def.get('source')
output_key = prompt_def.get('output_key', f'stage{stage_num}')
output_format = prompt_def.get('output_format', 'text')
prompt_debug = {} if enable_debug else None
if source == 'reference':
# Reference to another prompt
ref_slug = prompt_def.get('slug')
if not ref_slug:
raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}")
if enable_debug:
prompt_debug['source'] = 'reference'
prompt_debug['ref_slug'] = ref_slug
# Load referenced prompt
result = await execute_prompt(ref_slug, context_vars, openrouter_call_func, enable_debug)
output = result['output']
if enable_debug and 'debug' in result:
prompt_debug['ref_debug'] = result['debug']
elif source == 'inline':
# Inline template
template = prompt_def.get('template')
if not template:
raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}")
placeholder_debug = {} if enable_debug else None
prompt_text = resolve_placeholders(template, context_vars, placeholder_debug, catalog)
if enable_debug:
prompt_debug['source'] = 'inline'
prompt_debug['template'] = template
_max = 512 * 1024
prompt_debug['final_prompt'] = (
prompt_text if len(prompt_text) <= _max else prompt_text[:_max] + "\n… [gekürzt, >512KB]"
)
prompt_debug.update(placeholder_debug)
response = await openrouter_call_func(prompt_text)
if enable_debug:
prompt_debug['ai_response_length'] = len(response)
prompt_debug['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
# Validate JSON if required
if output_format == 'json':
output = validate_json_output(response, prompt_def.get('output_schema'), prompt_debug if enable_debug else None)
else:
output = response
else:
raise HTTPException(400, f"Unknown prompt source: {source}")
# Store output with key
stage_outputs[output_key] = output
# Add to context for next stage
context_var_key = f'stage_{stage_num}_{output_key}'
context_vars[context_var_key] = output
if enable_debug:
prompt_debug['output_key'] = output_key
prompt_debug['context_var_key'] = context_var_key
stage_debug['prompts'].append(prompt_debug)
stage_results.append({
"stage": stage_num,
"outputs": stage_outputs
})
if enable_debug:
stage_debug['output'] = stage_outputs # Add outputs to debug info for value table
pipeline_debug.append(stage_debug)
# Final output is last stage's first output
final_output = stage_results[-1]['outputs'] if stage_results else {}
result = {
"type": "pipeline",
"slug": prompt['slug'],
"stages": stage_results,
"output": final_output,
"output_format": prompt.get('output_format', 'text')
}
if enable_debug:
result['debug'] = {
'initial_variables': list(variables.keys()),
'stages': pipeline_debug
}
return result
async def execute_prompt_with_data(
prompt_slug: str,
profile_id: str,
modules: Optional[Dict[str, bool]] = None,
timeframes: Optional[Dict[str, int]] = None,
openrouter_call_func = None,
enable_debug: bool = False,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]:
"""
Execute prompt with data loaded from database.
Args:
prompt_slug: Slug of prompt to execute
profile_id: User profile ID
modules: Dict of module -> enabled (e.g., {"körper": true})
timeframes: Dict of module -> days (e.g., {"körper": 30})
openrouter_call_func: Async function for AI calls
enable_debug: If True, include debug information in response
Returns:
Execution result dict
"""
from datetime import datetime, timedelta
from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog
# Build variables from data modules
variables = {
'profile_id': profile_id,
'today': datetime.now().strftime('%Y-%m-%d')
}
# Load placeholder catalog for |d / |x Modifier
try:
catalog = get_placeholder_catalog(profile_id)
except Exception as e:
catalog = None
print(f"Warning: Could not load placeholder catalog: {e}")
variables['_catalog'] = catalog # Will be popped in execute_prompt (can be None)
# Add PROCESSED placeholders (name, weight_trend, caliper_summary, etc.)
# This makes old-style prompts work with the new executor
try:
processed_placeholders = get_placeholder_example_values(profile_id)
# Remove {{ }} from keys (placeholder_resolver returns them with wrappers)
cleaned_placeholders = {
key.replace('{{', '').replace('}}', ''): value
for key, value in processed_placeholders.items()
}
variables.update(cleaned_placeholders)
except Exception as e:
# Continue even if placeholder resolution fails
if enable_debug:
variables['_placeholder_error'] = str(e)
# Load data for enabled modules
if modules:
with get_db() as conn:
cur = get_cursor(conn)
# Weight data
if modules.get('körper'):
days = timeframes.get('körper', 30)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, weight FROM weight_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['weight_data'] = [r2d(r) for r in cur.fetchall()]
# Nutrition data
if modules.get('ernährung'):
days = timeframes.get('ernährung', 30)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, kcal, protein_g, fat_g, carbs_g
FROM nutrition_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()]
# Activity data
if modules.get('training'):
days = timeframes.get('training', 14)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, activity_type, duration_min, kcal_active, hr_avg
FROM activity_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['activity_data'] = [r2d(r) for r in cur.fetchall()]
# Sleep data
if modules.get('schlaf'):
days = timeframes.get('schlaf', 14)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, sleep_segments, source
FROM sleep_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['sleep_data'] = [r2d(r) for r in cur.fetchall()]
# Vitals data
if modules.get('vitalwerte'):
days = timeframes.get('vitalwerte', 7)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
# Baseline vitals
cur.execute(
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
FROM vitals_baseline
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()]
# Blood pressure
cur.execute(
"""SELECT measured_at, systolic, diastolic, pulse
FROM blood_pressure_log
WHERE profile_id = %s AND measured_at >= %s
ORDER BY measured_at DESC""",
(profile_id, since + ' 00:00:00')
)
variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()]
# Mental/Goals (no timeframe, just current state)
if modules.get('mentales') or modules.get('ziele'):
# TODO: Add mental state / goals data when implemented
variables['goals_data'] = []
# Execute prompt
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug, progress_callback)
async def execute_workflow_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None,
progress_callback = None # NEW: Optional callback für SSE Progress-Updates
) -> Dict[str, Any]:
"""
Execute a workflow-type prompt (graph-based execution).
Phase 2-4: Sequenzielle Workflow-Execution, conditional branching, path consolidation
Phase 5: Graph aus ai_prompts.graph_data (nicht workflow_definitions)
Args:
prompt: Prompt dict from database (must have 'graph_data' field)
variables: Dict of variables for placeholder replacement
openrouter_call_func: Async function(prompt_text, model) -> response_text
enable_debug: If True, include debug information in response
catalog: Optional placeholder catalog
Returns:
Dict with execution results:
{
"type": "workflow",
"execution_id": "...",
"status": "completed" | "failed",
"aggregated_result": {...},
"node_states": [...], # Only if enable_debug=True
"error": "..." # Only if status=failed
}
"""
from workflow_executor import execute_workflow
# Phase 5: Graph aus ai_prompts.graph_data
graph_data = prompt.get('graph_data')
if not graph_data:
raise HTTPException(400, "Workflow-Prompt fehlt 'graph_data' Feld")
# Execute workflow (mit graph_data statt workflow_id)
result = await execute_workflow(
graph_data=graph_data, # NEU: Direkt graph_data übergeben
profile_id=variables.get('profile_id', 'unknown'), # From context
variables=variables,
openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug,
progress_callback=progress_callback # NEW: Progress-Callbacks durchreichen
)
# Convert ExecutionResult to dict for API response
response = {
"type": "workflow",
"execution_id": result.execution_id,
"status": result.status,
"aggregated_result": result.aggregated_result
}
if enable_debug:
response["node_states"] = [s.model_dump() for s in result.node_states]
if result.error:
response["error"] = result.error
return response