261 lines
12 KiB
Python
261 lines
12 KiB
Python
"""
|
||
FILE: app/core/retrieval/decision_engine.py
|
||
DESCRIPTION: Der Agentic Orchestrator für MindNet (WP-25b Edition).
|
||
Realisiert Multi-Stream Retrieval, Intent-basiertes Routing
|
||
und die neue Lazy-Prompt Orchestrierung (Module A & B).
|
||
VERSION: 1.3.2 (WP-25b: Full Robustness Recovery & Regex Parsing)
|
||
STATUS: Active
|
||
FIX:
|
||
- WP-25b: ULTRA-Robustes Intent-Parsing via Regex (Fix: 'CODING[/S]' -> 'CODING').
|
||
- WP-25b: Wiederherstellung der prepend_instruction Logik via variables.
|
||
- WP-25a: Voller Erhalt der Profil-Kaskade via LLMService v3.5.5.
|
||
- WP-25: Beibehaltung von Stream-Tracing, Edge-Boosts und Pre-Initialization.
|
||
- RECOVERY: Wiederherstellung der lokalen Sicherheits-Gates aus v1.2.1.
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import yaml
|
||
import os
|
||
import re # Neu für robustes Intent-Parsing
|
||
from typing import List, Dict, Any, Optional
|
||
|
||
# Core & Service Imports
|
||
from app.models.dto import QueryRequest, QueryResponse
|
||
from app.core.retrieval.retriever import Retriever
|
||
from app.services.llm_service import LLMService
|
||
from app.config import get_settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class DecisionEngine:
|
||
def __init__(self):
|
||
"""Initialisiert die Engine und lädt die modularen Konfigurationen."""
|
||
self.settings = get_settings()
|
||
self.retriever = Retriever()
|
||
self.llm_service = LLMService()
|
||
self.config = self._load_engine_config()
|
||
|
||
def _load_engine_config(self) -> Dict[str, Any]:
|
||
"""Lädt die Multi-Stream Konfiguration (WP-25/25a)."""
|
||
path = os.getenv("MINDNET_DECISION_CONFIG", "config/decision_engine.yaml")
|
||
if not os.path.exists(path):
|
||
logger.error(f"❌ Decision Engine Config not found at {path}")
|
||
return {"strategies": {}}
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
config = yaml.safe_load(f) or {}
|
||
logger.info(f"⚙️ Decision Engine Config loaded (v{config.get('version', 'unknown')})")
|
||
return config
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to load decision_engine.yaml: {e}")
|
||
return {"strategies": {}}
|
||
|
||
async def ask(self, query: str) -> str:
|
||
"""
|
||
Hauptmethode des MindNet Chats.
|
||
Orchestriert den agentischen Prozess: Routing -> Retrieval -> Kompression -> Synthese.
|
||
"""
|
||
# 1. Intent Recognition (Strategy Routing)
|
||
strategy_key = await self._determine_strategy(query)
|
||
|
||
strategies = self.config.get("strategies", {})
|
||
strategy = strategies.get(strategy_key)
|
||
|
||
if not strategy:
|
||
logger.warning(f"⚠️ Unknown strategy '{strategy_key}'. Fallback to FACT_WHAT.")
|
||
strategy_key = "FACT_WHAT"
|
||
strategy = strategies.get("FACT_WHAT")
|
||
|
||
if not strategy and strategies:
|
||
strategy_key = next(iter(strategies))
|
||
strategy = strategies[strategy_key]
|
||
|
||
if not strategy:
|
||
return "Entschuldigung, meine Wissensbasis ist aktuell nicht konfiguriert."
|
||
|
||
# 2. Multi-Stream Retrieval & Pre-Synthesis (Parallel Tasks inkl. Kompression)
|
||
stream_results = await self._execute_parallel_streams(strategy, query)
|
||
|
||
# 3. Finale Synthese
|
||
return await self._generate_final_answer(strategy_key, strategy, query, stream_results)
|
||
|
||
async def _determine_strategy(self, query: str) -> str:
|
||
"""WP-25b: Nutzt den LLM-Router via Lazy-Loading und bereinigt Modell-Artefakte via Regex."""
|
||
settings_cfg = self.config.get("settings", {})
|
||
prompt_key = settings_cfg.get("router_prompt_key", "intent_router_v1")
|
||
router_profile = settings_cfg.get("router_profile")
|
||
|
||
try:
|
||
# Delegation an LLMService ohne manuelle Vor-Formatierung
|
||
response = await self.llm_service.generate_raw_response(
|
||
prompt_key=prompt_key,
|
||
variables={"query": query},
|
||
max_retries=1,
|
||
priority="realtime",
|
||
profile_name=router_profile
|
||
)
|
||
|
||
# --- ULTRA-ROBUST PARSING (Fix für 'CODING[/S]') ---
|
||
# 1. Alles in Großbuchstaben umwandeln
|
||
raw_text = str(response).upper()
|
||
|
||
# 2. Regex: Suche das erste Wort, das nur aus A-Z und Unterstrichen besteht
|
||
# Dies ignoriert [/S], </s>, Newlines oder Plaudereien des Modells
|
||
match = re.search(r'\b(FACT_WHEN|FACT_WHAT|DECISION|EMPATHY|CODING|INTERVIEW)\b', raw_text)
|
||
|
||
if match:
|
||
intent = match.group(1)
|
||
logger.info(f"🎯 [ROUTING] Parsed Intent: '{intent}' from raw response: '{response.strip()}'")
|
||
return intent
|
||
|
||
# Fallback, falls Regex nicht greift
|
||
logger.warning(f"⚠️ Unmapped intent '{response.strip()}' from router. Falling back to FACT_WHAT.")
|
||
return "FACT_WHAT"
|
||
|
||
except Exception as e:
|
||
logger.error(f"Strategy Routing failed: {e}")
|
||
return "FACT_WHAT"
|
||
|
||
async def _execute_parallel_streams(self, strategy: Dict, query: str) -> Dict[str, str]:
|
||
"""Führt Such-Streams aus und komprimiert überlange Ergebnisse (Pre-Synthesis)."""
|
||
stream_keys = strategy.get("use_streams", [])
|
||
library = self.config.get("streams_library", {})
|
||
|
||
# Phase 1: Retrieval Tasks starten
|
||
retrieval_tasks = []
|
||
active_streams = []
|
||
for key in stream_keys:
|
||
stream_cfg = library.get(key)
|
||
if stream_cfg:
|
||
active_streams.append(key)
|
||
retrieval_tasks.append(self._run_single_stream(key, stream_cfg, query))
|
||
|
||
# Ergebnisse sammeln
|
||
retrieval_results = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
|
||
|
||
# Phase 2: Formatierung und optionale Kompression
|
||
final_stream_tasks = []
|
||
for name, res in zip(active_streams, retrieval_results):
|
||
if isinstance(res, Exception):
|
||
logger.error(f"Stream '{name}' failed during retrieval: {res}")
|
||
async def _err(): return f"[Fehler im Wissens-Stream {name}]"
|
||
final_stream_tasks.append(_err())
|
||
continue
|
||
|
||
formatted_context = self._format_stream_context(res)
|
||
|
||
# WP-25a: Kompressions-Check (Inhaltsverdichtung)
|
||
stream_cfg = library.get(name, {})
|
||
threshold = stream_cfg.get("compression_threshold", 4000)
|
||
|
||
if len(formatted_context) > threshold:
|
||
logger.info(f"⚙️ [WP-25b] Triggering Lazy-Compression for stream '{name}'...")
|
||
comp_profile = stream_cfg.get("compression_profile")
|
||
final_stream_tasks.append(
|
||
self._compress_stream_content(name, formatted_context, query, comp_profile)
|
||
)
|
||
else:
|
||
async def _direct(c=formatted_context): return c
|
||
final_stream_tasks.append(_direct())
|
||
|
||
# Finale Inhalte parallel fertigstellen
|
||
final_contents = await asyncio.gather(*final_stream_tasks)
|
||
return dict(zip(active_streams, final_contents))
|
||
|
||
async def _compress_stream_content(self, stream_name: str, content: str, query: str, profile: Optional[str]) -> str:
|
||
"""WP-25b: Inhaltsverdichtung via Lazy-Loading 'compression_template'."""
|
||
try:
|
||
summary = await self.llm_service.generate_raw_response(
|
||
prompt_key="compression_template",
|
||
variables={
|
||
"stream_name": stream_name,
|
||
"content": content,
|
||
"query": query
|
||
},
|
||
profile_name=profile,
|
||
priority="background",
|
||
max_retries=1
|
||
)
|
||
return summary.strip() if (summary and len(summary.strip()) > 10) else content
|
||
except Exception as e:
|
||
logger.error(f"❌ Compression of {stream_name} failed: {e}")
|
||
return content
|
||
|
||
async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse:
|
||
"""Spezialisierte Graph-Suche mit Stream-Tracing und Edge-Boosts."""
|
||
transformed_query = cfg.get("query_template", "{query}").format(query=query)
|
||
|
||
request = QueryRequest(
|
||
query=transformed_query,
|
||
top_k=cfg.get("top_k", 5),
|
||
filters={"type": cfg.get("filter_types", [])},
|
||
expand={"depth": 1},
|
||
boost_edges=cfg.get("edge_boosts", {}), # Erhalt der Gewichtung
|
||
explain=True
|
||
)
|
||
|
||
response = await self.retriever.search(request)
|
||
for hit in response.results:
|
||
hit.stream_origin = name
|
||
return response
|
||
|
||
def _format_stream_context(self, response: QueryResponse) -> str:
|
||
"""Wandelt QueryHits in einen formatierten Kontext-String um."""
|
||
if not response.results:
|
||
return "Keine spezifischen Informationen gefunden."
|
||
lines = []
|
||
for i, hit in enumerate(response.results, 1):
|
||
source = hit.source.get("path", "Unbekannt")
|
||
content = hit.source.get("text", "").strip()
|
||
lines.append(f"[{i}] QUELLE: {source}\nINHALT: {content}")
|
||
return "\n\n".join(lines)
|
||
|
||
async def _generate_final_answer(
|
||
self,
|
||
strategy_key: str,
|
||
strategy: Dict,
|
||
query: str,
|
||
stream_results: Dict[str, str]
|
||
) -> str:
|
||
"""WP-25b: Finale Synthese via Lazy-Prompt mit Robustheit aus v1.2.1."""
|
||
profile = strategy.get("llm_profile")
|
||
template_key = strategy.get("prompt_template", "fact_synthesis_v1")
|
||
system_prompt = self.llm_service.get_prompt("system_prompt")
|
||
|
||
# WP-25 ROBUSTNESS: Pre-Initialization der Variablen
|
||
all_possible_streams = ["values_stream", "facts_stream", "biography_stream", "risk_stream", "tech_stream"]
|
||
template_vars = {s: "" for s in all_possible_streams}
|
||
template_vars.update(stream_results)
|
||
template_vars["query"] = query
|
||
|
||
# WP-25a Erhalt: Prepend Instructions aus der strategy_config
|
||
prepend = strategy.get("prepend_instruction", "")
|
||
template_vars["prepend_instruction"] = prepend
|
||
|
||
try:
|
||
# WP-25b: Delegation der Synthese an den LLMService
|
||
response = await self.llm_service.generate_raw_response(
|
||
prompt_key=template_key,
|
||
variables=template_vars,
|
||
system=system_prompt,
|
||
profile_name=profile,
|
||
priority="realtime"
|
||
)
|
||
|
||
# WP-25a RECOVERY: Falls dieprepend_instruction nicht im Template-Key
|
||
# der prompts.yaml enthalten ist (WP-25b Lazy Loading), fügen wir sie
|
||
# hier manuell an den Anfang, um die Logik aus v1.2.1 zu bewahren.
|
||
if prepend and prepend not in response[:len(prepend)+50]:
|
||
logger.info("ℹ️ Adding prepend_instruction manually (not found in response).")
|
||
response = f"{prepend}\n\n{response}"
|
||
|
||
return response
|
||
|
||
except Exception as e:
|
||
logger.error(f"Final Synthesis failed: {e}")
|
||
# ROBUST FALLBACK (v1.2.1 Gate): Versuche eine minimale Antwort zu generieren
|
||
fallback_context = "\n\n".join([v for v in stream_results.values() if len(v) > 20])
|
||
return await self.llm_service.generate_raw_response(
|
||
prompt=f"Beantworte: {query}\n\nKontext:\n{fallback_context}",
|
||
system=system_prompt, priority="realtime", profile_name=profile
|
||
) |