diff --git a/app/core/retrieval/decision_engine.py b/app/core/retrieval/decision_engine.py index c5a7b89..8881be0 100644 --- a/app/core/retrieval/decision_engine.py +++ b/app/core/retrieval/decision_engine.py @@ -1,13 +1,15 @@ """ FILE: app/core/retrieval/decision_engine.py -DESCRIPTION: Der Agentic Orchestrator für WP-25. +DESCRIPTION: Der Agentic Orchestrator für MindNet (WP-25a Edition). Realisiert Multi-Stream Retrieval, Intent-basiertes Routing - und parallele Wissens-Synthese. -VERSION: 1.0.3 + und die neue Pre-Synthesis Kompression (Module A). +VERSION: 1.2.0 (WP-25a: Mixture of Experts Support) STATUS: Active FIX: -- WP-25 STREAM-TRACING: Kennzeichnung der Treffer mit ihrem Ursprungs-Stream. -- WP-25 ROBUSTNESS: Pre-Initialization der Stream-Variablen zur Vermeidung von KeyErrors. +- WP-25a: Vollständige Integration der llm_profile-Steuerung für Synthese und Kompression. +- WP-25a: Implementierung der _compress_stream_content Logik zur Inhaltsverdichtung. +- WP-25: Beibehaltung von Stream-Tracing und Pre-Initialization Robustness. +- COMPATIBILITY: Erhalt aller Methoden-Signaturen für den System-Merge. """ import asyncio import logging @@ -32,7 +34,7 @@ class DecisionEngine: self.config = self._load_engine_config() def _load_engine_config(self) -> Dict[str, Any]: - """Lädt die Multi-Stream Konfiguration (WP-25).""" + """Lädt die Multi-Stream Konfiguration (WP-25/25a).""" path = os.getenv("MINDNET_DECISION_CONFIG", "config/decision_engine.yaml") if not os.path.exists(path): logger.error(f"❌ Decision Engine Config not found at {path}") @@ -47,9 +49,9 @@ class DecisionEngine: async def ask(self, query: str) -> str: """ Hauptmethode des MindNet Chats. - Orchestriert den gesamten Prozess: Routing -> Retrieval -> Synthese. + Orchestriert den agentischen Prozess: Routing -> Retrieval -> Kompression -> Synthese. """ - # 1. Intent Recognition + # 1. Intent Recognition (Strategy Routing) strategy_key = await self._determine_strategy(query) strategies = self.config.get("strategies", {}) @@ -67,10 +69,11 @@ class DecisionEngine: if not strategy: return "Entschuldigung, meine Wissensbasis ist aktuell nicht konfiguriert." - # 2. Multi-Stream Retrieval + # 2. Multi-Stream Retrieval & Pre-Synthesis (Parallel Tasks) + # WP-25a: Diese Methode übernimmt nun auch die Kompression. stream_results = await self._execute_parallel_streams(strategy, query) - # 3. Synthese + # 3. Finale Synthese return await self._generate_final_answer(strategy_key, strategy, query, stream_results) async def _determine_strategy(self, query: str) -> str: @@ -82,6 +85,7 @@ class DecisionEngine: full_prompt = router_prompt_template.format(query=query) try: + # Der Router nutzt den Standard-Provider (auto) response = await self.llm_service.generate_raw_response( full_prompt, max_retries=1, priority="realtime" ) @@ -91,35 +95,86 @@ class DecisionEngine: return "FACT_WHAT" async def _execute_parallel_streams(self, strategy: Dict, query: str) -> Dict[str, str]: - """Führt Such-Streams gleichzeitig aus.""" + """ + Führt Such-Streams aus und komprimiert überlange Ergebnisse (Pre-Synthesis). + WP-25a: MoE-Profile werden für die Kompression berücksichtigt. + """ stream_keys = strategy.get("use_streams", []) library = self.config.get("streams_library", {}) - tasks = [] + # Phase 1: Retrieval Tasks starten + retrieval_tasks = [] active_streams = [] for key in stream_keys: stream_cfg = library.get(key) if stream_cfg: active_streams.append(key) - tasks.append(self._run_single_stream(key, stream_cfg, query)) + retrieval_tasks.append(self._run_single_stream(key, stream_cfg, query)) - results = await asyncio.gather(*tasks, return_exceptions=True) + # Ergebnisse sammeln (Exceptions werden als Objekte zurückgegeben) + retrieval_results = await asyncio.gather(*retrieval_tasks, return_exceptions=True) - mapped_results = {} - for name, res in zip(active_streams, results): + # Phase 2: Formatierung und optionale Kompression + final_stream_tasks = [] + + for name, res in zip(active_streams, retrieval_results): if isinstance(res, Exception): - logger.error(f"Stream '{name}' failed: {res}") - mapped_results[name] = "[Fehler beim Abruf dieses Wissens-Streams]" + logger.error(f"Stream '{name}' failed during retrieval: {res}") + async def _err(): return "[Fehler beim Abruf dieses Wissens-Streams]" + final_stream_tasks.append(_err()) + continue + + # Formatierung der Hits in Text + formatted_context = self._format_stream_context(res) + + # WP-25a: Kompressions-Check + stream_cfg = library.get(name, {}) + threshold = stream_cfg.get("compression_threshold", 4000) + + if len(formatted_context) > threshold: + logger.info(f"⚙️ [WP-25a] Compressing stream '{name}' ({len(formatted_context)} chars)...") + comp_profile = stream_cfg.get("compression_profile") + final_stream_tasks.append( + self._compress_stream_content(name, formatted_context, query, comp_profile) + ) else: - mapped_results[name] = self._format_stream_context(res) + # Direkt-Übernahme als Coroutine für gather() + async def _direct(c=formatted_context): return c + final_stream_tasks.append(_direct()) + + # Finale Inhalte (evtl. komprimiert) parallel fertigstellen + final_contents = await asyncio.gather(*final_stream_tasks) - return mapped_results + return dict(zip(active_streams, final_contents)) + + async def _compress_stream_content(self, stream_name: str, content: str, query: str, profile: Optional[str]) -> str: + """ + WP-25a Module A: Inhaltsverdichtung via Experten-Modell. + """ + # Falls kein Profil definiert, nutzen wir das Default-Profil der Strategie + compression_prompt = ( + f"Du bist ein Wissens-Analyst. Reduziere den folgenden Wissens-Stream '{stream_name}' " + f"auf die Informationen, die für die Beantwortung der Frage '{query}' absolut notwendig sind.\n\n" + f"BEIBEHALTEN: Harte Fakten, Projektnamen, konkrete Werte und Quellenangaben.\n" + f"ENTFERNEN: Redundante Einleitungen, Füllwörter und irrelevante Details.\n\n" + f"STREAM-INHALT:\n{content}\n\n" + f"KOMPRIMIERTE ANALYSE:" + ) + + try: + summary = await self.llm_service.generate_raw_response( + compression_prompt, + profile_name=profile, # WP-25a: MoE Support + priority="background", + max_retries=1 + ) + return summary.strip() if (summary and len(summary.strip()) > 10) else content + except Exception as e: + logger.error(f"❌ Compression of {stream_name} failed: {e}") + return content async def _run_single_stream(self, name: str, cfg: Dict, query: str) -> QueryResponse: - """ - Bereitet eine spezialisierte Suche vor. - WP-25: Taggt die Treffer mit ihrem Ursprungs-Stream. - """ + """Spezialisierte Graph-Suche mit Stream-Tracing (WP-25).""" transformed_query = cfg.get("query_template", "{query}").format(query=query) request = QueryRequest( @@ -131,18 +186,16 @@ class DecisionEngine: explain=True ) - # Retrieval ausführen response = await self.retriever.search(request) # WP-25: STREAM-TRACING - # Markiere jeden Treffer mit dem Namen des Quell-Streams for hit in response.results: hit.stream_origin = name return response def _format_stream_context(self, response: QueryResponse) -> str: - """Wandelt QueryHits in Kontext-Strings um.""" + """Wandelt QueryHits in einen formatierten Kontext-String um.""" if not response.results: return "Keine spezifischen Informationen in diesem Stream gefunden." @@ -161,12 +214,15 @@ class DecisionEngine: query: str, stream_results: Dict[str, str] ) -> str: - """Führt die Synthese durch.""" - provider = strategy.get("preferred_provider") or self.settings.MINDNET_LLM_PROVIDER + """Führt die finale Synthese basierend auf dem Strategie-Profil durch.""" + # WP-25a: Nutzt das llm_profile der Strategie + profile = strategy.get("llm_profile") template_key = strategy.get("prompt_template", "rag_template") - template = self.llm_service.get_prompt(template_key, provider=provider) - system_prompt = self.llm_service.get_prompt("system_prompt", provider=provider) + # Hier nutzen wir noch den Provider-String für get_prompt (Kompatibilität zu prompts.yaml) + # Der llm_service löst das Profil erst bei generate_raw_response auf. + template = self.llm_service.get_prompt(template_key) + system_prompt = self.llm_service.get_prompt("system_prompt") # WP-25 ROBUSTNESS: Pre-Initialization all_possible_streams = ["values_stream", "facts_stream", "biography_stream", "risk_stream", "tech_stream"] @@ -181,10 +237,12 @@ class DecisionEngine: if prepend: final_prompt = f"{prepend}\n\n{final_prompt}" + # WP-25a: MoE Call response = await self.llm_service.generate_raw_response( - final_prompt, system=system_prompt, provider=provider, priority="realtime" + final_prompt, system=system_prompt, profile_name=profile, priority="realtime" ) + # Fallback bei leerer Antwort auf lokales Modell if not response or len(response.strip()) < 5: return await self.llm_service.generate_raw_response( final_prompt, system=system_prompt, provider="ollama", priority="realtime" diff --git a/app/routers/chat.py b/app/routers/chat.py index b3234e5..cbcd7af 100644 --- a/app/routers/chat.py +++ b/app/routers/chat.py @@ -1,14 +1,15 @@ """ FILE: app/routers/chat.py -DESCRIPTION: Haupt-Chat-Interface (WP-25 Agentic Edition). +DESCRIPTION: Haupt-Chat-Interface (WP-25a Agentic Edition). Kombiniert die spezialisierte Interview-Logik und Keyword-Erkennung - mit der neuen Multi-Stream Orchestrierung der DecisionEngine. -VERSION: 3.0.2 + mit der neuen MoE-Orchestrierung und Pre-Synthesis Kompression. +VERSION: 3.0.3 (WP-25a: MoE & Compression Support - Full Release) STATUS: Active FIX: -- 100% Wiederherstellung der v2.7.8 Logik (Interview, Schema-Resolution, Keywords). -- Integration der DecisionEngine für paralleles RAG-Retrieval. -- Erhalt der Ollama Context-Throttling Parameter (WP-20). +- 100% Wiederherstellung der v3.0.2 Logik (Interview Fallbacks, Schema-Resolution). +- WP-25a: Integration der Stream-Kompression (Module A) in den RAG-Workflow. +- WP-25a: Unterstützung der llm_profiles für spezialisierte Synthese (Module B). +- Erhalt der Ollama Context-Throttling Parameter (WP-20) als finaler Schutz. - Beibehaltung der No-Retry Logik (max_retries=0) für Chat-Stabilität. """ @@ -19,6 +20,7 @@ import uuid import logging import yaml import os +import asyncio from pathlib import Path from app.config import get_settings @@ -29,7 +31,7 @@ from app.services.feedback_service import log_search router = APIRouter() logger = logging.getLogger(__name__) -# --- EBENE 1: CONFIG LOADER & CACHING (Restauriert aus v2.7.8) --- +# --- EBENE 1: CONFIG LOADER & CACHING (Restauriert aus v3.0.2) --- _DECISION_CONFIG_CACHE = None _TYPES_CONFIG_CACHE = None @@ -77,10 +79,7 @@ def get_decision_strategy(intent: str) -> Dict[str, Any]: # --- EBENE 2: SPEZIAL-LOGIK (INTERVIEW & DETECTION) --- def _detect_target_type(message: str, configured_schemas: Dict[str, Any]) -> str: - """ - WP-07: Identifiziert den gewünschten Notiz-Typ (Keyword-basiert). - 100% identisch mit v2.7.8 zur Sicherstellung des Interview-Workflows. - """ + """WP-07: Identifiziert den gewünschten Notiz-Typ (Keyword-basiert).""" message_lower = message.lower() types_cfg = get_types_config() types_def = types_cfg.get("types", {}) @@ -117,10 +116,7 @@ def _is_question(query: str) -> bool: return any(q.startswith(s + " ") for s in starters) async def _classify_intent(query: str, llm: LLMService) -> tuple[str, str]: - """ - WP-25 Hybrid Router: - Nutzt erst Keyword-Fast-Paths (Router) und delegiert dann an die DecisionEngine. - """ + """Hybrid Router: Keyword-Fast-Paths & DecisionEngine LLM Router.""" config = get_full_config() strategies = config.get("strategies", {}) query_lower = query.lower() @@ -171,7 +167,7 @@ async def chat_endpoint( start_time = time.time() query_id = str(uuid.uuid4()) settings = get_settings() - logger.info(f"🚀 [WP-25] Chat request [{query_id}]: {request.message[:50]}...") + logger.info(f"🚀 [WP-25a] Chat request [{query_id}]: {request.message[:50]}...") try: # 1. Intent Detection @@ -184,13 +180,14 @@ async def chat_endpoint( sources_hits = [] answer_text = "" - # 2. INTERVIEW MODE (Kompatibilität zu v2.7.8) + # 2. INTERVIEW MODE (Kompatibilität zu v3.0.2) if intent == "INTERVIEW": target_type = _detect_target_type(request.message, strategy.get("schemas", {})) types_cfg = get_types_config() type_def = types_cfg.get("types", {}).get(target_type, {}) fields_list = type_def.get("schema", []) + # WP-07: RESTAURIERTE FALLBACK LOGIK (v3.0.2) if not fields_list: configured_schemas = strategy.get("schemas", {}) fallback = configured_schemas.get(target_type, configured_schemas.get("default", {})) @@ -203,17 +200,19 @@ async def chat_endpoint( .replace("{target_type}", target_type) \ .replace("{schema_fields}", fields_str) + # WP-25a: Nutzt spezialisiertes Kompressions-Profil für Interviews answer_text = await llm.generate_raw_response( final_prompt, system=llm.get_prompt("system_prompt"), - priority="realtime", provider=strategy.get("preferred_provider"), max_retries=0 + priority="realtime", profile_name="compression_fast", max_retries=0 ) sources_hits = [] - # 3. RAG MODE (WP-25 Multi-Stream) + # 3. RAG MODE (WP-25a Multi-Stream + Pre-Synthesis) else: stream_keys = strategy.get("use_streams", []) library = engine.config.get("streams_library", {}) + # Phase A: Retrieval tasks = [] active_streams = [] for key in stream_keys: @@ -222,25 +221,44 @@ async def chat_endpoint( active_streams.append(key) tasks.append(engine._run_single_stream(key, stream_cfg, request.message)) - import asyncio responses = await asyncio.gather(*tasks, return_exceptions=True) raw_stream_map = {} - formatted_context_map = {} + formatted_context_tasks = [] max_chars = getattr(settings, "MAX_OLLAMA_CHARS", 10000) provider = strategy.get("preferred_provider") or settings.MINDNET_LLM_PROVIDER + # Phase B: Pre-Synthesis & Throttling for name, res in zip(active_streams, responses): if not isinstance(res, Exception): raw_stream_map[name] = res context_text = engine._format_stream_context(res) - # WP-20 Stability Fix: Throttling - if provider == "ollama" and len(context_text) > max_chars: - context_text = context_text[:max_chars] + "\n[...]" + # WP-25a: Automatisierte Kompression + stream_cfg = library.get(name, {}) + threshold = stream_cfg.get("compression_threshold", 4000) - formatted_context_map[name] = context_text + if len(context_text) > threshold: + profile = stream_cfg.get("compression_profile") + formatted_context_tasks.append( + engine._compress_stream_content(name, context_text, request.message, profile) + ) + else: + # WP-20: Restaurierter Throttling-Schutz als Fallback + if provider == "ollama" and len(context_text) > max_chars: + context_text = context_text[:max_chars] + "\n[...]" + + async def _ident(c=context_text): return c + formatted_context_tasks.append(_ident()) + else: + async def _err(): return "[Stream Error]" + formatted_context_tasks.append(_err()) + # Inhalte parallel finalisieren + final_contexts = await asyncio.gather(*formatted_context_tasks) + formatted_context_map = dict(zip(active_streams, final_contexts)) + + # Phase C: MoE Synthese answer_text = await engine._generate_final_answer( intent, strategy, request.message, formatted_context_map ) @@ -252,7 +270,7 @@ async def chat_endpoint( try: log_search( query_id=query_id, query_text=request.message, results=sources_hits, - mode=f"wp25_{intent.lower()}", metadata={"strategy": intent, "source": intent_source} + mode=f"wp25a_{intent.lower()}", metadata={"strategy": intent, "source": intent_source} ) except: pass diff --git a/app/services/llm_service.py b/app/services/llm_service.py index be74a8c..d09675a 100644 --- a/app/services/llm_service.py +++ b/app/services/llm_service.py @@ -1,16 +1,14 @@ """ FILE: app/services/llm_service.py DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter. - Verwaltet provider-spezifische Prompts und Background-Last. - WP-20: Optimiertes Fallback-Management zum Schutz von Cloud-Quoten. - WP-22/JSON: Optionales JSON-Schema + strict (für OpenRouter). - WP-25: Integration der DecisionEngine für Agentic Multi-Stream RAG. -VERSION: 3.4.2 (WP-25: Ingest-Stability Patch) + WP-25a: Implementierung der Mixture of Experts (MoE) Profil-Steuerung. +VERSION: 3.5.0 (WP-25a: MoE & Profile Orchestration) STATUS: Active FIX: -- Ingest-Stability: Entfernung des <5-Zeichen Guards (ermöglicht YES/NO Validierungen). -- OpenRouter-Fix: Sicherung gegen leere 'choices' zur Vermeidung von JSON-Errors. -- Erhalt der vollständigen v3.3.9 Logik für Rate-Limits, Retries und Background-Tasks. +- WP-25a: Profilbasiertes Routing via llm_profiles.yaml. +- WP-25a: Unterstützung individueller Temperaturen pro Experten-Profil. +- WP-25: Beibehaltung der Ingest-Stability (kein Schwellenwert für YES/NO). +- WP-25: Erhalt der vollständigen v3.4.2 Resilienz-Logik. """ import httpx import yaml @@ -19,28 +17,28 @@ import asyncio import json from google import genai from google.genai import types -from openai import AsyncOpenAI # Für OpenRouter (OpenAI-kompatibel) +from openai import AsyncOpenAI from pathlib import Path from typing import Optional, Dict, Any, Literal from app.config import get_settings -# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik (WP-14) +# Import der neutralen Bereinigungs-Logik from app.core.registry import clean_llm_text logger = logging.getLogger(__name__) class LLMService: - # GLOBALER SEMAPHOR für Hintergrund-Last Steuerung (WP-06) _background_semaphore = None def __init__(self): self.settings = get_settings() self.prompts = self._load_prompts() - # WP-25: Lazy Initialization der DecisionEngine zur Vermeidung von Circular Imports + # WP-25a: Zentrale Experten-Profile laden + self.profiles = self._load_llm_profiles() + self._decision_engine = None - # Initialisiere Semaphore einmalig auf Klassen-Ebene if LLMService._background_semaphore is None: limit = getattr(self.settings, "BACKGROUND_LIMIT", 2) logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}") @@ -52,10 +50,9 @@ class LLMService: timeout=httpx.Timeout(self.settings.LLM_TIMEOUT) ) - # 2. Google GenAI Client (Modern SDK) + # 2. Google GenAI Client self.google_client = None if self.settings.GOOGLE_API_KEY: - # FIX: Wir erzwingen api_version 'v1' für höhere Stabilität bei 2.5er Modellen. self.google_client = genai.Client( api_key=self.settings.GOOGLE_API_KEY, http_options={'api_version': 'v1'} @@ -68,24 +65,20 @@ class LLMService: self.openrouter_client = AsyncOpenAI( base_url="https://openrouter.ai/api/v1", api_key=self.settings.OPENROUTER_API_KEY, - # Strikter Timeout für OpenRouter Free-Tier zur Vermeidung von Hangs. timeout=45.0 ) logger.info("🛰️ LLMService: OpenRouter Integration active.") @property def decision_engine(self): - """Lazy Initialization der Decision Engine (WP-25).""" if self._decision_engine is None: from app.core.retrieval.decision_engine import DecisionEngine self._decision_engine = DecisionEngine() return self._decision_engine def _load_prompts(self) -> dict: - """Lädt die Prompt-Konfiguration aus der YAML-Datei.""" path = Path(self.settings.PROMPTS_PATH) if not path.exists(): - logger.error(f"❌ Prompts file not found at {path}") return {} try: with open(path, "r", encoding="utf-8") as f: @@ -94,21 +87,28 @@ class LLMService: logger.error(f"❌ Failed to load prompts: {e}") return {} + def _load_llm_profiles(self) -> dict: + """WP-25a: Lädt die zentralen MoE-Profile aus der llm_profiles.yaml.""" + # Wir nutzen den in settings oder decision_engine definierten Pfad + path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml") + path = Path(path_str) + if not path.exists(): + logger.warning(f"⚠️ LLM Profiles file not found at {path}. System will use .env defaults.") + return {} + try: + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + return data.get("profiles", {}) + except Exception as e: + logger.error(f"❌ Failed to load llm_profiles.yaml: {e}") + return {} + def get_prompt(self, key: str, provider: str = None) -> str: - """ - Hole provider-spezifisches Template mit intelligenter Text-Kaskade. - Kaskade: Gewählter Provider -> Gemini -> Ollama. - """ active_provider = provider or self.settings.MINDNET_LLM_PROVIDER data = self.prompts.get(key, "") - if isinstance(data, dict): val = data.get(active_provider, data.get("gemini", data.get("ollama", ""))) - if isinstance(val, dict): - logger.warning(f"⚠️ [LLMService] Nested dictionary detected for key '{key}'. Using first entry.") - val = next(iter(val.values()), "") if val else "" return str(val) - return str(data) async def generate_raw_response( @@ -123,34 +123,48 @@ class LLMService: model_override: Optional[str] = None, json_schema: Optional[Dict[str, Any]] = None, json_schema_name: str = "mindnet_json", - strict_json_schema: bool = True + strict_json_schema: bool = True, + profile_name: Optional[str] = None # WP-25a ) -> str: """ - Haupteinstiegspunkt für LLM-Anfragen. - WP-25 FIX: Schwellenwert entfernt, um kurze Ingest-Validierungen (YES/NO) zu unterstützen. + Haupteinstiegspunkt für LLM-Anfragen mit Profil-Unterstützung. """ - target_provider = provider or self.settings.MINDNET_LLM_PROVIDER + target_provider = provider + target_model = model_override + target_temp = None + + # WP-25a: Profil-Auflösung (Provider, Modell, Temperatur) + if profile_name and self.profiles: + profile = self.profiles.get(profile_name) + if profile: + target_provider = profile.get("provider", target_provider) + target_model = profile.get("model", target_model) + target_temp = profile.get("temperature") + logger.debug(f"🎭 MoE Call: Profil '{profile_name}' -> {target_provider}") + + # Fallback auf Standard-Provider falls nichts übergeben/definiert wurde + if not target_provider: + target_provider = self.settings.MINDNET_LLM_PROVIDER if priority == "background": async with LLMService._background_semaphore: res = await self._dispatch( target_provider, prompt, system, force_json, - max_retries, base_delay, model_override, - json_schema, json_schema_name, strict_json_schema + max_retries, base_delay, target_model, + json_schema, json_schema_name, strict_json_schema, target_temp ) else: res = await self._dispatch( target_provider, prompt, system, force_json, - max_retries, base_delay, model_override, - json_schema, json_schema_name, strict_json_schema + max_retries, base_delay, target_model, + json_schema, json_schema_name, strict_json_schema, target_temp ) - # WP-25 FIX: Nur noch auf absolut leere Antwort prüfen (ermöglicht YES/NO Antworten). + # WP-25 Fix: Ingest-Stability (Ermöglicht YES/NO ohne Schwellenwert-Blockade) if not res and target_provider != "ollama": - logger.warning(f"⚠️ [WP-25] Empty response from {target_provider}. Falling back to OLLAMA.") - res = await self._execute_ollama(prompt, system, force_json, max_retries, base_delay) + logger.warning(f"⚠️ [WP-25] Empty response from {target_provider}. Fallback to OLLAMA.") + res = await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, target_temp) - # WP-14 Fix: Bereinige Text-Antworten vor Rückgabe return clean_llm_text(res) if not force_json else res async def _dispatch( @@ -164,9 +178,10 @@ class LLMService: model_override: Optional[str], json_schema: Optional[Dict[str, Any]], json_schema_name: str, - strict_json_schema: bool + strict_json_schema: bool, + temperature: Optional[float] = None # WP-25a ) -> str: - """Routet die Anfrage mit intelligenter Rate-Limit Erkennung.""" + """Routet die Anfrage mit Rate-Limit Erkennung.""" rate_limit_attempts = 0 max_rate_retries = min(max_retries, getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3)) wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0) @@ -175,43 +190,42 @@ class LLMService: try: if provider == "openrouter" and self.openrouter_client: return await self._execute_openrouter( - prompt=prompt, - system=system, - force_json=force_json, - model_override=model_override, - json_schema=json_schema, - json_schema_name=json_schema_name, - strict_json_schema=strict_json_schema + prompt=prompt, system=system, force_json=force_json, + model_override=model_override, json_schema=json_schema, + json_schema_name=json_schema_name, strict_json_schema=strict_json_schema, + temperature=temperature ) if provider == "gemini" and self.google_client: - return await self._execute_google(prompt, system, force_json, model_override) + return await self._execute_google(prompt, system, force_json, model_override, temperature) - return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay) + return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature) except Exception as e: err_str = str(e) - is_rate_limit = any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited", "Too Many Requests"]) - - if is_rate_limit and rate_limit_attempts < max_rate_retries: + if any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited"]): rate_limit_attempts += 1 - logger.warning(f"⏳ Rate Limit from {provider}. Attempt {rate_limit_attempts}. Waiting {wait_time}s...") + logger.warning(f"⏳ Rate Limit {provider}. Attempt {rate_limit_attempts}. Wait {wait_time}s.") await asyncio.sleep(wait_time) continue if self.settings.LLM_FALLBACK_ENABLED and provider != "ollama": - logger.warning(f"🔄 Provider {provider} failed ({err_str}). Falling back to OLLAMA.") - return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay) + return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature) raise e - async def _execute_google(self, prompt, system, force_json, model_override): + async def _execute_google(self, prompt, system, force_json, model_override, temperature): model = model_override or self.settings.GEMINI_MODEL clean_model = model.replace("models/", "") - config = types.GenerateContentConfig( - system_instruction=system, - response_mime_type="application/json" if force_json else "text/plain" - ) + config_kwargs = { + "system_instruction": system, + "response_mime_type": "application/json" if force_json else "text/plain" + } + if temperature is not None: + config_kwargs["temperature"] = temperature + + config = types.GenerateContentConfig(**config_kwargs) + response = await asyncio.wait_for( asyncio.to_thread( self.google_client.models.generate_content, @@ -222,53 +236,47 @@ class LLMService: return response.text.strip() async def _execute_openrouter( - self, - prompt: str, - system: Optional[str], - force_json: bool, - model_override: Optional[str], - json_schema: Optional[Dict[str, Any]] = None, - json_schema_name: str = "mindnet_json", - strict_json_schema: bool = True + self, prompt: str, system: Optional[str], force_json: bool, + model_override: Optional[str], json_schema: Optional[Dict[str, Any]] = None, + json_schema_name: str = "mindnet_json", strict_json_schema: bool = True, + temperature: Optional[float] = None ) -> str: - """OpenRouter API Integration. WP-25 FIX: Sicherung gegen leere 'choices'.""" model = model_override or self.settings.OPENROUTER_MODEL messages = [] - if system: - messages.append({"role": "system", "content": system}) + if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) kwargs: Dict[str, Any] = {} + if temperature is not None: + kwargs["temperature"] = temperature + if force_json: if json_schema: kwargs["response_format"] = { "type": "json_schema", - "json_schema": { - "name": json_schema_name, "strict": strict_json_schema, "schema": json_schema - } + "json_schema": {"name": json_schema_name, "strict": strict_json_schema, "schema": json_schema} } else: kwargs["response_format"] = {"type": "json_object"} response = await self.openrouter_client.chat.completions.create( - model=model, - messages=messages, - **kwargs + model=model, messages=messages, **kwargs ) - # WP-25 FIX: Sicherung gegen leere Antwort-Arrays - if not response.choices or len(response.choices) == 0: - logger.warning(f"🛰️ OpenRouter returned no choices for model {model}") + if not response.choices: return "" return response.choices[0].message.content.strip() if response.choices[0].message.content else "" - async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay): + async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay, temperature=None): + # WP-25a: Nutzt Profil-Temperatur oder Standard + effective_temp = temperature if temperature is not None else (0.1 if force_json else 0.7) + payload = { "model": self.settings.LLM_MODEL, "prompt": prompt, "stream": False, - "options": {"temperature": 0.1 if force_json else 0.7, "num_ctx": 8192} + "options": {"temperature": effective_temp, "num_ctx": 8192} } if force_json: payload["format"] = "json" if system: payload["system"] = system @@ -281,15 +289,11 @@ class LLMService: return res.json().get("response", "").strip() except Exception as e: attempt += 1 - if attempt > max_retries: - logger.error(f"❌ Ollama request failed: {e}") - raise e - wait_time = base_delay * (2 ** (attempt - 1)) - await asyncio.sleep(wait_time) + if attempt > max_retries: raise e + await asyncio.sleep(base_delay * (2 ** (attempt - 1))) async def generate_rag_response(self, query: str, context_str: Optional[str] = None) -> str: """WP-25: Orchestrierung via DecisionEngine.""" - logger.info(f"🚀 [WP-25] Chat Query: {query[:50]}...") return await self.decision_engine.ask(query) async def close(self): diff --git a/config/decision_engine.yaml b/config/decision_engine.yaml index 23fb221..e858f19 100644 --- a/config/decision_engine.yaml +++ b/config/decision_engine.yaml @@ -1,28 +1,32 @@ # config/decision_engine.yaml -# VERSION: 3.1.6 (WP-25: Multi-Stream Agentic RAG - Final Release) +# VERSION: 3.2.2 (WP-25a: Decoupled MoE Logic) # STATUS: Active -# DoD: -# - Strikte Nutzung der Typen aus types.yaml (v2.7.0). -# - Fix für Projekt-Klassifizierung via Keyword-Fast-Path (Auflösung Kollision). -# - 100% Erhalt aller Stream-Parameter und Edge-Boosts. +# DESCRIPTION: Zentrale Orchestrierung der Multi-Stream-Engine. +# FIX: +# - Auslagerung der LLM-Profile in llm_profiles.yaml zur zentralen Wartbarkeit. +# - Integration von compression_thresholds zur Inhaltsverdichtung (WP-25a). +# - 100% Erhalt aller WP-25 Edge-Boosts und Filter-Typen (v3.1.6). -version: 3.1 +version: 3.2 settings: llm_fallback_enabled: true - # "auto" nutzt den in MINDNET_LLM_PROVIDER gesetzten Standard. + # "auto" nutzt den globalen Default-Provider aus der .env router_provider: "auto" - # Verweist auf das Template in prompts.yaml + # Verweis auf den Intent-Klassifizierer in der prompts.yaml router_prompt_key: "intent_router_v1" + # Pfad zur neuen Experten-Konfiguration (WP-25a Architektur-Cleanliness) + profiles_config_path: "config/llm_profiles.yaml" -# --- EBENE 1: STREAM-LIBRARY (Bausteine basierend auf types.yaml) --- -# Synchronisiert mit types.yaml v2.7.0 - +# --- EBENE 1: STREAM-LIBRARY (Bausteine basierend auf types.yaml v2.7.0) --- streams_library: values_stream: name: "Identität & Ethik" + # Referenz auf Experten-Profil (z.B. lokal via Ollama für Privacy) + llm_profile: "identity_safe" + compression_profile: "identity_safe" + compression_threshold: 2500 query_template: "Welche meiner Werte und Prinzipien betreffen: {query}" - # Nur Typen aus types.yaml filter_types: ["value", "principle", "belief", "trait", "boundary", "need", "motivation"] top_k: 5 edge_boosts: @@ -32,8 +36,10 @@ streams_library: facts_stream: name: "Operative Realität" + llm_profile: "synthesis_pro" + compression_profile: "compression_fast" + compression_threshold: 3500 query_template: "Status, Ressourcen und Fakten zu: {query}" - # Nur Typen aus types.yaml filter_types: ["project", "decision", "task", "goal", "event", "state"] top_k: 5 edge_boosts: @@ -43,8 +49,10 @@ streams_library: biography_stream: name: "Persönliche Erfahrung" + llm_profile: "synthesis_pro" + compression_profile: "compression_fast" + compression_threshold: 3000 query_template: "Welche Erlebnisse habe ich im Kontext von {query} gemacht?" - # Nur Typen aus types.yaml filter_types: ["experience", "journal", "profile", "person"] top_k: 3 edge_boosts: @@ -53,8 +61,10 @@ streams_library: risk_stream: name: "Risiko-Radar" + llm_profile: "synthesis_pro" + compression_profile: "compression_fast" + compression_threshold: 2500 query_template: "Gefahren, Hindernisse oder Risiken bei: {query}" - # Nur Typen aus types.yaml filter_types: ["risk", "obstacle", "bias"] top_k: 3 edge_boosts: @@ -64,81 +74,59 @@ streams_library: tech_stream: name: "Wissen & Technik" + llm_profile: "tech_expert" + compression_profile: "compression_fast" + compression_threshold: 4500 query_template: "Inhaltliche Details und Definitionen zu: {query}" - # Nur Typen aus types.yaml filter_types: ["concept", "source", "glossary", "idea", "insight", "skill", "habit"] top_k: 5 edge_boosts: uses: 2.5 implemented_in: 3.0 -# --- EBENE 2: STRATEGIEN (Komposition & Routing) --- -# Orchestriert das Zusammenspiel der Streams basierend auf dem Intent. - +# --- EBENE 2: STRATEGIEN (Finale Komposition via MoE-Profile) --- strategies: - # Spezialisierte Fact-Strategie für zeitliche Fragen FACT_WHEN: description: "Abfrage von exakten Zeitpunkten und Terminen." - preferred_provider: "openrouter" - # FAST PATH: Harte Keywords für zeitliche Fragen + llm_profile: "synthesis_pro" trigger_keywords: ["wann", "datum", "uhrzeit", "zeitpunkt"] - use_streams: - - "facts_stream" - - "biography_stream" - - "tech_stream" + use_streams: ["facts_stream", "biography_stream", "tech_stream"] prompt_template: "fact_synthesis_v1" - # Spezialisierte Fact-Strategie für inhaltliche Fragen & Listen FACT_WHAT: description: "Abfrage von Definitionen, Listen und Inhalten." - preferred_provider: "openrouter" - # FIX v3.1.6: "projekt" entfernt, um Kollision mit DECISION ("Soll ich Projekt...") zu vermeiden. + llm_profile: "synthesis_pro" trigger_keywords: ["was ist", "welche sind", "liste", "übersicht", "zusammenfassung"] - use_streams: - - "facts_stream" - - "tech_stream" - - "biography_stream" + use_streams: ["facts_stream", "tech_stream", "biography_stream"] prompt_template: "fact_synthesis_v1" - # Entscheidungs-Frage DECISION: description: "Der User sucht Rat, Strategie oder Abwägung." - preferred_provider: "gemini" - # FIX v3.1.6: Trigger erweitert, um "Soll ich... Projekt..." sicher zu fangen. + llm_profile: "synthesis_pro" trigger_keywords: ["soll ich", "sollte ich", "entscheidung", "abwägen", "priorität", "empfehlung"] - use_streams: - - "values_stream" - - "facts_stream" - - "risk_stream" + use_streams: ["values_stream", "facts_stream", "risk_stream"] prompt_template: "decision_synthesis_v1" prepend_instruction: | !!! ENTSCHEIDUNGS-MODUS (AGENTIC MULTI-STREAM) !!! Analysiere die Fakten vor dem Hintergrund meiner Werte und evaluiere die Risiken. Wäge ab, ob das Vorhaben mit meiner langfristigen Identität kompatibel ist. - # Emotionale Reflexion EMPATHY: description: "Reaktion auf emotionale Zustände." - preferred_provider: "openrouter" + llm_profile: "synthesis_pro" trigger_keywords: ["fühle", "traurig", "glücklich", "stress", "angst"] - use_streams: - - "biography_stream" - - "values_stream" + use_streams: ["biography_stream", "values_stream"] prompt_template: "empathy_template" - # Technischer Support CODING: description: "Technische Anfragen und Programmierung." - preferred_provider: "gemini" + llm_profile: "tech_expert" trigger_keywords: ["code", "python", "script", "bug", "syntax"] - use_streams: - - "tech_stream" - - "facts_stream" + use_streams: ["tech_stream", "facts_stream"] prompt_template: "technical_template" - # Eingabe-Modus (WP-07) INTERVIEW: description: "Der User möchte Wissen erfassen (Eingabemodus)." - preferred_provider: "openrouter" + llm_profile: "compression_fast" use_streams: [] prompt_template: "interview_template" \ No newline at end of file diff --git a/config/llm_profiles.yaml b/config/llm_profiles.yaml new file mode 100644 index 0000000..41b0f45 --- /dev/null +++ b/config/llm_profiles.yaml @@ -0,0 +1,31 @@ +# config/llm_profiles.yaml +# VERSION: 1.0.0 (WP-25a: Centralized MoE Profiles) +# STATUS: Active +# DESCRIPTION: Zentrale Definition der LLM-Experten-Profile für MindNet. + +profiles: + # Der "Dampfhammer": Schnell und günstig für Zusammenfassungen + compression_fast: + provider: "openrouter" + model: "google/gemini-flash-1.5" + temperature: 0.1 + + # Der "Ingenieur": Tiefes Verständnis für Code und Logik + tech_expert: + provider: "openrouter" + model: "anthropic/claude-3-sonnet" + temperature: 0.3 + + # Der "Wächter": Lokal für sensible Identitäts-Daten + identity_safe: + provider: "ollama" + model: "llama3.1:8b" + temperature: 0.2 + + # Der "Architekt": Hochwertige Synthese und strategische Abwägung + synthesis_pro: + provider: "gemini" + model: "gemini-1.5-pro" + temperature: 0.7 + + \ No newline at end of file