Universal CSV Importer #70
|
|
@ -281,13 +281,21 @@ async def execute_node(
|
||||||
prompt_template = await load_prompt_template(node.prompt_slug, context)
|
prompt_template = await load_prompt_template(node.prompt_slug, context)
|
||||||
logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'")
|
logger.debug(f"Node {node.id}: Loaded prompt '{node.prompt_slug}'")
|
||||||
|
|
||||||
# 2. Parse question_augmentations
|
# 2. Parse question_augmentations (Hybrid Model)
|
||||||
questions = []
|
questions = []
|
||||||
if node.question_augmentations:
|
if node.question_augmentations:
|
||||||
# Convert list of dicts to JSONB-like format for parser
|
# Node-specific questions (override base prompt questions)
|
||||||
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
|
questions_jsonb = [q.model_dump() if hasattr(q, 'model_dump') else q for q in node.question_augmentations]
|
||||||
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
|
questions = parse_question_augmentations_from_jsonb(questions_jsonb)
|
||||||
logger.debug(f"Node {node.id}: {len(questions)} question augmentations")
|
logger.debug(f"Node {node.id}: {len(questions)} node-specific questions")
|
||||||
|
else:
|
||||||
|
# Fallback: Load questions from base prompt (Hybrid Model)
|
||||||
|
base_questions = await load_prompt_questions(node.prompt_slug)
|
||||||
|
if base_questions:
|
||||||
|
questions = parse_question_augmentations_from_jsonb(base_questions)
|
||||||
|
logger.debug(f"Node {node.id}: {len(questions)} questions from base prompt '{node.prompt_slug}'")
|
||||||
|
else:
|
||||||
|
logger.debug(f"Node {node.id}: No questions (neither node-specific nor base prompt)")
|
||||||
|
|
||||||
# 3. Augment Prompt
|
# 3. Augment Prompt
|
||||||
if questions:
|
if questions:
|
||||||
|
|
@ -295,8 +303,10 @@ async def execute_node(
|
||||||
base_prompt=prompt_template,
|
base_prompt=prompt_template,
|
||||||
questions=questions
|
questions=questions
|
||||||
)
|
)
|
||||||
|
logger.debug(f"Node {node.id}: Augmented prompt with {len(questions)} questions")
|
||||||
else:
|
else:
|
||||||
augmented_prompt = prompt_template
|
augmented_prompt = prompt_template
|
||||||
|
logger.debug(f"Node {node.id}: No augmentation (no questions)")
|
||||||
|
|
||||||
# 4. LLM Call
|
# 4. LLM Call
|
||||||
logger.debug(f"Node {node.id}: Calling LLM")
|
logger.debug(f"Node {node.id}: Calling LLM")
|
||||||
|
|
@ -312,16 +322,17 @@ async def execute_node(
|
||||||
# 6. Normalize Signals
|
# 6. Normalize Signals
|
||||||
normalized_signals = []
|
normalized_signals = []
|
||||||
if parsed["decision_signals"]:
|
if parsed["decision_signals"]:
|
||||||
# Hybrid Model: Node-spezifische Questions überschreiben Catalog
|
# Hybrid Model: Questions (node-specific or base prompt) override Catalog
|
||||||
node_catalog = catalog.copy()
|
node_catalog = catalog.copy()
|
||||||
if node.question_augmentations:
|
if questions:
|
||||||
for q in node.question_augmentations:
|
for q in questions:
|
||||||
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
|
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
|
||||||
node_catalog[q_dict['type']] = {
|
node_catalog[q_dict['type']] = {
|
||||||
"answer_spectrum": q_dict['answer_spectrum'],
|
"answer_spectrum": q_dict['answer_spectrum'],
|
||||||
"normalization_rules": None # Node-Questions haben keine Synonyme
|
"normalization_rules": None # Questions haben keine Synonyme
|
||||||
}
|
}
|
||||||
logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with node-specific spectrum")
|
source = "node-specific" if node.question_augmentations else "base prompt"
|
||||||
|
logger.debug(f"Node {node.id}: Override catalog for '{q_dict['type']}' with {source} spectrum")
|
||||||
|
|
||||||
normalized_signals = normalize_all_signals(
|
normalized_signals = normalize_all_signals(
|
||||||
decision_signals=parsed["decision_signals"],
|
decision_signals=parsed["decision_signals"],
|
||||||
|
|
@ -816,6 +827,52 @@ async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str
|
||||||
return resolved
|
return resolved
|
||||||
|
|
||||||
|
|
||||||
|
async def load_prompt_questions(prompt_slug: str) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
Lädt Fragen aus einem Basis-Prompt (Hybrid Model - Fallback).
|
||||||
|
|
||||||
|
Wenn ein Analysis Node KEINE node-spezifischen Fragen hat,
|
||||||
|
werden die Fragen aus dem referenzierten Basis-Prompt geladen.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
prompt_slug: Slug des Prompts (z.B. "pipeline_body")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Liste von Question-Dicts im format:
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": "q1",
|
||||||
|
"type": "relevanz",
|
||||||
|
"question": "Ist eine vertiefte Analyse relevant?",
|
||||||
|
"answer_spectrum": ["ja", "nein", "unklar"]
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Beispiel:
|
||||||
|
>>> questions = await load_prompt_questions("pipeline_body")
|
||||||
|
>>> len(questions) > 0
|
||||||
|
True
|
||||||
|
"""
|
||||||
|
with get_db() as conn:
|
||||||
|
cur = get_cursor(conn)
|
||||||
|
cur.execute(
|
||||||
|
"SELECT questions FROM ai_prompts WHERE slug = %s AND active = true",
|
||||||
|
(prompt_slug,)
|
||||||
|
)
|
||||||
|
row = cur.fetchone()
|
||||||
|
if not row or not row.get('questions'):
|
||||||
|
return []
|
||||||
|
|
||||||
|
questions = row['questions']
|
||||||
|
# PostgreSQL JSONB wird automatisch zu Python list/dict konvertiert
|
||||||
|
if isinstance(questions, list):
|
||||||
|
return questions
|
||||||
|
else:
|
||||||
|
logger.warning(f"Unexpected questions format for {prompt_slug}: {type(questions)}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
|
def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Aggregiert Ergebnisse aller Knoten.
|
Aggregiert Ergebnisse aller Knoten.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user