mitai-jinkendo/backend/prompt_executor.py
Lars 7be7266477
All checks were successful
Deploy Development / deploy (push) Successful in 52s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 13s
feat: unified prompt executor - Phase 2 complete (Issue #28)
Backend:
- prompt_executor.py: Universal executor for base + pipeline prompts
  - Dynamic placeholder resolution
  - JSON output validation
  - Multi-stage parallel execution (sequential impl)
  - Reference and inline prompt support
  - Data loading per module (körper, ernährung, training, schlaf, vitalwerte)

Endpoints:
- POST /api/prompts/execute - Execute unified prompts
- POST /api/prompts/unified - Create unified prompts
- PUT /api/prompts/unified/{id} - Update unified prompts

Frontend:
- api.js: executeUnifiedPrompt, createUnifiedPrompt, updateUnifiedPrompt

Next: Phase 3 - Frontend UI consolidation
2026-03-25 14:52:24 +01:00

352 lines
12 KiB
Python

"""
Unified Prompt Executor (Issue #28 Phase 2)
Executes both base and pipeline-type prompts with:
- Dynamic placeholder resolution
- JSON output validation
- Multi-stage parallel execution
- Reference and inline prompt support
"""
import json
import re
from typing import Dict, Any, Optional
from db import get_db, get_cursor, r2d
from fastapi import HTTPException
def resolve_placeholders(template: str, variables: Dict[str, Any]) -> str:
"""
Replace {{placeholder}} with values from variables dict.
Args:
template: String with {{key}} placeholders
variables: Dict of key -> value mappings
Returns:
Template with placeholders replaced
"""
def replacer(match):
key = match.group(1).strip()
if key in variables:
value = variables[key]
# Convert dict/list to JSON string
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value)
# Keep placeholder if no value found
return match.group(0)
return re.sub(r'\{\{([^}]+)\}\}', replacer, template)
def validate_json_output(output: str, schema: Optional[Dict] = None) -> Dict:
"""
Validate that output is valid JSON.
Args:
output: String to validate
schema: Optional JSON schema to validate against (TODO: jsonschema library)
Returns:
Parsed JSON dict
Raises:
HTTPException: If output is not valid JSON
"""
try:
parsed = json.loads(output)
# TODO: Add jsonschema validation if schema provided
return parsed
except json.JSONDecodeError as e:
raise HTTPException(
status_code=500,
detail=f"AI returned invalid JSON: {str(e)}"
)
async def execute_prompt(
prompt_slug: str,
variables: Dict[str, Any],
openrouter_call_func
) -> Dict[str, Any]:
"""
Execute a single prompt (base or pipeline type).
Args:
prompt_slug: Slug of prompt to execute
variables: Dict of variables for placeholder replacement
openrouter_call_func: Async function(prompt_text) -> response_text
Returns:
Dict with execution results:
{
"type": "base" | "pipeline",
"slug": "...",
"output": "..." | {...}, # String or parsed JSON
"stages": [...] # Only for pipeline type
}
"""
# Load prompt from database
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT * FROM ai_prompts
WHERE slug = %s AND active = true""",
(prompt_slug,)
)
row = cur.fetchone()
if not row:
raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}")
prompt = r2d(row)
prompt_type = prompt.get('type', 'pipeline')
if prompt_type == 'base':
# Base prompt: single execution with template
return await execute_base_prompt(prompt, variables, openrouter_call_func)
elif prompt_type == 'pipeline':
# Pipeline prompt: multi-stage execution
return await execute_pipeline_prompt(prompt, variables, openrouter_call_func)
else:
raise HTTPException(400, f"Unknown prompt type: {prompt_type}")
async def execute_base_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func
) -> Dict[str, Any]:
"""Execute a base-type prompt (single template)."""
template = prompt.get('template')
if not template:
raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}")
# Resolve placeholders
prompt_text = resolve_placeholders(template, variables)
# Call AI
response = await openrouter_call_func(prompt_text)
# Validate JSON if required
output_format = prompt.get('output_format', 'text')
if output_format == 'json':
output = validate_json_output(response, prompt.get('output_schema'))
else:
output = response
return {
"type": "base",
"slug": prompt['slug'],
"output": output,
"output_format": output_format
}
async def execute_pipeline_prompt(
prompt: Dict,
variables: Dict[str, Any],
openrouter_call_func
) -> Dict[str, Any]:
"""
Execute a pipeline-type prompt (multi-stage).
Each stage's results are added to variables for next stage.
"""
stages = prompt.get('stages')
if not stages:
raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}")
# Parse stages if stored as JSON string
if isinstance(stages, str):
stages = json.loads(stages)
stage_results = []
context_vars = variables.copy()
# Execute stages in order
for stage_def in sorted(stages, key=lambda s: s['stage']):
stage_num = stage_def['stage']
stage_prompts = stage_def.get('prompts', [])
if not stage_prompts:
continue
# Execute all prompts in this stage (parallel concept, sequential impl for now)
stage_outputs = {}
for prompt_def in stage_prompts:
source = prompt_def.get('source')
output_key = prompt_def.get('output_key', f'stage{stage_num}')
output_format = prompt_def.get('output_format', 'text')
if source == 'reference':
# Reference to another prompt
ref_slug = prompt_def.get('slug')
if not ref_slug:
raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}")
# Load referenced prompt
result = await execute_prompt(ref_slug, context_vars, openrouter_call_func)
output = result['output']
elif source == 'inline':
# Inline template
template = prompt_def.get('template')
if not template:
raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}")
prompt_text = resolve_placeholders(template, context_vars)
response = await openrouter_call_func(prompt_text)
# Validate JSON if required
if output_format == 'json':
output = validate_json_output(response, prompt_def.get('output_schema'))
else:
output = response
else:
raise HTTPException(400, f"Unknown prompt source: {source}")
# Store output with key
stage_outputs[output_key] = output
# Add to context for next stage
context_vars[f'stage_{stage_num}_{output_key}'] = output
stage_results.append({
"stage": stage_num,
"outputs": stage_outputs
})
# Final output is last stage's first output
final_output = stage_results[-1]['outputs'] if stage_results else {}
return {
"type": "pipeline",
"slug": prompt['slug'],
"stages": stage_results,
"output": final_output,
"output_format": prompt.get('output_format', 'text')
}
async def execute_prompt_with_data(
prompt_slug: str,
profile_id: str,
modules: Optional[Dict[str, bool]] = None,
timeframes: Optional[Dict[str, int]] = None,
openrouter_call_func = None
) -> Dict[str, Any]:
"""
Execute prompt with data loaded from database.
Args:
prompt_slug: Slug of prompt to execute
profile_id: User profile ID
modules: Dict of module -> enabled (e.g., {"körper": true})
timeframes: Dict of module -> days (e.g., {"körper": 30})
openrouter_call_func: Async function for AI calls
Returns:
Execution result dict
"""
from datetime import datetime, timedelta
# Build variables from data modules
variables = {
'profile_id': profile_id,
'today': datetime.now().strftime('%Y-%m-%d')
}
# Load data for enabled modules
if modules:
with get_db() as conn:
cur = get_cursor(conn)
# Weight data
if modules.get('körper'):
days = timeframes.get('körper', 30)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, weight FROM weight_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['weight_data'] = [r2d(r) for r in cur.fetchall()]
# Nutrition data
if modules.get('ernährung'):
days = timeframes.get('ernährung', 30)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, kcal, protein_g, fat_g, carbs_g
FROM nutrition_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()]
# Activity data
if modules.get('training'):
days = timeframes.get('training', 14)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, activity_type, duration_min, kcal_active, hr_avg
FROM activity_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['activity_data'] = [r2d(r) for r in cur.fetchall()]
# Sleep data
if modules.get('schlaf'):
days = timeframes.get('schlaf', 14)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT date, sleep_segments, source
FROM sleep_log
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['sleep_data'] = [r2d(r) for r in cur.fetchall()]
# Vitals data
if modules.get('vitalwerte'):
days = timeframes.get('vitalwerte', 7)
since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
# Baseline vitals
cur.execute(
"""SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate
FROM vitals_baseline
WHERE profile_id = %s AND date >= %s
ORDER BY date DESC""",
(profile_id, since)
)
variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()]
# Blood pressure
cur.execute(
"""SELECT measured_at, systolic, diastolic, pulse
FROM blood_pressure_log
WHERE profile_id = %s AND measured_at >= %s
ORDER BY measured_at DESC""",
(profile_id, since + ' 00:00:00')
)
variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()]
# Mental/Goals (no timeframe, just current state)
if modules.get('mentales') or modules.get('ziele'):
# TODO: Add mental state / goals data when implemented
variables['goals_data'] = []
# Execute prompt
return await execute_prompt(prompt_slug, variables, openrouter_call_func)