mitai-jinkendo/backend/routers/prompts.py
Lars 555ff62b56
All checks were successful
Deploy Development / deploy (push) Successful in 45s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: global placeholder export with values (Settings page)
Zentraler Export aller verfügbaren Platzhalter mit aktuellen Werten.

Backend:
- GET /api/prompts/placeholders/export-values
  - Returns all placeholders organized by category
  - Includes resolved values for current profile
  - Includes metadata (description, example)
  - Flat list + categorized structure

Frontend SettingsPage:
- Button "📊 Platzhalter exportieren"
- Downloads: placeholders-{profile}-{date}.json
- Shows all 38+ placeholders with current values
- Useful for:
  - Understanding available data
  - Debugging prompt templates
  - Verifying placeholder resolution

Frontend api.js:
- exportPlaceholderValues()

Export Format:
{
  "export_date": "2026-03-26T...",
  "profile_id": "...",
  "count": 38,
  "all_placeholders": { "name": "Lars", ... },
  "placeholders_by_category": {
    "Profil": [...],
    "Körper": [...],
    ...
  }
}

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-26 10:05:11 +01:00

1102 lines
38 KiB
Python

"""
AI Prompts Management Endpoints for Mitai Jinkendo
Handles prompt template configuration (admin-editable).
"""
import os
import json
import uuid
import httpx
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from db import get_db, get_cursor, r2d
from auth import require_auth, require_admin
from models import (
PromptCreate, PromptUpdate, PromptGenerateRequest,
PipelineConfigCreate, PipelineConfigUpdate
)
from placeholder_resolver import (
resolve_placeholders,
get_unknown_placeholders,
get_placeholder_example_values,
get_available_placeholders,
get_placeholder_catalog
)
# Environment variables
OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY")
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "anthropic/claude-sonnet-4")
router = APIRouter(prefix="/api/prompts", tags=["prompts"])
@router.get("")
def list_prompts(session: dict=Depends(require_auth)):
"""
List AI prompts.
- Admins: see ALL prompts (including pipeline and inactive)
- Users: see only active single-analysis prompts
"""
with get_db() as conn:
cur = get_cursor(conn)
is_admin = session.get('role') == 'admin'
if is_admin:
# Admin sees everything
cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug")
else:
# Users see only active, non-pipeline prompts
cur.execute("SELECT * FROM ai_prompts WHERE active=true AND slug NOT LIKE 'pipeline_%' ORDER BY sort_order")
return [r2d(r) for r in cur.fetchall()]
@router.post("")
def create_prompt(p: PromptCreate, session: dict=Depends(require_admin)):
"""Create new AI prompt (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if slug already exists
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,))
if cur.fetchone():
raise HTTPException(status_code=400, detail=f"Prompt with slug '{p.slug}' already exists")
prompt_id = str(uuid.uuid4())
cur.execute(
"""INSERT INTO ai_prompts (id, name, slug, display_name, description, template, category, active, sort_order, created, updated)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""",
(prompt_id, p.name, p.slug, p.display_name or p.name, p.description, p.template, p.category, p.active, p.sort_order)
)
return {"id": prompt_id, "slug": p.slug}
@router.put("/{prompt_id}")
def update_prompt(prompt_id: str, p: PromptUpdate, session: dict=Depends(require_admin)):
"""Update AI prompt template (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Build dynamic UPDATE query
updates = []
values = []
if p.name is not None:
updates.append('name=%s')
values.append(p.name)
if p.display_name is not None:
updates.append('display_name=%s')
values.append(p.display_name)
if p.description is not None:
updates.append('description=%s')
values.append(p.description)
if p.template is not None:
updates.append('template=%s')
values.append(p.template)
if p.category is not None:
updates.append('category=%s')
values.append(p.category)
if p.active is not None:
updates.append('active=%s')
values.append(p.active)
if p.sort_order is not None:
updates.append('sort_order=%s')
values.append(p.sort_order)
if not updates:
return {"ok": True}
cur.execute(
f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
values + [prompt_id]
)
return {"ok": True}
@router.delete("/{prompt_id}")
def delete_prompt(prompt_id: str, session: dict=Depends(require_admin)):
"""Delete AI prompt (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM ai_prompts WHERE id=%s", (prompt_id,))
if cur.rowcount == 0:
raise HTTPException(status_code=404, detail="Prompt not found")
return {"ok": True}
@router.post("/{prompt_id}/duplicate")
def duplicate_prompt(prompt_id: str, session: dict=Depends(require_admin)):
"""Duplicate an existing prompt (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Load original prompt
cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,))
original = r2d(cur.fetchone())
if not original:
raise HTTPException(status_code=404, detail="Prompt not found")
# Create duplicate with new ID and modified name/slug
new_id = str(uuid.uuid4())
new_name = f"{original['name']} (Kopie)"
new_slug = f"{original['slug']}_copy_{uuid.uuid4().hex[:6]}"
new_display_name = f"{original.get('display_name') or original['name']} (Kopie)"
cur.execute(
"""INSERT INTO ai_prompts (id, name, slug, display_name, description, template, category, active, sort_order, created, updated)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""",
(new_id, new_name, new_slug, new_display_name, original['description'], original['template'],
original.get('category', 'ganzheitlich'), original['active'], original['sort_order'])
)
return {"id": new_id, "slug": new_slug, "name": new_name}
@router.put("/reorder")
def reorder_prompts(order: list[str], session: dict=Depends(require_admin)):
"""
Reorder prompts by providing list of IDs in desired order.
Args:
order: List of prompt IDs in new order
"""
with get_db() as conn:
cur = get_cursor(conn)
for idx, prompt_id in enumerate(order):
cur.execute(
"UPDATE ai_prompts SET sort_order=%s WHERE id=%s",
(idx, prompt_id)
)
return {"ok": True}
@router.post("/preview")
def preview_prompt(data: dict, session: dict=Depends(require_auth)):
"""
Preview a prompt template with real user data (without calling AI).
Args:
data: {"template": "Your template with {{placeholders}}"}
Returns:
{
"resolved": "Template with replaced placeholders",
"unknown_placeholders": ["list", "of", "unknown"]
}
"""
template = data.get('template', '')
profile_id = session['profile_id']
resolved = resolve_placeholders(template, profile_id)
unknown = get_unknown_placeholders(template)
return {
"resolved": resolved,
"unknown_placeholders": unknown
}
@router.get("/placeholders")
def list_placeholders(session: dict=Depends(require_auth)):
"""
Get grouped catalog of available placeholders with descriptions and examples.
Returns:
Dict mapping category to list of {key, description, example}
"""
profile_id = session['profile_id']
return get_placeholder_catalog(profile_id)
@router.get("/placeholders/export-values")
def export_placeholder_values(session: dict = Depends(require_auth)):
"""
Export all available placeholders with their current resolved values.
Returns JSON export suitable for download with all placeholders
resolved for the current user's profile.
"""
from datetime import datetime
profile_id = session['profile_id']
# Get all resolved placeholder values
resolved_values = get_placeholder_example_values(profile_id)
# Clean up keys (remove {{ }})
cleaned_values = {
key.replace('{{', '').replace('}}', ''): value
for key, value in resolved_values.items()
}
# Get catalog for metadata
catalog = get_placeholder_catalog(profile_id)
# Organize by category with metadata
export_data = {
'export_date': datetime.now().isoformat(),
'profile_id': profile_id,
'placeholders_by_category': {}
}
for category, items in catalog.items():
export_data['placeholders_by_category'][category] = []
for item in items:
key = item['key'].replace('{{', '').replace('}}', '')
export_data['placeholders_by_category'][category].append({
'key': item['key'],
'description': item['description'],
'value': cleaned_values.get(key, 'nicht verfügbar'),
'example': item.get('example')
})
# Also include flat list for easy access
export_data['all_placeholders'] = cleaned_values
export_data['count'] = len(cleaned_values)
return export_data
# ── KI-Assisted Prompt Engineering ───────────────────────────────────────────
async def call_openrouter(prompt: str, max_tokens: int = 1500) -> str:
"""Call OpenRouter API to get AI response."""
if not OPENROUTER_KEY:
raise HTTPException(status_code=500, detail="OpenRouter API key not configured")
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens
},
timeout=60.0
)
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=f"OpenRouter API error: {resp.text}")
return resp.json()['choices'][0]['message']['content'].strip()
def collect_example_data(profile_id: str, data_categories: list[str]) -> dict:
"""Collect example data from user's profile for specified categories."""
example_data = {}
with get_db() as conn:
cur = get_cursor(conn)
# Profil
cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,))
profile = r2d(cur.fetchone())
example_data['profil'] = {
'name': profile.get('name', 'Nutzer'),
'age': profile.get('dob', 'unbekannt'),
'height': profile.get('height', 'unbekannt'),
'sex': profile.get('sex', 'unbekannt')
}
# Körper
if 'körper' in data_categories:
cur.execute(
"SELECT weight, date FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 3",
(profile_id,)
)
weights = [r2d(r) for r in cur.fetchall()]
example_data['körper'] = {
'weight_entries': weights,
'latest_weight': f"{weights[0]['weight']:.1f} kg" if weights else "nicht verfügbar"
}
# Ernährung
if 'ernährung' in data_categories:
cur.execute(
"""SELECT kcal, protein, carb, fat, date FROM nutrition_log
WHERE profile_id=%s ORDER BY date DESC LIMIT 3""",
(profile_id,)
)
nutrition = [r2d(r) for r in cur.fetchall()]
example_data['ernährung'] = {
'recent_entries': nutrition
}
# Training
if 'training' in data_categories:
cur.execute(
"""SELECT activity_type, duration_min, kcal_active, date FROM activity_log
WHERE profile_id=%s ORDER BY date DESC LIMIT 5""",
(profile_id,)
)
activities = [r2d(r) for r in cur.fetchall()]
example_data['training'] = {
'recent_activities': activities
}
return example_data
@router.post("/generate")
async def generate_prompt(req: PromptGenerateRequest, session: dict=Depends(require_admin)):
"""
Generate AI prompt using KI based on user's goal description.
This is a meta-feature: KI helps create better prompts for KI analysis.
"""
profile_id = session['profile_id']
# Collect example data
example_data = collect_example_data(profile_id, req.data_categories)
# Get available placeholders for selected categories
available_placeholders = get_available_placeholders(req.data_categories)
placeholders_list = []
for cat, phs in available_placeholders.items():
placeholders_list.extend(phs)
# Build meta-prompt for prompt generation
meta_prompt = f"""Du bist ein Experte für Prompt-Engineering im Bereich Fitness & Gesundheit.
**Aufgabe:**
Erstelle einen optimalen KI-Prompt für folgendes Analyseziel:
"{req.goal}"
**Verfügbare Datenbereiche:**
{', '.join(req.data_categories)}
**Beispieldaten (aktuelle Werte des Nutzers):**
```json
{json.dumps(example_data, indent=2, ensure_ascii=False)}
```
**Verfügbare Platzhalter:**
{', '.join(placeholders_list)}
**Anforderungen an den Prompt:**
1. Nutze relevante Platzhalter ({{{{platzhalter_name}}}}) - diese werden durch echte Daten ersetzt
2. Sei spezifisch und klar in den Anweisungen
3. Fordere strukturierte Antworten (z.B. Abschnitte, Bullet Points)
4. Gib der KI Kontext über ihre Rolle/Expertise (z.B. "Du bist ein Sportwissenschaftler")
5. Fordere konkrete, umsetzbare Handlungsempfehlungen
6. Sprache: Deutsch
7. Der Prompt sollte 150-300 Wörter lang sein
{f'**Gewünschtes Antwort-Format:**\\n{req.example_output}' if req.example_output else ''}
**Generiere jetzt NUR den Prompt-Text (keine Erklärung, keine Metakommentare):**
"""
# Call AI to generate prompt
generated_prompt = await call_openrouter(meta_prompt, max_tokens=1000)
# Extract placeholders used
import re
placeholders_used = list(set(re.findall(r'\{\{(\w+)\}\}', generated_prompt)))
# Generate title from goal
title = generate_title_from_goal(req.goal)
# Infer category
category = infer_category(req.data_categories)
return {
"template": generated_prompt,
"placeholders_used": placeholders_used,
"example_data": example_data,
"suggested_title": title,
"suggested_category": category
}
def generate_title_from_goal(goal: str) -> str:
"""Generate a title from the goal description."""
goal_lower = goal.lower()
# Simple keyword matching
if 'protein' in goal_lower:
return 'Protein-Analyse'
elif 'gewicht' in goal_lower or 'abnehmen' in goal_lower:
return 'Gewichtstrend-Analyse'
elif 'training' in goal_lower or 'aktivität' in goal_lower:
return 'Trainingsanalyse'
elif 'schlaf' in goal_lower:
return 'Schlaf-Analyse'
elif 'regeneration' in goal_lower or 'erholung' in goal_lower:
return 'Regenerations-Analyse'
elif 'kraft' in goal_lower or 'muskel' in goal_lower:
return 'Kraftentwicklung'
elif 'ausdauer' in goal_lower or 'cardio' in goal_lower:
return 'Ausdauer-Analyse'
else:
return 'Neue Analyse'
def infer_category(data_categories: list[str]) -> str:
"""Infer prompt category from selected data categories."""
if len(data_categories) == 1:
return data_categories[0]
elif len(data_categories) > 2:
return 'ganzheitlich'
else:
# 2 categories: prefer the first one
return data_categories[0] if data_categories else 'ganzheitlich'
@router.post("/{prompt_id}/optimize")
async def optimize_prompt(prompt_id: str, session: dict=Depends(require_admin)):
"""
Analyze and optimize an existing prompt using KI.
Returns suggestions for improvement with score, strengths, weaknesses,
and an optimized version of the prompt.
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,))
prompt = r2d(cur.fetchone())
if not prompt:
raise HTTPException(status_code=404, detail="Prompt not found")
# Build meta-prompt for optimization
meta_prompt = f"""Du bist ein Experte für Prompt-Engineering.
**Analysiere folgenden KI-Prompt und schlage Verbesserungen vor:**
```
{prompt['template']}
```
**Analysiere folgende Aspekte:**
1. **Klarheit & Präzision:** Ist die Anweisung klar und eindeutig?
2. **Struktur & Lesbarkeit:** Ist der Prompt gut strukturiert?
3. **Platzhalter-Nutzung:** Werden relevante Platzhalter genutzt? Fehlen wichtige Daten?
4. **Antwort-Format:** Wird eine strukturierte Ausgabe gefordert?
5. **Kontext:** Hat die KI genug Rollenkontext (z.B. "Du bist ein Ernährungsexperte")?
6. **Handlungsempfehlungen:** Werden konkrete, umsetzbare Schritte gefordert?
**Gib deine Analyse als JSON zurück (NUR das JSON, keine zusätzlichen Kommentare):**
```json
{{
"score": 0-100,
"strengths": ["Stärke 1", "Stärke 2", "Stärke 3"],
"weaknesses": ["Schwäche 1", "Schwäche 2"],
"optimized_prompt": "Vollständig optimierte Version des Prompts",
"changes_summary": "Kurze Zusammenfassung was verbessert wurde (2-3 Sätze)"
}}
```
**Wichtig:**
- Die optimierte Version sollte alle Platzhalter beibehalten und ggf. ergänzen
- Sprache: Deutsch
- Der optimierte Prompt sollte 150-400 Wörter lang sein
"""
# Call AI for optimization
response = await call_openrouter(meta_prompt, max_tokens=1500)
# Parse JSON response
try:
# Extract JSON from markdown code blocks if present
if '```json' in response:
json_start = response.find('```json') + 7
json_end = response.find('```', json_start)
json_str = response[json_start:json_end].strip()
elif '```' in response:
json_start = response.find('```') + 3
json_end = response.find('```', json_start)
json_str = response[json_start:json_end].strip()
else:
json_str = response
analysis = json.loads(json_str)
except json.JSONDecodeError as e:
raise HTTPException(
status_code=500,
detail=f"Failed to parse AI response as JSON: {str(e)}. Response: {response[:200]}"
)
# Ensure required fields
if not all(k in analysis for k in ['score', 'strengths', 'weaknesses', 'optimized_prompt', 'changes_summary']):
raise HTTPException(
status_code=500,
detail=f"AI response missing required fields. Got: {list(analysis.keys())}"
)
return analysis
# ── Pipeline Config Management (Issue #28) ────────────────────────────────────
@router.get("/pipeline-configs")
def list_pipeline_configs(session: dict=Depends(require_auth)):
"""
List pipeline configurations.
- Admins: see ALL configs
- Users: see only active configs
"""
with get_db() as conn:
cur = get_cursor(conn)
is_admin = session.get('role') == 'admin'
if is_admin:
cur.execute("SELECT * FROM pipeline_configs ORDER BY is_default DESC, name")
else:
cur.execute("SELECT * FROM pipeline_configs WHERE active=true ORDER BY is_default DESC, name")
return [r2d(r) for r in cur.fetchall()]
@router.post("/pipeline-configs")
def create_pipeline_config(p: PipelineConfigCreate, session: dict=Depends(require_admin)):
"""Create new pipeline configuration (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if name already exists
cur.execute("SELECT id FROM pipeline_configs WHERE name=%s", (p.name,))
if cur.fetchone():
raise HTTPException(status_code=400, detail=f"Pipeline config with name '{p.name}' already exists")
# Validate: stage prompts must exist
all_slugs = p.stage1_prompts + [p.stage2_prompt]
if p.stage3_prompt:
all_slugs.append(p.stage3_prompt)
for slug in all_slugs:
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail=f"Prompt '{slug}' does not exist")
# If is_default=true, unset other defaults
if p.is_default:
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true")
config_id = str(uuid.uuid4())
cur.execute(
"""INSERT INTO pipeline_configs (
id, name, description, is_default, active,
modules, timeframes, stage1_prompts, stage2_prompt, stage3_prompt,
created, updated
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""",
(
config_id, p.name, p.description, p.is_default, p.active,
json.dumps(p.modules), json.dumps(p.timeframes),
p.stage1_prompts, p.stage2_prompt, p.stage3_prompt
)
)
return {"id": config_id, "name": p.name}
@router.put("/pipeline-configs/{config_id}")
def update_pipeline_config(config_id: str, p: PipelineConfigUpdate, session: dict=Depends(require_admin)):
"""Update pipeline configuration (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if config exists
cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Pipeline config not found")
# Build dynamic UPDATE query
updates = []
values = []
if p.name is not None:
updates.append('name=%s')
values.append(p.name)
if p.description is not None:
updates.append('description=%s')
values.append(p.description)
if p.is_default is not None:
# If setting to default, unset others
if p.is_default:
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true AND id!=%s", (config_id,))
updates.append('is_default=%s')
values.append(p.is_default)
if p.active is not None:
updates.append('active=%s')
values.append(p.active)
if p.modules is not None:
updates.append('modules=%s')
values.append(json.dumps(p.modules))
if p.timeframes is not None:
updates.append('timeframes=%s')
values.append(json.dumps(p.timeframes))
if p.stage1_prompts is not None:
updates.append('stage1_prompts=%s')
values.append(p.stage1_prompts)
if p.stage2_prompt is not None:
updates.append('stage2_prompt=%s')
values.append(p.stage2_prompt)
if p.stage3_prompt is not None:
updates.append('stage3_prompt=%s')
values.append(p.stage3_prompt)
if not updates:
return {"ok": True}
cur.execute(
f"UPDATE pipeline_configs SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
values + [config_id]
)
return {"ok": True}
@router.delete("/pipeline-configs/{config_id}")
def delete_pipeline_config(config_id: str, session: dict=Depends(require_admin)):
"""Delete pipeline configuration (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if it's the only default
cur.execute("SELECT is_default FROM pipeline_configs WHERE id=%s", (config_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Pipeline config not found")
if row['is_default']:
# Check if there are other configs
cur.execute("SELECT COUNT(*) as count FROM pipeline_configs WHERE id!=%s", (config_id,))
if cur.fetchone()['count'] > 0:
raise HTTPException(
status_code=400,
detail="Cannot delete the default config. Please set another config as default first."
)
cur.execute("DELETE FROM pipeline_configs WHERE id=%s", (config_id,))
return {"ok": True}
@router.post("/pipeline-configs/{config_id}/set-default")
def set_default_pipeline_config(config_id: str, session: dict=Depends(require_admin)):
"""Set a pipeline config as default (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if config exists
cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Pipeline config not found")
# Unset all other defaults
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true")
# Set this one as default
cur.execute("UPDATE pipeline_configs SET is_default=true, updated=CURRENT_TIMESTAMP WHERE id=%s", (config_id,))
return {"ok": True}
@router.post("/{prompt_id}/reset-to-default")
def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)):
"""
Reset a system prompt to its default template (admin only).
Only works for prompts with is_system_default=true.
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT is_system_default, default_template FROM ai_prompts WHERE id=%s", (prompt_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Prompt not found")
if not row['is_system_default']:
raise HTTPException(status_code=400, detail="Only system prompts can be reset to default")
if not row['default_template']:
raise HTTPException(status_code=400, detail="No default template available for this prompt")
# Reset template to default
cur.execute(
"UPDATE ai_prompts SET template=%s, updated=CURRENT_TIMESTAMP WHERE id=%s",
(row['default_template'], prompt_id)
)
return {"ok": True}
# ══════════════════════════════════════════════════════════════════════════════
# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2)
# ══════════════════════════════════════════════════════════════════════════════
from prompt_executor import execute_prompt_with_data
from models import UnifiedPromptCreate, UnifiedPromptUpdate
@router.post("/execute")
async def execute_unified_prompt(
prompt_slug: str,
modules: Optional[dict] = None,
timeframes: Optional[dict] = None,
debug: bool = False,
save: bool = False,
session: dict = Depends(require_auth)
):
"""
Execute a unified prompt (base or pipeline type).
Args:
prompt_slug: Slug of prompt to execute
modules: Dict of enabled modules (e.g., {"körper": true})
timeframes: Dict of timeframes per module (e.g., {"körper": 30})
debug: If true, include debug information (placeholders, final prompts, etc.)
save: If true, save result to ai_insights table
Returns:
Execution result with outputs (and debug info if debug=true)
"""
profile_id = session['profile_id']
# Use default modules/timeframes if not provided
if not modules:
modules = {
'körper': True,
'ernährung': True,
'training': True,
'schlaf': True,
'vitalwerte': True
}
if not timeframes:
timeframes = {
'körper': 30,
'ernährung': 30,
'training': 14,
'schlaf': 14,
'vitalwerte': 7
}
# Execute with prompt_executor
result = await execute_prompt_with_data(
prompt_slug=prompt_slug,
profile_id=profile_id,
modules=modules,
timeframes=timeframes,
openrouter_call_func=call_openrouter,
enable_debug=debug
)
# Save to ai_insights if requested
if save:
# Extract final output text/markdown
if result['type'] == 'pipeline':
# For pipeline, get the last stage's output
final_output = result.get('output', {})
# If output is dict with single key, use that value
if isinstance(final_output, dict) and len(final_output) == 1:
content = list(final_output.values())[0]
else:
content = json.dumps(final_output, ensure_ascii=False)
else:
# For base prompts, use output directly
content = result.get('output', '')
if isinstance(content, dict):
content = json.dumps(content, ensure_ascii=False)
# Save to database
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO ai_insights (id, profile_id, scope, content, created)
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP)""",
(str(uuid.uuid4()), profile_id, prompt_slug, content)
)
conn.commit()
return result
@router.post("/unified")
def create_unified_prompt(p: UnifiedPromptCreate, session: dict = Depends(require_admin)):
"""
Create a new unified prompt (base or pipeline type).
Admin only.
"""
with get_db() as conn:
cur = get_cursor(conn)
# Check for duplicate slug
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,))
if cur.fetchone():
raise HTTPException(status_code=400, detail="Slug already exists")
# Validate type
if p.type not in ['base', 'pipeline']:
raise HTTPException(status_code=400, detail="Type must be 'base' or 'pipeline'")
# Validate base type has template
if p.type == 'base' and not p.template:
raise HTTPException(status_code=400, detail="Base prompts require a template")
# Validate pipeline type has stages
if p.type == 'pipeline' and not p.stages:
raise HTTPException(status_code=400, detail="Pipeline prompts require stages")
# Convert stages to JSONB
stages_json = None
if p.stages:
stages_json = json.dumps([
{
'stage': s.stage,
'prompts': [
{
'source': pr.source,
'slug': pr.slug,
'template': pr.template,
'output_key': pr.output_key,
'output_format': pr.output_format,
'output_schema': pr.output_schema
}
for pr in s.prompts
]
}
for s in p.stages
])
prompt_id = str(uuid.uuid4())
cur.execute(
"""INSERT INTO ai_prompts
(id, slug, name, display_name, description, template, category, active, sort_order,
type, stages, output_format, output_schema)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
(
prompt_id, p.slug, p.name, p.display_name, p.description,
p.template, p.category, p.active, p.sort_order,
p.type, stages_json, p.output_format,
json.dumps(p.output_schema) if p.output_schema else None
)
)
return {"id": prompt_id, "slug": p.slug}
@router.put("/unified/{prompt_id}")
def update_unified_prompt(prompt_id: str, p: UnifiedPromptUpdate, session: dict = Depends(require_admin)):
"""
Update a unified prompt.
Admin only.
"""
with get_db() as conn:
cur = get_cursor(conn)
# Check if exists
cur.execute("SELECT id FROM ai_prompts WHERE id=%s", (prompt_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Prompt not found")
# Build update query
updates = []
values = []
if p.name is not None:
updates.append('name=%s')
values.append(p.name)
if p.display_name is not None:
updates.append('display_name=%s')
values.append(p.display_name)
if p.description is not None:
updates.append('description=%s')
values.append(p.description)
if p.type is not None:
if p.type not in ['base', 'pipeline']:
raise HTTPException(status_code=400, detail="Type must be 'base' or 'pipeline'")
updates.append('type=%s')
values.append(p.type)
if p.category is not None:
updates.append('category=%s')
values.append(p.category)
if p.active is not None:
updates.append('active=%s')
values.append(p.active)
if p.sort_order is not None:
updates.append('sort_order=%s')
values.append(p.sort_order)
if p.template is not None:
updates.append('template=%s')
values.append(p.template)
if p.output_format is not None:
updates.append('output_format=%s')
values.append(p.output_format)
if p.output_schema is not None:
updates.append('output_schema=%s')
values.append(json.dumps(p.output_schema))
if p.stages is not None:
stages_json = json.dumps([
{
'stage': s.stage,
'prompts': [
{
'source': pr.source,
'slug': pr.slug,
'template': pr.template,
'output_key': pr.output_key,
'output_format': pr.output_format,
'output_schema': pr.output_schema
}
for pr in s.prompts
]
}
for s in p.stages
])
updates.append('stages=%s')
values.append(stages_json)
if not updates:
return {"ok": True}
cur.execute(
f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
values + [prompt_id]
)
return {"ok": True}
@router.get("/export-all")
def export_all_prompts(session: dict = Depends(require_admin)):
"""
Export all prompts as JSON array.
Admin only. Used for backup and dev→prod sync.
"""
from datetime import datetime
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug")
prompts = [r2d(row) for row in cur.fetchall()]
# Convert to export format (clean up DB-specific fields)
export_data = []
for p in prompts:
export_item = {
'slug': p['slug'],
'name': p['name'],
'display_name': p.get('display_name'),
'description': p.get('description'),
'type': p.get('type', 'pipeline'),
'category': p.get('category', 'ganzheitlich'),
'template': p.get('template'),
'stages': p.get('stages'),
'output_format': p.get('output_format', 'text'),
'output_schema': p.get('output_schema'),
'active': p.get('active', True),
'sort_order': p.get('sort_order', 0)
}
export_data.append(export_item)
return {
'export_date': datetime.now().isoformat(),
'count': len(export_data),
'prompts': export_data
}
@router.post("/import")
def import_prompts(
data: dict,
overwrite: bool = False,
session: dict = Depends(require_admin)
):
"""
Import prompts from JSON export.
Args:
data: Export data from /export-all endpoint
overwrite: If true, update existing prompts. If false, skip duplicates.
Returns:
Summary of import results (created, updated, skipped)
"""
if 'prompts' not in data:
raise HTTPException(400, "Invalid import data: missing 'prompts' key")
prompts = data['prompts']
created = 0
updated = 0
skipped = 0
errors = []
with get_db() as conn:
cur = get_cursor(conn)
for p in prompts:
slug = p.get('slug')
if not slug:
errors.append('Prompt without slug skipped')
continue
# Check if exists
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,))
existing = cur.fetchone()
if existing and not overwrite:
skipped += 1
continue
# Prepare stages JSON if present
stages_json = None
if p.get('stages'):
stages_json = json.dumps(p['stages']) if isinstance(p['stages'], list) else p['stages']
if existing:
# Update existing
cur.execute("""
UPDATE ai_prompts SET
name=%s, display_name=%s, description=%s, type=%s,
category=%s, template=%s, stages=%s, output_format=%s,
output_schema=%s, active=%s, sort_order=%s,
updated=CURRENT_TIMESTAMP
WHERE slug=%s
""", (
p.get('name'), p.get('display_name'), p.get('description'),
p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'),
p.get('template'), stages_json, p.get('output_format', 'text'),
p.get('output_schema'), p.get('active', True),
p.get('sort_order', 0), slug
))
updated += 1
else:
# Create new
cur.execute("""
INSERT INTO ai_prompts (
slug, name, display_name, description, type, category,
template, stages, output_format, output_schema,
active, sort_order, created, updated
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)
""", (
slug, p.get('name'), p.get('display_name'), p.get('description'),
p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'),
p.get('template'), stages_json, p.get('output_format', 'text'),
p.get('output_schema'), p.get('active', True), p.get('sort_order', 0)
))
created += 1
conn.commit()
return {
'success': True,
'created': created,
'updated': updated,
'skipped': skipped,
'errors': errors if errors else None
}