Backend:
- prompt_executor.py: Universal executor for base + pipeline prompts
- Dynamic placeholder resolution
- JSON output validation
- Multi-stage parallel execution (sequential impl)
- Reference and inline prompt support
- Data loading per module (körper, ernährung, training, schlaf, vitalwerte)
Endpoints:
- POST /api/prompts/execute - Execute unified prompts
- POST /api/prompts/unified - Create unified prompts
- PUT /api/prompts/unified/{id} - Update unified prompts
Frontend:
- api.js: executeUnifiedPrompt, createUnifiedPrompt, updateUnifiedPrompt
Next: Phase 3 - Frontend UI consolidation
352 lines
12 KiB
Python
352 lines
12 KiB
Python
"""
|
|
Unified Prompt Executor (Issue #28 Phase 2)
|
|
|
|
Executes both base and pipeline-type prompts with:
|
|
- Dynamic placeholder resolution
|
|
- JSON output validation
|
|
- Multi-stage parallel execution
|
|
- Reference and inline prompt support
|
|
"""
|
|
import json
|
|
import re
|
|
from typing import Dict, Any, Optional
|
|
from db import get_db, get_cursor, r2d
|
|
from fastapi import HTTPException
|
|
|
|
|
|
def resolve_placeholders(template: str, variables: Dict[str, Any]) -> str:
|
|
"""
|
|
Replace {{placeholder}} with values from variables dict.
|
|
|
|
Args:
|
|
template: String with {{key}} placeholders
|
|
variables: Dict of key -> value mappings
|
|
|
|
Returns:
|
|
Template with placeholders replaced
|
|
"""
|
|
def replacer(match):
|
|
key = match.group(1).strip()
|
|
if key in variables:
|
|
value = variables[key]
|
|
# Convert dict/list to JSON string
|
|
if isinstance(value, (dict, list)):
|
|
return json.dumps(value, ensure_ascii=False)
|
|
return str(value)
|
|
# Keep placeholder if no value found
|
|
return match.group(0)
|
|
|
|
return re.sub(r'\{\{([^}]+)\}\}', replacer, template)
|
|
|
|
|
|
def validate_json_output(output: str, schema: Optional[Dict] = None) -> Dict:
|
|
"""
|
|
Validate that output is valid JSON.
|
|
|
|
Args:
|
|
output: String to validate
|
|
schema: Optional JSON schema to validate against (TODO: jsonschema library)
|
|
|
|
Returns:
|
|
Parsed JSON dict
|
|
|
|
Raises:
|
|
HTTPException: If output is not valid JSON
|
|
"""
|
|
try:
|
|
parsed = json.loads(output)
|
|
# TODO: Add jsonschema validation if schema provided
|
|
return parsed
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"AI returned invalid JSON: {str(e)}"
|
|
)
|
|
|
|
|
|
async def execute_prompt(
|
|
prompt_slug: str,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a single prompt (base or pipeline type).
|
|
|
|
Args:
|
|
prompt_slug: Slug of prompt to execute
|
|
variables: Dict of variables for placeholder replacement
|
|
openrouter_call_func: Async function(prompt_text) -> response_text
|
|
|
|
Returns:
|
|
Dict with execution results:
|
|
{
|
|
"type": "base" | "pipeline",
|
|
"slug": "...",
|
|
"output": "..." | {...}, # String or parsed JSON
|
|
"stages": [...] # Only for pipeline type
|
|
}
|
|
"""
|
|
# Load prompt from database
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""SELECT * FROM ai_prompts
|
|
WHERE slug = %s AND active = true""",
|
|
(prompt_slug,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}")
|
|
|
|
prompt = r2d(row)
|
|
|
|
prompt_type = prompt.get('type', 'pipeline')
|
|
|
|
if prompt_type == 'base':
|
|
# Base prompt: single execution with template
|
|
return await execute_base_prompt(prompt, variables, openrouter_call_func)
|
|
|
|
elif prompt_type == 'pipeline':
|
|
# Pipeline prompt: multi-stage execution
|
|
return await execute_pipeline_prompt(prompt, variables, openrouter_call_func)
|
|
|
|
else:
|
|
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
|
|
|
|
|
|
async def execute_base_prompt(
|
|
prompt: Dict,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func
|
|
) -> Dict[str, Any]:
|
|
"""Execute a base-type prompt (single template)."""
|
|
template = prompt.get('template')
|
|
if not template:
|
|
raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}")
|
|
|
|
# Resolve placeholders
|
|
prompt_text = resolve_placeholders(template, variables)
|
|
|
|
# Call AI
|
|
response = await openrouter_call_func(prompt_text)
|
|
|
|
# Validate JSON if required
|
|
output_format = prompt.get('output_format', 'text')
|
|
if output_format == 'json':
|
|
output = validate_json_output(response, prompt.get('output_schema'))
|
|
else:
|
|
output = response
|
|
|
|
return {
|
|
"type": "base",
|
|
"slug": prompt['slug'],
|
|
"output": output,
|
|
"output_format": output_format
|
|
}
|
|
|
|
|
|
async def execute_pipeline_prompt(
|
|
prompt: Dict,
|
|
variables: Dict[str, Any],
|
|
openrouter_call_func
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute a pipeline-type prompt (multi-stage).
|
|
|
|
Each stage's results are added to variables for next stage.
|
|
"""
|
|
stages = prompt.get('stages')
|
|
if not stages:
|
|
raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}")
|
|
|
|
# Parse stages if stored as JSON string
|
|
if isinstance(stages, str):
|
|
stages = json.loads(stages)
|
|
|
|
stage_results = []
|
|
context_vars = variables.copy()
|
|
|
|
# Execute stages in order
|
|
for stage_def in sorted(stages, key=lambda s: s['stage']):
|
|
stage_num = stage_def['stage']
|
|
stage_prompts = stage_def.get('prompts', [])
|
|
|
|
if not stage_prompts:
|
|
continue
|
|
|
|
# Execute all prompts in this stage (parallel concept, sequential impl for now)
|
|
stage_outputs = {}
|
|
|
|
for prompt_def in stage_prompts:
|
|
source = prompt_def.get('source')
|
|
output_key = prompt_def.get('output_key', f'stage{stage_num}')
|
|
output_format = prompt_def.get('output_format', 'text')
|
|
|
|
if source == 'reference':
|
|
# Reference to another prompt
|
|
ref_slug = prompt_def.get('slug')
|
|
if not ref_slug:
|
|
raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}")
|
|
|
|
# Load referenced prompt
|
|
result = await execute_prompt(ref_slug, context_vars, openrouter_call_func)
|
|
output = result['output']
|
|
|
|
elif source == 'inline':
|
|
# Inline template
|
|
template = prompt_def.get('template')
|
|
if not template:
|
|
raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}")
|
|
|
|
prompt_text = resolve_placeholders(template, context_vars)
|
|
response = await openrouter_call_func(prompt_text)
|
|
|
|
# Validate JSON if required
|
|
if output_format == 'json':
|
|
output = validate_json_output(response, prompt_def.get('output_schema'))
|
|
else:
|
|
output = response
|
|
|
|
else:
|
|
raise HTTPException(400, f"Unknown prompt source: {source}")
|
|
|
|
# Store output with key
|
|
stage_outputs[output_key] = output
|
|
|
|
# Add to context for next stage
|
|
context_vars[f'stage_{stage_num}_{output_key}'] = output
|
|
|
|
stage_results.append({
|
|
"stage": stage_num,
|
|
"outputs": stage_outputs
|
|
})
|
|
|
|
# Final output is last stage's first output
|
|
final_output = stage_results[-1]['outputs'] if stage_results else {}
|
|
|
|
return {
|
|
"type": "pipeline",
|
|
"slug": prompt['slug'],
|
|
"stages": stage_results,
|
|
"output": final_output,
|
|
"output_format": prompt.get('output_format', 'text')
|
|
}
|
|
|
|
|
|
async def execute_prompt_with_data(
|
|
prompt_slug: str,
|
|
profile_id: str,
|
|
modules: Optional[Dict[str, bool]] = None,
|
|
timeframes: Optional[Dict[str, int]] = None,
|
|
openrouter_call_func = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute prompt with data loaded from database.
|
|
|
|
Args:
|
|
prompt_slug: Slug of prompt to execute
|
|
profile_id: User profile ID
|
|
modules: Dict of module -> enabled (e.g., {"körper": true})
|
|
timeframes: Dict of module -> days (e.g., {"körper": 30})
|
|
openrouter_call_func: Async function for AI calls
|
|
|
|
Returns:
|
|
Execution result dict
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
|
|
# Build variables from data modules
|
|
variables = {
|
|
'profile_id': profile_id,
|
|
'today': datetime.now().strftime('%Y-%m-%d')
|
|
}
|
|
|
|
# Load data for enabled modules
|
|
if modules:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Weight data
|
|
if modules.get('körper'):
|
|
days = timeframes.get('körper', 30)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, weight FROM weight_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['weight_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Nutrition data
|
|
if modules.get('ernährung'):
|
|
days = timeframes.get('ernährung', 30)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, kcal, protein_g, fat_g, carbs_g
|
|
FROM nutrition_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Activity data
|
|
if modules.get('training'):
|
|
days = timeframes.get('training', 14)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, activity_type, duration_min, kcal_active, hr_avg
|
|
FROM activity_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['activity_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Sleep data
|
|
if modules.get('schlaf'):
|
|
days = timeframes.get('schlaf', 14)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
cur.execute(
|
|
"""SELECT date, sleep_segments, source
|
|
FROM sleep_log
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['sleep_data'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Vitals data
|
|
if modules.get('vitalwerte'):
|
|
days = timeframes.get('vitalwerte', 7)
|
|
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
|
|
|
|
# Baseline vitals
|
|
cur.execute(
|
|
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
|
|
FROM vitals_baseline
|
|
WHERE profile_id = %s AND date >= %s
|
|
ORDER BY date DESC""",
|
|
(profile_id, since)
|
|
)
|
|
variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Blood pressure
|
|
cur.execute(
|
|
"""SELECT measured_at, systolic, diastolic, pulse
|
|
FROM blood_pressure_log
|
|
WHERE profile_id = %s AND measured_at >= %s
|
|
ORDER BY measured_at DESC""",
|
|
(profile_id, since + ' 00:00:00')
|
|
)
|
|
variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Mental/Goals (no timeframe, just current state)
|
|
if modules.get('mentales') or modules.get('ziele'):
|
|
# TODO: Add mental state / goals data when implemented
|
|
variables['goals_data'] = []
|
|
|
|
# Execute prompt
|
|
return await execute_prompt(prompt_slug, variables, openrouter_call_func)
|