From b5be6e21a5c7f387a867d6dfd5206b475682df63 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 3 Apr 2026 16:55:51 +0200 Subject: [PATCH] feat: Phase 0 - Workflow Engine Foundation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend: - DB-Migration 034: workflow_definitions, workflow_question_catalog, workflow_executions - ai_prompts.question_augmentations JSONB-Spalte (Hybridmodell: Prompt-Defaults) - 6 Grundtypen Fragenergänzungen mit Normalisierungsregeln (Seed-Daten) - Pydantic-Modelle (16 Models, 11 Enums) in workflow_models.py - Workflow-Engine: Graph-Parsing, Topologische Sortierung, DAG-Validierung - Dispatcher-Erweiterung type='workflow' (Stub für Phase 1-3) - Adjacency Lists, Erreichbarkeits-Prüfungen, Zyklen-Erkennung Testing: - 22 Unit-Tests (alle bestanden): Graph-Parsing, Validierung, Topologische Sortierung - Fixtures: simple_valid_graph, parallel_graph, branching_graph Version: - APP_VERSION 0.9i - DB_SCHEMA_VERSION 20260403 - Module: workflow 0.1.0 Anforderungsanalyse: .claude/task/Workflow_engine_prompting_engine/anforderungsanalyse_umsetzungsplan.md Konzept-Basis: .claude/task/Workflow_engine_prompting_engine/konzept_workflow_engine_konsolidated.md Co-Authored-By: Claude Opus 4.6 --- .../migrations/034_workflow_foundation.sql | 132 ++++++ backend/prompt_executor.py | 45 ++ backend/version.py | 56 +++ backend/workflow_engine.py | 393 +++++++++++++++++ backend/workflow_models.py | 280 ++++++++++++ tests/backend/test_workflow_engine.py | 413 ++++++++++++++++++ 6 files changed, 1319 insertions(+) create mode 100644 backend/migrations/034_workflow_foundation.sql create mode 100644 backend/version.py create mode 100644 backend/workflow_engine.py create mode 100644 backend/workflow_models.py create mode 100644 tests/backend/test_workflow_engine.py diff --git a/backend/migrations/034_workflow_foundation.sql b/backend/migrations/034_workflow_foundation.sql new file mode 100644 index 0000000..dfe1d12 --- /dev/null +++ b/backend/migrations/034_workflow_foundation.sql @@ -0,0 +1,132 @@ +-- Migration 034: Workflow Foundation +-- Phase 0: Datenmodell für Workflow-Erweiterung der Prompt Engine +-- Erstellt: 2026-04-03 + +-- ============================================================ +-- Tabelle: workflow_definitions +-- Speichert Workflow-Graphen (Knoten + Kanten) +-- ============================================================ +CREATE TABLE IF NOT EXISTS workflow_definitions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + slug VARCHAR(100) UNIQUE NOT NULL, + description TEXT, + graph JSONB NOT NULL, -- Der Workflow-Graph (Knoten + Kanten) + version INTEGER DEFAULT 1, + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_workflow_definitions_slug ON workflow_definitions(slug); +CREATE INDEX IF NOT EXISTS idx_workflow_definitions_active ON workflow_definitions(active); + +-- ============================================================ +-- Tabelle: workflow_question_catalog +-- Katalog der Fragenergänzungen (6 Grundtypen) +-- ============================================================ +CREATE TABLE IF NOT EXISTS workflow_question_catalog ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + question_type VARCHAR(50) NOT NULL, -- relevanz, prioritaet, selektion, ausschluss, eskalation, unsicherheit + label VARCHAR(255) NOT NULL, + question_template TEXT NOT NULL, -- Template für die Frage + answer_spectrum JSONB NOT NULL, -- z.B. ["ja", "nein", "unklar"] + normalization_rules JSONB, -- Regeln für Normalisierung (Synonyme, etc.) + active BOOLEAN DEFAULT TRUE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_workflow_question_catalog_type ON workflow_question_catalog(question_type); +CREATE INDEX IF NOT EXISTS idx_workflow_question_catalog_active ON workflow_question_catalog(active); + +-- Seed: 6 Grundtypen der Fragenergänzungen +INSERT INTO workflow_question_catalog (question_type, label, question_template, answer_spectrum, normalization_rules, active) VALUES +( + 'relevanz', + 'Relevanz-Frage', + 'Ist eine vertiefte Analyse in diesem Bereich relevant?', + '["ja", "nein", "unklar"]'::JSONB, + '{"synonyms": {"ja": ["yes", "Ja", "JA", "relevant", "sinnvoll"], "nein": ["no", "Nein", "NEIN", "nicht relevant", "unwichtig"], "unklar": ["unclear", "unsure", "vielleicht", "möglicherweise"]}}'::JSONB, + true +), +( + 'prioritaet', + 'Prioritäts-Frage', + 'Wie hoch ist die Priorität für eine Analyse in diesem Bereich?', + '["hoch", "mittel", "niedrig", "unklar"]'::JSONB, + '{"synonyms": {"hoch": ["high", "Hoch", "HOCH", "urgent", "dringend"], "mittel": ["medium", "Mittel", "MITTEL", "moderat"], "niedrig": ["low", "Niedrig", "NIEDRIG", "gering"], "unklar": ["unclear", "unsure", "kann nicht einschätzen"]}}'::JSONB, + true +), +( + 'selektion', + 'Selektions-Frage', + 'Soll dieser Pfad ausgewählt werden?', + '["ja", "nein", "unklar"]'::JSONB, + '{"synonyms": {"ja": ["yes", "Ja", "JA", "auswählen", "select"], "nein": ["no", "Nein", "NEIN", "nicht auswählen", "skip"], "unklar": ["unclear", "unsure", "vielleicht"]}}'::JSONB, + true +), +( + 'ausschluss', + 'Ausschluss-Frage', + 'Soll dieser Pfad ausgeschlossen werden?', + '["ja", "nein", "unklar"]'::JSONB, + '{"synonyms": {"ja": ["yes", "Ja", "JA", "ausschließen", "exclude"], "nein": ["no", "Nein", "NEIN", "nicht ausschließen", "include"], "unklar": ["unclear", "unsure", "vielleicht"]}}'::JSONB, + true +), +( + 'eskalation', + 'Eskalations-Frage', + 'Ist eine Eskalation oder besondere Aufmerksamkeit erforderlich?', + '["ja", "nein", "unklar"]'::JSONB, + '{"synonyms": {"ja": ["yes", "Ja", "JA", "eskalieren", "alert"], "nein": ["no", "Nein", "NEIN", "normal", "routine"], "unklar": ["unclear", "unsure", "vielleicht"]}}'::JSONB, + true +), +( + 'unsicherheit', + 'Unsicherheits-Frage', + 'Besteht Unsicherheit in der Bewertung?', + '["ja", "nein", "unklar"]'::JSONB, + '{"synonyms": {"ja": ["yes", "Ja", "JA", "unsicher", "uncertain"], "nein": ["no", "Nein", "NEIN", "sicher", "certain"], "unklar": ["unclear", "unsure", "vielleicht"]}}'::JSONB, + true +); + +-- ============================================================ +-- Tabelle: workflow_executions +-- Ausführungs-Log für Workflow-Runs +-- ============================================================ +CREATE TABLE IF NOT EXISTS workflow_executions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + workflow_id UUID REFERENCES workflow_definitions(id) ON DELETE CASCADE, + profile_id UUID REFERENCES profiles(id) ON DELETE CASCADE, + status VARCHAR(20) NOT NULL DEFAULT 'running', -- running, completed, failed, partial + node_states JSONB, -- Status jedes Knotens (executed, skipped, unclear, failed) + execution_log JSONB, -- Detaillierter Ablauf + started_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + completed_at TIMESTAMP WITH TIME ZONE +); + +CREATE INDEX IF NOT EXISTS idx_workflow_executions_workflow_id ON workflow_executions(workflow_id); +CREATE INDEX IF NOT EXISTS idx_workflow_executions_profile_id ON workflow_executions(profile_id); +CREATE INDEX IF NOT EXISTS idx_workflow_executions_status ON workflow_executions(status); +CREATE INDEX IF NOT EXISTS idx_workflow_executions_started_at ON workflow_executions(started_at DESC); + +-- ============================================================ +-- Erweiterung bestehende Tabelle: ai_prompts +-- Optionale Prompt-gebundene Standard-Fragenergänzungen +-- (Sekundär: Knotenspezifische Fragen haben Vorrang) +-- ============================================================ +ALTER TABLE ai_prompts +ADD COLUMN IF NOT EXISTS question_augmentations JSONB; + +COMMENT ON COLUMN ai_prompts.question_augmentations IS 'Optionale Standard-Fragenergänzungen für diesen Prompt. Knotenspezifische Fragen im Workflow-Graph haben Vorrang (Hybridmodell mit Vorrangregel).'; + +-- ============================================================ +-- Kommentare für Dokumentation +-- ============================================================ +COMMENT ON TABLE workflow_definitions IS 'Workflow-Graphen (Knoten + Kanten) als JSONB. Erweitert die Prompt Engine um verzweigbare, bedingte Analysen.'; +COMMENT ON TABLE workflow_question_catalog IS 'Katalog der 6 Grundtypen von Fragenergänzungen mit Antwortspektren und Normalisierungsregeln.'; +COMMENT ON TABLE workflow_executions IS 'Ausführungs-Log für Workflow-Runs mit Knoten-Status und detailliertem Ablauf.'; + +COMMENT ON COLUMN workflow_definitions.graph IS 'JSONB: {nodes: [{id, type, prompt_slug, question_augmentations, position}, ...], edges: [{id, from, to}, ...]}'; +COMMENT ON COLUMN workflow_executions.node_states IS 'JSONB: {node_id: {status: "executed|skipped|unclear|failed", ...}, ...}'; +COMMENT ON COLUMN workflow_executions.execution_log IS 'JSONB: Chronologischer Ablauf mit Timestamps, Entscheidungen, normalisierten Signalen.'; diff --git a/backend/prompt_executor.py b/backend/prompt_executor.py index 656868a..4bd247f 100644 --- a/backend/prompt_executor.py +++ b/backend/prompt_executor.py @@ -192,6 +192,10 @@ async def execute_prompt( # Pipeline prompt: multi-stage execution return await execute_pipeline_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) + elif prompt_type == 'workflow': + # Workflow prompt: graph-based execution (Phase 0: Foundation) + return await execute_workflow_prompt(prompt, variables, openrouter_call_func, enable_debug, catalog) + else: raise HTTPException(400, f"Unknown prompt type: {prompt_type}") @@ -524,3 +528,44 @@ async def execute_prompt_with_data( # Execute prompt return await execute_prompt(prompt_slug, variables, openrouter_call_func, enable_debug) + + +async def execute_workflow_prompt( + prompt: Dict, + variables: Dict[str, Any], + openrouter_call_func, + enable_debug: bool = False, + catalog: Optional[Dict] = None +) -> Dict[str, Any]: + """ + Execute a workflow-type prompt (graph-based execution). + + Phase 0: Stub-Implementierung + Phase 1-3: Vollständige Implementierung in workflow_engine.py + + Args: + prompt: Prompt dict from database + variables: Dict of variables for placeholder replacement + openrouter_call_func: Async function(prompt_text) -> response_text + enable_debug: If True, include debug information in response + catalog: Optional placeholder catalog + + Returns: + Dict with execution results: + { + "type": "workflow", + "slug": "...", + "output": {...}, + "execution_id": "...", # UUID of workflow_executions entry + "node_states": {...}, # Status per node + "debug": {...} # Only if enable_debug=True + } + """ + # Phase 0: Stub implementation + # Workflow execution will be implemented in Phase 1-3 + # For now: Return error to prevent accidental use + raise HTTPException( + status_code=501, + detail="Workflow-Execution noch nicht implementiert (Phase 0: Foundation). " + "Vollständige Implementierung erfolgt in Phase 1-3." + ) diff --git a/backend/version.py b/backend/version.py new file mode 100644 index 0000000..3665107 --- /dev/null +++ b/backend/version.py @@ -0,0 +1,56 @@ +""" +Application Version Information + +Semantic Versioning: MAJOR.MINOR.PATCH +- MAJOR: Breaking Change, DB-Migration inkompatibel +- MINOR: Neues Feature, neues Modul +- PATCH: Bugfix, kleine Änderung, Refactor +""" + +APP_VERSION = "0.9i" +BUILD_DATE = "2026-04-03" +DB_SCHEMA_VERSION = "20260403" # Migration 034 + +MODULE_VERSIONS = { + "auth": "1.2.0", + "profiles": "1.1.0", + "weight": "1.0.3", + "circumference": "1.0.1", + "caliper": "1.0.1", + "activity": "1.1.0", + "nutrition": "1.0.2", + "photos": "1.0.0", + "insights": "1.3.0", + "prompts": "1.1.0", + "admin": "1.2.0", + "stats": "1.0.1", + "exportdata": "1.1.0", + "importdata": "1.0.0", + "membership": "2.1.0", + "workflow": "0.1.0", # Phase 0: Foundation +} + +CHANGELOG = [ + { + "version": "0.9i", + "date": "2026-04-03", + "changes": [ + "Phase 0: Workflow Engine Foundation", + "DB-Migration 034: workflow_definitions, workflow_question_catalog, workflow_executions", + "Pydantic-Modelle für Workflow-Graph (WorkflowGraph, Node, Edge, Condition)", + "Graph-Parsing, Topologische Sortierung, DAG-Validierung", + "Dispatcher-Erweiterung: type='workflow' (Stub-Implementierung)", + "Unit-Tests für Phase 0 (Graph-Parsing, Zyklen-Erkennung, Erreichbarkeit)", + ] + }, + { + "version": "0.9h+", + "date": "2026-03-28", + "changes": [ + "Phase 0c: Multi-Layer Data Architecture Complete", + "Data Layer Migration (97 Funktionen in 6 Modulen)", + "20 neue Chart Endpoints (E1-E5, A1-A8, R1-R5, C1-C4)", + "Single Source of Truth für Datenberechnungen", + ] + }, +] diff --git a/backend/workflow_engine.py b/backend/workflow_engine.py new file mode 100644 index 0000000..8be8086 --- /dev/null +++ b/backend/workflow_engine.py @@ -0,0 +1,393 @@ +""" +Workflow Engine (Phase 0: Foundation) + +Graph-Parsing, topologische Sortierung, DAG-Validierung. + +Konzept-Basis: konzept_workflow_engine_konsolidated.md +Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md + +Phase 0: Foundation (Graph-Parsing, Validation) +Phase 1-3: Vollständige Execution-Logic +""" +from typing import Dict, List, Set, Optional, Tuple, Any +from workflow_models import ( + WorkflowGraph, + WorkflowNode, + WorkflowEdge, + NodeType, + NodeStatus +) +from fastapi import HTTPException + + +class WorkflowEngine: + """ + Workflow-Execution-Engine für graph-basierte Prompt-Workflows. + + Phase 0: Graph-Parsing und Validierung + Phase 1-3: Execution-Logic + """ + + def __init__(self, graph: WorkflowGraph): + """ + Initialisiere Engine mit Workflow-Graph. + + Args: + graph: Workflow-Graph (Knoten + Kanten) + + Raises: + HTTPException: Bei ungültigem Graph (Zyklen, fehlende Knoten, etc.) + """ + self.graph = graph + self.nodes_by_id: Dict[str, WorkflowNode] = {node.id: node for node in graph.nodes} + self.edges_by_id: Dict[str, WorkflowEdge] = {edge.id: edge for edge in graph.edges} + + # Adjacency lists für Traversierung + self.outgoing_edges: Dict[str, List[WorkflowEdge]] = {} + self.incoming_edges: Dict[str, List[WorkflowEdge]] = {} + + # Build adjacency lists + for edge in graph.edges: + # Outgoing edges (from node) + if edge.from_node not in self.outgoing_edges: + self.outgoing_edges[edge.from_node] = [] + self.outgoing_edges[edge.from_node].append(edge) + + # Incoming edges (to node) + if edge.to_node not in self.incoming_edges: + self.incoming_edges[edge.to_node] = [] + self.incoming_edges[edge.to_node].append(edge) + + # Validiere Graph + self._validate_graph() + + # Topologische Sortierung + self.topological_order = self._topological_sort() + + def _validate_graph(self): + """ + Validiere Workflow-Graph. + + Prüfungen: + 1. Alle referenzierten Knoten existieren + 2. Genau ein START-Knoten + 3. Mindestens ein END-Knoten + 4. Keine Zyklen (DAG) + 5. Alle Knoten erreichbar vom START + 6. Alle Knoten können END erreichen + + Raises: + HTTPException: Bei Validierungsfehlern + """ + errors = [] + + # 1. Prüfe ob alle referenzierten Knoten existieren + for edge in self.graph.edges: + if edge.from_node not in self.nodes_by_id: + errors.append(f"Edge {edge.id}: from_node '{edge.from_node}' existiert nicht") + if edge.to_node not in self.nodes_by_id: + errors.append(f"Edge {edge.id}: to_node '{edge.to_node}' existiert nicht") + + if errors: + raise HTTPException(400, {"error": "Ungültiger Graph", "details": errors}) + + # 2. Genau ein START-Knoten + start_nodes = [n for n in self.graph.nodes if n.type == NodeType.START] + if len(start_nodes) == 0: + errors.append("Kein START-Knoten gefunden") + elif len(start_nodes) > 1: + errors.append(f"Mehrere START-Knoten gefunden: {[n.id for n in start_nodes]}") + + # 3. Mindestens ein END-Knoten + end_nodes = [n for n in self.graph.nodes if n.type == NodeType.END] + if len(end_nodes) == 0: + errors.append("Kein END-Knoten gefunden") + + if errors: + raise HTTPException(400, {"error": "Ungültiger Graph", "details": errors}) + + # 4. Keine Zyklen (DAG-Prüfung) + cycle = self._detect_cycle() + if cycle: + errors.append(f"Zyklus erkannt: {' → '.join(cycle)}") + + if errors: + raise HTTPException(400, {"error": "Ungültiger Graph (Zyklus)", "details": errors}) + + # 5. Alle Knoten erreichbar vom START + start_node = start_nodes[0] + reachable = self._get_reachable_nodes(start_node.id) + unreachable = [n.id for n in self.graph.nodes if n.id not in reachable] + if unreachable: + errors.append(f"Knoten nicht erreichbar vom START: {unreachable}") + + # 6. Alle Knoten können END erreichen (Rückwärts-Prüfung) + end_nodes_ids = [n.id for n in end_nodes] + can_reach_end = self._get_nodes_reaching_end(end_nodes_ids) + cannot_reach_end = [n.id for n in self.graph.nodes if n.id not in can_reach_end] + if cannot_reach_end: + errors.append(f"Knoten können END nicht erreichen: {cannot_reach_end}") + + if errors: + raise HTTPException(400, {"error": "Ungültiger Graph (Erreichbarkeit)", "details": errors}) + + def _detect_cycle(self) -> Optional[List[str]]: + """ + Erkenne Zyklen im Graph mittels DFS. + + Returns: + Liste der Knoten im Zyklus, oder None wenn kein Zyklus + """ + visited: Set[str] = set() + rec_stack: Set[str] = set() # Recursion stack für DFS + parent: Dict[str, Optional[str]] = {} + + def dfs(node_id: str) -> Optional[List[str]]: + visited.add(node_id) + rec_stack.add(node_id) + + # Besuche alle Nachbarn + for edge in self.outgoing_edges.get(node_id, []): + neighbor = edge.to_node + + if neighbor not in visited: + parent[neighbor] = node_id + cycle = dfs(neighbor) + if cycle: + return cycle + elif neighbor in rec_stack: + # Zyklus gefunden! Rekonstruiere Pfad + cycle_path = [neighbor] + current = node_id + while current != neighbor: + cycle_path.append(current) + current = parent.get(current) + cycle_path.append(neighbor) # Schließe Zyklus + return list(reversed(cycle_path)) + + rec_stack.remove(node_id) + return None + + # Starte DFS von allen Knoten (für disconnected components) + for node in self.graph.nodes: + if node.id not in visited: + parent[node.id] = None + cycle = dfs(node.id) + if cycle: + return cycle + + return None + + def _get_reachable_nodes(self, start_node_id: str) -> Set[str]: + """ + Finde alle vom Startknoten aus erreichbaren Knoten (Vorwärts-Traversierung). + + Args: + start_node_id: ID des Startknotens + + Returns: + Set aller erreichbaren Knoten-IDs + """ + reachable: Set[str] = set() + stack = [start_node_id] + + while stack: + current = stack.pop() + if current in reachable: + continue + reachable.add(current) + + # Füge alle Nachbarn hinzu + for edge in self.outgoing_edges.get(current, []): + stack.append(edge.to_node) + + return reachable + + def _get_nodes_reaching_end(self, end_node_ids: List[str]) -> Set[str]: + """ + Finde alle Knoten die mindestens einen END-Knoten erreichen können. + + Rückwärts-Traversierung von END-Knoten. + + Args: + end_node_ids: Liste der END-Knoten-IDs + + Returns: + Set aller Knoten-IDs die END erreichen können + """ + can_reach_end: Set[str] = set() + stack = list(end_node_ids) + + while stack: + current = stack.pop() + if current in can_reach_end: + continue + can_reach_end.add(current) + + # Füge alle Vorgänger hinzu (rückwärts) + for edge in self.incoming_edges.get(current, []): + stack.append(edge.from_node) + + return can_reach_end + + def _topological_sort(self) -> List[str]: + """ + Berechne topologische Sortierung des Graphen (Kahn's Algorithm). + + Die topologische Sortierung gibt eine Reihenfolge vor, in der Knoten + ausgeführt werden können, ohne dass Abhängigkeiten verletzt werden. + + Returns: + Liste der Knoten-IDs in topologischer Reihenfolge + + Raises: + HTTPException: Bei Zyklen (sollte durch _validate_graph verhindert sein) + """ + # Berechne In-Degree für jeden Knoten + in_degree: Dict[str, int] = {node.id: 0 for node in self.graph.nodes} + for edge in self.graph.edges: + in_degree[edge.to_node] += 1 + + # Queue mit Knoten ohne Vorgänger (In-Degree = 0) + queue = [node_id for node_id, degree in in_degree.items() if degree == 0] + sorted_order = [] + + while queue: + # Entferne Knoten mit In-Degree 0 + current = queue.pop(0) + sorted_order.append(current) + + # Reduziere In-Degree aller Nachbarn + for edge in self.outgoing_edges.get(current, []): + neighbor = edge.to_node + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + queue.append(neighbor) + + # Wenn nicht alle Knoten sortiert wurden: Zyklus (sollte nicht passieren) + if len(sorted_order) != len(self.graph.nodes): + raise HTTPException( + 500, + "Topologische Sortierung fehlgeschlagen (Zyklus?). " + "Dies sollte durch _validate_graph verhindert worden sein." + ) + + return sorted_order + + def get_execution_order(self) -> List[List[str]]: + """ + Berechne Ausführungs-Reihenfolge als Ebenen (für parallele Execution). + + Knoten auf derselben Ebene können parallel ausgeführt werden. + + Returns: + Liste von Ebenen, jede Ebene ist eine Liste von Knoten-IDs: + [ + ["node_start"], + ["node_1", "node_2"], # können parallel laufen + ["node_3"], + ["node_end"] + ] + """ + # Berechne Level für jeden Knoten (längster Pfad vom START) + levels: Dict[str, int] = {} + + for node_id in self.topological_order: + # Finde maximales Level aller Vorgänger + incoming = self.incoming_edges.get(node_id, []) + if not incoming: + # Knoten ohne Vorgänger (START) + levels[node_id] = 0 + else: + max_parent_level = max(levels[edge.from_node] for edge in incoming) + levels[node_id] = max_parent_level + 1 + + # Gruppiere Knoten nach Level + max_level = max(levels.values()) if levels else 0 + execution_order = [[] for _ in range(max_level + 1)] + + for node_id, level in levels.items(): + execution_order[level].append(node_id) + + return execution_order + + async def execute( + self, + variables: Dict[str, Any], + openrouter_call_func, + profile_id: str, + enable_debug: bool = False + ) -> Dict[str, Any]: + """ + Führe Workflow aus. + + Phase 0: Stub-Implementierung + Phase 1-3: Vollständige Implementierung mit: + - Fragenergänzung + - Normalisierung + - Logik-Auswertung + - Pfad-Routing + - Join-Konsolidierung + + Args: + variables: Dict of variables for placeholder replacement + openrouter_call_func: Async function(prompt_text) -> response_text + profile_id: User profile ID + enable_debug: If True, include debug information + + Returns: + Dict with execution results + + Raises: + HTTPException: 501 Not Implemented (Phase 0) + """ + raise HTTPException( + status_code=501, + detail="Workflow-Execution noch nicht implementiert (Phase 0: Foundation). " + "Vollständige Implementierung erfolgt in Phase 1-3." + ) + + +def parse_workflow_graph(graph_jsonb: Dict) -> WorkflowGraph: + """ + Parse JSONB-Graph aus Datenbank zu Pydantic WorkflowGraph-Modell. + + Args: + graph_jsonb: JSONB dict aus workflow_definitions.graph + + Returns: + Validiertes WorkflowGraph-Objekt + + Raises: + ValidationError: Bei ungültigem Graph-Format + """ + return WorkflowGraph(**graph_jsonb) + + +def validate_workflow_graph(graph: WorkflowGraph) -> Tuple[bool, List[str]]: + """ + Validiere Workflow-Graph (ohne Engine zu initialisieren). + + Kann für UI-Validierung verwendet werden (vor dem Speichern). + + Args: + graph: Workflow-Graph + + Returns: + Tuple (is_valid, errors) + - is_valid: True wenn Graph gültig + - errors: Liste von Fehler-Strings (leer wenn valid) + """ + try: + engine = WorkflowEngine(graph) + return True, [] + except HTTPException as e: + # Extrahiere Fehler aus HTTPException detail + detail = e.detail + if isinstance(detail, dict): + errors = detail.get('details', [detail.get('error', str(e))]) + else: + errors = [str(detail)] + return False, errors + except Exception as e: + return False, [f"Unerwarteter Fehler: {str(e)}"] diff --git a/backend/workflow_models.py b/backend/workflow_models.py new file mode 100644 index 0000000..6c45df7 --- /dev/null +++ b/backend/workflow_models.py @@ -0,0 +1,280 @@ +""" +Pydantic Models for Workflow Engine (Phase 0) + +Data validation schemas for Workflow-Graph, Knoten, Kanten, Bedingungen. + +Konzept-Basis: konzept_workflow_engine_konsolidated.md +Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md +""" +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field +from enum import Enum + + +# ── Enums ───────────────────────────────────────────────────────────────────── + +class NodeType(str, Enum): + """Workflow-Knotentypen""" + START = "start" + ANALYSIS = "analysis" + LOGIC = "logic" + JOIN = "join" + END = "end" + + +class JoinStrategy(str, Enum): + """Join-Strategien für Pfad-Konsolidierung""" + WAIT_ALL = "wait_all" # Warte auf alle eingehenden Pfade + WAIT_ANY = "wait_any" # Warte auf mindestens einen Pfad + BEST_EFFORT = "best_effort" # Verwende verfügbare Pfade + + +class SkipHandling(str, Enum): + """Umgang mit übersprungenen Pfaden am Join""" + IGNORE_SKIPPED = "ignore_skipped" # Übersprungene Pfade ignorieren + USE_PLACEHOLDER = "use_placeholder" # Platzhalter für übersprungene Pfade + REQUIRE_MINIMUM = "require_minimum" # Mindestanzahl erforderlich + + +class FallbackStrategy(str, Enum): + """Fallback-Strategien bei unklaren/ungültigen Signalen""" + CONSERVATIVE_SKIP = "conservative_skip" # Konservativ: Pfad überspringen + DEFAULT_PATH = "default_path" # Standard-Pfad ausführen + UNCERTAINTY_PATH = "uncertainty_path" # Expliziter Unsicherheits-Pfad + DOCUMENT_ONLY = "document_only" # Nur dokumentieren, kein Routing + + +class NodeStatus(str, Enum): + """Ausführungsstatus eines Knotens""" + PENDING = "pending" + EXECUTED = "executed" + SKIPPED = "skipped" + UNCLEAR = "unclear" + FAILED = "failed" + + +class LogicOperator(str, Enum): + """Logische Operatoren für Bedingungen""" + EQ = "eq" # == + NEQ = "neq" # != + IN = "in" # in + NOT_IN = "not_in" # not in + GT = "gt" # > + LT = "lt" # < + GTE = "gte" # >= + LTE = "lte" # <= + AND = "and" + OR = "or" + NOT = "not" + + +# ── Hilfsmodelle ────────────────────────────────────────────────────────────── + +class Position(BaseModel): + """Position eines Knotens im visuellen Editor""" + x: float + y: float + + +class QuestionAugmentation(BaseModel): + """ + Fragenergänzung zu einem Analyseprompt. + + Hybridmodell (Sektion 6 der Anforderungsanalyse): + - Primär: Knotengebunden (am Workflow-Knoten definiert) + - Sekundär: Prompt-gebundene Standardfragen (optional) + - Vorrangregel: Knotenspezifische überschreiben Prompt-Defaults + """ + id: str = Field(..., description="Eindeutige ID der Frage (für Referenzierung in Logik-Knoten)") + type: str = Field(..., description="Fragetyp: relevanz, prioritaet, selektion, ausschluss, eskalation, unsicherheit") + question: str = Field(..., description="Fragetext (kann Template-Variablen enthalten)") + answer_spectrum: List[str] = Field(..., description="Erlaubte Antworten, z.B. ['ja', 'nein', 'unklar']") + + +# ── Bedingungsmodelle ───────────────────────────────────────────────────────── + +class LogicOperand(BaseModel): + """ + Operand einer Logik-Bedingung. + + Referenziert normalisierte Signale aus vorangegangenen Knoten. + Format: "node_id.question_id" (z.B. "node_1.q1") + """ + ref: str = Field(..., description="Referenz zum Signal: node_id.question_id") + operator: LogicOperator = Field(..., description="Vergleichsoperator") + value: Any = Field(..., description="Vergleichswert") + + +class LogicExpression(BaseModel): + """ + Logik-Ausdruck (verschachtelbar). + + Beispiel: + { + "operator": "and", + "operands": [ + {"ref": "node_1.q1", "operator": "eq", "value": "ja"}, + {"ref": "node_1.q2", "operator": "in", "value": ["hoch", "mittel"]} + ] + } + + oder verschachtelt: + { + "operator": "or", + "operands": [ + { + "operator": "and", + "operands": [...] + }, + {"ref": "node_2.q1", "operator": "eq", "value": "ja"} + ] + } + """ + operator: LogicOperator = Field(..., description="Logischer Operator (and, or, not) oder Vergleichsoperator") + operands: Optional[List[Any]] = Field(None, description="Liste von Operanden (LogicOperand oder verschachtelte LogicExpression)") + # Bei einfachem Vergleich: + ref: Optional[str] = Field(None, description="Signal-Referenz (nur bei Vergleichsoperatoren)") + value: Optional[Any] = Field(None, description="Vergleichswert (nur bei Vergleichsoperatoren)") + + +class Condition(BaseModel): + """ + Bedingung für einen Logik-Knoten. + + Unterstützt if/else-if/else-Logik. + """ + type: str = Field(default="if", description="Bedingungstyp: if, else-if, else") + expression: Optional[LogicExpression] = Field(None, description="Logischer Ausdruck (null bei 'else')") + then_path: Optional[str] = Field(None, description="Edge-ID für 'then'-Pfad") + else_path: Optional[str] = Field(None, description="Edge-ID für 'else'-Pfad") + + +class FallbackConfig(BaseModel): + """Fallback-Konfiguration für Logik-Knoten""" + strategy: FallbackStrategy = Field(..., description="Fallback-Strategie") + on_unclear: Optional[str] = Field(None, description="Edge-ID für Unsicherheits-Pfad") + on_invalid: Optional[str] = Field(None, description="Edge-ID bei ungültigen Signalen") + + +# ── Workflow-Graph-Modelle ──────────────────────────────────────────────────── + +class WorkflowNode(BaseModel): + """ + Workflow-Knoten (Teil des Graph-JSONB). + + Verschiedene Typen haben unterschiedliche Felder: + - START/END: nur id, type, position + - ANALYSIS: prompt_slug, question_augmentations + - LOGIC: condition, fallback + - JOIN: join_strategy, skip_handling + """ + id: str = Field(..., description="Eindeutige Knoten-ID") + type: NodeType = Field(..., description="Knotentyp") + position: Optional[Position] = Field(None, description="Position im visuellen Editor") + + # ANALYSIS-Knoten + prompt_slug: Optional[str] = Field(None, description="Slug des auszuführenden Prompts") + question_augmentations: Optional[List[QuestionAugmentation]] = Field(None, description="Fragenergänzungen (knotengebunden, überschreiben Prompt-Defaults)") + + # LOGIC-Knoten + condition: Optional[Condition] = Field(None, description="Bedingung für Pfad-Routing") + fallback: Optional[FallbackConfig] = Field(None, description="Fallback-Konfiguration") + + # JOIN-Knoten + join_strategy: Optional[JoinStrategy] = Field(None, description="Join-Strategie") + skip_handling: Optional[SkipHandling] = Field(None, description="Umgang mit übersprungenen Pfaden") + + +class WorkflowEdge(BaseModel): + """ + Workflow-Kante (Verbindung zwischen Knoten). + """ + model_config = {"populate_by_name": True} # Erlaubt sowohl 'from_node' als auch 'from' (Alias) + + id: str = Field(..., description="Eindeutige Edge-ID") + from_node: str = Field(..., alias="from", description="Quell-Knoten-ID") + to_node: str = Field(..., alias="to", description="Ziel-Knoten-ID") + label: Optional[str] = Field(None, description="Label für visuelle Darstellung (z.B. 'then', 'else')") + + +class WorkflowGraph(BaseModel): + """ + Workflow-Graph (gespeichert als JSONB in workflow_definitions.graph). + + Repräsentiert einen DAG (Directed Acyclic Graph). + """ + nodes: List[WorkflowNode] = Field(..., description="Liste aller Knoten") + edges: List[WorkflowEdge] = Field(..., description="Liste aller Kanten") + + +# ── Workflow-Definition (DB-Modell) ─────────────────────────────────────────── + +class WorkflowDefinitionCreate(BaseModel): + """Request-Modell für Workflow-Erstellung""" + name: str + slug: str + description: Optional[str] = None + graph: WorkflowGraph + + +class WorkflowDefinitionUpdate(BaseModel): + """Request-Modell für Workflow-Update""" + name: Optional[str] = None + description: Optional[str] = None + graph: Optional[WorkflowGraph] = None + active: Optional[bool] = None + + +class WorkflowDefinition(BaseModel): + """Response-Modell für Workflow-Definition""" + id: str + name: str + slug: str + description: Optional[str] = None + graph: WorkflowGraph + version: int + active: bool + created_at: str + updated_at: str + + +# ── Workflow-Execution (DB-Modell) ──────────────────────────────────────────── + +class NodeState(BaseModel): + """Ausführungsstatus eines Knotens""" + status: NodeStatus + result: Optional[Dict[str, Any]] = Field(None, description="Ergebnis der Knoten-Ausführung (Prompt-Output, normalisierte Signale, etc.)") + timestamp: Optional[str] = Field(None, description="Zeitpunkt der Ausführung") + error: Optional[str] = Field(None, description="Fehlermeldung bei Status=failed") + + +class WorkflowExecutionCreate(BaseModel): + """Request-Modell für Workflow-Ausführung (Start)""" + workflow_id: str + # profile_id kommt aus Session + + +class WorkflowExecution(BaseModel): + """Response-Modell für Workflow-Execution""" + id: str + workflow_id: str + profile_id: str + status: str + node_states: Optional[Dict[str, NodeState]] = None + execution_log: Optional[List[Dict[str, Any]]] = None + started_at: str + completed_at: Optional[str] = None + + +# ── Question Catalog (DB-Modell) ────────────────────────────────────────────── + +class QuestionCatalogEntry(BaseModel): + """Eintrag im Fragenkatalog""" + id: str + question_type: str + label: str + question_template: str + answer_spectrum: List[str] + normalization_rules: Optional[Dict[str, Any]] = None + active: bool + created_at: str diff --git a/tests/backend/test_workflow_engine.py b/tests/backend/test_workflow_engine.py new file mode 100644 index 0000000..1f91608 --- /dev/null +++ b/tests/backend/test_workflow_engine.py @@ -0,0 +1,413 @@ +""" +Unit Tests für Workflow Engine (Phase 0: Foundation) + +Tests für: +- Graph-Parsing +- Topologische Sortierung +- DAG-Validierung (Zyklen-Erkennung) +- Erreichbarkeits-Prüfungen + +Run with: pytest tests/backend/test_workflow_engine.py -v +""" +import pytest +import sys +import os + +# Add backend to path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../backend')) + +from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge, NodeType, Position +from workflow_engine import WorkflowEngine, parse_workflow_graph, validate_workflow_graph +from fastapi import HTTPException + + +# ── Fixtures ────────────────────────────────────────────────────────────────── + +@pytest.fixture +def simple_valid_graph(): + """Einfacher gültiger Graph: START → ANALYSIS → END""" + return WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test_prompt"), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="analysis1"), + WorkflowEdge(id="e2", from_node="analysis1", to_node="end") + ] + ) + + +@pytest.fixture +def parallel_graph(): + """Graph mit Parallelität: START → (A1 || A2) → JOIN → END""" + return WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)), + WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="prompt1"), + WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="prompt2"), + WorkflowNode(id="join", type=NodeType.JOIN, position=Position(x=200, y=50)), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="analysis1"), + WorkflowEdge(id="e2", from_node="start", to_node="analysis2"), + WorkflowEdge(id="e3", from_node="analysis1", to_node="join"), + WorkflowEdge(id="e4", from_node="analysis2", to_node="join"), + WorkflowEdge(id="e5", from_node="join", to_node="end") + ] + ) + + +@pytest.fixture +def branching_graph(): + """Graph mit Verzweigung: START → LOGIC → (A1 | A2) → END""" + return WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=50)), + WorkflowNode(id="logic1", type=NodeType.LOGIC, position=Position(x=100, y=50)), + WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="prompt1"), + WorkflowNode(id="analysis2", type=NodeType.ANALYSIS, position=Position(x=200, y=100), prompt_slug="prompt2"), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=50)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="logic1"), + WorkflowEdge(id="e2", from_node="logic1", to_node="analysis1", label="then"), + WorkflowEdge(id="e3", from_node="logic1", to_node="analysis2", label="else"), + WorkflowEdge(id="e4", from_node="analysis1", to_node="end"), + WorkflowEdge(id="e5", from_node="analysis2", to_node="end") + ] + ) + + +# ── Test: Graph-Parsing ─────────────────────────────────────────────────────── + +def test_parse_workflow_graph_valid(simple_valid_graph): + """Test: Gültiger Graph wird korrekt geparst""" + graph_dict = simple_valid_graph.model_dump() + parsed = parse_workflow_graph(graph_dict) + + assert len(parsed.nodes) == 3 + assert len(parsed.edges) == 2 + assert parsed.nodes[0].type == NodeType.START + assert parsed.nodes[2].type == NodeType.END + + +def test_parse_workflow_graph_invalid_format(): + """Test: Ungültiges Format wirft ValidationError""" + invalid_graph = {"nodes": "not a list", "edges": []} + + with pytest.raises(Exception): # Pydantic ValidationError + parse_workflow_graph(invalid_graph) + + +# ── Test: Graph-Validierung ────────────────────────────────────────────────── + +def test_valid_graph_initialization(simple_valid_graph): + """Test: Gültiger Graph kann initialisiert werden""" + engine = WorkflowEngine(simple_valid_graph) + + assert len(engine.nodes_by_id) == 3 + assert len(engine.edges_by_id) == 2 + assert engine.topological_order == ["start", "analysis1", "end"] + + +def test_validate_graph_no_start_node(): + """Test: Graph ohne START-Knoten wird abgelehnt""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=0, y=0), prompt_slug="test"), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0)) + ], + edges=[WorkflowEdge(id="e1", from_node="analysis1", to_node="end")] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "Kein START-Knoten" in str(exc_info.value.detail) + + +def test_validate_graph_no_end_node(): + """Test: Graph ohne END-Knoten wird abgelehnt""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="analysis1", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test") + ], + edges=[WorkflowEdge(id="e1", from_node="start", to_node="analysis1")] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "Kein END-Knoten" in str(exc_info.value.detail) + + +def test_validate_graph_multiple_start_nodes(): + """Test: Graph mit mehreren START-Knoten wird abgelehnt""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start1", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="start2", type=NodeType.START, position=Position(x=0, y=100)), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start1", to_node="end"), + WorkflowEdge(id="e2", from_node="start2", to_node="end") + ] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "Mehrere START-Knoten" in str(exc_info.value.detail) + + +def test_validate_graph_missing_node_reference(): + """Test: Edge referenziert nicht-existierenden Knoten""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=100, y=0)) + ], + edges=[WorkflowEdge(id="e1", from_node="start", to_node="nonexistent")] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "existiert nicht" in str(exc_info.value.detail) + + +# ── Test: Zyklen-Erkennung ──────────────────────────────────────────────────── + +def test_detect_cycle_simple(): + """Test: Einfacher Zyklus wird erkannt (A → B → A)""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), + WorkflowNode(id="b", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=300, y=0)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="a"), + WorkflowEdge(id="e2", from_node="a", to_node="b"), + WorkflowEdge(id="e3", from_node="b", to_node="a"), # Zyklus! + WorkflowEdge(id="e4", from_node="b", to_node="end") + ] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "Zyklus erkannt" in str(exc_info.value.detail) + + +def test_detect_cycle_self_loop(): + """Test: Selbst-Zyklus wird erkannt (A → A)""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="a"), + WorkflowEdge(id="e2", from_node="a", to_node="a"), # Selbst-Zyklus! + WorkflowEdge(id="e3", from_node="a", to_node="end") + ] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "Zyklus erkannt" in str(exc_info.value.detail) + + +def test_no_cycle_branching(branching_graph): + """Test: Verzweigung ohne Zyklus wird akzeptiert""" + engine = WorkflowEngine(branching_graph) + assert engine is not None # Kein Fehler + + +# ── Test: Erreichbarkeit ────────────────────────────────────────────────────── + +def test_unreachable_node_from_start(): + """Test: Nicht vom START erreichbarer Knoten wird erkannt""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), + WorkflowNode(id="isolated", type=NodeType.ANALYSIS, position=Position(x=100, y=100), prompt_slug="test"), # Isoliert! + WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=0)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="a"), + WorkflowEdge(id="e2", from_node="a", to_node="end") + # 'isolated' ist nicht verbunden + ] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "nicht erreichbar vom START" in str(exc_info.value.detail) + + +def test_node_cannot_reach_end(): + """Test: Knoten der END nicht erreichen kann wird erkannt""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)), + WorkflowNode(id="a", type=NodeType.ANALYSIS, position=Position(x=100, y=0), prompt_slug="test"), + WorkflowNode(id="dead_end", type=NodeType.ANALYSIS, position=Position(x=200, y=0), prompt_slug="test"), + WorkflowNode(id="end", type=NodeType.END, position=Position(x=200, y=100)) + ], + edges=[ + WorkflowEdge(id="e1", from_node="start", to_node="a"), + WorkflowEdge(id="e2", from_node="a", to_node="dead_end") + # 'dead_end' kann END nicht erreichen (keine Verbindung) + ] + ) + + with pytest.raises(HTTPException) as exc_info: + WorkflowEngine(graph) + + assert exc_info.value.status_code == 400 + assert "END nicht erreichen" in str(exc_info.value.detail) + + +# ── Test: Topologische Sortierung ───────────────────────────────────────────── + +def test_topological_sort_simple(simple_valid_graph): + """Test: Einfacher linearer Graph hat korrekte topologische Sortierung""" + engine = WorkflowEngine(simple_valid_graph) + assert engine.topological_order == ["start", "analysis1", "end"] + + +def test_topological_sort_parallel(parallel_graph): + """Test: Paralleler Graph - Topologische Sortierung""" + engine = WorkflowEngine(parallel_graph) + + # START muss zuerst kommen + assert engine.topological_order[0] == "start" + + # analysis1 und analysis2 können in beliebiger Reihenfolge kommen (parallel) + assert set(engine.topological_order[1:3]) == {"analysis1", "analysis2"} + + # JOIN kommt nach beiden Analysen + assert engine.topological_order[3] == "join" + + # END kommt zuletzt + assert engine.topological_order[4] == "end" + + +def test_topological_sort_branching(branching_graph): + """Test: Verzweigter Graph - Topologische Sortierung""" + engine = WorkflowEngine(branching_graph) + + # START → LOGIC muss zuerst kommen + assert engine.topological_order[:2] == ["start", "logic1"] + + # analysis1 und analysis2 können in beliebiger Reihenfolge kommen (alternative Pfade) + assert set(engine.topological_order[2:4]) == {"analysis1", "analysis2"} + + # END kommt zuletzt + assert engine.topological_order[4] == "end" + + +# ── Test: Ausführungs-Ebenen (Parallelität) ─────────────────────────────────── + +def test_execution_order_simple(simple_valid_graph): + """Test: Einfacher Graph hat 3 Ebenen (keine Parallelität)""" + engine = WorkflowEngine(simple_valid_graph) + execution_order = engine.get_execution_order() + + assert len(execution_order) == 3 + assert execution_order[0] == ["start"] + assert execution_order[1] == ["analysis1"] + assert execution_order[2] == ["end"] + + +def test_execution_order_parallel(parallel_graph): + """Test: Paralleler Graph - Ebene 2 hat 2 Knoten (können parallel laufen)""" + engine = WorkflowEngine(parallel_graph) + execution_order = engine.get_execution_order() + + assert len(execution_order) == 4 + assert execution_order[0] == ["start"] + assert set(execution_order[1]) == {"analysis1", "analysis2"} # Parallel! + assert execution_order[2] == ["join"] + assert execution_order[3] == ["end"] + + +def test_execution_order_branching(branching_graph): + """Test: Verzweigter Graph - Alternative Pfade sind auf derselben Ebene""" + engine = WorkflowEngine(branching_graph) + execution_order = engine.get_execution_order() + + assert len(execution_order) == 4 + assert execution_order[0] == ["start"] + assert execution_order[1] == ["logic1"] + assert set(execution_order[2]) == {"analysis1", "analysis2"} # Alternative Pfade + assert execution_order[3] == ["end"] + + +# ── Test: Validierungs-Hilfsfunktion ────────────────────────────────────────── + +def test_validate_workflow_graph_valid(simple_valid_graph): + """Test: Hilfsfunktion validate_workflow_graph für gültigen Graph""" + is_valid, errors = validate_workflow_graph(simple_valid_graph) + + assert is_valid is True + assert errors == [] + + +def test_validate_workflow_graph_invalid(): + """Test: Hilfsfunktion validate_workflow_graph für ungültigen Graph""" + graph = WorkflowGraph( + nodes=[ + WorkflowNode(id="start", type=NodeType.START, position=Position(x=0, y=0)) + # Kein END-Knoten! + ], + edges=[] + ) + + is_valid, errors = validate_workflow_graph(graph) + + assert is_valid is False + assert len(errors) > 0 + assert any("END-Knoten" in str(e) for e in errors) + + +# ── Test: Adjacency Lists ───────────────────────────────────────────────────── + +def test_adjacency_lists_creation(parallel_graph): + """Test: Adjacency Lists werden korrekt erstellt""" + engine = WorkflowEngine(parallel_graph) + + # Outgoing edges vom START + start_outgoing = engine.outgoing_edges.get("start", []) + assert len(start_outgoing) == 2 + assert set(e.to_node for e in start_outgoing) == {"analysis1", "analysis2"} + + # Incoming edges zu JOIN + join_incoming = engine.incoming_edges.get("join", []) + assert len(join_incoming) == 2 + assert set(e.from_node for e in join_incoming) == {"analysis1", "analysis2"} + + +# ── Run Tests ───────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + pytest.main([__file__, "-v"])