""" Pydantic Models for Workflow Engine (Phase 0) Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen. Konzept-Basis: konzept_workflow_engine_konsolidated.md Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md """ from typing import Optional, List, Dict, Any, Union from pydantic import BaseModel, Field from enum import Enum # ── Enums ───────────────────────────────────────────────────────────────────── class NodeType(str, Enum): """Workflow-Knotentypen""" START = "start" ANALYSIS = "analysis" LOGIC = "logic" JOIN = "join" END = "end" class JoinStrategy(str, Enum): """Join-Strategien für Pfad-Konsolidierung""" WAIT_ALL = "wait_all" # Warte auf alle eingehenden Pfade WAIT_ANY = "wait_any" # Warte auf mindestens einen Pfad BEST_EFFORT = "best_effort" # Verwende verfügbare Pfade class SkipHandling(str, Enum): """Umgang mit übersprungenen Pfaden am Join""" IGNORE_SKIPPED = "ignore_skipped" # Übersprungene Pfade ignorieren USE_PLACEHOLDER = "use_placeholder" # Platzhalter für übersprungene Pfade REQUIRE_MINIMUM = "require_minimum" # Mindestanzahl erforderlich class FallbackStrategy(str, Enum): """Fallback-Strategien bei unklaren/ungültigen Signalen""" CONSERVATIVE_SKIP = "conservative_skip" # Konservativ: Pfad überspringen DEFAULT_PATH = "default_path" # Standard-Pfad ausführen UNCERTAINTY_PATH = "uncertainty_path" # Expliziter Unsicherheits-Pfad DOCUMENT_ONLY = "document_only" # Nur dokumentieren, kein Routing class NodeStatus(str, Enum): """Ausführungsstatus eines Knotens""" PENDING = "pending" EXECUTING = "executing" # Phase 2: Gerade in Ausführung EXECUTED = "executed" SKIPPED = "skipped" UNCLEAR = "unclear" FAILED = "failed" class SignalStatus(str, Enum): """Status nach Normalisierung (Phase 2)""" VALID = "valid" # Exakte Übereinstimmung mit Spektrum NORMALIZED = "normalized" # Gemappt (Synonym/Case-insensitive) UNCLEAR = "unclear" # Mehrdeutig oder widersprüchlich INVALID = "invalid" # Außerhalb des Spektrums NOT_DECIDABLE = "not_decidable" # Kein Signal vorhanden class LogicOperator(str, Enum): """Logische Operatoren für Bedingungen""" EQ = "eq" # == NEQ = "neq" # != IN = "in" # in NOT_IN = "not_in" # not in GT = "gt" # > LT = "lt" # < GTE = "gte" # >= LTE = "lte" # <= CONTAINS = "contains" # String/List contains (Phase 3) AND = "and" OR = "or" NOT = "not" class EndNodeOutputMode(str, Enum): """Output-Modi für End Node (Phase 4)""" AUTO = "auto" # Automatisch: Concatenate all analyses TEMPLATE = "template" # Custom Jinja2 template # ── Hilfsmodelle ────────────────────────────────────────────────────────────── class Position(BaseModel): """Position eines Knotens im visuellen Editor""" x: float y: float class QuestionAugmentation(BaseModel): """ Fragenergänzung zu einem Analyseprompt. Hybridmodell (Sektion 6 der Anforderungsanalyse): - Primär: Knotengebunden (am Workflow-Knoten definiert) - Sekundär: Prompt-gebundene Standardfragen (optional) - Vorrangregel: Knotenspezifische überschreiben Prompt-Defaults """ id: str = Field(..., description="Eindeutige ID der Frage (für Referenzierung in Logik-Knoten)") type: str = Field(..., description="Fragetyp: relevanz, prioritaet, selektion, ausschluss, eskalation, unsicherheit") question: str = Field(..., description="Fragetext (kann Template-Variablen enthalten)") answer_spectrum: List[str] = Field(..., description="Erlaubte Antworten, z.B. ['ja', 'nein', 'unklar']") # ── Bedingungsmodelle ───────────────────────────────────────────────────────── class LogicOperand(BaseModel): """ Operand einer Logik-Bedingung. Referenziert normalisierte Signale aus vorangegangenen Knoten. Format: "node_id.question_id" (z.B. "node_1.q1") """ ref: str = Field(..., description="Referenz zum Signal: node_id.question_id") operator: LogicOperator = Field(..., description="Vergleichsoperator") value: Any = Field(..., description="Vergleichswert") class LogicExpression(BaseModel): """ Logik-Ausdruck (verschachtelbar). Beispiel: { "operator": "and", "operands": [ {"ref": "node_1.q1", "operator": "eq", "value": "ja"}, {"ref": "node_1.q2", "operator": "in", "value": ["hoch", "mittel"]} ] } oder verschachtelt: { "operator": "or", "operands": [ { "operator": "and", "operands": [...] }, {"ref": "node_2.q1", "operator": "eq", "value": "ja"} ] } """ operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator") operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)") # Bei einfachem Vergleich: ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)") value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)") # Enable forward reference resolution for recursive model LogicExpression.model_rebuild() class Condition(BaseModel): """ Bedingung für einen Logik-Knoten. Unterstützt if/else-if/else-Logik. Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression. If unknown fields are present (like 'operator', 'operands'), deserialization fails and Pydantic tries LogicExpression instead. """ model_config = {'extra': 'forbid'} type: str = Field(default="if", description="Bedingungstyp: if, else-if, else") expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')") then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad") else_path: Optional[str] = Field(None, description="Edge-ID für 'else'-Pfad") class FallbackConfig(BaseModel): """Fallback-Konfiguration für Logik-Knoten""" strategy: FallbackStrategy = Field(..., description="Fallback-Strategie") on_unclear: Optional[str] = Field(None, description="Edge-ID für Unsicherheits-Pfad") on_invalid: Optional[str] = Field(None, description="Edge-ID bei ungültigen Signalen") # ── Workflow-Graph-Modelle ──────────────────────────────────────────────────── class WorkflowNode(BaseModel): """ Workflow-Knoten (Teil des Graph-JSONB). Verschiedene Typen haben unterschiedliche Felder: - START/END: nur id, type, position - ANALYSIS: prompt_slug, question_augmentations - LOGIC: condition, fallback - JOIN: join_strategy, skip_handling """ id: str = Field(..., description="Eindeutige Knoten-ID") type: NodeType = Field(..., description="Knotentyp") label: Optional[str] = Field(None, description="Node-Label (vom Editor, z.B. 'Qualitätseinschätzung')") position: Optional[Position] = Field(None, description="Position im visuellen Editor") # ANALYSIS-Knoten prompt_slug: Optional[str] = Field(None, description="Slug des auszuführenden Prompts (reference mode)") inline_template: Optional[str] = Field(None, description="Inline-Prompt-Template (inline mode, Part 3)") question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)") # LOGIC-Knoten # Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy) condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing") fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration") # JOIN-Knoten (Phase 4) join_strategy: Optional[JoinStrategy] = Field(None, description="Join-Strategie") skip_handling: Optional[SkipHandling] = Field(None, description="Umgang mit übersprungenen Pfaden") min_paths: Optional[int] = Field(None, description="Mindestanzahl erforderlicher Pfade (für REQUIRE_MINIMUM)") timeout_seconds: Optional[int] = Field(None, description="Timeout für BEST_EFFORT-Strategie") # END-Knoten (Phase 4) output_mode: Optional[EndNodeOutputMode] = Field(None, description="Output-Modus: auto oder template") template: Optional[str] = Field(None, description="Jinja2 template für finales Ergebnis (wenn output_mode=template)") class WorkflowEdge(BaseModel): """ Workflow-Kante (Verbindung zwischen Knoten). """ model_config = {"populate_by_name": True} # Erlaubt sowohl 'from_node' als auch 'from' (Alias) id: str = Field(..., description="Eindeutige Edge-ID") from_node: str = Field(..., alias="from", description="Quell-Knoten-ID") to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID") label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')") # UI-Format fields (React Flow) source_handle: Optional[str] = Field(None, alias="sourceHandle", description="Source handle ID (UI format: 'true', 'false', 'out')") target_handle: Optional[str] = Field(None, alias="targetHandle", description="Target handle ID (UI format: 'in', 'path_1', etc.)") class WorkflowGraph(BaseModel): """ Workflow-Graph (gespeichert als JSONB in workflow_definitions.graph). Repräsentiert einen DAG (Directed Acyclic Graph). """ nodes: List[WorkflowNode] = Field(..., description="Liste aller Knoten") edges: List[WorkflowEdge] = Field(..., description="Liste aller Kanten") # ── Workflow-Definition (DB-Modell) ─────────────────────────────────────────── class WorkflowDefinitionCreate(BaseModel): """Request-Modell für Workflow-Erstellung""" name: str slug: str description: Optional[str] = None graph: WorkflowGraph class WorkflowDefinitionUpdate(BaseModel): """Request-Modell für Workflow-Update""" name: Optional[str] = None description: Optional[str] = None graph: Optional[WorkflowGraph] = None active: Optional[bool] = None class WorkflowDefinition(BaseModel): """Response-Modell für Workflow-Definition""" id: str name: str slug: str description: Optional[str] = None graph: WorkflowGraph version: int active: bool created_at: str updated_at: str # ── Workflow-Execution (DB-Modell) ──────────────────────────────────────────── class NodeState(BaseModel): """Ausführungsstatus eines Knotens""" status: NodeStatus result: Optional[Dict[str, Any]] = Field(None, description="Ergebnis der Knoten-Ausführung (Prompt-Output, normalisierte Signale, etc.)") timestamp: Optional[str] = Field(None, description="Zeitpunkt der Ausführung") error: Optional[str] = Field(None, description="Fehlermeldung bei Status=failed") class WorkflowExecutionCreate(BaseModel): """Request-Modell für Workflow-Ausführung (Start)""" workflow_id: str # profile_id kommt aus Session class WorkflowExecution(BaseModel): """Response-Modell für Workflow-Execution""" id: str workflow_id: str profile_id: str status: str node_states: Optional[Dict[str, NodeState]] = None execution_log: Optional[List[Dict[str, Any]]] = None started_at: str completed_at: Optional[str] = None # ── Question Catalog (DB-Modell) ────────────────────────────────────────────── class QuestionCatalogEntry(BaseModel): """Eintrag im Fragenkatalog""" id: str question_type: str label: str question_template: str answer_spectrum: List[str] normalization_rules: Optional[Dict[str, Any]] = None active: bool created_at: str # ── Phase 2: Normalisierung & Execution ─────────────────────────────────────── class NormalizedSignal(BaseModel): """ Normalisiertes Entscheidungssignal (Phase 2). Resultat der Normalisierung einer Rohantwort gegen das Antwortspektrum. """ question_type: str = Field(..., description="Typ der Frage (z.B. 'relevanz')") raw_value: str = Field(..., description="Original LLM-Antwort") normalized_value: Optional[str] = Field(None, description="Gemappter Wert (null bei invalid/not_decidable)") status: SignalStatus = Field(..., description="Normalisierungsstatus") confidence: float = Field(default=1.0, description="Konfidenz (für späteren Einsatz)") metadata: Dict[str, Any] = Field(default_factory=dict, description="Zusatzinfo (z.B. method: 'synonym')") class NodeExecutionState(BaseModel): """ Detaillierter Ausführungsstatus eines Knotens (Phase 2). Erweitert NodeState um Phase-1-Komponenten (analysis_core, decision_signals, etc.) """ node_id: str = Field(..., description="Knoten-ID") status: NodeStatus = Field(..., description="Ausführungsstatus") # Phase 1 Result Container analysis_core: Optional[str] = Field(None, description="Hauptanalyse aus ## Analyse Sektion") decision_signals: Dict[str, str] = Field(default_factory=dict, description="Rohe Signale (pre-normalization)") normalized_signals: List[NormalizedSignal] = Field(default_factory=list, description="Normalisierte Signale (Phase 2)") reasoning_anchors: Optional[str] = Field(None, description="Begründungsanker aus ## Begründung") # Debug Information (nur wenn enable_debug=True) debug_prompt: Optional[str] = Field(None, description="Vollständiger Prompt der an die KI gesendet wurde") debug_raw_response: Optional[str] = Field(None, description="Rohe KI-Antwort (ungeparst)") debug_node_type: Optional[str] = Field(None, description="Node-Typ (analysis, logic, join, etc.)") debug_prompt_slug: Optional[str] = Field(None, description="Verwendeter Prompt-Slug (bei ANALYSIS nodes)") # Metadata (für Join Nodes und andere Zusatzinfos) metadata: Optional[Dict[str, Any]] = Field(None, description="Zusätzliche Metadaten (z.B. Join-Statistiken)") # Error & Timing error: Optional[str] = Field(None, description="Fehlermeldung bei failed") started_at: Optional[str] = Field(None, description="Start-Timestamp (ISO)") completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)") class ExecutionResult(BaseModel): """ Ergebnis einer Workflow-Ausführung (Phase 2). Wird von workflow_executor.execute_workflow() zurückgegeben. """ execution_id: str = Field(..., description="UUID der Execution") workflow_id: str = Field(..., description="UUID des Workflows") status: str = Field(..., description="Gesamt-Status: 'completed', 'failed', 'partial'") node_states: List[NodeExecutionState] = Field(..., description="States aller ausgeführten Knoten") aggregated_result: Dict[str, Any] = Field(default_factory=dict, description="Aggregierte Ergebnisse (combined_analysis, all_signals, etc.)") started_at: str = Field(..., description="Start-Timestamp (ISO)") completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)") error: Optional[str] = Field(None, description="Fehlermeldung bei failed")