mitai-jinkendo/backend/workflow_models.py
Lars ba04e0c0b6
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
fix: Add extra='forbid' to Condition for proper Union resolution
Critical fix: Without extra='forbid', Pydantic accepted UI format
{operator: "and", operands: [...]} as valid Condition by ignoring
unknown fields, resulting in Condition(expression=None).

With extra='forbid':
- Condition rejects unknown fields → fails
- Union tries next type → LogicExpression → success

Test Results (9/9 passed):
- Simple comparisons (eq, neq, gt, lt, in) 
- AND/OR combinations 
- Deep nesting (3+ levels) 
- NOT operator 
- All operators (eq, neq, in, not_in, gt, lt, gte, lte, and, or, not) 
- Legacy format (Condition wrapper) 
- Complex real-world scenarios 

Added comprehensive test suite in:
- test_condition_parsing.py (9 test cases)
- test_condition_union.py (Union resolution verification)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-13 09:01:53 +02:00

370 lines
15 KiB
Python

"""
Pydantic Models for Workflow Engine (Phase 0)
Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen.
Konzept-Basis: konzept_workflow_engine_konsolidated.md
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md
"""
from typing import Optional, List, Dict, Any, Union
from pydantic import BaseModel, Field
from enum import Enum
# ── Enums ─────────────────────────────────────────────────────────────────────
class NodeType(str, Enum):
"""Workflow-Knotentypen"""
START = "start"
ANALYSIS = "analysis"
LOGIC = "logic"
JOIN = "join"
END = "end"
class JoinStrategy(str, Enum):
"""Join-Strategien für Pfad-Konsolidierung"""
WAIT_ALL = "wait_all" # Warte auf alle eingehenden Pfade
WAIT_ANY = "wait_any" # Warte auf mindestens einen Pfad
BEST_EFFORT = "best_effort" # Verwende verfügbare Pfade
class SkipHandling(str, Enum):
"""Umgang mit übersprungenen Pfaden am Join"""
IGNORE_SKIPPED = "ignore_skipped" # Übersprungene Pfade ignorieren
USE_PLACEHOLDER = "use_placeholder" # Platzhalter für übersprungene Pfade
REQUIRE_MINIMUM = "require_minimum" # Mindestanzahl erforderlich
class FallbackStrategy(str, Enum):
"""Fallback-Strategien bei unklaren/ungültigen Signalen"""
CONSERVATIVE_SKIP = "conservative_skip" # Konservativ: Pfad überspringen
DEFAULT_PATH = "default_path" # Standard-Pfad ausführen
UNCERTAINTY_PATH = "uncertainty_path" # Expliziter Unsicherheits-Pfad
DOCUMENT_ONLY = "document_only" # Nur dokumentieren, kein Routing
class NodeStatus(str, Enum):
"""Ausführungsstatus eines Knotens"""
PENDING = "pending"
EXECUTING = "executing" # Phase 2: Gerade in Ausführung
EXECUTED = "executed"
SKIPPED = "skipped"
UNCLEAR = "unclear"
FAILED = "failed"
class SignalStatus(str, Enum):
"""Status nach Normalisierung (Phase 2)"""
VALID = "valid" # Exakte Übereinstimmung mit Spektrum
NORMALIZED = "normalized" # Gemappt (Synonym/Case-insensitive)
UNCLEAR = "unclear" # Mehrdeutig oder widersprüchlich
INVALID = "invalid" # Außerhalb des Spektrums
NOT_DECIDABLE = "not_decidable" # Kein Signal vorhanden
class LogicOperator(str, Enum):
"""Logische Operatoren für Bedingungen"""
EQ = "eq" # ==
NEQ = "neq" # !=
IN = "in" # in
NOT_IN = "not_in" # not in
GT = "gt" # >
LT = "lt" # <
GTE = "gte" # >=
LTE = "lte" # <=
CONTAINS = "contains" # String/List contains (Phase 3)
AND = "and"
OR = "or"
NOT = "not"
class EndNodeOutputMode(str, Enum):
"""Output-Modi für End Node (Phase 4)"""
AUTO = "auto" # Automatisch: Concatenate all analyses
TEMPLATE = "template" # Custom Jinja2 template
# ── Hilfsmodelle ──────────────────────────────────────────────────────────────
class Position(BaseModel):
"""Position eines Knotens im visuellen Editor"""
x: float
y: float
class QuestionAugmentation(BaseModel):
"""
Fragenergänzung zu einem Analyseprompt.
Hybridmodell (Sektion 6 der Anforderungsanalyse):
- Primär: Knotengebunden (am Workflow-Knoten definiert)
- Sekundär: Prompt-gebundene Standardfragen (optional)
- Vorrangregel: Knotenspezifische überschreiben Prompt-Defaults
"""
id: str = Field(..., description="Eindeutige ID der Frage (für Referenzierung in Logik-Knoten)")
type: str = Field(..., description="Fragetyp: relevanz, prioritaet, selektion, ausschluss, eskalation, unsicherheit")
question: str = Field(..., description="Fragetext (kann Template-Variablen enthalten)")
answer_spectrum: List[str] = Field(..., description="Erlaubte Antworten, z.B. ['ja', 'nein', 'unklar']")
# ── Bedingungsmodelle ─────────────────────────────────────────────────────────
class LogicOperand(BaseModel):
"""
Operand einer Logik-Bedingung.
Referenziert normalisierte Signale aus vorangegangenen Knoten.
Format: "node_id.question_id" (z.B. "node_1.q1")
"""
ref: str = Field(..., description="Referenz zum Signal: node_id.question_id")
operator: LogicOperator = Field(..., description="Vergleichsoperator")
value: Any = Field(..., description="Vergleichswert")
class LogicExpression(BaseModel):
"""
Logik-Ausdruck (verschachtelbar).
Beispiel:
{
"operator": "and",
"operands": [
{"ref": "node_1.q1", "operator": "eq", "value": "ja"},
{"ref": "node_1.q2", "operator": "in", "value": ["hoch", "mittel"]}
]
}
oder verschachtelt:
{
"operator": "or",
"operands": [
{
"operator": "and",
"operands": [...]
},
{"ref": "node_2.q1", "operator": "eq", "value": "ja"}
]
}
"""
operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator")
operands: Optional[List['LogicExpression']] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)")
# Bei einfachem Vergleich:
ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)")
value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)")
# Enable forward reference resolution for recursive model
LogicExpression.model_rebuild()
class Condition(BaseModel):
"""
Bedingung für einen Logik-Knoten.
Unterstützt if/else-if/else-Logik.
Note: Uses extra='forbid' to ensure proper Union resolution with LogicExpression.
If unknown fields are present (like 'operator', 'operands'), deserialization fails
and Pydantic tries LogicExpression instead.
"""
model_config = {'extra': 'forbid'}
type: str = Field(default="if", description="Bedingungstyp: if, else-if, else")
expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')")
then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad")
else_path: Optional[str] = Field(None, description="Edge-ID für 'else'-Pfad")
class FallbackConfig(BaseModel):
"""Fallback-Konfiguration für Logik-Knoten"""
strategy: FallbackStrategy = Field(..., description="Fallback-Strategie")
on_unclear: Optional[str] = Field(None, description="Edge-ID für Unsicherheits-Pfad")
on_invalid: Optional[str] = Field(None, description="Edge-ID bei ungültigen Signalen")
# ── Workflow-Graph-Modelle ────────────────────────────────────────────────────
class WorkflowNode(BaseModel):
"""
Workflow-Knoten (Teil des Graph-JSONB).
Verschiedene Typen haben unterschiedliche Felder:
- START/END: nur id, type, position
- ANALYSIS: prompt_slug, question_augmentations
- LOGIC: condition, fallback
- JOIN: join_strategy, skip_handling
"""
id: str = Field(..., description="Eindeutige Knoten-ID")
type: NodeType = Field(..., description="Knotentyp")
position: Optional[Position] = Field(None, description="Position im visuellen Editor")
# ANALYSIS-Knoten
prompt_slug: Optional[str] = Field(None, description="Slug des auszuführenden Prompts (reference mode)")
inline_template: Optional[str] = Field(None, description="Inline-Prompt-Template (inline mode, Part 3)")
question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)")
# LOGIC-Knoten
# Support both formats: direct LogicExpression (UI) or wrapped in Condition (legacy)
condition: Optional[Union[LogicExpression, Condition]] = Field(None, description="Bedingung für Pfad-Routing")
fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration")
# JOIN-Knoten (Phase 4)
join_strategy: Optional[JoinStrategy] = Field(None, description="Join-Strategie")
skip_handling: Optional[SkipHandling] = Field(None, description="Umgang mit übersprungenen Pfaden")
min_paths: Optional[int] = Field(None, description="Mindestanzahl erforderlicher Pfade (für REQUIRE_MINIMUM)")
timeout_seconds: Optional[int] = Field(None, description="Timeout für BEST_EFFORT-Strategie")
# END-Knoten (Phase 4)
output_mode: Optional[EndNodeOutputMode] = Field(None, description="Output-Modus: auto oder template")
template: Optional[str] = Field(None, description="Jinja2 template für finales Ergebnis (wenn output_mode=template)")
class WorkflowEdge(BaseModel):
"""
Workflow-Kante (Verbindung zwischen Knoten).
"""
model_config = {"populate_by_name": True} # Erlaubt sowohl 'from_node' als auch 'from' (Alias)
id: str = Field(..., description="Eindeutige Edge-ID")
from_node: str = Field(..., alias="from", description="Quell-Knoten-ID")
to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID")
label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')")
class WorkflowGraph(BaseModel):
"""
Workflow-Graph (gespeichert als JSONB in workflow_definitions.graph).
Repräsentiert einen DAG (Directed Acyclic Graph).
"""
nodes: List[WorkflowNode] = Field(..., description="Liste aller Knoten")
edges: List[WorkflowEdge] = Field(..., description="Liste aller Kanten")
# ── Workflow-Definition (DB-Modell) ───────────────────────────────────────────
class WorkflowDefinitionCreate(BaseModel):
"""Request-Modell für Workflow-Erstellung"""
name: str
slug: str
description: Optional[str] = None
graph: WorkflowGraph
class WorkflowDefinitionUpdate(BaseModel):
"""Request-Modell für Workflow-Update"""
name: Optional[str] = None
description: Optional[str] = None
graph: Optional[WorkflowGraph] = None
active: Optional[bool] = None
class WorkflowDefinition(BaseModel):
"""Response-Modell für Workflow-Definition"""
id: str
name: str
slug: str
description: Optional[str] = None
graph: WorkflowGraph
version: int
active: bool
created_at: str
updated_at: str
# ── Workflow-Execution (DB-Modell) ────────────────────────────────────────────
class NodeState(BaseModel):
"""Ausführungsstatus eines Knotens"""
status: NodeStatus
result: Optional[Dict[str, Any]] = Field(None, description="Ergebnis der Knoten-Ausführung (Prompt-Output, normalisierte Signale, etc.)")
timestamp: Optional[str] = Field(None, description="Zeitpunkt der Ausführung")
error: Optional[str] = Field(None, description="Fehlermeldung bei Status=failed")
class WorkflowExecutionCreate(BaseModel):
"""Request-Modell für Workflow-Ausführung (Start)"""
workflow_id: str
# profile_id kommt aus Session
class WorkflowExecution(BaseModel):
"""Response-Modell für Workflow-Execution"""
id: str
workflow_id: str
profile_id: str
status: str
node_states: Optional[Dict[str, NodeState]] = None
execution_log: Optional[List[Dict[str, Any]]] = None
started_at: str
completed_at: Optional[str] = None
# ── Question Catalog (DB-Modell) ──────────────────────────────────────────────
class QuestionCatalogEntry(BaseModel):
"""Eintrag im Fragenkatalog"""
id: str
question_type: str
label: str
question_template: str
answer_spectrum: List[str]
normalization_rules: Optional[Dict[str, Any]] = None
active: bool
created_at: str
# ── Phase 2: Normalisierung & Execution ───────────────────────────────────────
class NormalizedSignal(BaseModel):
"""
Normalisiertes Entscheidungssignal (Phase 2).
Resultat der Normalisierung einer Rohantwort gegen das Antwortspektrum.
"""
question_type: str = Field(..., description="Typ der Frage (z.B. 'relevanz')")
raw_value: str = Field(..., description="Original LLM-Antwort")
normalized_value: Optional[str] = Field(None, description="Gemappter Wert (null bei invalid/not_decidable)")
status: SignalStatus = Field(..., description="Normalisierungsstatus")
confidence: float = Field(default=1.0, description="Konfidenz (für späteren Einsatz)")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Zusatzinfo (z.B. method: 'synonym')")
class NodeExecutionState(BaseModel):
"""
Detaillierter Ausführungsstatus eines Knotens (Phase 2).
Erweitert NodeState um Phase-1-Komponenten (analysis_core, decision_signals, etc.)
"""
node_id: str = Field(..., description="Knoten-ID")
status: NodeStatus = Field(..., description="Ausführungsstatus")
# Phase 1 Result Container
analysis_core: Optional[str] = Field(None, description="Hauptanalyse aus ## Analyse Sektion")
decision_signals: Dict[str, str] = Field(default_factory=dict, description="Rohe Signale (pre-normalization)")
normalized_signals: List[NormalizedSignal] = Field(default_factory=list, description="Normalisierte Signale (Phase 2)")
reasoning_anchors: Optional[str] = Field(None, description="Begründungsanker aus ## Begründung")
# Error & Timing
error: Optional[str] = Field(None, description="Fehlermeldung bei failed")
started_at: Optional[str] = Field(None, description="Start-Timestamp (ISO)")
completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)")
class ExecutionResult(BaseModel):
"""
Ergebnis einer Workflow-Ausführung (Phase 2).
Wird von workflow_executor.execute_workflow() zurückgegeben.
"""
execution_id: str = Field(..., description="UUID der Execution")
workflow_id: str = Field(..., description="UUID des Workflows")
status: str = Field(..., description="Gesamt-Status: 'completed', 'failed', 'partial'")
node_states: List[NodeExecutionState] = Field(..., description="States aller ausgeführten Knoten")
aggregated_result: Dict[str, Any] = Field(default_factory=dict, description="Aggregierte Ergebnisse (combined_analysis, all_signals, etc.)")
started_at: str = Field(..., description="Start-Timestamp (ISO)")
completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)")
error: Optional[str] = Field(None, description="Fehlermeldung bei failed")