diff --git a/backend/migrations/019_pipeline_system.sql b/backend/migrations/019_pipeline_system.sql new file mode 100644 index 0000000..aa009c2 --- /dev/null +++ b/backend/migrations/019_pipeline_system.sql @@ -0,0 +1,157 @@ +-- Migration 019: Pipeline-System - Konfigurierbare mehrstufige Analysen +-- Ermöglicht Admin-Verwaltung von Pipeline-Konfigurationen (Issue #28) +-- Created: 2026-03-25 + +-- ======================================== +-- 1. Erweitere ai_prompts für Reset-Feature +-- ======================================== +ALTER TABLE ai_prompts +ADD COLUMN IF NOT EXISTS is_system_default BOOLEAN DEFAULT FALSE, +ADD COLUMN IF NOT EXISTS default_template TEXT; + +COMMENT ON COLUMN ai_prompts.is_system_default IS 'true = System-Prompt mit Reset-Funktion'; +COMMENT ON COLUMN ai_prompts.default_template IS 'Original-Template für Reset-to-Default'; + +-- Markiere bestehende Pipeline-Prompts als System-Defaults +UPDATE ai_prompts +SET + is_system_default = true, + default_template = template +WHERE slug LIKE 'pipeline_%'; + +-- ======================================== +-- 2. Create pipeline_configs table +-- ======================================== +CREATE TABLE IF NOT EXISTS pipeline_configs ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + name VARCHAR(255) NOT NULL UNIQUE, + description TEXT, + is_default BOOLEAN DEFAULT FALSE, + active BOOLEAN DEFAULT TRUE, + + -- Module configuration: which data sources to include + modules JSONB NOT NULL DEFAULT '{}'::jsonb, + -- Example: {"körper": true, "ernährung": true, "training": true, "schlaf": false} + + -- Timeframes per module (days) + timeframes JSONB NOT NULL DEFAULT '{}'::jsonb, + -- Example: {"körper": 30, "ernährung": 30, "training": 14} + + -- Stage 1 prompts (parallel execution) + stage1_prompts TEXT[] NOT NULL DEFAULT ARRAY[]::TEXT[], + -- Example: ARRAY['pipeline_body', 'pipeline_nutrition', 'pipeline_activity'] + + -- Stage 2 prompt (synthesis) + stage2_prompt VARCHAR(100) NOT NULL, + -- Example: 'pipeline_synthesis' + + -- Stage 3 prompt (optional, e.g., goals) + stage3_prompt VARCHAR(100), + -- Example: 'pipeline_goals' + + created TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +-- ======================================== +-- 3. Create indexes +-- ======================================== +CREATE INDEX IF NOT EXISTS idx_pipeline_configs_default ON pipeline_configs(is_default) WHERE is_default = true; +CREATE INDEX IF NOT EXISTS idx_pipeline_configs_active ON pipeline_configs(active); + +-- ======================================== +-- 4. Seed: Standard-Pipeline "Alltags-Check" +-- ======================================== +INSERT INTO pipeline_configs ( + name, + description, + is_default, + modules, + timeframes, + stage1_prompts, + stage2_prompt, + stage3_prompt +) VALUES ( + 'Alltags-Check', + 'Standard-Analyse: Körper, Ernährung, Training über die letzten 2-4 Wochen', + true, + '{"körper": true, "ernährung": true, "training": true, "schlaf": false, "vitalwerte": false, "mentales": false, "ziele": false}'::jsonb, + '{"körper": 30, "ernährung": 30, "training": 14}'::jsonb, + ARRAY['pipeline_body', 'pipeline_nutrition', 'pipeline_activity'], + 'pipeline_synthesis', + 'pipeline_goals' +) ON CONFLICT (name) DO NOTHING; + +-- ======================================== +-- 5. Seed: Erweiterte Pipelines (optional) +-- ======================================== + +-- Schlaf-Fokus Pipeline (wenn Schlaf-Prompts existieren) +INSERT INTO pipeline_configs ( + name, + description, + is_default, + modules, + timeframes, + stage1_prompts, + stage2_prompt, + stage3_prompt +) VALUES ( + 'Schlaf & Erholung', + 'Analyse von Schlaf, Vitalwerten und Erholungsstatus', + false, + '{"schlaf": true, "vitalwerte": true, "training": true, "körper": false, "ernährung": false, "mentales": false, "ziele": false}'::jsonb, + '{"schlaf": 14, "vitalwerte": 7, "training": 14}'::jsonb, + ARRAY['pipeline_sleep', 'pipeline_vitals', 'pipeline_activity'], + 'pipeline_synthesis', + NULL +) ON CONFLICT (name) DO NOTHING; + +-- Wettkampf-Analyse (langfristiger Trend) +INSERT INTO pipeline_configs ( + name, + description, + is_default, + modules, + timeframes, + stage1_prompts, + stage2_prompt, + stage3_prompt +) VALUES ( + 'Wettkampf-Analyse', + 'Langfristige Analyse für Wettkampfvorbereitung (90 Tage)', + false, + '{"körper": true, "training": true, "vitalwerte": true, "ernährung": true, "schlaf": false, "mentales": false, "ziele": true}'::jsonb, + '{"körper": 90, "training": 90, "vitalwerte": 30, "ernährung": 60}'::jsonb, + ARRAY['pipeline_body', 'pipeline_activity', 'pipeline_vitals', 'pipeline_nutrition'], + 'pipeline_synthesis', + 'pipeline_goals' +) ON CONFLICT (name) DO NOTHING; + +-- ======================================== +-- 6. Trigger für updated timestamp +-- ======================================== +DROP TRIGGER IF EXISTS trigger_pipeline_configs_updated ON pipeline_configs; +CREATE TRIGGER trigger_pipeline_configs_updated + BEFORE UPDATE ON pipeline_configs + FOR EACH ROW + EXECUTE FUNCTION update_updated_timestamp(); + +-- ======================================== +-- 7. Constraints & Validation +-- ======================================== + +-- Only one default config allowed (enforced via partial unique index) +CREATE UNIQUE INDEX IF NOT EXISTS idx_pipeline_configs_single_default +ON pipeline_configs(is_default) +WHERE is_default = true; + +-- ======================================== +-- 8. Comments (Documentation) +-- ======================================== +COMMENT ON TABLE pipeline_configs IS 'v9f Issue #28: Konfigurierbare Pipeline-Analysen. Admins können mehrere Pipeline-Configs erstellen mit unterschiedlichen Modulen und Zeiträumen.'; +COMMENT ON COLUMN pipeline_configs.modules IS 'JSONB: Welche Module aktiv sind (boolean flags)'; +COMMENT ON COLUMN pipeline_configs.timeframes IS 'JSONB: Zeiträume pro Modul in Tagen'; +COMMENT ON COLUMN pipeline_configs.stage1_prompts IS 'Array von slug-Werten für parallele Stage-1-Prompts'; +COMMENT ON COLUMN pipeline_configs.stage2_prompt IS 'Slug des Synthese-Prompts (kombiniert Stage-1-Ergebnisse)'; +COMMENT ON COLUMN pipeline_configs.stage3_prompt IS 'Optionaler Slug für Stage-3-Prompt (z.B. Zielabgleich)'; diff --git a/backend/models.py b/backend/models.py index 8b2d846..7566e50 100644 --- a/backend/models.py +++ b/backend/models.py @@ -156,3 +156,33 @@ class PromptGenerateRequest(BaseModel): goal: str data_categories: list[str] example_output: Optional[str] = None + + +# ── Pipeline Config Models (Issue #28) ───────────────────────────────────── + +class PipelineConfigCreate(BaseModel): + name: str + description: Optional[str] = None + is_default: bool = False + active: bool = True + modules: dict # {"körper": true, "ernährung": true, ...} + timeframes: dict # {"körper": 30, "ernährung": 30, ...} + stage1_prompts: list[str] # Array of slugs + stage2_prompt: str # slug + stage3_prompt: Optional[str] = None # slug + + +class PipelineConfigUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + is_default: Optional[bool] = None + active: Optional[bool] = None + modules: Optional[dict] = None + timeframes: Optional[dict] = None + stage1_prompts: Optional[list[str]] = None + stage2_prompt: Optional[str] = None + stage3_prompt: Optional[str] = None + + +class PipelineExecuteRequest(BaseModel): + config_id: Optional[str] = None # None = use default config diff --git a/backend/routers/insights.py b/backend/routers/insights.py index 0127241..a9dd7fe 100644 --- a/backend/routers/insights.py +++ b/backend/routers/insights.py @@ -433,8 +433,17 @@ async def analyze_with_prompt(slug: str, x_profile_id: Optional[str]=Header(defa @router.post("/insights/pipeline") -async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)): - """Run 3-stage pipeline analysis.""" +async def analyze_pipeline( + config_id: Optional[str] = None, + x_profile_id: Optional[str] = Header(default=None), + session: dict = Depends(require_auth) +): + """ + Run configurable multi-stage pipeline analysis. + + Args: + config_id: Pipeline config ID (optional, uses default if not specified) + """ pid = get_pid(x_profile_id) # Phase 4: Check pipeline feature access (boolean - enabled/disabled) @@ -466,14 +475,34 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset." ) + # Load pipeline config + with get_db() as conn: + cur = get_cursor(conn) + if config_id: + cur.execute("SELECT * FROM pipeline_configs WHERE id=%s AND active=true", (config_id,)) + else: + cur.execute("SELECT * FROM pipeline_configs WHERE is_default=true AND active=true") + + config = r2d(cur.fetchone()) + if not config: + raise HTTPException(404, "Pipeline-Konfiguration nicht gefunden") + + logger.info(f"[PIPELINE] Using config '{config['name']}' (id={config['id']})") + data = _get_profile_data(pid) vars = _prepare_template_vars(data) - # Stage 1: Parallel JSON analyses + # Stage 1: Load and execute prompts from config + stage1_prompts = [] with get_db() as conn: cur = get_cursor(conn) - cur.execute("SELECT slug, template FROM ai_prompts WHERE slug LIKE 'pipeline_%' AND slug NOT IN ('pipeline_synthesis','pipeline_goals') AND active=true") - stage1_prompts = [r2d(r) for r in cur.fetchall()] + for slug in config['stage1_prompts']: + cur.execute("SELECT slug, template FROM ai_prompts WHERE slug=%s AND active=true", (slug,)) + prompt = r2d(cur.fetchone()) + if prompt: + stage1_prompts.append(prompt) + else: + logger.warning(f"[PIPELINE] Stage 1 prompt '{slug}' not found or inactive") stage1_results = {} for p in stage1_prompts: @@ -510,17 +539,20 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses except: stage1_results[slug] = content - # Stage 2: Synthesis - vars['stage1_body'] = json.dumps(stage1_results.get('pipeline_body', {}), ensure_ascii=False) - vars['stage1_nutrition'] = json.dumps(stage1_results.get('pipeline_nutrition', {}), ensure_ascii=False) - vars['stage1_activity'] = json.dumps(stage1_results.get('pipeline_activity', {}), ensure_ascii=False) + # Stage 2: Synthesis with dynamic placeholders + # Inject all stage1 results as {{stage1_}} placeholders + for slug, result in stage1_results.items(): + # Convert slug like "pipeline_body" to placeholder name "stage1_body" + placeholder_name = slug.replace('pipeline_', 'stage1_') + vars[placeholder_name] = json.dumps(result, ensure_ascii=False) if isinstance(result, dict) else str(result) + # Load stage 2 prompt from config with get_db() as conn: cur = get_cursor(conn) - cur.execute("SELECT template FROM ai_prompts WHERE slug='pipeline_synthesis' AND active=true") + cur.execute("SELECT template FROM ai_prompts WHERE slug=%s AND active=true", (config['stage2_prompt'],)) synth_row = cur.fetchone() if not synth_row: - raise HTTPException(500, "Pipeline synthesis prompt not found") + raise HTTPException(500, f"Pipeline synthesis prompt '{config['stage2_prompt']}' not found") synth_prompt = _render_template(synth_row['template'], vars) @@ -548,16 +580,24 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses else: raise HTTPException(500, "Keine KI-API konfiguriert") - # Stage 3: Goals (only if goals are set) + # Stage 3: Optional (e.g., Goals) goals_text = None - prof = data['profile'] - if prof.get('goal_weight') or prof.get('goal_bf_pct'): - with get_db() as conn: - cur = get_cursor(conn) - cur.execute("SELECT template FROM ai_prompts WHERE slug='pipeline_goals' AND active=true") - goals_row = cur.fetchone() - if goals_row: - goals_prompt = _render_template(goals_row['template'], vars) + if config.get('stage3_prompt'): + # Check if conditions are met (for backwards compatibility with goals check) + prof = data['profile'] + should_run_stage3 = True + + # Special case: goals prompt only runs if goals are set + if config['stage3_prompt'] == 'pipeline_goals': + should_run_stage3 = bool(prof.get('goal_weight') or prof.get('goal_bf_pct')) + + if should_run_stage3: + with get_db() as conn: + cur = get_cursor(conn) + cur.execute("SELECT template FROM ai_prompts WHERE slug=%s AND active=true", (config['stage3_prompt'],)) + goals_row = cur.fetchone() + if goals_row: + goals_prompt = _render_template(goals_row['template'], vars) if ANTHROPIC_KEY: import anthropic @@ -586,11 +626,14 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses if goals_text: final_content += "\n\n" + goals_text - # Save as 'pipeline' scope (with history - no DELETE) + # Save with config-specific scope (with history - no DELETE) + scope = f"pipeline_{config['name'].lower().replace(' ', '_')}" with get_db() as conn: cur = get_cursor(conn) - cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,'pipeline',%s,CURRENT_TIMESTAMP)", - (str(uuid.uuid4()), pid, final_content)) + cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,%s,%s,CURRENT_TIMESTAMP)", + (str(uuid.uuid4()), pid, scope, final_content)) + + logger.info(f"[PIPELINE] Completed '{config['name']}' - saved as scope='{scope}'") # Phase 2: Increment ai_calls usage (pipeline uses multiple API calls) # Note: We increment once per pipeline run, not per individual call @@ -599,7 +642,15 @@ async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), ses # Old usage tracking (keep for now) inc_ai_usage(pid) - return {"scope": "pipeline", "content": final_content, "stage1": stage1_results} + return { + "scope": scope, + "content": final_content, + "stage1": stage1_results, + "config": { + "id": config['id'], + "name": config['name'] + } + } @router.get("/ai/usage") diff --git a/backend/routers/prompts.py b/backend/routers/prompts.py index 9d5340c..a7128a4 100644 --- a/backend/routers/prompts.py +++ b/backend/routers/prompts.py @@ -12,7 +12,10 @@ from fastapi import APIRouter, Depends, HTTPException from db import get_db, get_cursor, r2d from auth import require_auth, require_admin -from models import PromptCreate, PromptUpdate, PromptGenerateRequest +from models import ( + PromptCreate, PromptUpdate, PromptGenerateRequest, + PipelineConfigCreate, PipelineConfigUpdate +) from placeholder_resolver import ( resolve_placeholders, get_unknown_placeholders, @@ -485,3 +488,199 @@ async def optimize_prompt(prompt_id: str, session: dict=Depends(require_admin)): ) return analysis + + +# ── Pipeline Config Management (Issue #28) ──────────────────────────────────── + +@router.get("/pipeline-configs") +def list_pipeline_configs(session: dict=Depends(require_auth)): + """ + List pipeline configurations. + - Admins: see ALL configs + - Users: see only active configs + """ + with get_db() as conn: + cur = get_cursor(conn) + is_admin = session.get('role') == 'admin' + + if is_admin: + cur.execute("SELECT * FROM pipeline_configs ORDER BY is_default DESC, name") + else: + cur.execute("SELECT * FROM pipeline_configs WHERE active=true ORDER BY is_default DESC, name") + + return [r2d(r) for r in cur.fetchall()] + + +@router.post("/pipeline-configs") +def create_pipeline_config(p: PipelineConfigCreate, session: dict=Depends(require_admin)): + """Create new pipeline configuration (admin only).""" + with get_db() as conn: + cur = get_cursor(conn) + + # Check if name already exists + cur.execute("SELECT id FROM pipeline_configs WHERE name=%s", (p.name,)) + if cur.fetchone(): + raise HTTPException(status_code=400, detail=f"Pipeline config with name '{p.name}' already exists") + + # Validate: stage prompts must exist + all_slugs = p.stage1_prompts + [p.stage2_prompt] + if p.stage3_prompt: + all_slugs.append(p.stage3_prompt) + + for slug in all_slugs: + cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,)) + if not cur.fetchone(): + raise HTTPException(status_code=400, detail=f"Prompt '{slug}' does not exist") + + # If is_default=true, unset other defaults + if p.is_default: + cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true") + + config_id = str(uuid.uuid4()) + cur.execute( + """INSERT INTO pipeline_configs ( + id, name, description, is_default, active, + modules, timeframes, stage1_prompts, stage2_prompt, stage3_prompt, + created, updated + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""", + ( + config_id, p.name, p.description, p.is_default, p.active, + json.dumps(p.modules), json.dumps(p.timeframes), + p.stage1_prompts, p.stage2_prompt, p.stage3_prompt + ) + ) + + return {"id": config_id, "name": p.name} + + +@router.put("/pipeline-configs/{config_id}") +def update_pipeline_config(config_id: str, p: PipelineConfigUpdate, session: dict=Depends(require_admin)): + """Update pipeline configuration (admin only).""" + with get_db() as conn: + cur = get_cursor(conn) + + # Check if config exists + cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,)) + if not cur.fetchone(): + raise HTTPException(status_code=404, detail="Pipeline config not found") + + # Build dynamic UPDATE query + updates = [] + values = [] + + if p.name is not None: + updates.append('name=%s') + values.append(p.name) + if p.description is not None: + updates.append('description=%s') + values.append(p.description) + if p.is_default is not None: + # If setting to default, unset others + if p.is_default: + cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true AND id!=%s", (config_id,)) + updates.append('is_default=%s') + values.append(p.is_default) + if p.active is not None: + updates.append('active=%s') + values.append(p.active) + if p.modules is not None: + updates.append('modules=%s') + values.append(json.dumps(p.modules)) + if p.timeframes is not None: + updates.append('timeframes=%s') + values.append(json.dumps(p.timeframes)) + if p.stage1_prompts is not None: + updates.append('stage1_prompts=%s') + values.append(p.stage1_prompts) + if p.stage2_prompt is not None: + updates.append('stage2_prompt=%s') + values.append(p.stage2_prompt) + if p.stage3_prompt is not None: + updates.append('stage3_prompt=%s') + values.append(p.stage3_prompt) + + if not updates: + return {"ok": True} + + cur.execute( + f"UPDATE pipeline_configs SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s", + values + [config_id] + ) + + return {"ok": True} + + +@router.delete("/pipeline-configs/{config_id}") +def delete_pipeline_config(config_id: str, session: dict=Depends(require_admin)): + """Delete pipeline configuration (admin only).""" + with get_db() as conn: + cur = get_cursor(conn) + + # Check if it's the only default + cur.execute("SELECT is_default FROM pipeline_configs WHERE id=%s", (config_id,)) + row = cur.fetchone() + if not row: + raise HTTPException(status_code=404, detail="Pipeline config not found") + + if row['is_default']: + # Check if there are other configs + cur.execute("SELECT COUNT(*) as count FROM pipeline_configs WHERE id!=%s", (config_id,)) + if cur.fetchone()['count'] > 0: + raise HTTPException( + status_code=400, + detail="Cannot delete the default config. Please set another config as default first." + ) + + cur.execute("DELETE FROM pipeline_configs WHERE id=%s", (config_id,)) + + return {"ok": True} + + +@router.post("/pipeline-configs/{config_id}/set-default") +def set_default_pipeline_config(config_id: str, session: dict=Depends(require_admin)): + """Set a pipeline config as default (admin only).""" + with get_db() as conn: + cur = get_cursor(conn) + + # Check if config exists + cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,)) + if not cur.fetchone(): + raise HTTPException(status_code=404, detail="Pipeline config not found") + + # Unset all other defaults + cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true") + + # Set this one as default + cur.execute("UPDATE pipeline_configs SET is_default=true, updated=CURRENT_TIMESTAMP WHERE id=%s", (config_id,)) + + return {"ok": True} + + +@router.post("/{prompt_id}/reset-to-default") +def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)): + """ + Reset a system prompt to its default template (admin only). + Only works for prompts with is_system_default=true. + """ + with get_db() as conn: + cur = get_cursor(conn) + + cur.execute("SELECT is_system_default, default_template FROM ai_prompts WHERE id=%s", (prompt_id,)) + row = cur.fetchone() + + if not row: + raise HTTPException(status_code=404, detail="Prompt not found") + + if not row['is_system_default']: + raise HTTPException(status_code=400, detail="Only system prompts can be reset to default") + + if not row['default_template']: + raise HTTPException(status_code=400, detail="No default template available for this prompt") + + # Reset template to default + cur.execute( + "UPDATE ai_prompts SET template=%s, updated=CURRENT_TIMESTAMP WHERE id=%s", + (row['default_template'], prompt_id) + ) + + return {"ok": True}