mindnet/app/core/retrieval/decision_engine.py

217 lines
8.9 KiB
Python

"""
FILE: app/core/retrieval/decision_engine.py
DESCRIPTION: Der Agentic Orchestrator für WP-25.
Realisiert Multi-Stream Retrieval, Intent-basiertes Routing
und parallele Wissens-Synthese.
VERSION: 1.0.2
STATUS: Active
FIX:
- WP-25 ROBUSTNESS: Pre-Initialization aller Stream-Variablen zur Vermeidung von KeyErrors.
- Behebung von Template-Mismatches bei unvollständigen Strategie-Definitionen.
- Erhalt der Sicherheitskaskade für die Strategiewahl.
"""
import asyncio
import logging
import yaml
import os
from typing import List, Dict, Any, Optional
# Core & Service Imports
from app.models.dto import QueryRequest, QueryResponse
from app.core.retrieval.retriever import Retriever
from app.services.llm_service import LLMService
from app.config import get_settings
logger = logging.getLogger(__name__)
class DecisionEngine:
def __init__(self):
"""Initialisiert die Engine und lädt die modularen Konfigurationen."""
self.settings = get_settings()
self.retriever = Retriever()
self.llm_service = LLMService()
self.config = self._load_engine_config()
def _load_engine_config(self) -> Dict[str, Any]:
"""Lädt die Multi-Stream Konfiguration (WP-25)."""
path = os.getenv("MINDNET_DECISION_CONFIG", "config/decision_engine.yaml")
if not os.path.exists(path):
logger.error(f"❌ Decision Engine Config not found at {path}")
return {"strategies": {}}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"❌ Failed to load decision_engine.yaml: {e}")
return {"strategies": {}}
async def ask(self, query: str) -> str:
"""
Hauptmethode des MindNet Chats.
Orchestriert den gesamten Prozess: Routing -> Retrieval -> Synthese.
"""
# 1. Intent Recognition (Welches Werkzeug brauchen wir?)
strategy_key = await self._determine_strategy(query)
# Sicherheits-Kaskade für die Strategiewahl
strategies = self.config.get("strategies", {})
strategy = strategies.get(strategy_key)
if not strategy:
logger.warning(f"⚠️ Unknown strategy '{strategy_key}'. Attempting fallback to FACT_WHAT.")
strategy_key = "FACT_WHAT"
strategy = strategies.get("FACT_WHAT")
# WP-25 FIX: Wenn FACT_WHAT ebenfalls fehlt, wähle die erste verfügbare Strategie
if not strategy and strategies:
strategy_key = next(iter(strategies))
strategy = strategies[strategy_key]
logger.warning(f"⚠️ 'FACT_WHAT' missing in config. Using first available: {strategy_key}")
# Letzte Rettung: Falls gar keine Strategien definiert sind
if not strategy:
logger.error("❌ CRITICAL: No strategies defined in decision_engine.yaml!")
return "Entschuldigung, meine Wissensbasis ist aktuell nicht konfiguriert."
# 2. Multi-Stream Retrieval (Wissen parallel sammeln)
stream_results = await self._execute_parallel_streams(strategy, query)
# 3. Synthese (Ergebnisse zu einer Antwort verweben)
return await self._generate_final_answer(strategy_key, strategy, query, stream_results)
async def _determine_strategy(self, query: str) -> str:
"""Nutzt den LLM-Router zur dynamischen Wahl der Such-Strategie."""
prompt_key = self.config.get("settings", {}).get("router_prompt_key", "intent_router_v1")
router_prompt_template = self.llm_service.get_prompt(prompt_key)
if not router_prompt_template:
return "FACT_WHAT"
full_prompt = router_prompt_template.format(query=query)
try:
response = await self.llm_service.generate_raw_response(
full_prompt,
max_retries=1,
priority="realtime"
)
return str(response).strip().upper()
except Exception as e:
logger.error(f"Strategy Routing failed: {e}")
return "FACT_WHAT"
async def _execute_parallel_streams(self, strategy: Dict, query: str) -> Dict[str, str]:
"""Führt alle in der Strategie definierten Such-Streams gleichzeitig aus."""
stream_keys = strategy.get("use_streams", [])
library = self.config.get("streams_library", {})
tasks = []
active_streams = []
for key in stream_keys:
stream_cfg = library.get(key)
if stream_cfg:
active_streams.append(key)
tasks.append(self._run_single_stream(key, stream_cfg, query))
results = await asyncio.gather(*tasks, return_exceptions=True)
mapped_results = {}
for name, res in zip(active_streams, results):
if isinstance(res, Exception):
logger.error(f"Stream '{name}' failed: {res}")
mapped_results[name] = "[Fehler beim Abruf dieses Wissens-Streams]"
else:
mapped_results[name] = self._format_stream_context(res)
return mapped_results
async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse:
"""Bereitet eine spezialisierte Suche für einen Stream vor und führt sie aus."""
transformed_query = cfg.get("query_template", "{query}").format(query=query)
request = QueryRequest(
query=transformed_query,
top_k=cfg.get("top_k", 5),
filters={"type": cfg.get("filter_types", [])},
expand={"depth": 1},
boost_edges=cfg.get("edge_boosts", {}),
explain=True
)
return await self.retriever.search(request)
def _format_stream_context(self, response: QueryResponse) -> str:
"""Wandelt QueryHits in einen kompakten String für das LLM um."""
if not response.results:
return "Keine spezifischen Informationen in diesem Stream gefunden."
lines = []
for i, hit in enumerate(response.results, 1):
source = hit.source.get("path", "Unbekannt")
content = hit.source.get("text", "").strip()
lines.append(f"[{i}] QUELLE: {source}\nINHALT: {content}")
return "\n\n".join(lines)
async def _generate_final_answer(
self,
strategy_key: str,
strategy: Dict,
query: str,
stream_results: Dict[str, str]
) -> str:
"""Führt die Multi-Stream Synthese durch."""
provider = strategy.get("preferred_provider") or self.settings.MINDNET_LLM_PROVIDER
template_key = strategy.get("prompt_template", "rag_template")
template = self.llm_service.get_prompt(template_key, provider=provider)
system_prompt = self.llm_service.get_prompt("system_prompt", provider=provider)
# WP-25 ROBUSTNESS FIX:
# Wir stellen sicher, dass alle Variablen, die im Template vorkommen könnten,
# zumindest mit einem leeren String initialisiert sind.
all_possible_streams = ["values_stream", "facts_stream", "biography_stream", "risk_stream", "tech_stream"]
template_vars = {s: "" for s in all_possible_streams}
# Überschreiben mit tatsächlichen Ergebnissen
template_vars.update(stream_results)
template_vars["query"] = query
prepend = strategy.get("prepend_instruction", "")
try:
# Sicherheitscheck: Sind alle benötigten Platzhalter im Template vorhanden?
final_prompt = template.format(**template_vars)
if prepend:
final_prompt = f"{prepend}\n\n{final_prompt}"
response = await self.llm_service.generate_raw_response(
final_prompt,
system=system_prompt,
provider=provider,
priority="realtime"
)
if not response or len(response.strip()) < 5:
return await self.llm_service.generate_raw_response(
final_prompt,
system=system_prompt,
provider="ollama",
priority="realtime"
)
return response
except KeyError as e:
logger.error(f"Template Variable mismatch in '{template_key}': Missing {e}")
# Fallback: Einfaches Aneinanderreihen der gefundenen Stream-Inhalte
fallback_context = "\n\n".join([v for v in stream_results.values() if v])
return await self.llm_service.generate_raw_response(
f"Beantworte: {query}\n\nKontext:\n{fallback_context}",
system=system_prompt,
priority="realtime"
)
except Exception as e:
logger.error(f"Final Synthesis failed: {e}")
return "Ich konnte keine Antwort generieren."