""" FILE: app/services/llm_service.py DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter. WP-25a: Implementierung der Mixture of Experts (MoE) Kaskaden-Steuerung. VERSION: 3.5.2 (WP-25a: MoE & Fallback Cascade Support) STATUS: Active FIX: - WP-25a: Implementierung der rekursiven Fallback-Kaskade via fallback_profile. - WP-25a: Schutz gegen zirkuläre Profil-Referenzen (visited_profiles). - WP-25a: Erweitertes Logging für Tracing der Experten-Entscheidungen. - Erhalt der Ingest-Stability (WP-25) und des Rate-Limit-Managements. """ import httpx import yaml import logging import asyncio import json from google import genai from google.genai import types from openai import AsyncOpenAI from pathlib import Path from typing import Optional, Dict, Any, Literal from app.config import get_settings # Import der neutralen Bereinigungs-Logik from app.core.registry import clean_llm_text logger = logging.getLogger(__name__) class LLMService: _background_semaphore = None def __init__(self): self.settings = get_settings() self.prompts = self._load_prompts() # WP-25a: Zentrale Experten-Profile laden self.profiles = self._load_llm_profiles() self._decision_engine = None if LLMService._background_semaphore is None: limit = getattr(self.settings, "BACKGROUND_LIMIT", 2) logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}") LLMService._background_semaphore = asyncio.Semaphore(limit) # 1. Lokaler Ollama Client self.ollama_client = httpx.AsyncClient( base_url=self.settings.OLLAMA_URL, timeout=httpx.Timeout(self.settings.LLM_TIMEOUT) ) # 2. Google GenAI Client self.google_client = None if self.settings.GOOGLE_API_KEY: self.google_client = genai.Client( api_key=self.settings.GOOGLE_API_KEY, http_options={'api_version': 'v1'} ) logger.info("✨ LLMService: Google GenAI (Gemini) active.") # 3. OpenRouter Client self.openrouter_client = None if self.settings.OPENROUTER_API_KEY: self.openrouter_client = AsyncOpenAI( base_url="https://openrouter.ai/api/v1", api_key=self.settings.OPENROUTER_API_KEY, timeout=45.0 ) logger.info("🛰️ LLMService: OpenRouter Integration active.") @property def decision_engine(self): if self._decision_engine is None: from app.core.retrieval.decision_engine import DecisionEngine self._decision_engine = DecisionEngine() return self._decision_engine def _load_prompts(self) -> dict: path = Path(self.settings.PROMPTS_PATH) if not path.exists(): return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception as e: logger.error(f"❌ Failed to load prompts: {e}") return {} def _load_llm_profiles(self) -> dict: """WP-25a: Lädt die zentralen MoE-Profile aus der llm_profiles.yaml.""" path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml") path = Path(path_str) if not path.exists(): logger.warning(f"⚠️ LLM Profiles file not found at {path}. System will use .env defaults.") return {} try: with open(path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} return data.get("profiles", {}) except Exception as e: logger.error(f"❌ Failed to load llm_profiles.yaml: {e}") return {} def get_prompt(self, key: str, provider: str = None) -> str: active_provider = provider or self.settings.MINDNET_LLM_PROVIDER data = self.prompts.get(key, "") if isinstance(data, dict): val = data.get(active_provider, data.get("gemini", data.get("ollama", ""))) return str(val) return str(data) async def generate_raw_response( self, prompt: str, system: str = None, force_json: bool = False, max_retries: int = 2, base_delay: float = 2.0, priority: Literal["realtime", "background"] = "realtime", provider: Optional[str] = None, model_override: Optional[str] = None, json_schema: Optional[Dict[str, Any]] = None, json_schema_name: str = "mindnet_json", strict_json_schema: bool = True, profile_name: Optional[str] = None, visited_profiles: Optional[list] = None ) -> str: """ Haupteinstiegspunkt für LLM-Anfragen mit rekursiver Kaskaden-Logik. """ visited_profiles = visited_profiles or [] target_provider = provider target_model = model_override target_temp = None fallback_profile = None # 1. Profil-Auflösung if profile_name and self.profiles: profile = self.profiles.get(profile_name) if profile: target_provider = profile.get("provider", target_provider) target_model = profile.get("model", target_model) target_temp = profile.get("temperature") fallback_profile = profile.get("fallback_profile") visited_profiles.append(profile_name) logger.info(f"🎭 MoE Dispatch: Profil='{profile_name}' -> Provider='{target_provider}' | Model='{target_model}'") else: logger.warning(f"⚠️ Profil '{profile_name}' nicht in llm_profiles.yaml gefunden!") # Fallback auf Standard-Provider falls nichts übergeben/definiert wurde if not target_provider: target_provider = self.settings.MINDNET_LLM_PROVIDER logger.info(f"ℹ️ Kein Provider/Profil definiert. Nutze Default: {target_provider}") # 2. Ausführung mit Fehler-Handling für Kaskade try: if priority == "background": async with LLMService._background_semaphore: res = await self._dispatch( target_provider, prompt, system, force_json, max_retries, base_delay, target_model, json_schema, json_schema_name, strict_json_schema, target_temp ) else: res = await self._dispatch( target_provider, prompt, system, force_json, max_retries, base_delay, target_model, json_schema, json_schema_name, strict_json_schema, target_temp ) # Check auf leere Cloud-Antworten (WP-25 Stability) if not res and target_provider != "ollama": logger.warning(f"⚠️ Empty response from {target_provider}. Triggering fallback chain.") raise ValueError(f"Empty response from {target_provider}") return clean_llm_text(res) if not force_json else res except Exception as e: logger.error(f"❌ Error during execution of profile '{profile_name}' ({target_provider}): {e}") # 3. Kaskaden-Logik: Nächstes Profil in der Kette versuchen if fallback_profile and fallback_profile not in visited_profiles: logger.info(f"🔄 Switching to fallback profile: '{fallback_profile}'") return await self.generate_raw_response( prompt=prompt, system=system, force_json=force_json, max_retries=max_retries, base_delay=base_delay, priority=priority, provider=provider, model_override=model_override, json_schema=json_schema, json_schema_name=json_schema_name, strict_json_schema=strict_json_schema, profile_name=fallback_profile, visited_profiles=visited_profiles ) # 4. Ultimativer Notanker: Falls alles fehlschlägt, direkt zu Ollama if target_provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED: logger.warning(f"🚨 Kaskade erschöpft. Nutze finalen Ollama-Notanker.") res = await self._execute_ollama(prompt, system, force_json, max_retries, base_delay) return clean_llm_text(res) if not force_json else res raise e async def _dispatch( self, provider: str, prompt: str, system: Optional[str], force_json: bool, max_retries: int, base_delay: float, model_override: Optional[str], json_schema: Optional[Dict[str, Any]], json_schema_name: str, strict_json_schema: bool, temperature: Optional[float] = None ) -> str: """Routet die Anfrage an den spezifischen Provider-Executor.""" rate_limit_attempts = 0 max_rate_retries = min(max_retries, getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3)) wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0) while rate_limit_attempts <= max_rate_retries: try: if provider == "openrouter" and self.openrouter_client: return await self._execute_openrouter( prompt=prompt, system=system, force_json=force_json, model_override=model_override, json_schema=json_schema, json_schema_name=json_schema_name, strict_json_schema=strict_json_schema, temperature=temperature ) if provider == "gemini" and self.google_client: return await self._execute_google(prompt, system, force_json, model_override, temperature) return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature) except Exception as e: err_str = str(e) # Rate-Limit Handling (429) if any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited"]): rate_limit_attempts += 1 logger.warning(f"⏳ Rate Limit {provider}. Attempt {rate_limit_attempts}. Wait {wait_time}s.") await asyncio.sleep(wait_time) continue # Andere Fehler werden an generate_raw_response für die Kaskade gereicht raise e async def _execute_google(self, prompt, system, force_json, model_override, temperature): model = model_override or self.settings.GEMINI_MODEL clean_model = model.replace("models/", "") config_kwargs = { "system_instruction": system, "response_mime_type": "application/json" if force_json else "text/plain" } if temperature is not None: config_kwargs["temperature"] = temperature config = types.GenerateContentConfig(**config_kwargs) response = await asyncio.wait_for( asyncio.to_thread( self.google_client.models.generate_content, model=clean_model, contents=prompt, config=config ), timeout=45.0 ) return response.text.strip() async def _execute_openrouter( self, prompt: str, system: Optional[str], force_json: bool, model_override: Optional[str], json_schema: Optional[Dict[str, Any]] = None, json_schema_name: str = "mindnet_json", strict_json_schema: bool = True, temperature: Optional[float] = None ) -> str: model = model_override or self.settings.OPENROUTER_MODEL logger.info(f"🛰️ OpenRouter Call: Model='{model}' | Temp={temperature}") messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) kwargs: Dict[str, Any] = {} if temperature is not None: kwargs["temperature"] = temperature if force_json: if json_schema: kwargs["response_format"] = { "type": "json_schema", "json_schema": {"name": json_schema_name, "strict": strict_json_schema, "schema": json_schema} } else: kwargs["response_format"] = {"type": "json_object"} response = await self.openrouter_client.chat.completions.create( model=model, messages=messages, **kwargs ) if not response.choices: return "" return response.choices[0].message.content.strip() if response.choices[0].message.content else "" async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay, temperature=None): # Nutzt Profil-Temperatur oder strikte Defaults für lokale Hardware-Schonung effective_temp = temperature if temperature is not None else (0.1 if force_json else 0.7) payload = { "model": self.settings.LLM_MODEL, "prompt": prompt, "stream": False, "options": {"temperature": effective_temp, "num_ctx": 8192} } if force_json: payload["format"] = "json" if system: payload["system"] = system attempt = 0 while True: try: res = await self.ollama_client.post("/api/generate", json=payload) res.raise_for_status() return res.json().get("response", "").strip() except Exception as e: attempt += 1 if attempt > max_retries: logger.error(f"❌ Ollama final failure after {attempt} attempts: {e}") raise e await asyncio.sleep(base_delay * (2 ** (attempt - 1))) async def generate_rag_response(self, query: str, context_str: Optional[str] = None) -> str: """WP-25: Orchestrierung via DecisionEngine.""" return await self.decision_engine.ask(query) async def close(self): if self.ollama_client: await self.ollama_client.aclose()