diff --git a/backend/workflow_executor.py b/backend/workflow_executor.py index 2239804..16a1482 100644 --- a/backend/workflow_executor.py +++ b/backend/workflow_executor.py @@ -281,13 +281,21 @@ async def execute_node( prompt_template = await load_prompt_template(node.prompt_slug, context) logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'") - # 2. Parse question_augmentations + # 2. Parse question_augmentations (Hybrid Model) questions = [] if node.question_augmentations: - # Convert list of dicts to JSONB-like format for parser + # Node-specific questions (override base prompt questions) questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations] questions = parse_question_augmentations_from_jsonb(questions_jsonb) - logger.debug(f"Node {node.id}: {len(questions)} question augmentations") + logger.debug(f"Node {node.id}: {len(questions)} node-specific questions") + else: + # Fallback: Load questions from base prompt (Hybrid Model) + base_questions = await load_prompt_questions(node.prompt_slug) + if base_questions: + questions = parse_question_augmentations_from_jsonb(base_questions) + logger.debug(f"Node {node.id}: {len(questions)} questions from base prompt '{node.prompt_slug}'") + else: + logger.debug(f"Node {node.id}: No questions (neither node-specific nor base prompt)") # 3. Augment Prompt if questions: @@ -295,8 +303,10 @@ async def execute_node( base_prompt=prompt_template, questions=questions ) + logger.debug(f"Node {node.id}: Augmented prompt with {len(questions)} questions") else: augmented_prompt = prompt_template + logger.debug(f"Node {node.id}: No augmentation (no questions)") # 4. LLM Call logger.debug(f"Node {node.id}: Calling LLM") @@ -312,16 +322,17 @@ async def execute_node( # 6. Normalize Signals normalized_signals = [] if parsed["decision_signals"]: - # Hybrid Model: Node-spezifische Questions überschreiben Catalog + # Hybrid Model: Questions (node-specific or base prompt) override Catalog node_catalog = catalog.copy() - if node.question_augmentations: - for q in node.question_augmentations: + if questions: + for q in questions: q_dict = q.model_dump() if hasattr(q, 'model_dump') else q node_catalog[q_dict['type']] = { "answer_spectrum": q_dict['answer_spectrum'], - "normalization_rules": None # Node-Questions haben keine Synonyme + "normalization_rules": None # Questions haben keine Synonyme } - logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with node-specific spectrum") + source = "node-specific" if node.question_augmentations else "base prompt" + logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with {source} spectrum") normalized_signals = normalize_all_signals( decision_signals=parsed["decision_signals"], @@ -816,6 +827,52 @@ async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str return resolved +async def load_prompt_questions(prompt_slug: str) -> List[Dict]: + """ + Lädt Fragen aus einem Basis-Prompt (Hybrid Model - Fallback). + + Wenn ein Analysis Node KEINE node-spezifischen Fragen hat, + werden die Fragen aus dem referenzierten Basis-Prompt geladen. + + Args: + prompt_slug: Slug des Prompts (z.B. "pipeline_body") + + Returns: + Liste von Question-Dicts im format: + [ + { + "id": "q1", + "type": "relevanz", + "question": "Ist eine vertiefte Analyse relevant?", + "answer_spectrum": ["ja", "nein", "unklar"] + }, + ... + ] + + Beispiel: + >>> questions = await load_prompt_questions("pipeline_body") + >>> len(questions) > 0 + True + """ + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + "SELECT questions FROM ai_prompts WHERE slug = %s AND active = true", + (prompt_slug,) + ) + row = cur.fetchone() + if not row or not row.get('questions'): + return [] + + questions = row['questions'] + # PostgreSQL JSONB wird automatisch zu Python list/dict konvertiert + if isinstance(questions, list): + return questions + else: + logger.warning(f"Unexpected questions format for {prompt_slug}: {type(questions)}") + return [] + + def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]: """ Aggregiert Ergebnisse aller Knoten.