mindnet/app/core/retrieval/decision_engine.py

245 lines
11 KiB
Python

"""
FILE: app/core/retrieval/decision_engine.py
DESCRIPTION: Der Agentic Orchestrator für MindNet (WP-25b Edition).
Realisiert Multi-Stream Retrieval, Intent-basiertes Routing
und die neue Lazy-Prompt Orchestrierung (Module A & B).
VERSION: 1.3.1 (WP-25b: Robust Intent Cleaning & Lazy Loading)
STATUS: Active
FIX:
- WP-25b: Robuste Bereinigung von Intent-Strings (Fix: CODING[/S] -> CODING).
- WP-25b: Umstellung auf Lazy-Loading (Übergabe von prompt_key + variables).
- WP-25a: Voller Erhalt der Profil-Kaskade via LLMService v3.5.5.
- WP-25: Beibehaltung von Stream-Tracing, Edge-Boosts und Pre-Initialization.
"""
import asyncio
import logging
import yaml
import os
from typing import List, Dict, Any, Optional
# Core & Service Imports
from app.models.dto import QueryRequest, QueryResponse
from app.core.retrieval.retriever import Retriever
from app.services.llm_service import LLMService
from app.config import get_settings
logger = logging.getLogger(__name__)
class DecisionEngine:
def __init__(self):
"""Initialisiert die Engine und lädt die modularen Konfigurationen."""
self.settings = get_settings()
self.retriever = Retriever()
self.llm_service = LLMService()
self.config = self._load_engine_config()
def _load_engine_config(self) -> Dict[str, Any]:
"""Lädt die Multi-Stream Konfiguration (WP-25/25a)."""
path = os.getenv("MINDNET_DECISION_CONFIG", "config/decision_engine.yaml")
if not os.path.exists(path):
logger.error(f"❌ Decision Engine Config not found at {path}")
return {"strategies": {}}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"❌ Failed to load decision_engine.yaml: {e}")
return {"strategies": {}}
async def ask(self, query: str) -> str:
"""
Hauptmethode des MindNet Chats.
Orchestriert den agentischen Prozess: Routing -> Retrieval -> Kompression -> Synthese.
"""
# 1. Intent Recognition (Strategy Routing)
strategy_key = await self._determine_strategy(query)
strategies = self.config.get("strategies", {})
strategy = strategies.get(strategy_key)
if not strategy:
logger.warning(f"⚠️ Unknown strategy '{strategy_key}'. Fallback to FACT_WHAT.")
strategy_key = "FACT_WHAT"
strategy = strategies.get("FACT_WHAT")
if not strategy and strategies:
strategy_key = next(iter(strategies))
strategy = strategies[strategy_key]
if not strategy:
return "Entschuldigung, meine Wissensbasis ist aktuell nicht konfiguriert."
# 2. Multi-Stream Retrieval & Pre-Synthesis (Parallel Tasks inkl. Kompression)
stream_results = await self._execute_parallel_streams(strategy, query)
# 3. Finale Synthese
return await self._generate_final_answer(strategy_key, strategy, query, stream_results)
async def _determine_strategy(self, query: str) -> str:
"""WP-25b: Nutzt den LLM-Router via Lazy-Loading und bereinigt Modell-Artefakte."""
settings_cfg = self.config.get("settings", {})
prompt_key = settings_cfg.get("router_prompt_key", "intent_router_v1")
router_profile = settings_cfg.get("router_profile")
try:
# WP-25b: Delegation an LLMService ohne manuelle Vor-Formatierung.
response = await self.llm_service.generate_raw_response(
prompt_key=prompt_key,
variables={"query": query},
max_retries=1,
priority="realtime",
profile_name=router_profile
)
# WP-25b FIX: Bereinigung von Stop-Markern wie [/S] oder </s>
raw_intent = str(response).replace("[/S]", "").replace("</s>", "").strip().upper()
# Robustheit: Nur das erste Wort nehmen, falls das Modell zu viel plaudert
intent = raw_intent.split()[0] if raw_intent else "FACT_WHAT"
# Validierung gegen bekannte Strategien aus der decision_engine.yaml
known_strategies = self.config.get("strategies", {}).keys()
if intent not in known_strategies:
logger.warning(f"⚠️ Unmapped intent '{intent}' from router. Falling back.")
return "FACT_WHAT"
return intent
except Exception as e:
logger.error(f"Strategy Routing failed: {e}")
return "FACT_WHAT"
async def _execute_parallel_streams(self, strategy: Dict, query: str) -> Dict[str, str]:
"""
Führt Such-Streams aus und komprimiert überlange Ergebnisse (Pre-Synthesis).
WP-25b: Unterstützt Lazy-Compression über Experten-Profile.
"""
stream_keys = strategy.get("use_streams", [])
library = self.config.get("streams_library", {})
# Phase 1: Retrieval Tasks starten
retrieval_tasks = []
active_streams = []
for key in stream_keys:
stream_cfg = library.get(key)
if stream_cfg:
active_streams.append(key)
retrieval_tasks.append(self._run_single_stream(key, stream_cfg, query))
# Ergebnisse sammeln
retrieval_results = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
# Phase 2: Formatierung und optionale Kompression
final_stream_tasks = []
for name, res in zip(active_streams, retrieval_results):
if isinstance(res, Exception):
logger.error(f"Stream '{name}' failed during retrieval: {res}")
async def _err(): return "[Fehler beim Abruf dieses Wissens-Streams]"
final_stream_tasks.append(_err())
continue
# Formatierung der Hits in Text
formatted_context = self._format_stream_context(res)
# WP-25a: Kompressions-Check (Inhaltsverdichtung)
stream_cfg = library.get(name, {})
threshold = stream_cfg.get("compression_threshold", 4000)
if len(formatted_context) > threshold:
logger.info(f"⚙️ [WP-25b] Triggering Lazy-Compression for stream '{name}'...")
comp_profile = stream_cfg.get("compression_profile")
final_stream_tasks.append(
self._compress_stream_content(name, formatted_context, query, comp_profile)
)
else:
async def _direct(c=formatted_context): return c
final_stream_tasks.append(_direct())
# Finale Inhalte parallel fertigstellen
final_contents = await asyncio.gather(*final_stream_tasks)
return dict(zip(active_streams, final_contents))
async def _compress_stream_content(self, stream_name: str, content: str, query: str, profile: Optional[str]) -> str:
"""WP-25b Module A: Inhaltsverdichtung via Lazy-Loading 'compression_template'."""
try:
# WP-25b: Delegation der Inhaltsverdichtung an den LLMService.
summary = await self.llm_service.generate_raw_response(
prompt_key="compression_template",
variables={
"stream_name": stream_name,
"content": content,
"query": query
},
profile_name=profile,
priority="background",
max_retries=1
)
return summary.strip() if (summary and len(summary.strip()) > 10) else content
except Exception as e:
logger.error(f"❌ Compression of {stream_name} failed: {e}")
return content
async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse:
"""Spezialisierte Graph-Suche mit Stream-Tracing und Edge-Boosts (WP-25)."""
transformed_query = cfg.get("query_template", "{query}").format(query=query)
request = QueryRequest(
query=transformed_query,
top_k=cfg.get("top_k", 5),
filters={"type": cfg.get("filter_types", [])},
expand={"depth": 1},
boost_edges=cfg.get("edge_boosts", {}), # WP-25a Erhalt
explain=True
)
response = await self.retriever.search(request)
for hit in response.results:
hit.stream_origin = name
return response
def _format_stream_context(self, response: QueryResponse) -> str:
"""Wandelt QueryHits in einen formatierten Kontext-String um."""
if not response.results:
return "Keine spezifischen Informationen in diesem Stream gefunden."
lines = []
for i, hit in enumerate(response.results, 1):
source = hit.source.get("path", "Unbekannt")
content = hit.source.get("text", "").strip()
lines.append(f"[{i}] QUELLE: {source}\nINHALT: {content}")
return "\n\n".join(lines)
async def _generate_final_answer(
self,
strategy_key: str,
strategy: Dict,
query: str,
stream_results: Dict[str, str]
) -> str:
"""WP-25b: Finale Synthese via Lazy-Prompt Orchestrierung."""
profile = strategy.get("llm_profile")
# Nutzt den Key aus der YAML oder 'fact_synthesis_v1' als sicheren Default
template_key = strategy.get("prompt_template", "fact_synthesis_v1")
system_prompt = self.llm_service.get_prompt("system_prompt")
# WP-25 ROBUSTNESS: Pre-Initialization der Variablen
all_possible_streams = ["values_stream", "facts_stream", "biography_stream", "risk_stream", "tech_stream"]
template_vars = {s: "" for s in all_possible_streams}
template_vars.update(stream_results)
template_vars["query"] = query
# WP-25a Erhalt: Optionale Prepend-Anweisung
template_vars["prepend_instruction"] = strategy.get("prepend_instruction", "")
# WP-25b: Delegation der Synthese an den LLMService.
# Formatierung erfolgt erst nach Profil-Auflösung (Gemini vs. Llama vs. Qwen).
try:
return await self.llm_service.generate_raw_response(
prompt_key=template_key,
variables=template_vars,
system=system_prompt,
profile_name=profile,
priority="realtime"
)
except Exception as e:
logger.error(f"Final Synthesis failed: {e}")
return "Ich konnte keine Antwort generieren."