""" AI Prompts Management Endpoints for Mitai Jinkendo Handles prompt template configuration (admin-editable). """ import os import json import uuid import httpx from typing import Optional from fastapi import APIRouter, Depends, HTTPException from db import get_db, get_cursor, r2d from auth import require_auth, require_admin from models import ( PromptCreate, PromptUpdate, PromptGenerateRequest, PipelineConfigCreate, PipelineConfigUpdate ) from placeholder_resolver import ( resolve_placeholders, get_unknown_placeholders, get_placeholder_example_values, get_available_placeholders, get_placeholder_catalog ) # Environment variables OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY") OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "anthropic/claude-sonnet-4") router = APIRouter(prefix="/api/prompts", tags=["prompts"]) @router.get("") def list_prompts(session: dict=Depends(require_auth)): """ List AI prompts. - Admins: see ALL prompts (including pipeline and inactive) - Users: see only active single-analysis prompts """ with get_db() as conn: cur = get_cursor(conn) is_admin = session.get('role') == 'admin' if is_admin: # Admin sees everything cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug") else: # Users see only active, non-pipeline prompts cur.execute("SELECT * FROM ai_prompts WHERE active=true AND slug NOT LIKE 'pipeline_%' ORDER BY sort_order") return [r2d(r) for r in cur.fetchall()] @router.post("") def create_prompt(p: PromptCreate, session: dict=Depends(require_admin)): """Create new AI prompt (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Check if slug already exists cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,)) if cur.fetchone(): raise HTTPException(status_code=400, detail=f"Prompt with slug '{p.slug}' already exists") prompt_id = str(uuid.uuid4()) cur.execute( """INSERT INTO ai_prompts (id, name, slug, display_name, description, template, category, active, sort_order, created, updated) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""", (prompt_id, p.name, p.slug, p.display_name or p.name, p.description, p.template, p.category, p.active, p.sort_order) ) return {"id": prompt_id, "slug": p.slug} @router.put("/{prompt_id}") def update_prompt(prompt_id: str, p: PromptUpdate, session: dict=Depends(require_admin)): """Update AI prompt template (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Build dynamic UPDATE query updates = [] values = [] if p.name is not None: updates.append('name=%s') values.append(p.name) if p.display_name is not None: updates.append('display_name=%s') values.append(p.display_name) if p.description is not None: updates.append('description=%s') values.append(p.description) if p.template is not None: updates.append('template=%s') values.append(p.template) if p.category is not None: updates.append('category=%s') values.append(p.category) if p.active is not None: updates.append('active=%s') values.append(p.active) if p.sort_order is not None: updates.append('sort_order=%s') values.append(p.sort_order) if not updates: return {"ok": True} cur.execute( f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s", values + [prompt_id] ) return {"ok": True} @router.delete("/{prompt_id}") def delete_prompt(prompt_id: str, session: dict=Depends(require_admin)): """Delete AI prompt (admin only).""" with get_db() as conn: cur = get_cursor(conn) cur.execute("DELETE FROM ai_prompts WHERE id=%s", (prompt_id,)) if cur.rowcount == 0: raise HTTPException(status_code=404, detail="Prompt not found") return {"ok": True} @router.post("/{prompt_id}/duplicate") def duplicate_prompt(prompt_id: str, session: dict=Depends(require_admin)): """Duplicate an existing prompt (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Load original prompt cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,)) original = r2d(cur.fetchone()) if not original: raise HTTPException(status_code=404, detail="Prompt not found") # Create duplicate with new ID and modified name/slug new_id = str(uuid.uuid4()) new_name = f"{original['name']} (Kopie)" new_slug = f"{original['slug']}_copy_{uuid.uuid4().hex[:6]}" new_display_name = f"{original.get('display_name') or original['name']} (Kopie)" cur.execute( """INSERT INTO ai_prompts (id, name, slug, display_name, description, template, category, active, sort_order, created, updated) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""", (new_id, new_name, new_slug, new_display_name, original['description'], original['template'], original.get('category', 'ganzheitlich'), original['active'], original['sort_order']) ) return {"id": new_id, "slug": new_slug, "name": new_name} @router.put("/reorder") def reorder_prompts(order: list[str], session: dict=Depends(require_admin)): """ Reorder prompts by providing list of IDs in desired order. Args: order: List of prompt IDs in new order """ with get_db() as conn: cur = get_cursor(conn) for idx, prompt_id in enumerate(order): cur.execute( "UPDATE ai_prompts SET sort_order=%s WHERE id=%s", (idx, prompt_id) ) return {"ok": True} @router.post("/preview") def preview_prompt(data: dict, session: dict=Depends(require_auth)): """ Preview a prompt template with real user data (without calling AI). Args: data: {"template": "Your template with {{placeholders}}"} Returns: { "resolved": "Template with replaced placeholders", "unknown_placeholders": ["list", "of", "unknown"] } """ template = data.get('template', '') profile_id = session['profile_id'] resolved = resolve_placeholders(template, profile_id) unknown = get_unknown_placeholders(template) return { "resolved": resolved, "unknown_placeholders": unknown } @router.get("/placeholders") def list_placeholders(session: dict=Depends(require_auth)): """ Get grouped catalog of available placeholders with descriptions and examples. Returns: Dict mapping category to list of {key, description, example} """ profile_id = session['profile_id'] return get_placeholder_catalog(profile_id) # ── KI-Assisted Prompt Engineering ─────────────────────────────────────────── async def call_openrouter(prompt: str, max_tokens: int = 1500) -> str: """Call OpenRouter API to get AI response.""" if not OPENROUTER_KEY: raise HTTPException(status_code=500, detail="OpenRouter API key not configured") async with httpx.AsyncClient() as client: resp = await client.post( "https://openrouter.ai/api/v1/chat/completions", headers={"Authorization": f"Bearer {OPENROUTER_KEY}"}, json={ "model": OPENROUTER_MODEL, "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens }, timeout=60.0 ) if resp.status_code != 200: raise HTTPException(status_code=resp.status_code, detail=f"OpenRouter API error: {resp.text}") return resp.json()['choices'][0]['message']['content'].strip() def collect_example_data(profile_id: str, data_categories: list[str]) -> dict: """Collect example data from user's profile for specified categories.""" example_data = {} with get_db() as conn: cur = get_cursor(conn) # Profil cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,)) profile = r2d(cur.fetchone()) example_data['profil'] = { 'name': profile.get('name', 'Nutzer'), 'age': profile.get('dob', 'unbekannt'), 'height': profile.get('height', 'unbekannt'), 'sex': profile.get('sex', 'unbekannt') } # Körper if 'körper' in data_categories: cur.execute( "SELECT weight, date FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 3", (profile_id,) ) weights = [r2d(r) for r in cur.fetchall()] example_data['körper'] = { 'weight_entries': weights, 'latest_weight': f"{weights[0]['weight']:.1f} kg" if weights else "nicht verfügbar" } # Ernährung if 'ernährung' in data_categories: cur.execute( """SELECT kcal, protein, carb, fat, date FROM nutrition_log WHERE profile_id=%s ORDER BY date DESC LIMIT 3""", (profile_id,) ) nutrition = [r2d(r) for r in cur.fetchall()] example_data['ernährung'] = { 'recent_entries': nutrition } # Training if 'training' in data_categories: cur.execute( """SELECT activity_type, duration_min, kcal_active, date FROM activity_log WHERE profile_id=%s ORDER BY date DESC LIMIT 5""", (profile_id,) ) activities = [r2d(r) for r in cur.fetchall()] example_data['training'] = { 'recent_activities': activities } return example_data @router.post("/generate") async def generate_prompt(req: PromptGenerateRequest, session: dict=Depends(require_admin)): """ Generate AI prompt using KI based on user's goal description. This is a meta-feature: KI helps create better prompts for KI analysis. """ profile_id = session['profile_id'] # Collect example data example_data = collect_example_data(profile_id, req.data_categories) # Get available placeholders for selected categories available_placeholders = get_available_placeholders(req.data_categories) placeholders_list = [] for cat, phs in available_placeholders.items(): placeholders_list.extend(phs) # Build meta-prompt for prompt generation meta_prompt = f"""Du bist ein Experte für Prompt-Engineering im Bereich Fitness & Gesundheit. **Aufgabe:** Erstelle einen optimalen KI-Prompt für folgendes Analyseziel: "{req.goal}" **Verfügbare Datenbereiche:** {', '.join(req.data_categories)} **Beispieldaten (aktuelle Werte des Nutzers):** ```json {json.dumps(example_data, indent=2, ensure_ascii=False)} ``` **Verfügbare Platzhalter:** {', '.join(placeholders_list)} **Anforderungen an den Prompt:** 1. Nutze relevante Platzhalter ({{{{platzhalter_name}}}}) - diese werden durch echte Daten ersetzt 2. Sei spezifisch und klar in den Anweisungen 3. Fordere strukturierte Antworten (z.B. Abschnitte, Bullet Points) 4. Gib der KI Kontext über ihre Rolle/Expertise (z.B. "Du bist ein Sportwissenschaftler") 5. Fordere konkrete, umsetzbare Handlungsempfehlungen 6. Sprache: Deutsch 7. Der Prompt sollte 150-300 Wörter lang sein {f'**Gewünschtes Antwort-Format:**\\n{req.example_output}' if req.example_output else ''} **Generiere jetzt NUR den Prompt-Text (keine Erklärung, keine Metakommentare):** """ # Call AI to generate prompt generated_prompt = await call_openrouter(meta_prompt, max_tokens=1000) # Extract placeholders used import re placeholders_used = list(set(re.findall(r'\{\{(\w+)\}\}', generated_prompt))) # Generate title from goal title = generate_title_from_goal(req.goal) # Infer category category = infer_category(req.data_categories) return { "template": generated_prompt, "placeholders_used": placeholders_used, "example_data": example_data, "suggested_title": title, "suggested_category": category } def generate_title_from_goal(goal: str) -> str: """Generate a title from the goal description.""" goal_lower = goal.lower() # Simple keyword matching if 'protein' in goal_lower: return 'Protein-Analyse' elif 'gewicht' in goal_lower or 'abnehmen' in goal_lower: return 'Gewichtstrend-Analyse' elif 'training' in goal_lower or 'aktivität' in goal_lower: return 'Trainingsanalyse' elif 'schlaf' in goal_lower: return 'Schlaf-Analyse' elif 'regeneration' in goal_lower or 'erholung' in goal_lower: return 'Regenerations-Analyse' elif 'kraft' in goal_lower or 'muskel' in goal_lower: return 'Kraftentwicklung' elif 'ausdauer' in goal_lower or 'cardio' in goal_lower: return 'Ausdauer-Analyse' else: return 'Neue Analyse' def infer_category(data_categories: list[str]) -> str: """Infer prompt category from selected data categories.""" if len(data_categories) == 1: return data_categories[0] elif len(data_categories) > 2: return 'ganzheitlich' else: # 2 categories: prefer the first one return data_categories[0] if data_categories else 'ganzheitlich' @router.post("/{prompt_id}/optimize") async def optimize_prompt(prompt_id: str, session: dict=Depends(require_admin)): """ Analyze and optimize an existing prompt using KI. Returns suggestions for improvement with score, strengths, weaknesses, and an optimized version of the prompt. """ with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,)) prompt = r2d(cur.fetchone()) if not prompt: raise HTTPException(status_code=404, detail="Prompt not found") # Build meta-prompt for optimization meta_prompt = f"""Du bist ein Experte für Prompt-Engineering. **Analysiere folgenden KI-Prompt und schlage Verbesserungen vor:** ``` {prompt['template']} ``` **Analysiere folgende Aspekte:** 1. **Klarheit & Präzision:** Ist die Anweisung klar und eindeutig? 2. **Struktur & Lesbarkeit:** Ist der Prompt gut strukturiert? 3. **Platzhalter-Nutzung:** Werden relevante Platzhalter genutzt? Fehlen wichtige Daten? 4. **Antwort-Format:** Wird eine strukturierte Ausgabe gefordert? 5. **Kontext:** Hat die KI genug Rollenkontext (z.B. "Du bist ein Ernährungsexperte")? 6. **Handlungsempfehlungen:** Werden konkrete, umsetzbare Schritte gefordert? **Gib deine Analyse als JSON zurück (NUR das JSON, keine zusätzlichen Kommentare):** ```json {{ "score": 0-100, "strengths": ["Stärke 1", "Stärke 2", "Stärke 3"], "weaknesses": ["Schwäche 1", "Schwäche 2"], "optimized_prompt": "Vollständig optimierte Version des Prompts", "changes_summary": "Kurze Zusammenfassung was verbessert wurde (2-3 Sätze)" }} ``` **Wichtig:** - Die optimierte Version sollte alle Platzhalter beibehalten und ggf. ergänzen - Sprache: Deutsch - Der optimierte Prompt sollte 150-400 Wörter lang sein """ # Call AI for optimization response = await call_openrouter(meta_prompt, max_tokens=1500) # Parse JSON response try: # Extract JSON from markdown code blocks if present if '```json' in response: json_start = response.find('```json') + 7 json_end = response.find('```', json_start) json_str = response[json_start:json_end].strip() elif '```' in response: json_start = response.find('```') + 3 json_end = response.find('```', json_start) json_str = response[json_start:json_end].strip() else: json_str = response analysis = json.loads(json_str) except json.JSONDecodeError as e: raise HTTPException( status_code=500, detail=f"Failed to parse AI response as JSON: {str(e)}. Response: {response[:200]}" ) # Ensure required fields if not all(k in analysis for k in ['score', 'strengths', 'weaknesses', 'optimized_prompt', 'changes_summary']): raise HTTPException( status_code=500, detail=f"AI response missing required fields. Got: {list(analysis.keys())}" ) return analysis # ── Pipeline Config Management (Issue #28) ──────────────────────────────────── @router.get("/pipeline-configs") def list_pipeline_configs(session: dict=Depends(require_auth)): """ List pipeline configurations. - Admins: see ALL configs - Users: see only active configs """ with get_db() as conn: cur = get_cursor(conn) is_admin = session.get('role') == 'admin' if is_admin: cur.execute("SELECT * FROM pipeline_configs ORDER BY is_default DESC, name") else: cur.execute("SELECT * FROM pipeline_configs WHERE active=true ORDER BY is_default DESC, name") return [r2d(r) for r in cur.fetchall()] @router.post("/pipeline-configs") def create_pipeline_config(p: PipelineConfigCreate, session: dict=Depends(require_admin)): """Create new pipeline configuration (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Check if name already exists cur.execute("SELECT id FROM pipeline_configs WHERE name=%s", (p.name,)) if cur.fetchone(): raise HTTPException(status_code=400, detail=f"Pipeline config with name '{p.name}' already exists") # Validate: stage prompts must exist all_slugs = p.stage1_prompts + [p.stage2_prompt] if p.stage3_prompt: all_slugs.append(p.stage3_prompt) for slug in all_slugs: cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,)) if not cur.fetchone(): raise HTTPException(status_code=400, detail=f"Prompt '{slug}' does not exist") # If is_default=true, unset other defaults if p.is_default: cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true") config_id = str(uuid.uuid4()) cur.execute( """INSERT INTO pipeline_configs ( id, name, description, is_default, active, modules, timeframes, stage1_prompts, stage2_prompt, stage3_prompt, created, updated ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""", ( config_id, p.name, p.description, p.is_default, p.active, json.dumps(p.modules), json.dumps(p.timeframes), p.stage1_prompts, p.stage2_prompt, p.stage3_prompt ) ) return {"id": config_id, "name": p.name} @router.put("/pipeline-configs/{config_id}") def update_pipeline_config(config_id: str, p: PipelineConfigUpdate, session: dict=Depends(require_admin)): """Update pipeline configuration (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Check if config exists cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,)) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Pipeline config not found") # Build dynamic UPDATE query updates = [] values = [] if p.name is not None: updates.append('name=%s') values.append(p.name) if p.description is not None: updates.append('description=%s') values.append(p.description) if p.is_default is not None: # If setting to default, unset others if p.is_default: cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true AND id!=%s", (config_id,)) updates.append('is_default=%s') values.append(p.is_default) if p.active is not None: updates.append('active=%s') values.append(p.active) if p.modules is not None: updates.append('modules=%s') values.append(json.dumps(p.modules)) if p.timeframes is not None: updates.append('timeframes=%s') values.append(json.dumps(p.timeframes)) if p.stage1_prompts is not None: updates.append('stage1_prompts=%s') values.append(p.stage1_prompts) if p.stage2_prompt is not None: updates.append('stage2_prompt=%s') values.append(p.stage2_prompt) if p.stage3_prompt is not None: updates.append('stage3_prompt=%s') values.append(p.stage3_prompt) if not updates: return {"ok": True} cur.execute( f"UPDATE pipeline_configs SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s", values + [config_id] ) return {"ok": True} @router.delete("/pipeline-configs/{config_id}") def delete_pipeline_config(config_id: str, session: dict=Depends(require_admin)): """Delete pipeline configuration (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Check if it's the only default cur.execute("SELECT is_default FROM pipeline_configs WHERE id=%s", (config_id,)) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Pipeline config not found") if row['is_default']: # Check if there are other configs cur.execute("SELECT COUNT(*) as count FROM pipeline_configs WHERE id!=%s", (config_id,)) if cur.fetchone()['count'] > 0: raise HTTPException( status_code=400, detail="Cannot delete the default config. Please set another config as default first." ) cur.execute("DELETE FROM pipeline_configs WHERE id=%s", (config_id,)) return {"ok": True} @router.post("/pipeline-configs/{config_id}/set-default") def set_default_pipeline_config(config_id: str, session: dict=Depends(require_admin)): """Set a pipeline config as default (admin only).""" with get_db() as conn: cur = get_cursor(conn) # Check if config exists cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,)) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Pipeline config not found") # Unset all other defaults cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true") # Set this one as default cur.execute("UPDATE pipeline_configs SET is_default=true, updated=CURRENT_TIMESTAMP WHERE id=%s", (config_id,)) return {"ok": True} @router.post("/{prompt_id}/reset-to-default") def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)): """ Reset a system prompt to its default template (admin only). Only works for prompts with is_system_default=true. """ with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT is_system_default, default_template FROM ai_prompts WHERE id=%s", (prompt_id,)) row = cur.fetchone() if not row: raise HTTPException(status_code=404, detail="Prompt not found") if not row['is_system_default']: raise HTTPException(status_code=400, detail="Only system prompts can be reset to default") if not row['default_template']: raise HTTPException(status_code=400, detail="No default template available for this prompt") # Reset template to default cur.execute( "UPDATE ai_prompts SET template=%s, updated=CURRENT_TIMESTAMP WHERE id=%s", (row['default_template'], prompt_id) ) return {"ok": True} # ══════════════════════════════════════════════════════════════════════════════ # UNIFIED PROMPT SYSTEM (Issue #28 Phase 2) # ══════════════════════════════════════════════════════════════════════════════ from prompt_executor import execute_prompt_with_data from models import UnifiedPromptCreate, UnifiedPromptUpdate @router.post("/execute") async def execute_unified_prompt( prompt_slug: str, modules: Optional[dict] = None, timeframes: Optional[dict] = None, debug: bool = False, session: dict = Depends(require_auth) ): """ Execute a unified prompt (base or pipeline type). Args: prompt_slug: Slug of prompt to execute modules: Dict of enabled modules (e.g., {"körper": true}) timeframes: Dict of timeframes per module (e.g., {"körper": 30}) debug: If true, include debug information (placeholders, final prompts, etc.) Returns: Execution result with outputs (and debug info if debug=true) """ profile_id = session['profile_id'] # Use default modules/timeframes if not provided if not modules: modules = { 'körper': True, 'ernährung': True, 'training': True, 'schlaf': True, 'vitalwerte': True } if not timeframes: timeframes = { 'körper': 30, 'ernährung': 30, 'training': 14, 'schlaf': 14, 'vitalwerte': 7 } # Execute with prompt_executor result = await execute_prompt_with_data( prompt_slug=prompt_slug, profile_id=profile_id, modules=modules, timeframes=timeframes, openrouter_call_func=call_openrouter, enable_debug=debug ) return result @router.post("/unified") def create_unified_prompt(p: UnifiedPromptCreate, session: dict = Depends(require_admin)): """ Create a new unified prompt (base or pipeline type). Admin only. """ with get_db() as conn: cur = get_cursor(conn) # Check for duplicate slug cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,)) if cur.fetchone(): raise HTTPException(status_code=400, detail="Slug already exists") # Validate type if p.type not in ['base', 'pipeline']: raise HTTPException(status_code=400, detail="Type must be 'base' or 'pipeline'") # Validate base type has template if p.type == 'base' and not p.template: raise HTTPException(status_code=400, detail="Base prompts require a template") # Validate pipeline type has stages if p.type == 'pipeline' and not p.stages: raise HTTPException(status_code=400, detail="Pipeline prompts require stages") # Convert stages to JSONB stages_json = None if p.stages: stages_json = json.dumps([ { 'stage': s.stage, 'prompts': [ { 'source': pr.source, 'slug': pr.slug, 'template': pr.template, 'output_key': pr.output_key, 'output_format': pr.output_format, 'output_schema': pr.output_schema } for pr in s.prompts ] } for s in p.stages ]) prompt_id = str(uuid.uuid4()) cur.execute( """INSERT INTO ai_prompts (id, slug, name, display_name, description, template, category, active, sort_order, type, stages, output_format, output_schema) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", ( prompt_id, p.slug, p.name, p.display_name, p.description, p.template, p.category, p.active, p.sort_order, p.type, stages_json, p.output_format, json.dumps(p.output_schema) if p.output_schema else None ) ) return {"id": prompt_id, "slug": p.slug} @router.put("/unified/{prompt_id}") def update_unified_prompt(prompt_id: str, p: UnifiedPromptUpdate, session: dict = Depends(require_admin)): """ Update a unified prompt. Admin only. """ with get_db() as conn: cur = get_cursor(conn) # Check if exists cur.execute("SELECT id FROM ai_prompts WHERE id=%s", (prompt_id,)) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Prompt not found") # Build update query updates = [] values = [] if p.name is not None: updates.append('name=%s') values.append(p.name) if p.display_name is not None: updates.append('display_name=%s') values.append(p.display_name) if p.description is not None: updates.append('description=%s') values.append(p.description) if p.type is not None: if p.type not in ['base', 'pipeline']: raise HTTPException(status_code=400, detail="Type must be 'base' or 'pipeline'") updates.append('type=%s') values.append(p.type) if p.category is not None: updates.append('category=%s') values.append(p.category) if p.active is not None: updates.append('active=%s') values.append(p.active) if p.sort_order is not None: updates.append('sort_order=%s') values.append(p.sort_order) if p.template is not None: updates.append('template=%s') values.append(p.template) if p.output_format is not None: updates.append('output_format=%s') values.append(p.output_format) if p.output_schema is not None: updates.append('output_schema=%s') values.append(json.dumps(p.output_schema)) if p.stages is not None: stages_json = json.dumps([ { 'stage': s.stage, 'prompts': [ { 'source': pr.source, 'slug': pr.slug, 'template': pr.template, 'output_key': pr.output_key, 'output_format': pr.output_format, 'output_schema': pr.output_schema } for pr in s.prompts ] } for s in p.stages ]) updates.append('stages=%s') values.append(stages_json) if not updates: return {"ok": True} cur.execute( f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s", values + [prompt_id] ) return {"ok": True}