mitai-jinkendo/backend/prompt_executor.py
Lars 4b6e1bed11
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance OpenRouter API interaction and error handling
- Increased the maximum token limit in the `call_openrouter` function from 1500 to 4096 to allow for more extensive responses.
- Implemented robust error handling for API requests, including timeout and request errors, with detailed HTTP exceptions for better debugging.
- Improved JSON response handling to ensure valid data is returned, with specific error messages for missing content in the response.
- Enhanced the overall reliability of the OpenRouter API integration, providing clearer feedback for users in case of issues.

These changes improve the user experience by ensuring more comprehensive responses and clearer error reporting during API interactions.
2026-04-12 11:03:07 +02:00

673 lines
24 KiB
Python

"""
Unified Prompt Executor (Issue #28 Phase 2)
Executes both base and pipeline-type prompts with:
- Dynamic placeholder resolution
- JSON output validation
- Multi-stage parallel execution
- Reference and inline prompt support
"""
import json
import re
from typing import Dict, Any, Optional
from db import get_db, get_cursor, r2d
from fastapi import HTTPException
from placeholder_resolver import get_catalog_row_for_key
def resolve_placeholders(template: str, variables: Dict[str, Any], debug_info: Optional[Dict] = None, catalog: Optional[Dict] = None) -> str:
"""
Replace {{placeholder}} with values from variables dict.
Modifiers (Katalog aus get_placeholder_catalog empfohlen):
- {{key|d}} — Wert — description (kurz)
- {{key|x}} — nur Erklärung (Katalogfeld ai_caption), ohne Zahlenwert
- {{key|d,x}} — Wert — description — Erklärung
Args:
template: String with {{key}} or {{key|modifiers}} placeholders
variables: Dict of key -> value mappings
debug_info: Optional dict to collect debug information
catalog: Optional placeholder catalog for descriptions (from get_placeholder_catalog)
Returns:
Template with placeholders replaced
"""
resolved = {}
unresolved = []
def replacer(match):
full_placeholder = match.group(1).strip()
# Parse key and modifiers (e.g., "weight_aktuell|d" -> key="weight_aktuell", modifiers="d")
parts = full_placeholder.split('|')
key = parts[0].strip()
modifiers = parts[1].strip() if len(parts) > 1 else ''
mods = {x.strip().lower() for x in modifiers.split(",") if x.strip()}
want_d = "d" in mods
want_x = "x" in mods
def _warn(msg: str):
if debug_info is not None:
debug_info.setdefault("warnings", []).append(msg)
row = get_catalog_row_for_key(catalog, key) if catalog else None
if want_x and not want_d:
if key not in variables:
if debug_info is not None:
unresolved.append(key)
return match.group(0)
expl = (row.get("ai_caption") or "").strip() if row else ""
if not expl and catalog is None:
_warn(f"Modifier |x für {key}: Katalog fehlt (ai_caption).")
out = expl
if debug_info is not None:
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
return out
if key not in variables:
if debug_info is not None:
unresolved.append(key)
return match.group(0)
value = variables[key]
if isinstance(value, (dict, list)):
resolved_value = json.dumps(value, ensure_ascii=False)
else:
resolved_value = str(value)
if not want_d and not want_x:
out = resolved_value
if debug_info is not None:
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
return out
parts = [resolved_value]
if want_d:
if row:
desc = (row.get("description") or "").strip()
if desc:
parts.append(desc)
else:
_warn(f"Modifier |d für {key}: Katalog fehlt (description).")
if want_x:
expl = (row.get("ai_caption") or "").strip() if row else ""
if expl:
parts.append(expl)
elif catalog is not None:
_warn(f"Modifier |x (mit |d) für {key}: ai_caption leer.")
out = "".join(parts)
if debug_info is not None:
resolved[key] = out[:100] + ("..." if len(out) > 100 else "")
return out
result = re.sub(r'\{\{([^}]+)\}\}', replacer, template)
# Store debug info
if debug_info is not None:
debug_info['resolved_placeholders'] = resolved
debug_info['unresolved_placeholders'] = unresolved
return result
def validate_json_output(output: str, schema: Optional[Dict] = None, debug_info: Optional[Dict] = None) -> Dict:
"""
Validate that output is valid JSON.
Unwraps Markdown-wrapped JSON (```json ... ```) if present.
Args:
output: String to validate
schema: Optional JSON schema to validate against (TODO: jsonschema library)
debug_info: Optional dict to attach to error for debugging
Returns:
Parsed JSON dict
Raises:
HTTPException: If output is not valid JSON (with debug info attached)
"""
# Try to unwrap Markdown code blocks (common AI pattern)
unwrapped = output.strip()
if unwrapped.startswith('```json'):
# Extract content between ```json and ```
lines = unwrapped.split('\n')
if len(lines) > 2 and lines[-1].strip() == '```':
unwrapped = '\n'.join(lines[1:-1])
elif unwrapped.startswith('```'):
# Generic code block
lines = unwrapped.split('\n')
if len(lines) > 2 and lines[-1].strip() == '```':
unwrapped = '\n'.join(lines[1:-1])
try:
parsed = json.loads(unwrapped)
# TODO: Add jsonschema validation if schema provided
return parsed
except json.JSONDecodeError as e:
error_detail = {
"error": f"AI returned invalid JSON: {str(e)}",
"raw_output": output[:500] + ('...' if len(output) > 500 else ''),
"unwrapped": unwrapped[:500] if unwrapped != output else None,
"output_length": len(output)
}
if debug_info:
error_detail["debug"] = debug_info
raise HTTPException(
status_code=500,
detail=error_detail
)
async def execute_prompt(
prompt_slug: str,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False
) -> Dict[str, Any]:
"""
Execute a single prompt (base or pipeline type).
Args:
prompt_slug: Slug of prompt to execute
variables: Dict of variables for placeholder replacement
openrouter_call_func: Async function(prompt_text) -> response_text
enable_debug: If True, include debug information in response
Returns:
Dict with execution results:
{
"type": "base" | "pipeline",
"slug": "...",
"output": "..." | {...}, # String or parsed JSON
"stages": [...] # Only for pipeline type
"debug": {...} # Only if enable_debug=True
}
"""
# Load prompt from database
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT * FROM ai_prompts
WHERE slug = %s AND active = true""",
(prompt_slug,)
)
row = cur.fetchone()
if not row:
raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}")
prompt = r2d(row)
prompt_type = prompt.get('type', 'pipeline')
# Get catalog from variables if available (passed from execute_prompt_with_data)
catalog = variables.pop('_catalog', None) if '_catalog' in variables else None
if prompt_type == 'base':
# Base prompt: single execution with template
return await execute_base_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
elif prompt_type == 'pipeline':
# Pipeline prompt: multi-stage execution
return await execute_pipeline_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
elif prompt_type == 'workflow':
# Workflow prompt: graph-based execution (Phase 0: Foundation)
return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog)
else:
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
async def execute_base_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None,
node_questions: Optional[list] = None # Phase 1: Knotengebundene Fragen
) -> Dict[str, Any]:
"""
Execute a base-type prompt (single template).
Phase 1: Unterstützt Fragenergänzungen (Hybridmodell)
- node_questions: Knotengebundene Fragen (Priorität 1)
- prompt.question_augmentations: Prompt-Defaults (Priorität 2)
"""
from question_augmenter import (
parse_question_augmentations_from_jsonb,
merge_question_augmentations,
augment_prompt_with_questions
)
from result_container_parser import parse_result_container_robust
template = prompt.get('template')
if not template:
raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}")
debug_info = {} if enable_debug else None
# Phase 1: Load question augmentations (Hybridmodell)
prompt_default_questions = None
if prompt.get('question_augmentations'):
try:
from workflow_models import QuestionAugmentation
prompt_default_questions = parse_question_augmentations_from_jsonb(
prompt['question_augmentations']
)
except Exception as e:
if enable_debug:
debug_info['question_augmentations_error'] = str(e)
# Merge question augmentations (Vorrangregel: Knoten > Prompt)
questions = merge_question_augmentations(node_questions, prompt_default_questions)
# Resolve placeholders (with optional catalog for |d modifier)
prompt_text = resolve_placeholders(template, variables, debug_info, catalog)
# Phase 1: Augment prompt with questions (if any)
if questions:
prompt_text = augment_prompt_with_questions(prompt_text, questions)
if enable_debug:
debug_info['question_augmentations_count'] = len(questions)
debug_info['question_types'] = [q.type for q in questions]
if enable_debug:
debug_info['template'] = template
# Volltext für Test-UI (Admin); sehr große Prompts nur weich begrenzen
_max = 512 * 1024
debug_info['final_prompt'] = (
prompt_text if len(prompt_text) <= _max else prompt_text[:_max] + "\n… [gekürzt, >512KB]"
)
debug_info['available_variables'] = list(variables.keys())
# Call AI
response = await openrouter_call_func(prompt_text)
if enable_debug:
debug_info['ai_response_length'] = len(response)
debug_info['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
# Phase 1: Parse structured result if questions were used
if questions:
expected_question_types = [q.type for q in questions]
container = parse_result_container_robust(response, expected_question_types)
if enable_debug:
debug_info['parsing_status'] = container['parsing_status']
debug_info['parsing_warnings'] = container.get('warnings', [])
output = container
output_format = 'structured_container' # New format type
else:
# Legacy behavior: Validate JSON if required
output_format = prompt.get('output_format', 'text')
if output_format == 'json':
output = validate_json_output(response, prompt.get('output_schema'), debug_info if enable_debug else None)
else:
output = response
result = {
"type": "base",
"slug": prompt['slug'],
"output": output,
"output_format": output_format
}
if enable_debug:
result['debug'] = debug_info
return result
async def execute_pipeline_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Execute a pipeline-type prompt (multi-stage).
Each stage's results are added to variables for next stage.
"""
stages = prompt.get('stages')
if not stages:
raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}")
# Parse stages if stored as JSON string
if isinstance(stages, str):
stages = json.loads(stages)
stage_results = []
context_vars = variables.copy()
pipeline_debug = [] if enable_debug else None
# Execute stages in order
for stage_def in sorted(stages, key=lambda s: s['stage']):
stage_num = stage_def['stage']
stage_prompts = stage_def.get('prompts', [])
if not stage_prompts:
continue
stage_debug = {} if enable_debug else None
if enable_debug:
stage_debug['stage'] = stage_num
stage_debug['available_variables'] = list(context_vars.keys())
stage_debug['prompts'] = []
# Execute all prompts in this stage (parallel concept, sequential impl for now)
stage_outputs = {}
for prompt_def in stage_prompts:
source = prompt_def.get('source')
output_key = prompt_def.get('output_key', f'stage{stage_num}')
output_format = prompt_def.get('output_format', 'text')
prompt_debug = {} if enable_debug else None
if source == 'reference':
# Reference to another prompt
ref_slug = prompt_def.get('slug')
if not ref_slug:
raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}")
if enable_debug:
prompt_debug['source'] = 'reference'
prompt_debug['ref_slug'] = ref_slug
# Load referenced prompt
result = await execute_prompt(ref_slug, context_vars, openrouter_call_func, enable_debug)
output = result['output']
if enable_debug and 'debug' in result:
prompt_debug['ref_debug'] = result['debug']
elif source == 'inline':
# Inline template
template = prompt_def.get('template')
if not template:
raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}")
placeholder_debug = {} if enable_debug else None
prompt_text = resolve_placeholders(template, context_vars, placeholder_debug, catalog)
if enable_debug:
prompt_debug['source'] = 'inline'
prompt_debug['template'] = template
_max = 512 * 1024
prompt_debug['final_prompt'] = (
prompt_text if len(prompt_text) <= _max else prompt_text[:_max] + "\n… [gekürzt, >512KB]"
)
prompt_debug.update(placeholder_debug)
response = await openrouter_call_func(prompt_text)
if enable_debug:
prompt_debug['ai_response_length'] = len(response)
prompt_debug['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '')
# Validate JSON if required
if output_format == 'json':
output = validate_json_output(response, prompt_def.get('output_schema'), prompt_debug if enable_debug else None)
else:
output = response
else:
raise HTTPException(400, f"Unknown prompt source: {source}")
# Store output with key
stage_outputs[output_key] = output
# Add to context for next stage
context_var_key = f'stage_{stage_num}_{output_key}'
context_vars[context_var_key] = output
if enable_debug:
prompt_debug['output_key'] = output_key
prompt_debug['context_var_key'] = context_var_key
stage_debug['prompts'].append(prompt_debug)
stage_results.append({
"stage": stage_num,
"outputs": stage_outputs
})
if enable_debug:
stage_debug['output'] = stage_outputs # Add outputs to debug info for value table
pipeline_debug.append(stage_debug)
# Final output is last stage's first output
final_output = stage_results[-1]['outputs'] if stage_results else {}
result = {
"type": "pipeline",
"slug": prompt['slug'],
"stages": stage_results,
"output": final_output,
"output_format": prompt.get('output_format', 'text')
}
if enable_debug:
result['debug'] = {
'initial_variables': list(variables.keys()),
'stages': pipeline_debug
}
return result
async def execute_prompt_with_data(
prompt_slug: str,
profile_id: str,
modules: Optional[Dict[str, bool]] = None,
timeframes: Optional[Dict[str, int]] = None,
openrouter_call_func = None,
enable_debug: bool = False
) -> Dict[str, Any]:
"""
Execute prompt with data loaded from database.
Args:
prompt_slug: Slug of prompt to execute
profile_id: User profile ID
modules: Dict of module -> enabled (e.g., {"körper": true})
timeframes: Dict of module -> days (e.g., {"körper": 30})
openrouter_call_func: Async function for AI calls
enable_debug: If True, include debug information in response
Returns:
Execution result dict
"""
from datetime import datetime, timedelta
from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog
# Build variables from data modules
variables = {
'profile_id': profile_id,
'today': datetime.now().strftime('%Y-%m-%d')
}
# Load placeholder catalog for |d / |x Modifier
try:
catalog = get_placeholder_catalog(profile_id)
except Exception as e:
catalog = None
print(f"Warning: Could not load placeholder catalog: {e}")
variables['_catalog'] = catalog # Will be popped in execute_prompt (can be None)
# Add PROCESSED placeholders (name, weight_trend, caliper_summary, etc.)
# This makes old-style prompts work with the new executor
try:
processed_placeholders = get_placeholder_example_values(profile_id)
# Remove {{ }} from keys (placeholder_resolver returns them with wrappers)
cleaned_placeholders = {
key.replace('{{', '').replace('}}', ''): value
for key, value in processed_placeholders.items()
}
variables.update(cleaned_placeholders)
except Exception as e:
# Continue even if placeholder resolution fails
if enable_debug:
variables['_placeholder_error'] = str(e)
# Load data for enabled modules
if modules:
with get_db() as conn:
cur = get_cursor(conn)
# Weight data
if modules.get('körper'):
days = timeframes.get('körper', 30)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, weight FROM weight_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['weight_data'] = [r2d(r) for r in cur.fetchall()]
# Nutrition data
if modules.get('ernährung'):
days = timeframes.get('ernährung', 30)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, kcal, protein_g, fat_g, carbs_g
FROM nutrition_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()]
# Activity data
if modules.get('training'):
days = timeframes.get('training', 14)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, activity_type, duration_min, kcal_active, hr_avg
FROM activity_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['activity_data'] = [r2d(r) for r in cur.fetchall()]
# Sleep data
if modules.get('schlaf'):
days = timeframes.get('schlaf', 14)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, sleep_segments, source
FROM sleep_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['sleep_data'] = [r2d(r) for r in cur.fetchall()]
# Vitals data
if modules.get('vitalwerte'):
days = timeframes.get('vitalwerte', 7)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
# Baseline vitals
cur.execute(
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
FROM vitals_baseline
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()]
# Blood pressure
cur.execute(
"""SELECT measured_at, systolic, diastolic, pulse
FROM blood_pressure_log
WHERE profile_id = %s AND measured_at >= %s
ORDER BY measured_at DESC""",
(profile_id, since + ' 00:00:00')
)
variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()]
# Mental/Goals (no timeframe, just current state)
if modules.get('mentales') or modules.get('ziele'):
# TODO: Add mental state / goals data when implemented
variables['goals_data'] = []
# Execute prompt
return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug)
async def execute_workflow_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func,
enable_debug: bool = False,
catalog: Optional[Dict] = None
) -> Dict[str, Any]:
"""
Execute a workflow-type prompt (graph-based execution).
Phase 2-4: Sequenzielle Workflow-Execution, conditional branching, path consolidation
Phase 5: Graph aus ai_prompts.graph_data (nicht workflow_definitions)
Args:
prompt: Prompt dict from database (must have 'graph_data' field)
variables: Dict of variables for placeholder replacement
openrouter_call_func: Async function(prompt_text, model) -> response_text
enable_debug: If True, include debug information in response
catalog: Optional placeholder catalog
Returns:
Dict with execution results:
{
"type": "workflow",
"execution_id": "...",
"status": "completed" | "failed",
"aggregated_result": {...},
"node_states": [...], # Only if enable_debug=True
"error": "..." # Only if status=failed
}
"""
from workflow_executor import execute_workflow
# Phase 5: Graph aus ai_prompts.graph_data
graph_data = prompt.get('graph_data')
if not graph_data:
raise HTTPException(400, "Workflow-Prompt fehlt 'graph_data' Feld")
# Execute workflow (mit graph_data statt workflow_id)
result = await execute_workflow(
graph_data=graph_data, # NEU: Direkt graph_data übergeben
profile_id=variables.get('profile_id', 'unknown'), # From context
variables=variables,
openrouter_call_func=openrouter_call_func,
enable_debug=enable_debug
)
# Convert ExecutionResult to dict for API response
response = {
"type": "workflow",
"execution_id": result.execution_id,
"status": result.status,
"aggregated_result": result.aggregated_result
}
if enable_debug:
response["node_states"] = [s.model_dump() for s in result.node_states]
if result.error:
response["error"] = result.error
return response