""" Unified Prompt Executor (Issue #28 Phase 2) Executes both base and pipeline-type prompts with: - Dynamic placeholder resolution - JSON output validation - Multi-stage parallel execution - Reference and inline prompt support """ import json import re from typing import Dict, Any, Optional from db import get_db, get_cursor, r2d from fastapi import HTTPException def resolve_placeholders(template: str, variables: Dict[str, Any], debug_info: Optional[Dict] = None, catalog: Optional[Dict] = None) -> str: """ Replace {{placeholder}} with values from variables dict. Supports modifiers: - {{key|d}} - Include description in parentheses (requires catalog) Args: template: String with {{key}} or {{key|modifiers}} placeholders variables: Dict of key -> value mappings debug_info: Optional dict to collect debug information catalog: Optional placeholder catalog for descriptions (from get_placeholder_catalog) Returns: Template with placeholders replaced """ resolved = {} unresolved = [] def replacer(match): full_placeholder = match.group(1).strip() # Parse key and modifiers (e.g., "weight_aktuell|d" -> key="weight_aktuell", modifiers="d") parts = full_placeholder.split('|') key = parts[0].strip() modifiers = parts[1].strip() if len(parts) > 1 else '' if key in variables: value = variables[key] # Convert dict/list to JSON string if isinstance(value, (dict, list)): resolved_value = json.dumps(value, ensure_ascii=False) else: resolved_value = str(value) # Apply modifiers if 'd' in modifiers: if catalog: # Add description from catalog description = None for cat_items in catalog.values(): matching = [item for item in cat_items if item['key'] == key] if matching: description = matching[0].get('description', '') break if description: resolved_value = f"{resolved_value} ({description})" else: # Catalog not available - log warning in debug if debug_info is not None: if 'warnings' not in debug_info: debug_info['warnings'] = [] debug_info['warnings'].append(f"Modifier |d used but catalog not available for {key}") # Track resolution for debug if debug_info is not None: resolved[key] = resolved_value[:100] + ('...' if len(resolved_value) > 100 else '') return resolved_value else: # Keep placeholder if no value found if debug_info is not None: unresolved.append(key) return match.group(0) result = re.sub(r'\{\{([^}]+)\}\}', replacer, template) # Store debug info if debug_info is not None: debug_info['resolved_placeholders'] = resolved debug_info['unresolved_placeholders'] = unresolved return result def validate_json_output(output: str, schema: Optional[Dict] = None, debug_info: Optional[Dict] = None) -> Dict: """ Validate that output is valid JSON. Unwraps Markdown-wrapped JSON (```json ... ```) if present. Args: output: String to validate schema: Optional JSON schema to validate against (TODO: jsonschema library) debug_info: Optional dict to attach to error for debugging Returns: Parsed JSON dict Raises: HTTPException: If output is not valid JSON (with debug info attached) """ # Try to unwrap Markdown code blocks (common AI pattern) unwrapped = output.strip() if unwrapped.startswith('```json'): # Extract content between ```json and ``` lines = unwrapped.split('\n') if len(lines) > 2 and lines[-1].strip() == '```': unwrapped = '\n'.join(lines[1:-1]) elif unwrapped.startswith('```'): # Generic code block lines = unwrapped.split('\n') if len(lines) > 2 and lines[-1].strip() == '```': unwrapped = '\n'.join(lines[1:-1]) try: parsed = json.loads(unwrapped) # TODO: Add jsonschema validation if schema provided return parsed except json.JSONDecodeError as e: error_detail = { "error": f"AI returned invalid JSON: {str(e)}", "raw_output": output[:500] + ('...' if len(output) > 500 else ''), "unwrapped": unwrapped[:500] if unwrapped != output else None, "output_length": len(output) } if debug_info: error_detail["debug"] = debug_info raise HTTPException( status_code=500, detail=error_detail ) async def execute_prompt( prompt_slug: str, variables: Dict[str, Any], openrouter_call_func, enable_debug: bool = False ) -> Dict[str, Any]: """ Execute a single prompt (base or pipeline type). Args: prompt_slug: Slug of prompt to execute variables: Dict of variables for placeholder replacement openrouter_call_func: Async function(prompt_text) -> response_text enable_debug: If True, include debug information in response Returns: Dict with execution results: { "type": "base" | "pipeline", "slug": "...", "output": "..." | {...}, # String or parsed JSON "stages": [...] # Only for pipeline type "debug": {...} # Only if enable_debug=True } """ # Load prompt from database with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT * FROM ai_prompts WHERE slug = %s AND active = true""", (prompt_slug,) ) row = cur.fetchone() if not row: raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}") prompt = r2d(row) prompt_type = prompt.get('type', 'pipeline') # Get catalog from variables if available (passed from execute_prompt_with_data) catalog = variables.pop('_catalog', None) if '_catalog' in variables else None if prompt_type == 'base': # Base prompt: single execution with template return await execute_base_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) elif prompt_type == 'pipeline': # Pipeline prompt: multi-stage execution return await execute_pipeline_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) else: raise HTTPException(400, f"Unknown prompt type: {prompt_type}") async def execute_base_prompt( prompt: Dict, variables: Dict[str, Any], openrouter_call_func, enable_debug: bool = False, catalog: Optional[Dict] = None ) -> Dict[str, Any]: """Execute a base-type prompt (single template).""" template = prompt.get('template') if not template: raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}") debug_info = {} if enable_debug else None # Resolve placeholders (with optional catalog for |d modifier) prompt_text = resolve_placeholders(template, variables, debug_info, catalog) if enable_debug: debug_info['template'] = template debug_info['final_prompt'] = prompt_text[:500] + ('...' if len(prompt_text) > 500 else '') debug_info['available_variables'] = list(variables.keys()) # Call AI response = await openrouter_call_func(prompt_text) if enable_debug: debug_info['ai_response_length'] = len(response) debug_info['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '') # Validate JSON if required output_format = prompt.get('output_format', 'text') if output_format == 'json': output = validate_json_output(response, prompt.get('output_schema'), debug_info if enable_debug else None) else: output = response result = { "type": "base", "slug": prompt['slug'], "output": output, "output_format": output_format } if enable_debug: result['debug'] = debug_info return result async def execute_pipeline_prompt( prompt: Dict, variables: Dict[str, Any], openrouter_call_func, enable_debug: bool = False, catalog: Optional[Dict] = None ) -> Dict[str, Any]: """ Execute a pipeline-type prompt (multi-stage). Each stage's results are added to variables for next stage. """ stages = prompt.get('stages') if not stages: raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}") # Parse stages if stored as JSON string if isinstance(stages, str): stages = json.loads(stages) stage_results = [] context_vars = variables.copy() pipeline_debug = [] if enable_debug else None # Execute stages in order for stage_def in sorted(stages, key=lambda s: s['stage']): stage_num = stage_def['stage'] stage_prompts = stage_def.get('prompts', []) if not stage_prompts: continue stage_debug = {} if enable_debug else None if enable_debug: stage_debug['stage'] = stage_num stage_debug['available_variables'] = list(context_vars.keys()) stage_debug['prompts'] = [] # Execute all prompts in this stage (parallel concept, sequential impl for now) stage_outputs = {} for prompt_def in stage_prompts: source = prompt_def.get('source') output_key = prompt_def.get('output_key', f'stage{stage_num}') output_format = prompt_def.get('output_format', 'text') prompt_debug = {} if enable_debug else None if source == 'reference': # Reference to another prompt ref_slug = prompt_def.get('slug') if not ref_slug: raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}") if enable_debug: prompt_debug['source'] = 'reference' prompt_debug['ref_slug'] = ref_slug # Load referenced prompt result = await execute_prompt(ref_slug, context_vars, openrouter_call_func, enable_debug) output = result['output'] if enable_debug and 'debug' in result: prompt_debug['ref_debug'] = result['debug'] elif source == 'inline': # Inline template template = prompt_def.get('template') if not template: raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}") placeholder_debug = {} if enable_debug else None prompt_text = resolve_placeholders(template, context_vars, placeholder_debug, catalog) if enable_debug: prompt_debug['source'] = 'inline' prompt_debug['template'] = template prompt_debug['final_prompt'] = prompt_text[:500] + ('...' if len(prompt_text) > 500 else '') prompt_debug.update(placeholder_debug) response = await openrouter_call_func(prompt_text) if enable_debug: prompt_debug['ai_response_length'] = len(response) prompt_debug['ai_response_preview'] = response[:200] + ('...' if len(response) > 200 else '') # Validate JSON if required if output_format == 'json': output = validate_json_output(response, prompt_def.get('output_schema'), prompt_debug if enable_debug else None) else: output = response else: raise HTTPException(400, f"Unknown prompt source: {source}") # Store output with key stage_outputs[output_key] = output # Add to context for next stage context_var_key = f'stage_{stage_num}_{output_key}' context_vars[context_var_key] = output if enable_debug: prompt_debug['output_key'] = output_key prompt_debug['context_var_key'] = context_var_key stage_debug['prompts'].append(prompt_debug) stage_results.append({ "stage": stage_num, "outputs": stage_outputs }) if enable_debug: stage_debug['output'] = stage_outputs # Add outputs to debug info for value table pipeline_debug.append(stage_debug) # Final output is last stage's first output final_output = stage_results[-1]['outputs'] if stage_results else {} result = { "type": "pipeline", "slug": prompt['slug'], "stages": stage_results, "output": final_output, "output_format": prompt.get('output_format', 'text') } if enable_debug: result['debug'] = { 'initial_variables': list(variables.keys()), 'stages': pipeline_debug } return result async def execute_prompt_with_data( prompt_slug: str, profile_id: str, modules: Optional[Dict[str, bool]] = None, timeframes: Optional[Dict[str, int]] = None, openrouter_call_func = None, enable_debug: bool = False ) -> Dict[str, Any]: """ Execute prompt with data loaded from database. Args: prompt_slug: Slug of prompt to execute profile_id: User profile ID modules: Dict of module -> enabled (e.g., {"körper": true}) timeframes: Dict of module -> days (e.g., {"körper": 30}) openrouter_call_func: Async function for AI calls enable_debug: If True, include debug information in response Returns: Execution result dict """ from datetime import datetime, timedelta from placeholder_resolver import get_placeholder_example_values, get_placeholder_catalog # Build variables from data modules variables = { 'profile_id': profile_id, 'today': datetime.now().strftime('%Y-%m-%d') } # Load placeholder catalog for |d modifier support try: catalog = get_placeholder_catalog(profile_id) except Exception as e: catalog = None print(f"Warning: Could not load placeholder catalog: {e}") variables['_catalog'] = catalog # Will be popped in execute_prompt (can be None) # Add PROCESSED placeholders (name, weight_trend, caliper_summary, etc.) # This makes old-style prompts work with the new executor try: processed_placeholders = get_placeholder_example_values(profile_id) # Remove {{ }} from keys (placeholder_resolver returns them with wrappers) cleaned_placeholders = { key.replace('{{', '').replace('}}', ''): value for key, value in processed_placeholders.items() } variables.update(cleaned_placeholders) except Exception as e: # Continue even if placeholder resolution fails if enable_debug: variables['_placeholder_error'] = str(e) # Load data for enabled modules if modules: with get_db() as conn: cur = get_cursor(conn) # Weight data if modules.get('körper'): days = timeframes.get('körper', 30) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, weight FROM weight_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['weight_data'] = [r2d(r) for r in cur.fetchall()] # Nutrition data if modules.get('ernährung'): days = timeframes.get('ernährung', 30) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, kcal, protein_g, fat_g, carbs_g FROM nutrition_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()] # Activity data if modules.get('training'): days = timeframes.get('training', 14) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, activity_type, duration_min, kcal_active, hr_avg FROM activity_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['activity_data'] = [r2d(r) for r in cur.fetchall()] # Sleep data if modules.get('schlaf'): days = timeframes.get('schlaf', 14) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, sleep_segments, source FROM sleep_log WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['sleep_data'] = [r2d(r) for r in cur.fetchall()] # Vitals data if modules.get('vitalwerte'): days = timeframes.get('vitalwerte', 7) since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Baseline vitals cur.execute( """SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate FROM vitals_baseline WHERE profile_id = %s AND date >= %s ORDER BY date DESC""", (profile_id, since) ) variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()] # Blood pressure cur.execute( """SELECT measured_at, systolic, diastolic, pulse FROM blood_pressure_log WHERE profile_id = %s AND measured_at >= %s ORDER BY measured_at DESC""", (profile_id, since + ' 00:00:00') ) variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()] # Mental/Goals (no timeframe, just current state) if modules.get('mentales') or modules.get('ziele'): # TODO: Add mental state / goals data when implemented variables['goals_data'] = [] # Execute prompt return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug)