""" Workflow Execution Router (Phase 2) Endpunkte für Workflow-Execution und Ergebnis-Abruf. Phase 2: Sequenzielle Execution Phase 3: Conditional branching """ from fastapi import APIRouter, Depends, HTTPException from auth import require_auth from db import get_db, get_cursor, r2d from pydantic import BaseModel from typing import Dict, Any, Optional import logging logger = logging.getLogger(__name__) router = APIRouter() class WorkflowExecuteRequest(BaseModel): """Request-Body für Workflow-Execution""" variables: Dict[str, Any] = {} enable_debug: bool = False @router.post("/api/workflows/{workflow_id}/execute") async def execute_workflow_endpoint( workflow_id: str, request: WorkflowExecuteRequest, session: dict = Depends(require_auth) ): """ Führt einen Workflow aus. Args: workflow_id: UUID des Workflows (aus workflow_definitions) request.variables: Platzhalter-Werte (optional, z.B. {"name": "Lars"}) request.enable_debug: Debug-Modus (zeigt node_states im Response) Returns: { "execution_id": "...", "status": "completed" | "failed", "aggregated_result": { "combined_analysis": "...", "all_signals": [...], "total_nodes": 3, "executed_nodes": 3, "failed_nodes": 0 }, "node_states": [...], # Nur wenn enable_debug=true "error": "..." # Nur wenn failed } Beispiel: POST /api/workflows/abc123/execute { "variables": {"name": "Lars"}, "enable_debug": true } """ from prompt_executor import execute_workflow_prompt from routers.prompts import call_openrouter profile_id = session["profile_id"] # Add profile_id to variables (für placeholder_resolver) variables = {**request.variables, "profile_id": profile_id} # Load workflow as "prompt" (für execute_workflow_prompt) with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT id, name, slug FROM workflow_definitions WHERE id = %s AND active = true", (workflow_id,) ) row = cur.fetchone() if not row: raise HTTPException(404, f"Workflow nicht gefunden: {workflow_id}") workflow_prompt = { "id": row['id'], "name": row['name'], "slug": row['slug'], "type": "workflow" } try: result = await execute_workflow_prompt( prompt=workflow_prompt, variables=variables, openrouter_call_func=call_openrouter, enable_debug=request.enable_debug ) return result except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) raise HTTPException(500, f"Workflow-Ausführung fehlgeschlagen: {str(e)}") @router.get("/api/workflows/executions/{execution_id}") def get_execution_result( execution_id: str, session: dict = Depends(require_auth) ): """ Lädt gespeicherten Execution State aus DB. Args: execution_id: UUID der Execution (aus workflow_executions) Returns: { "id": "...", "workflow_id": "...", "profile_id": "...", "status": "completed" | "failed", "node_states": [...], # JSONB "execution_log": {...}, "started_at": "2026-04-03T12:00:00", "completed_at": "2026-04-03T12:00:10" } Beispiel: GET /api/workflows/executions/abc123 """ profile_id = session["profile_id"] with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT id, workflow_id, profile_id, status, node_states, execution_log, started_at::text, completed_at::text FROM workflow_executions WHERE id = %s AND profile_id = %s """, (execution_id, profile_id)) row = cur.fetchone() if not row: raise HTTPException(404, "Execution nicht gefunden") return r2d(row) @router.get("/api/workflows") def list_workflows( session: dict = Depends(require_auth) ): """ Listet alle aktiven Workflows auf. Returns: [ { "id": "...", "name": "...", "slug": "...", "description": "...", "version": 1, "created_at": "...", "updated_at": "..." }, ... ] Beispiel: GET /api/workflows """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT id, name, slug, description, version, created_at::text, updated_at::text FROM workflow_definitions WHERE active = true ORDER BY name """) rows = cur.fetchall() return [r2d(row) for row in rows] @router.get("/api/workflows/{workflow_id}") def get_workflow( workflow_id: str, session: dict = Depends(require_auth) ): """ Lädt einen einzelnen Workflow mit Graph. Args: workflow_id: UUID des Workflows Returns: { "id": "...", "name": "...", "slug": "...", "description": "...", "graph": {...}, # JSONB "version": 1, "active": true, "created_at": "...", "updated_at": "..." } """ with get_db() as conn: cur = get_cursor(conn) cur.execute(""" SELECT id, name, slug, description, graph, version, active, created_at::text, updated_at::text FROM workflow_definitions WHERE id = %s AND active = true """, (workflow_id,)) row = cur.fetchone() if not row: raise HTTPException(404, "Workflow nicht gefunden") return r2d(row)