diff --git a/backend/prompt_executor.py b/backend/prompt_executor.py new file mode 100644 index 0000000..e21dfdc --- /dev/null +++ b/backend/prompt_executor.py @@ -0,0 +1,351 @@ +""" +Unified Prompt Executor (Issue #28 Phase 2) + +Executes both base and pipeline-type prompts with: +- Dynamic placeholder resolution +- JSON output validation +- Multi-stage parallel execution +- Reference and inline prompt support +""" +import json +import re +from typing import Dict, Any, Optional +from db import get_db, get_cursor, r2d +from fastapi import HTTPException + + +def resolve_placeholders(template: str, variables: Dict[str, Any]) -> str: + """ + Replace {{placeholder}} with values from variables dict. + + Args: + template: String with {{key}} placeholders + variables: Dict of key -> value mappings + + Returns: + Template with placeholders replaced + """ + def replacer(match): + key = match.group(1).strip() + if key in variables: + value = variables[key] + # Convert dict/list to JSON string + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False) + return str(value) + # Keep placeholder if no value found + return match.group(0) + + return re.sub(r'\{\{([^}]+)\}\}', replacer, template) + + +def validate_json_output(output: str, schema: Optional[Dict] = None) -> Dict: + """ + Validate that output is valid JSON. + + Args: + output: String to validate + schema: Optional JSON schema to validate against (TODO: jsonschema library) + + Returns: + Parsed JSON dict + + Raises: + HTTPException: If output is not valid JSON + """ + try: + parsed = json.loads(output) + # TODO: Add jsonschema validation if schema provided + return parsed + except json.JSONDecodeError as e: + raise HTTPException( + status_code=500, + detail=f"AI returned invalid JSON: {str(e)}" + ) + + +async def execute_prompt( + prompt_slug: str, + variables: Dict[str, Any], + openrouter_call_func +) -> Dict[str, Any]: + """ + Execute a single prompt (base or pipeline type). + + Args: + prompt_slug: Slug of prompt to execute + variables: Dict of variables for placeholder replacement + openrouter_call_func: Async function(prompt_text) -> response_text + + Returns: + Dict with execution results: + { + "type": "base" | "pipeline", + "slug": "...", + "output": "..." | {...}, # String or parsed JSON + "stages": [...] # Only for pipeline type + } + """ + # Load prompt from database + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + """SELECT * FROM ai_prompts + WHERE slug = %s AND active = true""", + (prompt_slug,) + ) + row = cur.fetchone() + if not row: + raise HTTPException(404, f"Prompt nicht gefunden: {prompt_slug}") + + prompt = r2d(row) + + prompt_type = prompt.get('type', 'pipeline') + + if prompt_type == 'base': + # Base prompt: single execution with template + return await execute_base_prompt(prompt, variables, openrouter_call_func) + + elif prompt_type == 'pipeline': + # Pipeline prompt: multi-stage execution + return await execute_pipeline_prompt(prompt, variables, openrouter_call_func) + + else: + raise HTTPException(400, f"Unknown prompt type: {prompt_type}") + + +async def execute_base_prompt( + prompt: Dict, + variables: Dict[str, Any], + openrouter_call_func +) -> Dict[str, Any]: + """Execute a base-type prompt (single template).""" + template = prompt.get('template') + if not template: + raise HTTPException(400, f"Base prompt missing template: {prompt['slug']}") + + # Resolve placeholders + prompt_text = resolve_placeholders(template, variables) + + # Call AI + response = await openrouter_call_func(prompt_text) + + # Validate JSON if required + output_format = prompt.get('output_format', 'text') + if output_format == 'json': + output = validate_json_output(response, prompt.get('output_schema')) + else: + output = response + + return { + "type": "base", + "slug": prompt['slug'], + "output": output, + "output_format": output_format + } + + +async def execute_pipeline_prompt( + prompt: Dict, + variables: Dict[str, Any], + openrouter_call_func +) -> Dict[str, Any]: + """ + Execute a pipeline-type prompt (multi-stage). + + Each stage's results are added to variables for next stage. + """ + stages = prompt.get('stages') + if not stages: + raise HTTPException(400, f"Pipeline prompt missing stages: {prompt['slug']}") + + # Parse stages if stored as JSON string + if isinstance(stages, str): + stages = json.loads(stages) + + stage_results = [] + context_vars = variables.copy() + + # Execute stages in order + for stage_def in sorted(stages, key=lambda s: s['stage']): + stage_num = stage_def['stage'] + stage_prompts = stage_def.get('prompts', []) + + if not stage_prompts: + continue + + # Execute all prompts in this stage (parallel concept, sequential impl for now) + stage_outputs = {} + + for prompt_def in stage_prompts: + source = prompt_def.get('source') + output_key = prompt_def.get('output_key', f'stage{stage_num}') + output_format = prompt_def.get('output_format', 'text') + + if source == 'reference': + # Reference to another prompt + ref_slug = prompt_def.get('slug') + if not ref_slug: + raise HTTPException(400, f"Reference prompt missing slug in stage {stage_num}") + + # Load referenced prompt + result = await execute_prompt(ref_slug, context_vars, openrouter_call_func) + output = result['output'] + + elif source == 'inline': + # Inline template + template = prompt_def.get('template') + if not template: + raise HTTPException(400, f"Inline prompt missing template in stage {stage_num}") + + prompt_text = resolve_placeholders(template, context_vars) + response = await openrouter_call_func(prompt_text) + + # Validate JSON if required + if output_format == 'json': + output = validate_json_output(response, prompt_def.get('output_schema')) + else: + output = response + + else: + raise HTTPException(400, f"Unknown prompt source: {source}") + + # Store output with key + stage_outputs[output_key] = output + + # Add to context for next stage + context_vars[f'stage_{stage_num}_{output_key}'] = output + + stage_results.append({ + "stage": stage_num, + "outputs": stage_outputs + }) + + # Final output is last stage's first output + final_output = stage_results[-1]['outputs'] if stage_results else {} + + return { + "type": "pipeline", + "slug": prompt['slug'], + "stages": stage_results, + "output": final_output, + "output_format": prompt.get('output_format', 'text') + } + + +async def execute_prompt_with_data( + prompt_slug: str, + profile_id: str, + modules: Optional[Dict[str, bool]] = None, + timeframes: Optional[Dict[str, int]] = None, + openrouter_call_func = None +) -> Dict[str, Any]: + """ + Execute prompt with data loaded from database. + + Args: + prompt_slug: Slug of prompt to execute + profile_id: User profile ID + modules: Dict of module -> enabled (e.g., {"körper": true}) + timeframes: Dict of module -> days (e.g., {"körper": 30}) + openrouter_call_func: Async function for AI calls + + Returns: + Execution result dict + """ + from datetime import datetime, timedelta + + # Build variables from data modules + variables = { + 'profile_id': profile_id, + 'today': datetime.now().strftime('%Y-%m-%d') + } + + # Load data for enabled modules + if modules: + with get_db() as conn: + cur = get_cursor(conn) + + # Weight data + if modules.get('körper'): + days = timeframes.get('körper', 30) + since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') + cur.execute( + """SELECT date, weight FROM weight_log + WHERE profile_id = %s AND date >= %s + ORDER BY date DESC""", + (profile_id, since) + ) + variables['weight_data'] = [r2d(r) for r in cur.fetchall()] + + # Nutrition data + if modules.get('ernährung'): + days = timeframes.get('ernährung', 30) + since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') + cur.execute( + """SELECT date, kcal, protein_g, fat_g, carbs_g + FROM nutrition_log + WHERE profile_id = %s AND date >= %s + ORDER BY date DESC""", + (profile_id, since) + ) + variables['nutrition_data'] = [r2d(r) for r in cur.fetchall()] + + # Activity data + if modules.get('training'): + days = timeframes.get('training', 14) + since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') + cur.execute( + """SELECT date, activity_type, duration_min, kcal_active, hr_avg + FROM activity_log + WHERE profile_id = %s AND date >= %s + ORDER BY date DESC""", + (profile_id, since) + ) + variables['activity_data'] = [r2d(r) for r in cur.fetchall()] + + # Sleep data + if modules.get('schlaf'): + days = timeframes.get('schlaf', 14) + since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') + cur.execute( + """SELECT date, sleep_segments, source + FROM sleep_log + WHERE profile_id = %s AND date >= %s + ORDER BY date DESC""", + (profile_id, since) + ) + variables['sleep_data'] = [r2d(r) for r in cur.fetchall()] + + # Vitals data + if modules.get('vitalwerte'): + days = timeframes.get('vitalwerte', 7) + since = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') + + # Baseline vitals + cur.execute( + """SELECT date, resting_hr, hrv, vo2_max, spo2, respiratory_rate + FROM vitals_baseline + WHERE profile_id = %s AND date >= %s + ORDER BY date DESC""", + (profile_id, since) + ) + variables['vitals_baseline'] = [r2d(r) for r in cur.fetchall()] + + # Blood pressure + cur.execute( + """SELECT measured_at, systolic, diastolic, pulse + FROM blood_pressure_log + WHERE profile_id = %s AND measured_at >= %s + ORDER BY measured_at DESC""", + (profile_id, since + ' 00:00:00') + ) + variables['blood_pressure'] = [r2d(r) for r in cur.fetchall()] + + # Mental/Goals (no timeframe, just current state) + if modules.get('mentales') or modules.get('ziele'): + # TODO: Add mental state / goals data when implemented + variables['goals_data'] = [] + + # Execute prompt + return await execute_prompt(prompt_slug, variables, openrouter_call_func) diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index a7128a4..804d402 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -684,3 +684,209 @@ def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin) ) return {"ok": True} + + +# ══════════════════════════════════════════════════════════════════════════════ +# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2) +# ══════════════════════════════════════════════════════════════════════════════ + +from prompt_executor import execute_prompt_with_data +from models import UnifiedPromptCreate, UnifiedPromptUpdate + + +@router.post("/execute") +async def execute_unified_prompt( + prompt_slug: str, + modules: Optional[dict] = None, + timeframes: Optional[dict] = None, + session: dict = Depends(require_auth) +): + """ + Execute a unified prompt (base or pipeline type). + + Args: + prompt_slug: Slug of prompt to execute + modules: Dict of enabled modules (e.g., {"körper": true}) + timeframes: Dict of timeframes per module (e.g., {"körper": 30}) + + Returns: + Execution result with outputs + """ + profile_id = session['profile_id'] + + # Use default modules/timeframes if not provided + if not modules: + modules = { + 'körper': True, + 'ernährung': True, + 'training': True, + 'schlaf': True, + 'vitalwerte': True + } + + if not timeframes: + timeframes = { + 'körper': 30, + 'ernährung': 30, + 'training': 14, + 'schlaf': 14, + 'vitalwerte': 7 + } + + # Execute with prompt_executor + result = await execute_prompt_with_data( + prompt_slug=prompt_slug, + profile_id=profile_id, + modules=modules, + timeframes=timeframes, + openrouter_call_func=call_openrouter + ) + + return result + + +@router.post("/unified") +def create_unified_prompt(p: UnifiedPromptCreate, session: dict = Depends(require_admin)): + """ + Create a new unified prompt (base or pipeline type). + Admin only. + """ + with get_db() as conn: + cur = get_cursor(conn) + + # Check for duplicate slug + cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,)) + if cur.fetchone(): + raise HTTPException(status_code=400, detail="Slug already exists") + + # Validate type + if p.type not in ['base', 'pipeline']: + raise HTTPException(status_code=400, detail="Type must be 'base' or 'pipeline'") + + # Validate base type has template + if p.type == 'base' and not p.template: + raise HTTPException(status_code=400, detail="Base prompts require a template") + + # Validate pipeline type has stages + if p.type == 'pipeline' and not p.stages: + raise HTTPException(status_code=400, detail="Pipeline prompts require stages") + + # Convert stages to JSONB + stages_json = None + if p.stages: + stages_json = json.dumps([ + { + 'stage': s.stage, + 'prompts': [ + { + 'source': pr.source, + 'slug': pr.slug, + 'template': pr.template, + 'output_key': pr.output_key, + 'output_format': pr.output_format, + 'output_schema': pr.output_schema + } + for pr in s.prompts + ] + } + for s in p.stages + ]) + + prompt_id = str(uuid.uuid4()) + + cur.execute( + """INSERT INTO ai_prompts + (id, slug, name, display_name, description, template, category, active, sort_order, + type, stages, output_format, output_schema) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", + ( + prompt_id, p.slug, p.name, p.display_name, p.description, + p.template, p.category, p.active, p.sort_order, + p.type, stages_json, p.output_format, + json.dumps(p.output_schema) if p.output_schema else None + ) + ) + + return {"id": prompt_id, "slug": p.slug} + + +@router.put("/unified/{prompt_id}") +def update_unified_prompt(prompt_id: str, p: UnifiedPromptUpdate, session: dict = Depends(require_admin)): + """ + Update a unified prompt. + Admin only. + """ + with get_db() as conn: + cur = get_cursor(conn) + + # Check if exists + cur.execute("SELECT id FROM ai_prompts WHERE id=%s", (prompt_id,)) + if not cur.fetchone(): + raise HTTPException(status_code=404, detail="Prompt not found") + + # Build update query + updates = [] + values = [] + + if p.name is not None: + updates.append('name=%s') + values.append(p.name) + if p.display_name is not None: + updates.append('display_name=%s') + values.append(p.display_name) + if p.description is not None: + updates.append('description=%s') + values.append(p.description) + if p.type is not None: + if p.type not in ['base', 'pipeline']: + raise HTTPException(status_code=400, detail="Type must be 'base' or 'pipeline'") + updates.append('type=%s') + values.append(p.type) + if p.category is not None: + updates.append('category=%s') + values.append(p.category) + if p.active is not None: + updates.append('active=%s') + values.append(p.active) + if p.sort_order is not None: + updates.append('sort_order=%s') + values.append(p.sort_order) + if p.template is not None: + updates.append('template=%s') + values.append(p.template) + if p.output_format is not None: + updates.append('output_format=%s') + values.append(p.output_format) + if p.output_schema is not None: + updates.append('output_schema=%s') + values.append(json.dumps(p.output_schema)) + if p.stages is not None: + stages_json = json.dumps([ + { + 'stage': s.stage, + 'prompts': [ + { + 'source': pr.source, + 'slug': pr.slug, + 'template': pr.template, + 'output_key': pr.output_key, + 'output_format': pr.output_format, + 'output_schema': pr.output_schema + } + for pr in s.prompts + ] + } + for s in p.stages + ]) + updates.append('stages=%s') + values.append(stages_json) + + if not updates: + return {"ok": True} + + cur.execute( + f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s", + values + [prompt_id] + ) + + return {"ok": True} diff --git a/frontend/src/utils/api.js b/frontend/src/utils/api.js index a3c86ed..eed3255 100644 --- a/frontend/src/utils/api.js +++ b/frontend/src/utils/api.js @@ -305,4 +305,15 @@ export const api = { // Pipeline Execution (Issue #28 Phase 2) executePipeline: (configId=null) => req('/insights/pipeline' + (configId ? `?config_id=${configId}` : ''), json({})), + + // Unified Prompt System (Issue #28 Phase 2) + executeUnifiedPrompt: (slug, modules=null, timeframes=null) => { + const params = new URLSearchParams({ prompt_slug: slug }) + const body = {} + if (modules) body.modules = modules + if (timeframes) body.timeframes = timeframes + return req('/prompts/execute?' + params, json(body)) + }, + createUnifiedPrompt: (d) => req('/prompts/unified', json(d)), + updateUnifiedPrompt: (id,d) => req(`/prompts/unified/${id}`, jput(d)), }