""" Unified Prompt Executor (Issue #28 Phase 2) Executes both base and pipeline-type prompts with: - Dynamic placeholder resolution - JSON output validation - Multi-stage parallel execution - Reference and inline prompt support """ import json import re from typing import Dict, Any, Optional from db import get_db, get_cursor, r2d from fastapi import HTTPException def resolve_placeholders(template: str, variables: Dict[str, Any]) -> str: """ Replace {{placeholder}} with values from variables dict. Args: template: String with {{key}} placeholders variables: Dict of key -> value mappings Returns: Template with placeholders replaced """ def replacer(match): key = match.group(1).strip() if key in variables: value = variables[key] # Convert dict/list to JSON string if isinstance(value, (dict, list)): return json.dumps(value, ensure_ascii=False) return str(value) # Keep placeholder if no value found return match.group(0) return re.sub(r'\{\{([^}]+)\}\}', replacer, template) def validate_json_output(output: str, schema: Optional[Dict] = None) -> Dict: """ Validate that output is valid JSON. Args: output: String to validate schema: Optional JSON schema to validate against (TODO: jsonschema library) Returns: Parsed JSON dict Raises: HTTPException: If output is not valid JSON """ try: parsed = json.loads(output) # TODO: Add jsonschema validation if schema provided return parsed except json.JSONDecodeError as e: raise HTTPException( status_code=500, detail=f"AI returned invalid JSON: {str(e)}" ) async def execute_prompt( prompt_slug: str, variables: Dict[str, Any], openrouter_call_func ) -> Dict[str, Any]: """ Execute a single prompt (base or pipeline type). Args: prompt_slug: Slug of prompt to execute variables: Dict of variables for placeholder replacement openrouter_call_func: Async function(prompt_text) -> response_text Returns: Dict with execution results: { "type": "base" | "pipeline", "slug": "...", "output": "..." | {...}, # String or parsed JSON "stages": [...] # Only for pipeline type } """ # Load prompt from database with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT * FROM ai_prompts WHERE slug = %s AND active = true""", (prompt_slug,) ) row = cur.fetchone() if not row: raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}") prompt = r2d(row) prompt_type = prompt.get('type', 'pipeline') if prompt_type == 'base': # Base prompt: single execution with template return await execute_base_prompt(prompt, variables, openrouter_call_func) elif prompt_type == 'pipeline': # Pipeline prompt: multi-stage execution return await execute_pipeline_prompt(prompt, variables, openrouter_call_func) else: raise HTTPException(400, f"Unknown prompt type: {prompt_type}") async def execute_base_prompt( prompt: Dict, variables: Dict[str, Any], openrouter_call_func ) -> Dict[str, Any]: """Execute a base-type prompt (single template).""" template = prompt.get('template') if not template: raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}") # Resolve placeholders prompt_text = resolve_placeholders(template, variables) # Call AI response = await openrouter_call_func(prompt_text) # Validate JSON if required output_format = prompt.get('output_format', 'text') if output_format == 'json': output = validate_json_output(response, prompt.get('output_schema')) else: output = response return { "type": "base", "slug": prompt['slug'], "output": output, "output_format": output_format } async def execute_pipeline_prompt( prompt: Dict, variables: Dict[str, Any], openrouter_call_func ) -> Dict[str, Any]: """ Execute a pipeline-type prompt (multi-stage). Each stage's results are added to variables for next stage. """ stages = prompt.get('stages') if not stages: raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}") # Parse stages if stored as JSON string if isinstance(stages, str): stages = json.loads(stages) stage_results = [] context_vars = variables.copy() # Execute stages in order for stage_def in sorted(stages, key=lambda s: s['stage']): stage_num = stage_def['stage'] stage_prompts = stage_def.get('prompts', []) if not stage_prompts: continue # Execute all prompts in this stage (parallel concept, sequential impl for now) stage_outputs = {} for prompt_def in stage_prompts: source = prompt_def.get('source') output_key = prompt_def.get('output_key', f'stage{stage_num}') output_format = prompt_def.get('output_format', 'text') if source == 'reference': # Reference to another prompt ref_slug = prompt_def.get('slug') if not ref_slug: raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}") # Load referenced prompt result = await execute_prompt(ref_slug, context_vars, openrouter_call_func) output = result['output'] elif source == 'inline': # Inline template template = prompt_def.get('template') if not template: raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}") prompt_text = resolve_placeholders(template, context_vars) response = await openrouter_call_func(prompt_text) # Validate JSON if required if output_format == 'json': output = validate_json_output(response, prompt_def.get('output_schema')) else: output = response else: raise HTTPException(400, f"Unknown prompt source: {source}") # Store output with key stage_outputs[output_key] = output # Add to context for next stage context_vars[f'stage_{stage_num}_{output_key}'] = output stage_results.append({ "stage": stage_num, "outputs": stage_outputs }) # Final output is last stage's first output final_output = stage_results[-1]['outputs'] if stage_results else {} return { "type": "pipeline", "slug": prompt['slug'], "stages": stage_results, "output": final_output, "output_format": prompt.get('output_format', 'text') } async def execute_prompt_with_data( prompt_slug: str, profile_id: str, modules: Optional[Dict[str, bool]] = None, timeframes: Optional[Dict[str, int]] = None, openrouter_call_func = None ) -> Dict[str, Any]: """ Execute prompt with data loaded from database. Args: prompt_slug: Slug of prompt to execute profile_id: User profile ID modules: Dict of module -> enabled (e.g., {"körper": true}) timeframes: Dict of module -> days (e.g., {"körper": 30}) openrouter_call_func: Async function for AI calls Returns: Execution result dict """ from datetime import datetime, timedelta # Build variables from data modules variables = { 'profile_id': profile_id, 'today': datetime.now().strftime('%Y-%m-%d') } # Load data for enabled modules if modules: with get_db() as conn: cur = get_cursor(conn) # Weight data if modules.get('körper'): days = timeframes.get('körper', 30) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, weight FROM weight_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['weight_data'] = [r2d(r) for r in cur.fetchall()] # Nutrition data if modules.get('ernährung'): days = timeframes.get('ernährung', 30) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, kcal, protein_g, fat_g, carbs_g FROM nutrition_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()] # Activity data if modules.get('training'): days = timeframes.get('training', 14) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, activity_type, duration_min, kcal_active, hr_avg FROM activity_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['activity_data'] = [r2d(r) for r in cur.fetchall()] # Sleep data if modules.get('schlaf'): days = timeframes.get('schlaf', 14) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, sleep_segments, source FROM sleep_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['sleep_data'] = [r2d(r) for r in cur.fetchall()] # Vitals data if modules.get('vitalwerte'): days = timeframes.get('vitalwerte', 7) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Baseline vitals cur.execute( """SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate FROM vitals_baseline WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()] # Blood pressure cur.execute( """SELECT measured_at, systolic, diastolic, pulse FROM blood_pressure_log WHERE profile_id = %s AND measured_at >= %s ORDER BY measured_at DESC""", (profile_id, since + ' 00:00:00') ) variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()] # Mental/Goals (no timeframe, just current state) if modules.get('mentales') or modules.get('ziele'): # TODO: Add mental state / goals data when implemented variables['goals_data'] = [] # Execute prompt return await execute_prompt(prompt_slug, variables, openrouter_call_func)