diff --git a/backend/main.py b/backend/main.py index de2e90a..22d2717 100644 --- a/backend/main.py +++ b/backend/main.py @@ -27,6 +27,7 @@ from routers import goals, focus_areas # v9e/v9g Goal System v2.0 (Dynamic Focu from routers import goal_types, goal_progress, training_phases, fitness_tests # v9h Goal System (Split routers) from routers import charts # Phase 0c Multi-Layer Architecture from routers import workflow_questions # Phase 1 Workflow Engine - Question Catalog +from routers import workflows # Phase 2 Workflow Engine - Execution # ── App Configuration ───────────────────────────────────────────────────────── DATA_DIR = Path(os.getenv("DATA_DIR", "./data")) @@ -111,8 +112,9 @@ app.include_router(focus_areas.router) # /api/focus-areas/* (v9g Focus # Phase 0c Multi-Layer Architecture app.include_router(charts.router) # /api/charts/* (Phase 0c Charts API) -# Phase 1 Workflow Engine +# Phase 1-2 Workflow Engine app.include_router(workflow_questions.router) # /api/workflow/questions/* (Phase 1 Question Catalog) +app.include_router(workflows.router) # /api/workflows/* (Phase 2 Execution) # ── Health Check ────────────────────────────────────────────────────────────── @app.get("/") diff --git a/backend/normalization_engine.py b/backend/normalization_engine.py new file mode 100644 index 0000000..cbac847 --- /dev/null +++ b/backend/normalization_engine.py @@ -0,0 +1,237 @@ +""" +Normalization Engine (Phase 2) + +Normalisiert decision_signals gegen answer_spectrum mit Synonymen. + +Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.5) +Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2) +""" +from typing import Dict, List, Optional +from workflow_models import NormalizedSignal, SignalStatus +import logging + +logger = logging.getLogger(__name__) + + +def normalize_decision_signal( + question_type: str, + raw_value: str, + answer_spectrum: List[str], + normalization_rules: Optional[Dict] = None +) -> NormalizedSignal: + """ + Normalisiert ein einzelnes Entscheidungssignal. + + Normalisierungs-Kaskade: + 1. Exakte Übereinstimmung → valid + 2. Case-insensitive Übereinstimmung → normalized + 3. Synonym-Mapping (aus normalization_rules) → normalized + 4. Keine Übereinstimmung → invalid + + Args: + question_type: Typ der Frage (z.B. "relevanz") + raw_value: Rohe LLM-Antwort (z.B. "JA", "yes", "ja") + answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"]) + normalization_rules: Optional: {"synonyms": {"ja": ["yes", "Ja", "JA"], ...}} + + Returns: + NormalizedSignal mit status + normalized_value + + Beispiele: + >>> normalize_decision_signal("relevanz", "ja", ["ja", "nein"]) + NormalizedSignal(status=VALID, normalized_value="ja") + + >>> normalize_decision_signal("relevanz", "JA", ["ja", "nein"]) + NormalizedSignal(status=NORMALIZED, normalized_value="ja") + + >>> normalize_decision_signal("relevanz", "yes", ["ja", "nein"], + ... {"synonyms": {"ja": ["yes", "Yes"]}}) + NormalizedSignal(status=NORMALIZED, normalized_value="ja") + + >>> normalize_decision_signal("relevanz", "vielleicht", ["ja", "nein"]) + NormalizedSignal(status=INVALID, normalized_value=None) + """ + # 1. Exakte Übereinstimmung + if raw_value in answer_spectrum: + logger.debug(f"{question_type}: '{raw_value}' → valid (exact match)") + return NormalizedSignal( + question_type=question_type, + raw_value=raw_value, + normalized_value=raw_value, + status=SignalStatus.VALID + ) + + # 2. Case-insensitive Matching + raw_lower = raw_value.strip().lower() + for allowed in answer_spectrum: + if raw_lower == allowed.lower(): + logger.debug(f"{question_type}: '{raw_value}' → normalized (case-insensitive)") + return NormalizedSignal( + question_type=question_type, + raw_value=raw_value, + normalized_value=allowed, + status=SignalStatus.NORMALIZED, + metadata={"method": "case_insensitive"} + ) + + # 3. Synonym Mapping + if normalization_rules and "synonyms" in normalization_rules: + normalized = apply_synonym_mapping( + raw_value=raw_value, + synonyms=normalization_rules["synonyms"] + ) + if normalized: + logger.debug(f"{question_type}: '{raw_value}' → normalized (synonym → '{normalized}')") + return NormalizedSignal( + question_type=question_type, + raw_value=raw_value, + normalized_value=normalized, + status=SignalStatus.NORMALIZED, + metadata={"method": "synonym"} + ) + + # 4. Keine Übereinstimmung + logger.warning(f"{question_type}: '{raw_value}' → invalid (no match in spectrum {answer_spectrum})") + return NormalizedSignal( + question_type=question_type, + raw_value=raw_value, + normalized_value=None, + status=SignalStatus.INVALID + ) + + +def apply_synonym_mapping( + raw_value: str, + synonyms: Dict[str, List[str]] +) -> Optional[str]: + """ + Mappt raw_value auf Synonym-Gruppe (case-insensitive). + + Args: + raw_value: "yes" oder "YES" oder "Yes" + synonyms: {"ja": ["yes", "Ja", "JA"], "nein": ["no", "No"]} + + Returns: + "ja" (Schlüssel der Gruppe) oder None + + Beispiele: + >>> apply_synonym_mapping("yes", {"ja": ["yes", "Yes"], "nein": ["no"]}) + "ja" + + >>> apply_synonym_mapping("YES", {"ja": ["yes"], "nein": ["no"]}) + "ja" + + >>> apply_synonym_mapping("vielleicht", {"ja": ["yes"], "nein": ["no"]}) + None + """ + raw_lower = raw_value.strip().lower() + + for canonical_value, synonym_list in synonyms.items(): + # Check case-insensitive gegen alle Synonyme + for syn in synonym_list: + if raw_lower == syn.lower(): + return canonical_value + + return None + + +def normalize_all_signals( + decision_signals: Dict[str, str], + catalog_dict: Dict[str, Dict] +) -> List[NormalizedSignal]: + """ + Normalisiert alle decision_signals gegen Katalog. + + Args: + decision_signals: {"relevanz": "ja", "prioritaet": "HOCH"} + catalog_dict: { + "relevanz": { + "answer_spectrum": ["ja", "nein", "unklar"], + "normalization_rules": {"synonyms": {...}} + }, + ... + } + + Returns: + Liste von NormalizedSignal (ein Signal pro question_type) + + Beispiele: + >>> signals = {"relevanz": "ja", "prioritaet": "HOCH"} + >>> catalog = { + ... "relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None}, + ... "prioritaet": {"answer_spectrum": ["hoch", "mittel", "niedrig"], "normalization_rules": None} + ... } + >>> normalized = normalize_all_signals(signals, catalog) + >>> len(normalized) + 2 + >>> normalized[0].status + + >>> normalized[1].status + + """ + normalized = [] + + for question_type, raw_value in decision_signals.items(): + if question_type not in catalog_dict: + logger.warning(f"Question type '{question_type}' not in catalog → not_decidable") + normalized.append(NormalizedSignal( + question_type=question_type, + raw_value=raw_value, + normalized_value=None, + status=SignalStatus.NOT_DECIDABLE, + metadata={"error": "not_in_catalog"} + )) + continue + + catalog_entry = catalog_dict[question_type] + signal = normalize_decision_signal( + question_type=question_type, + raw_value=raw_value, + answer_spectrum=catalog_entry["answer_spectrum"], + normalization_rules=catalog_entry.get("normalization_rules") + ) + normalized.append(signal) + + return normalized + + +def load_question_catalog(db_connection) -> Dict[str, Dict]: + """ + Lädt workflow_question_catalog aus DB. + + Returns: + { + "relevanz": { + "answer_spectrum": ["ja", "nein", "unklar"], + "normalization_rules": {"synonyms": {"ja": ["yes", "Yes"], ...}} + }, + "prioritaet": {...}, + ... + } + + Beispiel: + >>> from db import get_db + >>> with get_db() as conn: + ... catalog = load_question_catalog(conn) + ... assert "relevanz" in catalog + ... assert "answer_spectrum" in catalog["relevanz"] + """ + from db import get_cursor + + cur = get_cursor(db_connection) + cur.execute(""" + SELECT question_type, answer_spectrum, normalization_rules + FROM workflow_question_catalog + WHERE active = true + """) + rows = cur.fetchall() + + catalog = {} + for row in rows: + catalog[row[0]] = { + "answer_spectrum": row[1], # JSONB already parsed by psycopg2 + "normalization_rules": row[2] # JSONB or None + } + + logger.info(f"Loaded question catalog: {len(catalog)} types") + return catalog diff --git a/backend/prompt_executor.py b/backend/prompt_executor.py index 4ae8760..5352ef7 100644 --- a/backend/prompt_executor.py +++ b/backend/prompt_executor.py @@ -588,13 +588,13 @@ async def execute_workflow_prompt( """ Execute a workflow-type prompt (graph-based execution). - Phase 0: Stub-Implementierung - Phase 1-3: Vollständige Implementierung in workflow_engine.py + Phase 2: Sequenzielle Workflow-Execution (ohne Logik/Routing) + Phase 3: Conditional branching Args: - prompt: Prompt dict from database + prompt: Prompt dict from database (must have 'id' field for workflow_id) variables: Dict of variables for placeholder replacement - openrouter_call_func: Async function(prompt_text) -> response_text + openrouter_call_func: Async function(prompt_text, model) -> response_text enable_debug: If True, include debug information in response catalog: Optional placeholder catalog @@ -602,18 +602,40 @@ async def execute_workflow_prompt( Dict with execution results: { "type": "workflow", - "slug": "...", - "output": {...}, - "execution_id": "...", # UUID of workflow_executions entry - "node_states": {...}, # Status per node - "debug": {...} # Only if enable_debug=True + "execution_id": "...", + "status": "completed" | "failed", + "aggregated_result": {...}, + "node_states": [...], # Only if enable_debug=True + "error": "..." # Only if status=failed } """ - # Phase 0: Stub implementation - # Workflow execution will be implemented in Phase 1-3 - # For now: Return error to prevent accidental use - raise HTTPException( - status_code=501, - detail="Workflow-Execution noch nicht implementiert (Phase 0: Foundation). " - "Vollständige Implementierung erfolgt in Phase 1-3." + from workflow_executor import execute_workflow + + workflow_id = prompt.get('id') + if not workflow_id: + raise HTTPException(400, "Workflow-Prompt fehlt 'id' Feld") + + # Execute workflow + result = await execute_workflow( + workflow_id=workflow_id, + profile_id=variables.get('profile_id', 'unknown'), # From context + variables=variables, + openrouter_call_func=openrouter_call_func, + enable_debug=enable_debug ) + + # Convert ExecutionResult to dict for API response + response = { + "type": "workflow", + "execution_id": result.execution_id, + "status": result.status, + "aggregated_result": result.aggregated_result + } + + if enable_debug: + response["node_states"] = [s.model_dump() for s in result.node_states] + + if result.error: + response["error"] = result.error + + return response diff --git a/backend/routers/workflows.py b/backend/routers/workflows.py new file mode 100644 index 0000000..ec69ed9 --- /dev/null +++ b/backend/routers/workflows.py @@ -0,0 +1,222 @@ +""" +Workflow Execution Router (Phase 2) + +Endpunkte für Workflow-Execution und Ergebnis-Abruf. + +Phase 2: Sequenzielle Execution +Phase 3: Conditional branching +""" +from fastapi import APIRouter, Depends, HTTPException +from auth import require_auth +from db import get_db, get_cursor, r2d +from pydantic import BaseModel +from typing import Dict, Any, Optional +import logging + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +class WorkflowExecuteRequest(BaseModel): + """Request-Body für Workflow-Execution""" + variables: Dict[str, Any] = {} + enable_debug: bool = False + + +@router.post("/api/workflows/{workflow_id}/execute") +async def execute_workflow_endpoint( + workflow_id: str, + request: WorkflowExecuteRequest, + session: dict = Depends(require_auth) +): + """ + Führt einen Workflow aus. + + Args: + workflow_id: UUID des Workflows (aus workflow_definitions) + request.variables: Platzhalter-Werte (optional, z.B. {"name": "Lars"}) + request.enable_debug: Debug-Modus (zeigt node_states im Response) + + Returns: + { + "execution_id": "...", + "status": "completed" | "failed", + "aggregated_result": { + "combined_analysis": "...", + "all_signals": [...], + "total_nodes": 3, + "executed_nodes": 3, + "failed_nodes": 0 + }, + "node_states": [...], # Nur wenn enable_debug=true + "error": "..." # Nur wenn failed + } + + Beispiel: + POST /api/workflows/abc123/execute + { + "variables": {"name": "Lars"}, + "enable_debug": true + } + """ + from prompt_executor import execute_workflow_prompt + from openrouter import call_openrouter_async + + profile_id = session["profile_id"] + + # Add profile_id to variables (für placeholder_resolver) + variables = {**request.variables, "profile_id": profile_id} + + # Load workflow as "prompt" (für execute_workflow_prompt) + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + "SELECT id, name, slug FROM workflow_definitions WHERE id = %s AND active = true", + (workflow_id,) + ) + row = cur.fetchone() + if not row: + raise HTTPException(404, f"Workflow nicht gefunden: {workflow_id}") + + workflow_prompt = { + "id": row[0], + "name": row[1], + "slug": row[2], + "type": "workflow" + } + + try: + result = await execute_workflow_prompt( + prompt=workflow_prompt, + variables=variables, + openrouter_call_func=call_openrouter_async, + enable_debug=request.enable_debug + ) + return result + + except Exception as e: + logger.error(f"Workflow execution failed: {e}", exc_info=True) + raise HTTPException(500, f"Workflow-Ausführung fehlgeschlagen: {str(e)}") + + +@router.get("/api/workflows/executions/{execution_id}") +def get_execution_result( + execution_id: str, + session: dict = Depends(require_auth) +): + """ + Lädt gespeicherten Execution State aus DB. + + Args: + execution_id: UUID der Execution (aus workflow_executions) + + Returns: + { + "id": "...", + "workflow_id": "...", + "profile_id": "...", + "status": "completed" | "failed", + "node_states": [...], # JSONB + "execution_log": {...}, + "started_at": "2026-04-03T12:00:00", + "completed_at": "2026-04-03T12:00:10" + } + + Beispiel: + GET /api/workflows/executions/abc123 + """ + profile_id = session["profile_id"] + + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT id, workflow_id, profile_id, status, node_states, execution_log, + started_at::text, completed_at::text + FROM workflow_executions + WHERE id = %s AND profile_id = %s + """, (execution_id, profile_id)) + row = cur.fetchone() + + if not row: + raise HTTPException(404, "Execution nicht gefunden") + + return r2d(row) + + +@router.get("/api/workflows") +def list_workflows( + session: dict = Depends(require_auth) +): + """ + Listet alle aktiven Workflows auf. + + Returns: + [ + { + "id": "...", + "name": "...", + "slug": "...", + "description": "...", + "version": 1, + "created_at": "...", + "updated_at": "..." + }, + ... + ] + + Beispiel: + GET /api/workflows + """ + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT id, name, slug, description, version, + created_at::text, updated_at::text + FROM workflow_definitions + WHERE active = true + ORDER BY name + """) + rows = cur.fetchall() + + return [r2d(row) for row in rows] + + +@router.get("/api/workflows/{workflow_id}") +def get_workflow( + workflow_id: str, + session: dict = Depends(require_auth) +): + """ + Lädt einen einzelnen Workflow mit Graph. + + Args: + workflow_id: UUID des Workflows + + Returns: + { + "id": "...", + "name": "...", + "slug": "...", + "description": "...", + "graph": {...}, # JSONB + "version": 1, + "active": true, + "created_at": "...", + "updated_at": "..." + } + """ + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + SELECT id, name, slug, description, graph, version, active, + created_at::text, updated_at::text + FROM workflow_definitions + WHERE id = %s AND active = true + """, (workflow_id,)) + row = cur.fetchone() + + if not row: + raise HTTPException(404, "Workflow nicht gefunden") + + return r2d(row) diff --git a/backend/version.py b/backend/version.py index d05e6b5..adcdbf2 100644 --- a/backend/version.py +++ b/backend/version.py @@ -7,7 +7,7 @@ Semantic Versioning: MAJOR.MINOR.PATCH - PATCH: Bugfix, kleine Änderung, Refactor """ -APP_VERSION = "0.9j" +APP_VERSION = "0.9k" BUILD_DATE = "2026-04-03" DB_SCHEMA_VERSION = "20260403" # Migration 034 @@ -27,10 +27,23 @@ MODULE_VERSIONS = { "exportdata": "1.1.0", "importdata": "1.0.0", "membership": "2.1.0", - "workflow": "0.2.0", # Phase 1: Fragenergänzung + Strukturierter Container + "workflow": "0.3.0", # Phase 2: Normalisierung + Workflow Executor } CHANGELOG = [ + { + "version": "0.9k", + "date": "2026-04-03", + "changes": [ + "Phase 2: Normalisierung + Workflow Executor", + "normalization_engine.py: Synonym-Mapping, 5 Statuswerte (valid, normalized, unclear, invalid, not_decidable)", + "workflow_executor.py: Sequenzielle Workflow-Ausführung, Node-State-Tracking, Ergebnis-Aggregation", + "Integration in prompt_executor.py: Dispatcher für type='workflow'", + "API-Router workflows.py: POST /workflows/{id}/execute, GET /workflows/executions/{id}", + "Unit-Tests Phase 2: 27 Tests (normalization_engine + workflow_executor)", + "Erweitert: workflow_models.py (NormalizedSignal, NodeExecutionState, ExecutionResult)", + ] + }, { "version": "0.9j", "date": "2026-04-03", diff --git a/backend/workflow_engine.py b/backend/workflow_engine.py index 8be8086..79fdd64 100644 --- a/backend/workflow_engine.py +++ b/backend/workflow_engine.py @@ -391,3 +391,37 @@ def validate_workflow_graph(graph: WorkflowGraph) -> Tuple[bool, List[str]]: return False, errors except Exception as e: return False, [f"Unerwarteter Fehler: {str(e)}"] + + +def get_execution_order(graph: WorkflowGraph) -> List[str]: + """ + Berechne sequenzielle Ausführungs-Reihenfolge (Phase 2). + + Phase 2: Sequenziell (flattened topological sort). + Phase 7: Parallele Execution (levels statt flat list). + + Args: + graph: Workflow-Graph + + Returns: + Liste von Knoten-IDs in Ausführungsreihenfolge + Beispiel: ["start", "node_1", "node_2", "end"] + + Raises: + HTTPException: Bei ungültigem Graph + + Beispiel: + >>> from workflow_models import WorkflowGraph, WorkflowNode, WorkflowEdge + >>> graph = WorkflowGraph( + ... nodes=[ + ... WorkflowNode(id="start", type="start"), + ... WorkflowNode(id="end", type="end") + ... ], + ... edges=[WorkflowEdge(id="e1", from_node="start", to_node="end")] + ... ) + >>> get_execution_order(graph) + ['start', 'end'] + """ + engine = WorkflowEngine(graph) + # Nutze Validator's topological_order (flattened) + return engine.validator.topological_order diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py new file mode 100644 index 0000000..9634210 --- /dev/null +++ b/backend/workflow_executor.py @@ -0,0 +1,425 @@ +""" +Workflow Executor (Phase 2) + +Führt Workflows sequenziell aus (noch keine Verzweigung/Logik). + +Konzept-Basis: konzept_workflow_engine_konsolidated.md +Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2) +""" +from typing import Dict, Any, List, Optional +from datetime import datetime +import uuid +import logging +import json + +from workflow_models import ( + WorkflowGraph, NodeExecutionState, ExecutionResult, + NodeStatus, NormalizedSignal +) +from workflow_engine import parse_workflow_graph, get_execution_order +from question_augmenter import ( + augment_prompt_with_questions, + parse_question_augmentations_from_jsonb +) +from result_container_parser import parse_result_container +from normalization_engine import normalize_all_signals, load_question_catalog +from db import get_db, get_cursor + +logger = logging.getLogger(__name__) + + +async def execute_workflow( + workflow_id: str, + profile_id: str, + variables: Dict[str, Any], + openrouter_call_func, # Callback für LLM-Calls: async (prompt, model) -> str + enable_debug: bool = False +) -> ExecutionResult: + """ + Führt einen Workflow aus (sequenziell, ohne Verzweigung). + + Phase 2: Linear execution in topological order. + Phase 3: Conditional branching basierend auf normalized_signals. + + Args: + workflow_id: UUID des Workflows + profile_id: UUID des Profils + variables: Platzhalter-Werte (z.B. {"name": "Lars", ...}) + openrouter_call_func: async (prompt, model) -> str + enable_debug: Debug-Modus + + Returns: + ExecutionResult mit allen node_states + + Beispiel: + >>> result = await execute_workflow( + ... workflow_id="test-workflow", + ... profile_id="test-profile", + ... variables={"name": "Lars"}, + ... openrouter_call_func=my_llm_func + ... ) + >>> result.status + 'completed' + >>> len(result.node_states) + 3 + """ + execution_id = str(uuid.uuid4()) + started_at = datetime.utcnow().isoformat() + + logger.info(f"Starting workflow execution: {execution_id} (workflow: {workflow_id})") + + try: + # 1. Lade Workflow-Definition + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + "SELECT graph FROM workflow_definitions WHERE id = %s AND active = true", + (workflow_id,) + ) + row = cur.fetchone() + if not row: + raise ValueError(f"Workflow not found: {workflow_id}") + + graph_json = row[0] + + # 2. Parse Graph + graph = parse_workflow_graph(graph_json) + logger.debug(f"Parsed graph: {len(graph.nodes)} nodes, {len(graph.edges)} edges") + + # 3. Topologische Sortierung + execution_order = get_execution_order(graph) + logger.info(f"Execution order: {execution_order}") + + # 4. Lade Question Catalog + with get_db() as conn: + catalog = load_question_catalog(conn) + logger.debug(f"Loaded catalog: {len(catalog)} question types") + + # 5. Execute Nodes sequenziell + node_states: List[NodeExecutionState] = [] + context = {"variables": variables, "profile_id": profile_id} + + for node_id in execution_order: + node = next(n for n in graph.nodes if n.id == node_id) + + logger.info(f"Executing node: {node_id} (type: {node.type})") + + node_state = await execute_node( + node=node, + context=context, + catalog=catalog, + openrouter_call_func=openrouter_call_func, + enable_debug=enable_debug + ) + + node_states.append(node_state) + + # Füge Ergebnisse zu Context hinzu (für späteren Zugriff in Phase 3) + context[f"node_{node_id}"] = { + "analysis_core": node_state.analysis_core, + "normalized_signals": [s.model_dump() for s in node_state.normalized_signals] + } + + # 6. Aggregiere Ergebnisse + aggregated = aggregate_results(node_states) + + # 7. Speichere Execution State + completed_at = datetime.utcnow().isoformat() + save_execution_state( + execution_id=execution_id, + workflow_id=workflow_id, + profile_id=profile_id, + node_states=node_states, + status="completed", + started_at=started_at, + completed_at=completed_at + ) + + logger.info(f"Workflow execution completed: {execution_id}") + + return ExecutionResult( + execution_id=execution_id, + workflow_id=workflow_id, + status="completed", + node_states=node_states, + aggregated_result=aggregated, + started_at=started_at, + completed_at=completed_at + ) + + except Exception as e: + logger.error(f"Workflow execution failed: {e}", exc_info=True) + + # Speichere Failed State + completed_at = datetime.utcnow().isoformat() + save_execution_state( + execution_id=execution_id, + workflow_id=workflow_id, + profile_id=profile_id, + node_states=node_states if 'node_states' in locals() else [], + status="failed", + started_at=started_at, + completed_at=completed_at, + error=str(e) + ) + + return ExecutionResult( + execution_id=execution_id, + workflow_id=workflow_id, + status="failed", + node_states=node_states if 'node_states' in locals() else [], + aggregated_result={}, + started_at=started_at, + completed_at=completed_at, + error=str(e) + ) + + +async def execute_node( + node, + context: Dict[str, Any], + catalog: Dict[str, Dict], + openrouter_call_func, + enable_debug: bool = False +) -> NodeExecutionState: + """ + Führt einen einzelnen Knoten aus. + + Args: + node: WorkflowNode (aus graph.nodes) + context: Execution context (variables, profile_id, prior results) + catalog: Question catalog + openrouter_call_func: LLM callback: async (prompt, model) -> str + enable_debug: Debug mode + + Returns: + NodeExecutionState + + Node Types: + - start/end: No-op + - analysis: Load prompt → augment → LLM → parse → normalize + - logic/join: Not implemented in Phase 2 + """ + started_at = datetime.utcnow().isoformat() + + try: + # Start/End Nodes: No-Op + if node.type in ["start", "end"]: + logger.debug(f"Node {node.id}: No-op ({node.type})") + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.EXECUTED, + started_at=started_at, + completed_at=datetime.utcnow().isoformat() + ) + + # Analysis Nodes + if node.type == "analysis": + # 1. Lade Prompt + prompt_template = await load_prompt_template(node.prompt_slug, context) + logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'") + + # 2. Parse question_augmentations + questions = [] + if node.question_augmentations: + # Convert list of dicts to JSONB-like format for parser + questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations] + questions = parse_question_augmentations_from_jsonb(questions_jsonb) + logger.debug(f"Node {node.id}: {len(questions)} question augmentations") + + # 3. Augment Prompt + if questions: + augmented_prompt = augment_prompt_with_questions( + base_prompt=prompt_template, + questions=questions + ) + else: + augmented_prompt = prompt_template + + # 4. LLM Call + logger.debug(f"Node {node.id}: Calling LLM") + llm_response = await openrouter_call_func( + augmented_prompt, + "anthropic/claude-sonnet-4" # Default model + ) + + # 5. Parse Result Container + parsed = parse_result_container(llm_response) + logger.debug(f"Node {node.id}: Parsed response (status: {parsed['parsing_status']})") + + # 6. Normalize Signals + normalized_signals = [] + if parsed["decision_signals"]: + normalized_signals = normalize_all_signals( + decision_signals=parsed["decision_signals"], + catalog_dict=catalog + ) + logger.info(f"Node {node.id}: Normalized {len(normalized_signals)} signals") + + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.EXECUTED, + analysis_core=parsed["analysis_core"], + decision_signals=parsed["decision_signals"], + normalized_signals=normalized_signals, + reasoning_anchors=parsed.get("reasoning_anchors"), + started_at=started_at, + completed_at=datetime.utcnow().isoformat() + ) + + # Unbekannter Node-Typ (Phase 3: logic, join) + raise ValueError(f"Node type '{node.type}' not implemented in Phase 2") + + except Exception as e: + logger.error(f"Node execution failed ({node.id}): {e}", exc_info=True) + return NodeExecutionState( + node_id=node.id, + status=NodeStatus.FAILED, + error=str(e), + started_at=started_at, + completed_at=datetime.utcnow().isoformat() + ) + + +async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str: + """ + Lädt Prompt-Template aus DB und resolved Platzhalter. + + Args: + prompt_slug: Slug des Prompts (z.B. "pipeline_body") + context: {"variables": {"name": "Lars", ...}, "profile_id": "..."} + + Returns: + Resolved prompt template + + Beispiel: + >>> template = await load_prompt_template("pipeline_body", {"profile_id": "123"}) + >>> "{{name}}" not in template + True + """ + from placeholder_resolver import resolve_placeholders + + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + "SELECT template FROM ai_prompts WHERE slug = %s AND active = true", + (prompt_slug,) + ) + row = cur.fetchone() + if not row: + raise ValueError(f"Prompt not found: {prompt_slug}") + + template = row[0] + + # Resolve Placeholders + profile_id = context.get("profile_id") + resolved = resolve_placeholders( + template=template, + profile_id=profile_id, + extra_vars=context.get("variables", {}) + ) + + return resolved + + +def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: + """ + Aggregiert Ergebnisse aller Knoten. + + Args: + node_states: Liste aller NodeExecutionState + + Returns: + { + "combined_analysis": "## node_1\n...\n\n## node_2\n...", + "all_signals": [{question_type, normalized_value, status}, ...], + "total_nodes": 3, + "executed_nodes": 3, + "failed_nodes": 0 + } + + Beispiel: + >>> states = [ + ... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"), + ... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2") + ... ] + >>> result = aggregate_results(states) + >>> "## n1" in result["combined_analysis"] + True + >>> result["executed_nodes"] + 2 + """ + combined_analysis = [] + all_signals = [] + + for state in node_states: + if state.status == NodeStatus.EXECUTED and state.analysis_core: + combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}") + + if state.normalized_signals: + all_signals.extend([s.model_dump() for s in state.normalized_signals]) + + return { + "combined_analysis": "\n\n".join(combined_analysis), + "all_signals": all_signals, + "total_nodes": len(node_states), + "executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED), + "failed_nodes": sum(1 for s in node_states if s.status == NodeStatus.FAILED) + } + + +def save_execution_state( + execution_id: str, + workflow_id: str, + profile_id: str, + node_states: List[NodeExecutionState], + status: str, + started_at: str, + completed_at: Optional[str] = None, + error: Optional[str] = None +): + """ + Speichert Execution State in workflow_executions. + + Args: + execution_id: UUID der Execution + workflow_id: UUID des Workflows + profile_id: UUID des Profils + node_states: Liste aller NodeExecutionState + status: 'completed' | 'failed' | 'partial' + started_at: ISO timestamp + completed_at: ISO timestamp (optional) + error: Fehlermeldung (optional) + + Beispiel: + >>> save_execution_state( + ... execution_id="exec-123", + ... workflow_id="wf-456", + ... profile_id="prof-789", + ... node_states=[], + ... status="completed", + ... started_at="2026-04-03T12:00:00" + ... ) + """ + # Serialize node_states to JSON + node_states_json = [s.model_dump() for s in node_states] + + with get_db() as conn: + cur = get_cursor(conn) + cur.execute(""" + INSERT INTO workflow_executions + (id, workflow_id, profile_id, status, node_states, execution_log, started_at, completed_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """, ( + execution_id, + workflow_id, + profile_id, + status, + json.dumps(node_states_json), + json.dumps({"error": error} if error else {}), + started_at, + completed_at + )) + conn.commit() + + logger.info(f"Saved execution state: {execution_id} (status: {status})") diff --git a/backend/workflow_models.py b/backend/workflow_models.py index 6c45df7..175a821 100644 --- a/backend/workflow_models.py +++ b/backend/workflow_models.py @@ -47,12 +47,22 @@ class FallbackStrategy(str, Enum): class NodeStatus(str, Enum): """Ausführungsstatus eines Knotens""" PENDING = "pending" + EXECUTING = "executing" # Phase 2: Gerade in Ausführung EXECUTED = "executed" SKIPPED = "skipped" UNCLEAR = "unclear" FAILED = "failed" +class SignalStatus(str, Enum): + """Status nach Normalisierung (Phase 2)""" + VALID = "valid" # Exakte Übereinstimmung mit Spektrum + NORMALIZED = "normalized" # Gemappt (Synonym/Case-insensitive) + UNCLEAR = "unclear" # Mehrdeutig oder widersprüchlich + INVALID = "invalid" # Außerhalb des Spektrums + NOT_DECIDABLE = "not_decidable" # Kein Signal vorhanden + + class LogicOperator(str, Enum): """Logische Operatoren für Bedingungen""" EQ = "eq" # == @@ -278,3 +288,58 @@ class QuestionCatalogEntry(BaseModel): normalization_rules: Optional[Dict[str, Any]] = None active: bool created_at: str + + +# ── Phase 2: Normalisierung & Execution ─────────────────────────────────────── + +class NormalizedSignal(BaseModel): + """ + Normalisiertes Entscheidungssignal (Phase 2). + + Resultat der Normalisierung einer Rohantwort gegen das Antwortspektrum. + """ + question_type: str = Field(..., description="Typ der Frage (z.B. 'relevanz')") + raw_value: str = Field(..., description="Original LLM-Antwort") + normalized_value: Optional[str] = Field(None, description="Gemappter Wert (null bei invalid/not_decidable)") + status: SignalStatus = Field(..., description="Normalisierungsstatus") + confidence: float = Field(default=1.0, description="Konfidenz (für späteren Einsatz)") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Zusatzinfo (z.B. method: 'synonym')") + + +class NodeExecutionState(BaseModel): + """ + Detaillierter Ausführungsstatus eines Knotens (Phase 2). + + Erweitert NodeState um Phase-1-Komponenten (analysis_core, decision_signals, etc.) + """ + node_id: str = Field(..., description="Knoten-ID") + status: NodeStatus = Field(..., description="Ausführungsstatus") + + # Phase 1 Result Container + analysis_core: Optional[str] = Field(None, description="Hauptanalyse aus ## Analyse Sektion") + decision_signals: Dict[str, str] = Field(default_factory=dict, description="Rohe Signale (pre-normalization)") + normalized_signals: List[NormalizedSignal] = Field(default_factory=list, description="Normalisierte Signale (Phase 2)") + reasoning_anchors: Optional[str] = Field(None, description="Begründungsanker aus ## Begründung") + + # Error & Timing + error: Optional[str] = Field(None, description="Fehlermeldung bei failed") + started_at: Optional[str] = Field(None, description="Start-Timestamp (ISO)") + completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)") + + +class ExecutionResult(BaseModel): + """ + Ergebnis einer Workflow-Ausführung (Phase 2). + + Wird von workflow_executor.execute_workflow() zurückgegeben. + """ + execution_id: str = Field(..., description="UUID der Execution") + workflow_id: str = Field(..., description="UUID des Workflows") + status: str = Field(..., description="Gesamt-Status: 'completed', 'failed', 'partial'") + + node_states: List[NodeExecutionState] = Field(..., description="States aller ausgeführten Knoten") + aggregated_result: Dict[str, Any] = Field(default_factory=dict, description="Aggregierte Ergebnisse (combined_analysis, all_signals, etc.)") + + started_at: str = Field(..., description="Start-Timestamp (ISO)") + completed_at: Optional[str] = Field(None, description="End-Timestamp (ISO)") + error: Optional[str] = Field(None, description="Fehlermeldung bei failed") diff --git a/tests/backend/test_phase2_normalization.py b/tests/backend/test_phase2_normalization.py new file mode 100644 index 0000000..7a6734e --- /dev/null +++ b/tests/backend/test_phase2_normalization.py @@ -0,0 +1,229 @@ +""" +Unit Tests für normalization_engine.py (Phase 2) + +Run with: PYTHONPATH=./backend pytest tests/backend/test_phase2_normalization.py -v +""" +import pytest +from workflow_models import SignalStatus +from normalization_engine import ( + normalize_decision_signal, + apply_synonym_mapping, + normalize_all_signals +) + + +# ── normalize_decision_signal Tests ──────────────────────────────────────────── + +def test_exact_match(): + """Test: Exakte Übereinstimmung mit Spektrum → valid""" + signal = normalize_decision_signal( + question_type="relevanz", + raw_value="ja", + answer_spectrum=["ja", "nein", "unklar"] + ) + assert signal.status == SignalStatus.VALID + assert signal.normalized_value == "ja" + assert signal.raw_value == "ja" + + +def test_case_insensitive_uppercase(): + """Test: Case-insensitive Matching (Großbuchstaben) → normalized""" + signal = normalize_decision_signal( + question_type="relevanz", + raw_value="JA", + answer_spectrum=["ja", "nein", "unklar"] + ) + assert signal.status == SignalStatus.NORMALIZED + assert signal.normalized_value == "ja" + assert signal.metadata["method"] == "case_insensitive" + + +def test_case_insensitive_mixed(): + """Test: Case-insensitive Matching (Mixed Case) → normalized""" + signal = normalize_decision_signal( + question_type="prioritaet", + raw_value="Hoch", + answer_spectrum=["hoch", "mittel", "niedrig"] + ) + assert signal.status == SignalStatus.NORMALIZED + assert signal.normalized_value == "hoch" + + +def test_synonym_mapping_simple(): + """Test: Synonym-Mapping → normalized""" + rules = {"synonyms": {"ja": ["yes", "Yes", "YES"]}} + signal = normalize_decision_signal( + question_type="relevanz", + raw_value="yes", + answer_spectrum=["ja", "nein"], + normalization_rules=rules + ) + assert signal.status == SignalStatus.NORMALIZED + assert signal.normalized_value == "ja" + assert signal.metadata["method"] == "synonym" + + +def test_synonym_mapping_case_insensitive(): + """Test: Synonym-Mapping mit case-insensitive → normalized""" + rules = {"synonyms": {"ja": ["yes"]}} + signal = normalize_decision_signal( + question_type="relevanz", + raw_value="YES", + answer_spectrum=["ja", "nein"], + normalization_rules=rules + ) + assert signal.status == SignalStatus.NORMALIZED + assert signal.normalized_value == "ja" + + +def test_invalid_value(): + """Test: Wert außerhalb des Spektrums → invalid""" + signal = normalize_decision_signal( + question_type="relevanz", + raw_value="vielleicht", + answer_spectrum=["ja", "nein", "unklar"] + ) + assert signal.status == SignalStatus.INVALID + assert signal.normalized_value is None + + +def test_whitespace_handling(): + """Test: Whitespace wird getrimmt → normalized""" + signal = normalize_decision_signal( + question_type="relevanz", + raw_value=" ja ", + answer_spectrum=["ja", "nein"] + ) + assert signal.status == SignalStatus.NORMALIZED # Wegen strip() + assert signal.normalized_value == "ja" + + +def test_synonym_no_match(): + """Test: Synonym-Rules vorhanden, aber kein Match → invalid""" + rules = {"synonyms": {"ja": ["yes"], "nein": ["no"]}} + signal = normalize_decision_signal( + question_type="relevanz", + raw_value="maybe", + answer_spectrum=["ja", "nein"], + normalization_rules=rules + ) + assert signal.status == SignalStatus.INVALID + + +# ── apply_synonym_mapping Tests ──────────────────────────────────────────────── + +def test_apply_synonym_exact(): + """Test: Exakte Synonym-Übereinstimmung""" + synonyms = {"ja": ["yes", "Yes"], "nein": ["no", "No"]} + result = apply_synonym_mapping("yes", synonyms) + assert result == "ja" + + +def test_apply_synonym_case_insensitive(): + """Test: Case-insensitive Synonym-Matching""" + synonyms = {"ja": ["yes"], "nein": ["no"]} + result = apply_synonym_mapping("YES", synonyms) + assert result == "ja" + + +def test_apply_synonym_no_match(): + """Test: Kein Synonym-Match → None""" + synonyms = {"ja": ["yes"], "nein": ["no"]} + result = apply_synonym_mapping("vielleicht", synonyms) + assert result is None + + +def test_apply_synonym_whitespace(): + """Test: Synonym mit Whitespace""" + synonyms = {"ja": ["yes"]} + result = apply_synonym_mapping(" yes ", synonyms) + assert result == "ja" + + +# ── normalize_all_signals Tests ──────────────────────────────────────────────── + +def test_normalize_all_signals_basic(): + """Test: Mehrere Signale normalisieren""" + signals = { + "relevanz": "ja", + "prioritaet": "HOCH" + } + catalog = { + "relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None}, + "prioritaet": {"answer_spectrum": ["hoch", "mittel", "niedrig"], "normalization_rules": None} + } + + normalized = normalize_all_signals(signals, catalog) + + assert len(normalized) == 2 + assert normalized[0].question_type == "relevanz" + assert normalized[0].status == SignalStatus.VALID + assert normalized[1].question_type == "prioritaet" + assert normalized[1].status == SignalStatus.NORMALIZED + + +def test_normalize_all_signals_with_synonyms(): + """Test: Normalisierung mit Synonymen""" + signals = { + "relevanz": "yes", + "prioritaet": "high" + } + catalog = { + "relevanz": { + "answer_spectrum": ["ja", "nein"], + "normalization_rules": {"synonyms": {"ja": ["yes"], "nein": ["no"]}} + }, + "prioritaet": { + "answer_spectrum": ["hoch", "mittel", "niedrig"], + "normalization_rules": {"synonyms": {"hoch": ["high"], "niedrig": ["low"]}} + } + } + + normalized = normalize_all_signals(signals, catalog) + + assert len(normalized) == 2 + assert normalized[0].normalized_value == "ja" + assert normalized[1].normalized_value == "hoch" + + +def test_normalize_all_signals_not_in_catalog(): + """Test: Question type nicht im Katalog → not_decidable""" + signals = {"unknown_type": "value"} + catalog = {"relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None}} + + normalized = normalize_all_signals(signals, catalog) + + assert len(normalized) == 1 + assert normalized[0].status == SignalStatus.NOT_DECIDABLE + assert normalized[0].metadata["error"] == "not_in_catalog" + + +def test_normalize_all_signals_mixed_validity(): + """Test: Gemischte Gültigkeit (valid, normalized, invalid)""" + signals = { + "relevanz": "ja", # valid + "prioritaet": "HOCH", # normalized (case) + "selektion": "vielleicht" # invalid + } + catalog = { + "relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None}, + "prioritaet": {"answer_spectrum": ["hoch", "mittel", "niedrig"], "normalization_rules": None}, + "selektion": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None} + } + + normalized = normalize_all_signals(signals, catalog) + + assert len(normalized) == 3 + assert normalized[0].status == SignalStatus.VALID + assert normalized[1].status == SignalStatus.NORMALIZED + assert normalized[2].status == SignalStatus.INVALID + + +def test_normalize_all_signals_empty(): + """Test: Leere Signal-Liste""" + normalized = normalize_all_signals({}, {}) + assert len(normalized) == 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/backend/test_phase2_workflow_executor.py b/tests/backend/test_phase2_workflow_executor.py new file mode 100644 index 0000000..42aacd3 --- /dev/null +++ b/tests/backend/test_phase2_workflow_executor.py @@ -0,0 +1,323 @@ +""" +Unit Tests für workflow_executor.py (Phase 2) + +Run with: PYTHONPATH=./backend pytest tests/backend/test_phase2_workflow_executor.py -v +""" +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from workflow_executor import aggregate_results +from workflow_models import NodeExecutionState, NodeStatus, NormalizedSignal, SignalStatus + + +# ── aggregate_results Tests ──────────────────────────────────────────────────── + +def test_aggregate_results_basic(): + """Test: Aggregation mit zwei executed nodes""" + states = [ + NodeExecutionState( + node_id="start", + status=NodeStatus.EXECUTED, + started_at="2026-04-03T12:00:00", + completed_at="2026-04-03T12:00:01" + ), + NodeExecutionState( + node_id="body", + status=NodeStatus.EXECUTED, + analysis_core="Gewichtsentwicklung positiv", + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="ja", + normalized_value="ja", + status=SignalStatus.VALID + ) + ], + started_at="2026-04-03T12:00:01", + completed_at="2026-04-03T12:00:05" + ), + NodeExecutionState( + node_id="end", + status=NodeStatus.EXECUTED, + started_at="2026-04-03T12:00:05", + completed_at="2026-04-03T12:00:06" + ) + ] + + result = aggregate_results(states) + + assert "## body" in result["combined_analysis"] + assert "Gewichtsentwicklung" in result["combined_analysis"] + assert result["total_nodes"] == 3 + assert result["executed_nodes"] == 3 + assert result["failed_nodes"] == 0 + assert len(result["all_signals"]) == 1 + assert result["all_signals"][0]["question_type"] == "relevanz" + + +def test_aggregate_results_with_failed_node(): + """Test: Aggregation mit einem fehlgeschlagenen Knoten""" + states = [ + NodeExecutionState( + node_id="node1", + status=NodeStatus.EXECUTED, + analysis_core="Success", + started_at="2026-04-03T12:00:00", + completed_at="2026-04-03T12:00:01" + ), + NodeExecutionState( + node_id="node2", + status=NodeStatus.FAILED, + error="LLM timeout", + started_at="2026-04-03T12:00:01", + completed_at="2026-04-03T12:00:02" + ) + ] + + result = aggregate_results(states) + + assert result["total_nodes"] == 2 + assert result["executed_nodes"] == 1 + assert result["failed_nodes"] == 1 + assert "## node1" in result["combined_analysis"] + assert "## node2" not in result["combined_analysis"] + + +def test_aggregate_results_multiple_signals(): + """Test: Aggregation mit mehreren normalisierten Signalen""" + states = [ + NodeExecutionState( + node_id="node1", + status=NodeStatus.EXECUTED, + analysis_core="Analysis 1", + normalized_signals=[ + NormalizedSignal( + question_type="relevanz", + raw_value="ja", + normalized_value="ja", + status=SignalStatus.VALID + ), + NormalizedSignal( + question_type="prioritaet", + raw_value="hoch", + normalized_value="hoch", + status=SignalStatus.VALID + ) + ], + started_at="2026-04-03T12:00:00", + completed_at="2026-04-03T12:00:01" + ), + NodeExecutionState( + node_id="node2", + status=NodeStatus.EXECUTED, + analysis_core="Analysis 2", + normalized_signals=[ + NormalizedSignal( + question_type="selektion", + raw_value="nein", + normalized_value="nein", + status=SignalStatus.VALID + ) + ], + started_at="2026-04-03T12:00:01", + completed_at="2026-04-03T12:00:02" + ) + ] + + result = aggregate_results(states) + + assert len(result["all_signals"]) == 3 + assert result["all_signals"][0]["question_type"] == "relevanz" + assert result["all_signals"][1]["question_type"] == "prioritaet" + assert result["all_signals"][2]["question_type"] == "selektion" + + +def test_aggregate_results_empty(): + """Test: Aggregation mit leerer node_states Liste""" + result = aggregate_results([]) + + assert result["combined_analysis"] == "" + assert result["all_signals"] == [] + assert result["total_nodes"] == 0 + assert result["executed_nodes"] == 0 + assert result["failed_nodes"] == 0 + + +def test_aggregate_results_no_analysis_core(): + """Test: Aggregation mit nodes ohne analysis_core""" + states = [ + NodeExecutionState( + node_id="start", + status=NodeStatus.EXECUTED, + started_at="2026-04-03T12:00:00", + completed_at="2026-04-03T12:00:01" + ) + ] + + result = aggregate_results(states) + + assert result["combined_analysis"] == "" + assert result["executed_nodes"] == 1 + + +def test_aggregate_results_formatting(): + """Test: Formatierung der combined_analysis""" + states = [ + NodeExecutionState( + node_id="node1", + status=NodeStatus.EXECUTED, + analysis_core="First analysis", + started_at="2026-04-03T12:00:00", + completed_at="2026-04-03T12:00:01" + ), + NodeExecutionState( + node_id="node2", + status=NodeStatus.EXECUTED, + analysis_core="Second analysis", + started_at="2026-04-03T12:00:01", + completed_at="2026-04-03T12:00:02" + ) + ] + + result = aggregate_results(states) + + # Prüfe Format: ## node_id\nanalysis_core\n\n## node_id\nanalysis_core + assert result["combined_analysis"].startswith("## node1\nFirst analysis") + assert "## node2\nSecond analysis" in result["combined_analysis"] + assert "\n\n" in result["combined_analysis"] # Separator zwischen Knoten + + +# ── Integration-ähnliche Tests (ohne echte DB/LLM) ───────────────────────────── + +@pytest.mark.asyncio +async def test_execute_node_start_end(): + """Test: Start/End Nodes sind No-Ops""" + from workflow_executor import execute_node + from workflow_models import WorkflowNode + + start_node = WorkflowNode(id="start", type="start") + end_node = WorkflowNode(id="end", type="end") + + context = {"variables": {}, "profile_id": "test"} + catalog = {} + + async def mock_llm(prompt, model): + return "should not be called" + + # Test start + result = await execute_node(start_node, context, catalog, mock_llm) + assert result.status == NodeStatus.EXECUTED + assert result.analysis_core is None + + # Test end + result = await execute_node(end_node, context, catalog, mock_llm) + assert result.status == NodeStatus.EXECUTED + assert result.analysis_core is None + + +@pytest.mark.asyncio +async def test_execute_node_unknown_type(): + """Test: Unbekannter Node-Typ wirft Fehler""" + from workflow_executor import execute_node + from workflow_models import WorkflowNode + + # Phase 2 unterstützt nur start, end, analysis + logic_node = WorkflowNode(id="logic1", type="logic") + + context = {"variables": {}, "profile_id": "test"} + catalog = {} + + async def mock_llm(prompt, model): + return "" + + result = await execute_node(logic_node, context, catalog, mock_llm) + + # Sollte FAILED sein mit Fehlermeldung + assert result.status == NodeStatus.FAILED + assert "not implemented in Phase 2" in result.error + + +@pytest.mark.asyncio +async def test_execute_node_analysis_simple(): + """Test: Analysis Node ohne Fragenergänzung""" + from workflow_executor import execute_node + from workflow_models import WorkflowNode + + node = WorkflowNode( + id="test_node", + type="analysis", + prompt_slug="test_prompt", + question_augmentations=None + ) + + context = {"variables": {"name": "Test"}, "profile_id": "test"} + catalog = {} + + # Mock LLM + async def mock_llm(prompt, model): + return "## Analyse\nTest analysis content" + + # Mock load_prompt_template + with patch('workflow_executor.load_prompt_template') as mock_load: + mock_load.return_value = "Test prompt for {{name}}" + + result = await execute_node(node, context, catalog, mock_llm) + + assert result.status == NodeStatus.EXECUTED + assert result.analysis_core == "Test analysis content" + assert len(result.normalized_signals) == 0 # Keine Fragen + + +@pytest.mark.asyncio +async def test_execute_node_analysis_with_questions(): + """Test: Analysis Node mit Fragenergänzung und Normalisierung""" + from workflow_executor import execute_node + from workflow_models import WorkflowNode, QuestionAugmentation + + node = WorkflowNode( + id="test_node", + type="analysis", + prompt_slug="test_prompt", + question_augmentations=[ + QuestionAugmentation( + id="q1", + type="relevanz", + question="Ist relevant?", + answer_spectrum=["ja", "nein", "unklar"] + ) + ] + ) + + context = {"variables": {}, "profile_id": "test"} + catalog = { + "relevanz": { + "answer_spectrum": ["ja", "nein", "unklar"], + "normalization_rules": None + } + } + + # Mock LLM + async def mock_llm(prompt, model): + # LLM antwortet mit Fragenergänzung + return """## Analyse +Test analysis + +## Entscheidungsfragen +- Relevanz: ja +""" + + # Mock load_prompt_template + with patch('workflow_executor.load_prompt_template') as mock_load: + mock_load.return_value = "Base prompt" + + result = await execute_node(node, context, catalog, mock_llm) + + assert result.status == NodeStatus.EXECUTED + assert result.analysis_core == "Test analysis" + assert len(result.normalized_signals) == 1 + assert result.normalized_signals[0].question_type == "relevanz" + assert result.normalized_signals[0].normalized_value == "ja" + assert result.normalized_signals[0].status == SignalStatus.VALID + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])