Flexibles KI Prompt System #48

Merged
Lars merged 56 commits from develop into main 2026-03-26 14:49:48 +01:00
4 changed files with 462 additions and 25 deletions
Showing only changes of commit 6627b5eee7 - Show all commits

View File

@ -0,0 +1,157 @@
-- Migration 019: Pipeline-System - Konfigurierbare mehrstufige Analysen
-- Ermöglicht Admin-Verwaltung von Pipeline-Konfigurationen (Issue #28)
-- Created: 2026-03-25
-- ========================================
-- 1. Erweitere ai_prompts für Reset-Feature
-- ========================================
ALTER TABLE ai_prompts
ADD COLUMN IF NOT EXISTS is_system_default BOOLEAN DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS default_template TEXT;
COMMENT ON COLUMN ai_prompts.is_system_default IS 'true = System-Prompt mit Reset-Funktion';
COMMENT ON COLUMN ai_prompts.default_template IS 'Original-Template für Reset-to-Default';
-- Markiere bestehende Pipeline-Prompts als System-Defaults
UPDATE ai_prompts
SET
is_system_default = true,
default_template = template
WHERE slug LIKE 'pipeline_%';
-- ========================================
-- 2. Create pipeline_configs table
-- ========================================
CREATE TABLE IF NOT EXISTS pipeline_configs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL UNIQUE,
description TEXT,
is_default BOOLEAN DEFAULT FALSE,
active BOOLEAN DEFAULT TRUE,
-- Module configuration: which data sources to include
modules JSONB NOT NULL DEFAULT '{}'::jsonb,
-- Example: {"körper": true, "ernährung": true, "training": true, "schlaf": false}
-- Timeframes per module (days)
timeframes JSONB NOT NULL DEFAULT '{}'::jsonb,
-- Example: {"körper": 30, "ernährung": 30, "training": 14}
-- Stage 1 prompts (parallel execution)
stage1_prompts TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[],
-- Example: ARRAY['pipeline_body', 'pipeline_nutrition', 'pipeline_activity']
-- Stage 2 prompt (synthesis)
stage2_prompt VARCHAR(100) NOT NULL,
-- Example: 'pipeline_synthesis'
-- Stage 3 prompt (optional, e.g., goals)
stage3_prompt VARCHAR(100),
-- Example: 'pipeline_goals'
created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- ========================================
-- 3. Create indexes
-- ========================================
CREATE INDEX IF NOT EXISTS idx_pipeline_configs_default ON pipeline_configs(is_default) WHERE is_default = true;
CREATE INDEX IF NOT EXISTS idx_pipeline_configs_active ON pipeline_configs(active);
-- ========================================
-- 4. Seed: Standard-Pipeline "Alltags-Check"
-- ========================================
INSERT INTO pipeline_configs (
name,
description,
is_default,
modules,
timeframes,
stage1_prompts,
stage2_prompt,
stage3_prompt
) VALUES (
'Alltags-Check',
'Standard-Analyse: Körper, Ernährung, Training über die letzten 2-4 Wochen',
true,
'{"körper": true, "ernährung": true, "training": true, "schlaf": false, "vitalwerte": false, "mentales": false, "ziele": false}'::jsonb,
'{"körper": 30, "ernährung": 30, "training": 14}'::jsonb,
ARRAY['pipeline_body', 'pipeline_nutrition', 'pipeline_activity'],
'pipeline_synthesis',
'pipeline_goals'
) ON CONFLICT (name) DO NOTHING;
-- ========================================
-- 5. Seed: Erweiterte Pipelines (optional)
-- ========================================
-- Schlaf-Fokus Pipeline (wenn Schlaf-Prompts existieren)
INSERT INTO pipeline_configs (
name,
description,
is_default,
modules,
timeframes,
stage1_prompts,
stage2_prompt,
stage3_prompt
) VALUES (
'Schlaf & Erholung',
'Analyse von Schlaf, Vitalwerten und Erholungsstatus',
false,
'{"schlaf": true, "vitalwerte": true, "training": true, "körper": false, "ernährung": false, "mentales": false, "ziele": false}'::jsonb,
'{"schlaf": 14, "vitalwerte": 7, "training": 14}'::jsonb,
ARRAY['pipeline_sleep', 'pipeline_vitals', 'pipeline_activity'],
'pipeline_synthesis',
NULL
) ON CONFLICT (name) DO NOTHING;
-- Wettkampf-Analyse (langfristiger Trend)
INSERT INTO pipeline_configs (
name,
description,
is_default,
modules,
timeframes,
stage1_prompts,
stage2_prompt,
stage3_prompt
) VALUES (
'Wettkampf-Analyse',
'Langfristige Analyse für Wettkampfvorbereitung (90 Tage)',
false,
'{"körper": true, "training": true, "vitalwerte": true, "ernährung": true, "schlaf": false, "mentales": false, "ziele": true}'::jsonb,
'{"körper": 90, "training": 90, "vitalwerte": 30, "ernährung": 60}'::jsonb,
ARRAY['pipeline_body', 'pipeline_activity', 'pipeline_vitals', 'pipeline_nutrition'],
'pipeline_synthesis',
'pipeline_goals'
) ON CONFLICT (name) DO NOTHING;
-- ========================================
-- 6. Trigger für updated timestamp
-- ========================================
DROP TRIGGER IF EXISTS trigger_pipeline_configs_updated ON pipeline_configs;
CREATE TRIGGER trigger_pipeline_configs_updated
BEFORE UPDATE ON pipeline_configs
FOR EACH ROW
EXECUTE FUNCTION update_updated_timestamp();
-- ========================================
-- 7. Constraints & Validation
-- ========================================
-- Only one default config allowed (enforced via partial unique index)
CREATE UNIQUE INDEX IF NOT EXISTS idx_pipeline_configs_single_default
ON pipeline_configs(is_default)
WHERE is_default = true;
-- ========================================
-- 8. Comments (Documentation)
-- ========================================
COMMENT ON TABLE pipeline_configs IS 'v9f Issue #28: Konfigurierbare Pipeline-Analysen. Admins können mehrere Pipeline-Configs erstellen mit unterschiedlichen Modulen und Zeiträumen.';
COMMENT ON COLUMN pipeline_configs.modules IS 'JSONB: Welche Module aktiv sind (boolean flags)';
COMMENT ON COLUMN pipeline_configs.timeframes IS 'JSONB: Zeiträume pro Modul in Tagen';
COMMENT ON COLUMN pipeline_configs.stage1_prompts IS 'Array von slug-Werten für parallele Stage-1-Prompts';
COMMENT ON COLUMN pipeline_configs.stage2_prompt IS 'Slug des Synthese-Prompts (kombiniert Stage-1-Ergebnisse)';
COMMENT ON COLUMN pipeline_configs.stage3_prompt IS 'Optionaler Slug für Stage-3-Prompt (z.B. Zielabgleich)';

View File

@ -156,3 +156,33 @@ class PromptGenerateRequest(BaseModel):
goal: str
data_categories: list[str]
example_output: Optional[str] = None
# ── Pipeline Config Models (Issue #28) ─────────────────────────────────────
class PipelineConfigCreate(BaseModel):
name: str
description: Optional[str] = None
is_default: bool = False
active: bool = True
modules: dict # {"körper": true, "ernährung": true, ...}
timeframes: dict # {"körper": 30, "ernährung": 30, ...}
stage1_prompts: list[str] # Array of slugs
stage2_prompt: str # slug
stage3_prompt: Optional[str] = None # slug
class PipelineConfigUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
is_default: Optional[bool] = None
active: Optional[bool] = None
modules: Optional[dict] = None
timeframes: Optional[dict] = None
stage1_prompts: Optional[list[str]] = None
stage2_prompt: Optional[str] = None
stage3_prompt: Optional[str] = None
class PipelineExecuteRequest(BaseModel):
config_id: Optional[str] = None # None = use default config

View File

@ -433,8 +433,17 @@ async def analyze_with_prompt(slug: str, x_profile_id: Optional[str]=Header(defa
@router.post("/insights/pipeline")
async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Run 3-stage pipeline analysis."""
async def analyze_pipeline(
config_id: Optional[str] = None,
x_profile_id: Optional[str] = Header(default=None),
session: dict = Depends(require_auth)
):
"""
Run configurable multi-stage pipeline analysis.
Args:
config_id: Pipeline config ID (optional, uses default if not specified)
"""
pid = get_pid(x_profile_id)
# Phase 4: Check pipeline feature access (boolean - enabled/disabled)
@ -466,14 +475,34 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Load pipeline config
with get_db() as conn:
cur = get_cursor(conn)
if config_id:
cur.execute("SELECT * FROM pipeline_configs WHERE id=%s AND active=true", (config_id,))
else:
cur.execute("SELECT * FROM pipeline_configs WHERE is_default=true AND active=true")
config = r2d(cur.fetchone())
if not config:
raise HTTPException(404, "Pipeline-Konfiguration nicht gefunden")
logger.info(f"[PIPELINE] Using config '{config['name']}' (id={config['id']})")
data = _get_profile_data(pid)
vars = _prepare_template_vars(data)
# Stage 1: Parallel JSON analyses
# Stage 1: Load and execute prompts from config
stage1_prompts = []
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT slug, template FROM ai_prompts WHERE slug LIKE 'pipeline_%' AND slug NOT IN ('pipeline_synthesis','pipeline_goals') AND active=true")
stage1_prompts = [r2d(r) for r in cur.fetchall()]
for slug in config['stage1_prompts']:
cur.execute("SELECT slug, template FROM ai_prompts WHERE slug=%s AND active=true", (slug,))
prompt = r2d(cur.fetchone())
if prompt:
stage1_prompts.append(prompt)
else:
logger.warning(f"[PIPELINE] Stage 1 prompt '{slug}' not found or inactive")
stage1_results = {}
for p in stage1_prompts:
@ -510,17 +539,20 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses
except:
stage1_results[slug] = content
# Stage 2: Synthesis
vars['stage1_body'] = json.dumps(stage1_results.get('pipeline_body', {}), ensure_ascii=False)
vars['stage1_nutrition'] = json.dumps(stage1_results.get('pipeline_nutrition', {}), ensure_ascii=False)
vars['stage1_activity'] = json.dumps(stage1_results.get('pipeline_activity', {}), ensure_ascii=False)
# Stage 2: Synthesis with dynamic placeholders
# Inject all stage1 results as {{stage1_<slug>}} placeholders
for slug, result in stage1_results.items():
# Convert slug like "pipeline_body" to placeholder name "stage1_body"
placeholder_name = slug.replace('pipeline_', 'stage1_')
vars[placeholder_name] = json.dumps(result, ensure_ascii=False) if isinstance(result, dict) else str(result)
# Load stage 2 prompt from config
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug='pipeline_synthesis' AND active=true")
cur.execute("SELECT template FROM ai_prompts WHERE slug=%s AND active=true", (config['stage2_prompt'],))
synth_row = cur.fetchone()
if not synth_row:
raise HTTPException(500, "Pipeline synthesis prompt not found")
raise HTTPException(500, f"Pipeline synthesis prompt '{config['stage2_prompt']}' not found")
synth_prompt = _render_template(synth_row['template'], vars)
@ -548,16 +580,24 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Stage 3: Goals (only if goals are set)
# Stage 3: Optional (e.g., Goals)
goals_text = None
prof = data['profile']
if prof.get('goal_weight') or prof.get('goal_bf_pct'):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug='pipeline_goals' AND active=true")
goals_row = cur.fetchone()
if goals_row:
goals_prompt = _render_template(goals_row['template'], vars)
if config.get('stage3_prompt'):
# Check if conditions are met (for backwards compatibility with goals check)
prof = data['profile']
should_run_stage3 = True
# Special case: goals prompt only runs if goals are set
if config['stage3_prompt'] == 'pipeline_goals':
should_run_stage3 = bool(prof.get('goal_weight') or prof.get('goal_bf_pct'))
if should_run_stage3:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug=%s AND active=true", (config['stage3_prompt'],))
goals_row = cur.fetchone()
if goals_row:
goals_prompt = _render_template(goals_row['template'], vars)
if ANTHROPIC_KEY:
import anthropic
@ -586,11 +626,14 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses
if goals_text:
final_content += "\n\n" + goals_text
# Save as 'pipeline' scope (with history - no DELETE)
# Save with config-specific scope (with history - no DELETE)
scope = f"pipeline_{config['name'].lower().replace(' ', '_')}"
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,'pipeline',%s,CURRENT_TIMESTAMP)",
(str(uuid.uuid4()), pid, final_content))
cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,%s,%s,CURRENT_TIMESTAMP)",
(str(uuid.uuid4()), pid, scope, final_content))
logger.info(f"[PIPELINE] Completed '{config['name']}' - saved as scope='{scope}'")
# Phase 2: Increment ai_calls usage (pipeline uses multiple API calls)
# Note: We increment once per pipeline run, not per individual call
@ -599,7 +642,15 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses
# Old usage tracking (keep for now)
inc_ai_usage(pid)
return {"scope": "pipeline", "content": final_content, "stage1": stage1_results}
return {
"scope": scope,
"content": final_content,
"stage1": stage1_results,
"config": {
"id": config['id'],
"name": config['name']
}
}
@router.get("/ai/usage")

View File

@ -12,7 +12,10 @@ from fastapi import APIRouter, Depends, HTTPException
from db import get_db, get_cursor, r2d
from auth import require_auth, require_admin
from models import PromptCreate, PromptUpdate, PromptGenerateRequest
from models import (
PromptCreate, PromptUpdate, PromptGenerateRequest,
PipelineConfigCreate, PipelineConfigUpdate
)
from placeholder_resolver import (
resolve_placeholders,
get_unknown_placeholders,
@ -485,3 +488,199 @@ async def optimize_prompt(prompt_id: str, session: dict=Depends(require_admin)):
)
return analysis
# ── Pipeline Config Management (Issue #28) ────────────────────────────────────
@router.get("/pipeline-configs")
def list_pipeline_configs(session: dict=Depends(require_auth)):
"""
List pipeline configurations.
- Admins: see ALL configs
- Users: see only active configs
"""
with get_db() as conn:
cur = get_cursor(conn)
is_admin = session.get('role') == 'admin'
if is_admin:
cur.execute("SELECT * FROM pipeline_configs ORDER BY is_default DESC, name")
else:
cur.execute("SELECT * FROM pipeline_configs WHERE active=true ORDER BY is_default DESC, name")
return [r2d(r) for r in cur.fetchall()]
@router.post("/pipeline-configs")
def create_pipeline_config(p: PipelineConfigCreate, session: dict=Depends(require_admin)):
"""Create new pipeline configuration (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if name already exists
cur.execute("SELECT id FROM pipeline_configs WHERE name=%s", (p.name,))
if cur.fetchone():
raise HTTPException(status_code=400, detail=f"Pipeline config with name '{p.name}' already exists")
# Validate: stage prompts must exist
all_slugs = p.stage1_prompts + [p.stage2_prompt]
if p.stage3_prompt:
all_slugs.append(p.stage3_prompt)
for slug in all_slugs:
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail=f"Prompt '{slug}' does not exist")
# If is_default=true, unset other defaults
if p.is_default:
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true")
config_id = str(uuid.uuid4())
cur.execute(
"""INSERT INTO pipeline_configs (
id, name, description, is_default, active,
modules, timeframes, stage1_prompts, stage2_prompt, stage3_prompt,
created, updated
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""",
(
config_id, p.name, p.description, p.is_default, p.active,
json.dumps(p.modules), json.dumps(p.timeframes),
p.stage1_prompts, p.stage2_prompt, p.stage3_prompt
)
)
return {"id": config_id, "name": p.name}
@router.put("/pipeline-configs/{config_id}")
def update_pipeline_config(config_id: str, p: PipelineConfigUpdate, session: dict=Depends(require_admin)):
"""Update pipeline configuration (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if config exists
cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Pipeline config not found")
# Build dynamic UPDATE query
updates = []
values = []
if p.name is not None:
updates.append('name=%s')
values.append(p.name)
if p.description is not None:
updates.append('description=%s')
values.append(p.description)
if p.is_default is not None:
# If setting to default, unset others
if p.is_default:
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true AND id!=%s", (config_id,))
updates.append('is_default=%s')
values.append(p.is_default)
if p.active is not None:
updates.append('active=%s')
values.append(p.active)
if p.modules is not None:
updates.append('modules=%s')
values.append(json.dumps(p.modules))
if p.timeframes is not None:
updates.append('timeframes=%s')
values.append(json.dumps(p.timeframes))
if p.stage1_prompts is not None:
updates.append('stage1_prompts=%s')
values.append(p.stage1_prompts)
if p.stage2_prompt is not None:
updates.append('stage2_prompt=%s')
values.append(p.stage2_prompt)
if p.stage3_prompt is not None:
updates.append('stage3_prompt=%s')
values.append(p.stage3_prompt)
if not updates:
return {"ok": True}
cur.execute(
f"UPDATE pipeline_configs SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
values + [config_id]
)
return {"ok": True}
@router.delete("/pipeline-configs/{config_id}")
def delete_pipeline_config(config_id: str, session: dict=Depends(require_admin)):
"""Delete pipeline configuration (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if it's the only default
cur.execute("SELECT is_default FROM pipeline_configs WHERE id=%s", (config_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Pipeline config not found")
if row['is_default']:
# Check if there are other configs
cur.execute("SELECT COUNT(*) as count FROM pipeline_configs WHERE id!=%s", (config_id,))
if cur.fetchone()['count'] > 0:
raise HTTPException(
status_code=400,
detail="Cannot delete the default config. Please set another config as default first."
)
cur.execute("DELETE FROM pipeline_configs WHERE id=%s", (config_id,))
return {"ok": True}
@router.post("/pipeline-configs/{config_id}/set-default")
def set_default_pipeline_config(config_id: str, session: dict=Depends(require_admin)):
"""Set a pipeline config as default (admin only)."""
with get_db() as conn:
cur = get_cursor(conn)
# Check if config exists
cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,))
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Pipeline config not found")
# Unset all other defaults
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true")
# Set this one as default
cur.execute("UPDATE pipeline_configs SET is_default=true, updated=CURRENT_TIMESTAMP WHERE id=%s", (config_id,))
return {"ok": True}
@router.post("/{prompt_id}/reset-to-default")
def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)):
"""
Reset a system prompt to its default template (admin only).
Only works for prompts with is_system_default=true.
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT is_system_default, default_template FROM ai_prompts WHERE id=%s", (prompt_id,))
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Prompt not found")
if not row['is_system_default']:
raise HTTPException(status_code=400, detail="Only system prompts can be reset to default")
if not row['default_template']:
raise HTTPException(status_code=400, detail="No default template available for this prompt")
# Reset template to default
cur.execute(
"UPDATE ai_prompts SET template=%s, updated=CURRENT_TIMESTAMP WHERE id=%s",
(row['default_template'], prompt_id)
)
return {"ok": True}