Universal CSV Importer #70

Merged
Lars merged 54 commits from develop into main 2026-04-11 07:06:47 +02:00
2 changed files with 60 additions and 17 deletions
Showing only changes of commit 76b4b36617 - Show all commits

View File

@ -114,7 +114,8 @@ async def execute_workflow(
"variables": variables,
"profile_id": profile_id,
"node_results": {}, # Phase 3: Store full NodeExecutionState
"active_edges": {} # Phase 3: Track edge activation
"active_edges": {}, # Phase 3: Track edge activation
"graph": graph # Phase 4: Needed for End Node to access questions
}
# Phase 3: BFS traversal mit edge activation
@ -168,7 +169,7 @@ async def execute_workflow(
queue.append(edge.to_node)
# 6. Aggregiere Ergebnisse
aggregated = aggregate_results(node_states)
aggregated = aggregate_results(node_states, graph)
# 7. Speichere Execution State
completed_at = datetime.utcnow().isoformat()
@ -590,18 +591,37 @@ def execute_end_node(
if not node.template:
raise ValueError(f"End node {node.id} has output_mode=TEMPLATE but no template defined")
# Build template context: {{node_id}} → {analysis_core, decision_signals, status}
# Build template context: {{node_id}} → {analysis_core, signal_X, question_X, status}
template_context = {}
graph = context.get("graph") # WorkflowGraph object
for node_id, node_state in context.get("node_results", {}).items():
template_context[node_id] = {
node_context = {
"analysis_core": node_state.analysis_core or "",
"decision_signals": node_state.decision_signals or {},
"reasoning_anchors": node_state.reasoning_anchors or "",
"status": node_state.status.value if node_state.status else "unknown",
# Add individual signal access: {{node_id.signal_name}}
**node_state.decision_signals # Flatten signals into node context
}
# Add normalized signals as {{node_id.signal_X}}
if node_state.normalized_signals:
for signal in node_state.normalized_signals:
# Convert NormalizedSignal object to dict if needed
signal_dict = signal.model_dump() if hasattr(signal, 'model_dump') else signal
signal_key = f"signal_{signal_dict['question_type']}"
node_context[signal_key] = signal_dict['normalized_value'] or signal_dict['raw_value']
# Add question texts as {{node_id.question_X}}
if graph:
workflow_node = next((n for n in graph.nodes if n.id == node_id), None)
if workflow_node and workflow_node.question_augmentations:
for q in workflow_node.question_augmentations:
q_dict = q.model_dump() if hasattr(q, 'model_dump') else q
question_key = f"question_{q_dict['type']}"
node_context[question_key] = q_dict.get('question', '')
template_context[node_id] = node_context
logger.debug(f"End node {node.id}: Built template context for {len(template_context)} nodes")
# Render template
@ -818,16 +838,20 @@ async def load_prompt_template(prompt_slug: str, context: Dict[str, Any]) -> str
def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
def aggregate_results(node_states: List[NodeExecutionState], graph) -> Dict[str, Any]:
"""
Aggregiert Ergebnisse aller Knoten.
Phase 4: Wenn End Node existiert, verwende NUR dessen Output.
Sonst (backward compatible): Kombiniere alle analysis_core Werte.
Args:
node_states: Liste aller NodeExecutionState
graph: WorkflowGraph object (to identify End Nodes)
Returns:
{
"combined_analysis": "## node_1\n...\n\n## node_2\n...",
"analysis_core": "Final output (from End Node or combined)",
"all_signals": [{question_type, normalized_value, status}, ...],
"total_nodes": 3,
"executed_nodes": 3,
@ -837,26 +861,45 @@ def aggregate_results(node_states: List[NodeExecutionState]) -> Dict[str, Any]:
Beispiel:
>>> states = [
... NodeExecutionState(node_id="n1", status=NodeStatus.EXECUTED, analysis_core="Test 1"),
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2")
... NodeExecutionState(node_id="n2", status=NodeStatus.EXECUTED, analysis_core="Test 2"),
... NodeExecutionState(node_id="end", status=NodeStatus.EXECUTED, analysis_core="Final")
... ]
>>> result = aggregate_results(states)
>>> "## n1" in result["combined_analysis"]
True
>>> result["executed_nodes"]
2
>>> result = aggregate_results(states, graph)
>>> result["analysis_core"]
"Final"
"""
# Find End Node in graph
end_node_ids = [n.id for n in graph.nodes if n.type == "end"]
combined_analysis = []
all_signals = []
final_output = None
for state in node_states:
# Check if this is an End Node
is_end_node = state.node_id in end_node_ids
if state.status == NodeStatus.EXECUTED and state.analysis_core:
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
if is_end_node:
# End Node output is the final output (don't add to combined_analysis)
final_output = state.analysis_core
else:
# Regular node - add to combined analysis
combined_analysis.append(f"## {state.node_id}\n{state.analysis_core}")
if state.normalized_signals:
all_signals.extend([s.model_dump() for s in state.normalized_signals])
# Phase 4: If End Node exists, use its output ONLY
# Backward compatible: If no End Node, use combined_analysis
if final_output:
primary_output = final_output
else:
primary_output = "\n\n".join(combined_analysis)
return {
"combined_analysis": "\n\n".join(combined_analysis),
"analysis_core": primary_output,
"combined_analysis": "\n\n".join(combined_analysis), # Keep for debugging (excludes End Node)
"all_signals": all_signals,
"total_nodes": len(node_states),
"executed_nodes": sum(1 for s in node_states if s.status == NodeStatus.EXECUTED),

View File

@ -149,7 +149,7 @@ export function WorkflowResultViewer({ result, onClose }) {
overflowY: 'auto'
}}
>
{aggregated.analysis_core || aggregated.combined_analysis || '(Kein Output)'}
{aggregated.analysis_core || '(Kein Output)'}
</div>
</div>