mindnet/app/core/retrieval/decision_engine.py

288 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/core/retrieval/decision_engine.py
DESCRIPTION: Der Agentic Orchestrator für MindNet (WP-25b Edition).
Realisiert Multi-Stream Retrieval, Intent-basiertes Routing
und die neue Lazy-Prompt Orchestrierung (Module A & B).
VERSION: 1.3.2 (WP-25b: Full Robustness Recovery & Regex Parsing)
STATUS: Active
FIX:
- WP-25b: ULTRA-Robustes Intent-Parsing via Regex (Fix: 'CODING[/S]' -> 'CODING').
- WP-25b: Wiederherstellung der prepend_instruction Logik via variables.
- WP-25a: Voller Erhalt der Profil-Kaskade via LLMService v3.5.5.
- WP-25: Beibehaltung von Stream-Tracing, Edge-Boosts und Pre-Initialization.
- RECOVERY: Wiederherstellung der lokalen Sicherheits-Gates aus v1.2.1.
"""
import asyncio
import logging
import yaml
import os
import re # Neu für robustes Intent-Parsing
from typing import List, Dict, Any, Optional
# Core & Service Imports
from app.models.dto import QueryRequest, QueryResponse
from app.core.retrieval.retriever import Retriever
from app.services.llm_service import LLMService
from app.config import get_settings
logger = logging.getLogger(__name__)
class DecisionEngine:
def __init__(self):
"""Initialisiert die Engine und lädt die modularen Konfigurationen."""
self.settings = get_settings()
self.retriever = Retriever()
self.llm_service = LLMService()
self.config = self._load_engine_config()
def _load_engine_config(self) -> Dict[str, Any]:
"""Lädt die Multi-Stream Konfiguration (WP-25/25a)."""
path = os.getenv("MINDNET_DECISION_CONFIG", "config/decision_engine.yaml")
if not os.path.exists(path):
logger.error(f"❌ Decision Engine Config not found at {path}")
return {"strategies": {}, "streams_library": {}}
try:
with open(path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
# WP-25b FIX: Schema-Validierung
required_keys = ["strategies", "streams_library"]
missing = [k for k in required_keys if k not in config]
if missing:
logger.error(f"❌ Missing required keys in decision_engine.yaml: {missing}")
return {"strategies": {}, "streams_library": {}}
# Warnung bei unbekannten Top-Level-Keys
known_keys = {"version", "settings", "strategies", "streams_library"}
unknown = set(config.keys()) - known_keys
if unknown:
logger.warning(f"⚠️ Unknown keys in decision_engine.yaml: {unknown}")
logger.info(f"⚙️ Decision Engine Config loaded (v{config.get('version', 'unknown')})")
return config
except yaml.YAMLError as e:
logger.error(f"❌ YAML syntax error in decision_engine.yaml: {e}")
return {"strategies": {}, "streams_library": {}}
except Exception as e:
logger.error(f"❌ Failed to load decision_engine.yaml: {e}")
return {"strategies": {}, "streams_library": {}}
async def ask(self, query: str) -> str:
"""
Hauptmethode des MindNet Chats.
Orchestriert den agentischen Prozess: Routing -> Retrieval -> Kompression -> Synthese.
"""
# 1. Intent Recognition (Strategy Routing)
strategy_key = await self._determine_strategy(query)
strategies = self.config.get("strategies", {})
strategy = strategies.get(strategy_key)
if not strategy:
logger.warning(f"⚠️ Unknown strategy '{strategy_key}'. Fallback to FACT_WHAT.")
strategy_key = "FACT_WHAT"
strategy = strategies.get("FACT_WHAT")
if not strategy and strategies:
strategy_key = next(iter(strategies))
strategy = strategies[strategy_key]
if not strategy:
return "Entschuldigung, meine Wissensbasis ist aktuell nicht konfiguriert."
# 2. Multi-Stream Retrieval & Pre-Synthesis (Parallel Tasks inkl. Kompression)
stream_results = await self._execute_parallel_streams(strategy, query)
# 3. Finale Synthese
return await self._generate_final_answer(strategy_key, strategy, query, stream_results)
async def _determine_strategy(self, query: str) -> str:
"""WP-25b: Nutzt den LLM-Router via Lazy-Loading und bereinigt Modell-Artefakte via Regex."""
settings_cfg = self.config.get("settings", {})
prompt_key = settings_cfg.get("router_prompt_key", "intent_router_v1")
router_profile = settings_cfg.get("router_profile")
try:
# Delegation an LLMService ohne manuelle Vor-Formatierung
response = await self.llm_service.generate_raw_response(
prompt_key=prompt_key,
variables={"query": query},
max_retries=1,
priority="realtime",
profile_name=router_profile
)
# --- ULTRA-ROBUST PARSING (Fix für 'CODING[/S]') ---
# 1. Alles in Großbuchstaben umwandeln
raw_text = str(response).upper()
# 2. Regex: Suche das erste Wort, das nur aus A-Z und Unterstrichen besteht
# Dies ignoriert [/S], </s>, Newlines oder Plaudereien des Modells
match = re.search(r'\b(FACT_WHEN|FACT_WHAT|DECISION|EMPATHY|CODING|INTERVIEW)\b', raw_text)
if match:
intent = match.group(1)
logger.info(f"🎯 [ROUTING] Parsed Intent: '{intent}' from raw response: '{response.strip()}'")
return intent
# Fallback, falls Regex nicht greift
logger.warning(f"⚠️ Unmapped intent '{response.strip()}' from router. Falling back to FACT_WHAT.")
return "FACT_WHAT"
except Exception as e:
logger.error(f"Strategy Routing failed: {e}")
return "FACT_WHAT"
async def _execute_parallel_streams(self, strategy: Dict, query: str) -> Dict[str, str]:
"""Führt Such-Streams aus und komprimiert überlange Ergebnisse (Pre-Synthesis)."""
stream_keys = strategy.get("use_streams", [])
library = self.config.get("streams_library", {})
# Phase 1: Retrieval Tasks starten
retrieval_tasks = []
active_streams = []
for key in stream_keys:
stream_cfg = library.get(key)
if stream_cfg:
active_streams.append(key)
retrieval_tasks.append(self._run_single_stream(key, stream_cfg, query))
# Ergebnisse sammeln
retrieval_results = await asyncio.gather(*retrieval_tasks, return_exceptions=True)
# Phase 2: Formatierung und optionale Kompression
final_stream_tasks = []
for name, res in zip(active_streams, retrieval_results):
if isinstance(res, Exception):
logger.error(f"Stream '{name}' failed during retrieval: {res}")
async def _err(): return f"[Fehler im Wissens-Stream {name}]"
final_stream_tasks.append(_err())
continue
formatted_context = self._format_stream_context(res)
# WP-25a: Kompressions-Check (Inhaltsverdichtung)
stream_cfg = library.get(name, {})
threshold = stream_cfg.get("compression_threshold", 4000)
if len(formatted_context) > threshold:
logger.info(f"⚙️ [WP-25b] Triggering Lazy-Compression for stream '{name}'...")
comp_profile = stream_cfg.get("compression_profile")
final_stream_tasks.append(
self._compress_stream_content(name, formatted_context, query, comp_profile)
)
else:
async def _direct(c=formatted_context): return c
final_stream_tasks.append(_direct())
# Finale Inhalte parallel fertigstellen
final_contents = await asyncio.gather(*final_stream_tasks)
return dict(zip(active_streams, final_contents))
async def _compress_stream_content(self, stream_name: str, content: str, query: str, profile: Optional[str]) -> str:
"""WP-25b: Inhaltsverdichtung via Lazy-Loading 'compression_template'."""
try:
summary = await self.llm_service.generate_raw_response(
prompt_key="compression_template",
variables={
"stream_name": stream_name,
"content": content,
"query": query
},
profile_name=profile,
priority="background",
max_retries=1
)
return summary.strip() if (summary and len(summary.strip()) > 10) else content
except Exception as e:
logger.error(f"❌ Compression of {stream_name} failed: {e}")
return content
async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse:
"""Spezialisierte Graph-Suche mit Stream-Tracing und Edge-Boosts."""
transformed_query = cfg.get("query_template", "{query}").format(query=query)
request = QueryRequest(
query=transformed_query,
top_k=cfg.get("top_k", 5),
filters={"type": cfg.get("filter_types", [])},
expand={"depth": 1},
boost_edges=cfg.get("edge_boosts", {}), # Erhalt der Gewichtung
explain=True
)
response = await self.retriever.search(request)
for hit in response.results:
hit.stream_origin = name
return response
def _format_stream_context(self, response: QueryResponse) -> str:
"""Wandelt QueryHits in einen formatierten Kontext-String um."""
if not response.results:
return "Keine spezifischen Informationen gefunden."
lines = []
for i, hit in enumerate(response.results, 1):
source = hit.source.get("path", "Unbekannt")
content = hit.source.get("text", "").strip()
lines.append(f"[{i}] QUELLE: {source}\nINHALT: {content}")
return "\n\n".join(lines)
async def _generate_final_answer(
self,
strategy_key: str,
strategy: Dict,
query: str,
stream_results: Dict[str, str]
) -> str:
"""WP-25b: Finale Synthese via Lazy-Prompt mit Robustheit aus v1.2.1."""
profile = strategy.get("llm_profile")
template_key = strategy.get("prompt_template", "fact_synthesis_v1")
system_prompt = self.llm_service.get_prompt("system_prompt")
# WP-25 ROBUSTNESS: Pre-Initialization der Variablen
all_possible_streams = ["values_stream", "facts_stream", "biography_stream", "risk_stream", "tech_stream"]
template_vars = {s: "" for s in all_possible_streams}
template_vars.update(stream_results)
template_vars["query"] = query
# WP-25a Erhalt: Prepend Instructions aus der strategy_config
prepend = strategy.get("prepend_instruction", "")
template_vars["prepend_instruction"] = prepend
try:
# WP-25b: Delegation der Synthese an den LLMService
response = await self.llm_service.generate_raw_response(
prompt_key=template_key,
variables=template_vars,
system=system_prompt,
profile_name=profile,
priority="realtime"
)
# WP-25a RECOVERY: Falls dieprepend_instruction nicht im Template-Key
# der prompts.yaml enthalten ist (WP-25b Lazy Loading), fügen wir sie
# hier manuell an den Anfang, um die Logik aus v1.2.1 zu bewahren.
if prepend and prepend not in response[:len(prepend)+50]:
logger.info(" Adding prepend_instruction manually (not found in response).")
response = f"{prepend}\n\n{response}"
return response
except Exception as e:
logger.error(f"Final Synthesis failed: {e}")
# ROBUST FALLBACK (v1.2.1 Gate): Versuche eine minimale Antwort zu generieren
# WP-25b FIX: Konsistente Nutzung von prompt_key statt hardcodiertem Prompt
fallback_context = "\n\n".join([v for v in stream_results.values() if len(v) > 20])
try:
return await self.llm_service.generate_raw_response(
prompt_key="fallback_synthesis",
variables={"query": query, "context": fallback_context},
system=system_prompt, priority="realtime", profile_name=profile
)
except (ValueError, KeyError):
# Fallback auf direkten Prompt, falls Template nicht existiert
logger.warning("⚠️ Fallback template 'fallback_synthesis' not found. Using direct prompt.")
return await self.llm_service.generate_raw_response(
prompt=f"Beantworte: {query}\n\nKontext:\n{fallback_context}",
system=system_prompt, priority="realtime", profile_name=profile
)