Logic-Nodes evaluated correctly but activated_edges was empty because _get_edges_by_label() only checked e.label, which is null in UI format. UI format uses: - sourceHandle: "true" / "false" (instead of label: "then" / "else") - targetHandle: "in" / "path_1" / etc. Changes: 1. Added source_handle/target_handle fields to WorkflowEdge model - With aliases sourceHandle/targetHandle for camelCase JSON 2. Updated _get_edges_by_label() to check both formats: - Legacy: e.label == "then" / "else" - UI: e.source_handle == "true" / "false" Now Logic-Nodes correctly activate outgoing edges → Join-Node receives completed paths → End-Node executes → Workflow completes! Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
374 lines
15 KiB
Python
374 lines
15 KiB
Python
"""
|
|
Pydantic Models for Workflow Engine (Phase 0)
|
|
|
|
Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen.
|
|
|
|
Konzept-Basis: konzept_workflow_engine_konsolidated.md
|
|
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md
|
|
"""
|
|
from typing import Optional, List, Dict, Any, Union
|
|
from pydantic import BaseModel, Field
|
|
from enum import Enum
|
|
|
|
|
|
# ── Enums ─────────────────────────────────────────────────────────────────────
|
|
|
|
class NodeType(str, Enum):
|
|
"""Workflow-Knotentypen"""
|
|
START = "start"
|
|
ANALYSIS = "analysis"
|
|
LOGIC = "logic"
|
|
JOIN = "join"
|
|
END = "end"
|
|
|
|
|
|
class JoinStrategy(str, Enum):
|
|
"""Join-Strategien für Pfad-Konsolidierung"""
|
|
WAIT_ALL = "wait_all" # Warte auf alle eingehenden Pfade
|
|
WAIT_ANY = "wait_any" # Warte auf mindestens einen Pfad
|
|
BEST_EFFORT = "best_effort" # Verwende verfügbare Pfade
|
|
|
|
|
|
class SkipHandling(str, Enum):
|
|
"""Umgang mit übersprungenen Pfaden am Join"""
|
|
IGNORE_SKIPPED = "ignore_skipped" # Übersprungene Pfade ignorieren
|
|
USE_PLACEHOLDER = "use_placeholder" # Platzhalter für übersprungene Pfade
|
|
REQUIRE_MINIMUM = "require_minimum" # Mindestanzahl erforderlich
|
|
|
|
|
|
class FallbackStrategy(str, Enum):
|
|
"""Fallback-Strategien bei unklaren/ungültigen Signalen"""
|
|
CONSERVATIVE_SKIP = "conservative_skip" # Konservativ: Pfad überspringen
|
|
DEFAULT_PATH = "default_path" # Standard-Pfad ausführen
|
|
UNCERTAINTY_PATH = "uncertainty_path" # Expliziter Unsicherheits-Pfad
|
|
DOCUMENT_ONLY = "document_only" # Nur dokumentieren, kein Routing
|
|
|
|
|
|
class NodeStatus(str, Enum):
|
|
"""Ausführungsstatus eines Knotens"""
|
|
PENDING = "pending"
|
|
EXECUTING = "executing" # Phase 2: Gerade in Ausführung
|
|
EXECUTED = "executed"
|
|
SKIPPED = "skipped"
|
|
UNCLEAR = "unclear"
|
|
FAILED = "failed"
|
|
|
|
|
|
class SignalStatus(str, Enum):
|
|
"""Status nach Normalisierung (Phase 2)"""
|
|
VALID = "valid" # Exakte Übereinstimmung mit Spektrum
|
|
NORMALIZED = "normalized" # Gemappt (Synonym/Case-insensitive)
|
|
UNCLEAR = "unclear" # Mehrdeutig oder widersprüchlich
|
|
INVALID = "invalid" # Außerhalb des Spektrums
|
|
NOT_DECIDABLE = "not_decidable" # Kein Signal vorhanden
|
|
|
|
|
|
class LogicOperator(str, Enum):
|
|
"""Logische Operatoren für Bedingungen"""
|
|
EQ = "eq" # ==
|
|
NEQ = "neq" # !=
|
|
IN = "in" # in
|
|
NOT_IN = "not_in" # not in
|
|
GT = "gt" # >
|
|
LT = "lt" # <
|
|
GTE = "gte" # >=
|
|
LTE = "lte" # <=
|
|
CONTAINS = "contains" # String/List contains (Phase 3)
|
|
AND = "and"
|
|
OR = "or"
|
|
NOT = "not"
|
|
|
|
|
|
class EndNodeOutputMode(str, Enum):
|
|
"""Output-Modi für End Node (Phase 4)"""
|
|
AUTO = "auto" # Automatisch: Concatenate all analyses
|
|
TEMPLATE = "template" # Custom Jinja2 template
|
|
|
|
|
|
# ── Hilfsmodelle ──────────────────────────────────────────────────────────────
|
|
|
|
class Position(BaseModel):
|
|
"""Position eines Knotens im visuellen Editor"""
|
|
x: float
|
|
y: float
|
|
|
|
|
|
class QuestionAugmentation(BaseModel):
|
|
"""
|
|
Fragenergänzung zu einem Analyseprompt.
|
|
|
|
Hybridmodell (Sektion 6 der Anforderungsanalyse):
|
|
- Primär: Knotengebunden (am Workflow-Knoten definiert)
|
|
- Sekundär: Prompt-gebundene Standardfragen (optional)
|
|
- Vorrangregel: Knotenspezifische überschreiben Prompt-Defaults
|
|
"""
|
|
id: str = Field(..., description="Eindeutige ID der Frage (für Referenzierung in Logik-Knoten)")
|
|
type: str = Field(..., description="Fragetyp: relevanz, prioritaet, selektion, ausschluss, eskalation, unsicherheit")
|
|
question: str = Field(..., description="Fragetext (kann Template-Variablen enthalten)")
|
|
answer_spectrum: List[str] = Field(..., description="Erlaubte Antworten, z.B. ['ja', 'nein', 'unklar']")
|
|
|
|
|
|
# ── Bedingungsmodelle ─────────────────────────────────────────────────────────
|
|
|
|
class LogicOperand(BaseModel):
|
|
"""
|
|
Operand einer Logik-Bedingung.
|
|
|
|
Referenziert normalisierte Signale aus vorangegangenen Knoten.
|
|
Format: "node_id.question_id" (z.B. "node_1.q1")
|
|
"""
|
|
ref: str = Field(..., description="Referenz zum Signal: node_id.question_id")
|
|
operator: LogicOperator = Field(..., description="Vergleichsoperator")
|
|
value: Any = Field(..., description="Vergleichswert")
|
|
|
|
|
|
class LogicExpression(BaseModel):
|
|
"""
|
|
Logik-Ausdruck (verschachtelbar).
|
|
|
|
Beispiel:
|
|
{
|
|
"operator": "and",
|
|
"operands": [
|
|
{"ref": "node_1.q1", "operator": "eq", "value": "ja"},
|
|
{"ref": "node_1.q2", "operator": "in", "value": ["hoch", "mittel"]}
|
|
]
|
|
}
|
|
|
|
oder verschachtelt:
|
|
{
|
|
"operator": "or",
|
|
"operands": [
|
|
{
|
|
"operator": "and",
|
|
"operands": [...]
|
|
},
|
|
{"ref": "node_2.q1", "operator": "eq", "value": "ja"}
|
|
]
|
|
}
|
|
"""
|
|
operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator")
|
|
operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)")
|
|
# Bei einfachem Vergleich:
|
|
ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)")
|
|
value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)")
|
|
|
|
# Enable forward reference resolution for recursive model
|
|
LogicExpression.model_rebuild()
|
|
|
|
|
|
class Condition(BaseModel):
|
|
"""
|
|
Bedingung für einen Logik-Knoten.
|
|
|
|
Unterstützt if/else-if/else-Logik.
|
|
|
|
Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression.
|
|
If unknown fields are present (like 'operator', 'operands'), deserialization fails
|
|
and Pydantic tries LogicExpression instead.
|
|
"""
|
|
model_config = {'extra': 'forbid'}
|
|
|
|
type: str = Field(default="if", description="Bedingungstyp: if, else-if, else")
|
|
expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')")
|
|
then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad")
|
|
else_path: Optional[str] = Field(None, description="Edge-ID für 'else'-Pfad")
|
|
|
|
|
|
class FallbackConfig(BaseModel):
|
|
"""Fallback-Konfiguration für Logik-Knoten"""
|
|
strategy: FallbackStrategy = Field(..., description="Fallback-Strategie")
|
|
on_unclear: Optional[str] = Field(None, description="Edge-ID für Unsicherheits-Pfad")
|
|
on_invalid: Optional[str] = Field(None, description="Edge-ID bei ungültigen Signalen")
|
|
|
|
|
|
# ── Workflow-Graph-Modelle ────────────────────────────────────────────────────
|
|
|
|
class WorkflowNode(BaseModel):
|
|
"""
|
|
Workflow-Knoten (Teil des Graph-JSONB).
|
|
|
|
Verschiedene Typen haben unterschiedliche Felder:
|
|
- START/END: nur id, type, position
|
|
- ANALYSIS: prompt_slug, question_augmentations
|
|
- LOGIC: condition, fallback
|
|
- JOIN: join_strategy, skip_handling
|
|
"""
|
|
id: str = Field(..., description="Eindeutige Knoten-ID")
|
|
type: NodeType = Field(..., description="Knotentyp")
|
|
position: Optional[Position] = Field(None, description="Position im visuellen Editor")
|
|
|
|
# ANALYSIS-Knoten
|
|
prompt_slug: Optional[str] = Field(None, description="Slug des auszuführenden Prompts (reference mode)")
|
|
inline_template: Optional[str] = Field(None, description="Inline-Prompt-Template (inline mode, Part 3)")
|
|
question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)")
|
|
|
|
# LOGIC-Knoten
|
|
# Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy)
|
|
condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing")
|
|
fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration")
|
|
|
|
# JOIN-Knoten (Phase 4)
|
|
join_strategy: Optional[JoinStrategy] = Field(None, description="Join-Strategie")
|
|
skip_handling: Optional[SkipHandling] = Field(None, description="Umgang mit übersprungenen Pfaden")
|
|
min_paths: Optional[int] = Field(None, description="Mindestanzahl erforderlicher Pfade (für REQUIRE_MINIMUM)")
|
|
timeout_seconds: Optional[int] = Field(None, description="Timeout für BEST_EFFORT-Strategie")
|
|
|
|
# END-Knoten (Phase 4)
|
|
output_mode: Optional[EndNodeOutputMode] = Field(None, description="Output-Modus: auto oder template")
|
|
template: Optional[str] = Field(None, description="Jinja2 template für finales Ergebnis (wenn output_mode=template)")
|
|
|
|
|
|
class WorkflowEdge(BaseModel):
|
|
"""
|
|
Workflow-Kante (Verbindung zwischen Knoten).
|
|
"""
|
|
model_config = {"populate_by_name": True} # Erlaubt sowohl 'from_node' als auch 'from' (Alias)
|
|
|
|
id: str = Field(..., description="Eindeutige Edge-ID")
|
|
from_node: str = Field(..., alias="from", description="Quell-Knoten-ID")
|
|
to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID")
|
|
label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')")
|
|
|
|
# UI-Format fields (React Flow)
|
|
source_handle: Optional[str] = Field(None, alias="sourceHandle", description="Source handle ID (UI format: 'true', 'false', 'out')")
|
|
target_handle: Optional[str] = Field(None, alias="targetHandle", description="Target handle ID (UI format: 'in', 'path_1', etc.)")
|
|
|
|
|
|
class WorkflowGraph(BaseModel):
|
|
"""
|
|
Workflow-Graph (gespeichert als JSONB in workflow_definitions.graph).
|
|
|
|
Repräsentiert einen DAG (Directed Acyclic Graph).
|
|
"""
|
|
nodes: List[WorkflowNode] = Field(..., description="Liste aller Knoten")
|
|
edges: List[WorkflowEdge] = Field(..., description="Liste aller Kanten")
|
|
|
|
|
|
# ── Workflow-Definition (DB-Modell) ───────────────────────────────────────────
|
|
|
|
class WorkflowDefinitionCreate(BaseModel):
|
|
"""Request-Modell für Workflow-Erstellung"""
|
|
name: str
|
|
slug: str
|
|
description: Optional[str] = None
|
|
graph: WorkflowGraph
|
|
|
|
|
|
class WorkflowDefinitionUpdate(BaseModel):
|
|
"""Request-Modell für Workflow-Update"""
|
|
name: Optional[str] = None
|
|
description: Optional[str] = None
|
|
graph: Optional[WorkflowGraph] = None
|
|
active: Optional[bool] = None
|
|
|
|
|
|
class WorkflowDefinition(BaseModel):
|
|
"""Response-Modell für Workflow-Definition"""
|
|
id: str
|
|
name: str
|
|
slug: str
|
|
description: Optional[str] = None
|
|
graph: WorkflowGraph
|
|
version: int
|
|
active: bool
|
|
created_at: str
|
|
updated_at: str
|
|
|
|
|
|
# ── Workflow-Execution (DB-Modell) ────────────────────────────────────────────
|
|
|
|
class NodeState(BaseModel):
|
|
"""Ausführungsstatus eines Knotens"""
|
|
status: NodeStatus
|
|
result: Optional[Dict[str, Any]] = Field(None, description="Ergebnis der Knoten-Ausführung (Prompt-Output, normalisierte Signale, etc.)")
|
|
timestamp: Optional[str] = Field(None, description="Zeitpunkt der Ausführung")
|
|
error: Optional[str] = Field(None, description="Fehlermeldung bei Status=failed")
|
|
|
|
|
|
class WorkflowExecutionCreate(BaseModel):
|
|
"""Request-Modell für Workflow-Ausführung (Start)"""
|
|
workflow_id: str
|
|
# profile_id kommt aus Session
|
|
|
|
|
|
class WorkflowExecution(BaseModel):
|
|
"""Response-Modell für Workflow-Execution"""
|
|
id: str
|
|
workflow_id: str
|
|
profile_id: str
|
|
status: str
|
|
node_states: Optional[Dict[str, NodeState]] = None
|
|
execution_log: Optional[List[Dict[str, Any]]] = None
|
|
started_at: str
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
# ── Question Catalog (DB-Modell) ──────────────────────────────────────────────
|
|
|
|
class QuestionCatalogEntry(BaseModel):
|
|
"""Eintrag im Fragenkatalog"""
|
|
id: str
|
|
question_type: str
|
|
label: str
|
|
question_template: str
|
|
answer_spectrum: List[str]
|
|
normalization_rules: Optional[Dict[str, Any]] = None
|
|
active: bool
|
|
created_at: str
|
|
|
|
|
|
# ── Phase 2: Normalisierung & Execution ───────────────────────────────────────
|
|
|
|
class NormalizedSignal(BaseModel):
|
|
"""
|
|
Normalisiertes Entscheidungssignal (Phase 2).
|
|
|
|
Resultat der Normalisierung einer Rohantwort gegen das Antwortspektrum.
|
|
"""
|
|
question_type: str = Field(..., description="Typ der Frage (z.B. 'relevanz')")
|
|
raw_value: str = Field(..., description="Original LLM-Antwort")
|
|
normalized_value: Optional[str] = Field(None, description="Gemappter Wert (null bei invalid/not_decidable)")
|
|
status: SignalStatus = Field(..., description="Normalisierungsstatus")
|
|
confidence: float = Field(default=1.0, description="Konfidenz (für späteren Einsatz)")
|
|
metadata: Dict[str, Any] = Field(default_factory=dict, description="Zusatzinfo (z.B. method: 'synonym')")
|
|
|
|
|
|
class NodeExecutionState(BaseModel):
|
|
"""
|
|
Detaillierter Ausführungsstatus eines Knotens (Phase 2).
|
|
|
|
Erweitert NodeState um Phase-1-Komponenten (analysis_core, decision_signals, etc.)
|
|
"""
|
|
node_id: str = Field(..., description="Knoten-ID")
|
|
status: NodeStatus = Field(..., description="Ausführungsstatus")
|
|
|
|
# Phase 1 Result Container
|
|
analysis_core: Optional[str] = Field(None, description="Hauptanalyse aus ## Analyse Sektion")
|
|
decision_signals: Dict[str, str] = Field(default_factory=dict, description="Rohe Signale (pre-normalization)")
|
|
normalized_signals: List[NormalizedSignal] = Field(default_factory=list, description="Normalisierte Signale (Phase 2)")
|
|
reasoning_anchors: Optional[str] = Field(None, description="Begründungsanker aus ## Begründung")
|
|
|
|
# Error & Timing
|
|
error: Optional[str] = Field(None, description="Fehlermeldung bei failed")
|
|
started_at: Optional[str] = Field(None, description="Start-Timestamp (ISO)")
|
|
completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)")
|
|
|
|
|
|
class ExecutionResult(BaseModel):
|
|
"""
|
|
Ergebnis einer Workflow-Ausführung (Phase 2).
|
|
|
|
Wird von workflow_executor.execute_workflow() zurückgegeben.
|
|
"""
|
|
execution_id: str = Field(..., description="UUID der Execution")
|
|
workflow_id: str = Field(..., description="UUID des Workflows")
|
|
status: str = Field(..., description="Gesamt-Status: 'completed', 'failed', 'partial'")
|
|
|
|
node_states: List[NodeExecutionState] = Field(..., description="States aller ausgeführten Knoten")
|
|
aggregated_result: Dict[str, Any] = Field(default_factory=dict, description="Aggregierte Ergebnisse (combined_analysis, all_signals, etc.)")
|
|
|
|
started_at: str = Field(..., description="Start-Timestamp (ISO)")
|
|
completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)")
|
|
error: Optional[str] = Field(None, description="Fehlermeldung bei failed")
|