""" FILE: app/services/llm_service.py DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter. Verwaltet provider-spezifische Prompts und Background-Last. WP-20: Optimiertes Fallback-Management zum Schutz von Cloud-Quoten. WP-20 Fix: Bulletproof Prompt-Auflösung für format() Aufrufe. WP-22/JSON: Optionales JSON-Schema + strict (für OpenRouter structured outputs). FIX: Intelligente Rate-Limit Erkennung (429 Handling), v1-API Sync & Timeouts. VERSION: 3.3.6 STATUS: Active DEPENDENCIES: httpx, yaml, logging, asyncio, json, google-genai, openai, app.config """ import httpx import yaml import logging import asyncio import json from google import genai from google.genai import types from openai import AsyncOpenAI # Für OpenRouter (OpenAI-kompatibel) from pathlib import Path from typing import Optional, Dict, Any, Literal from app.config import get_settings logger = logging.getLogger(__name__) class LLMService: # GLOBALER SEMAPHOR für Hintergrund-Last Steuerung (WP-06) _background_semaphore = None def __init__(self): self.settings = get_settings() self.prompts = self._load_prompts() # Initialisiere Semaphore einmalig auf Klassen-Ebene if LLMService._background_semaphore is None: limit = getattr(self.settings, "BACKGROUND_LIMIT", 2) logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}") LLMService._background_semaphore = asyncio.Semaphore(limit) # 1. Lokaler Ollama Client self.ollama_client = httpx.AsyncClient( base_url=self.settings.OLLAMA_URL, timeout=httpx.Timeout(self.settings.LLM_TIMEOUT) ) # 2. Google GenAI Client (Modern SDK) self.google_client = None if self.settings.GOOGLE_API_KEY: # FIX: Wir erzwingen api_version 'v1' für höhere Stabilität bei 2.5er Modellen. self.google_client = genai.Client( api_key=self.settings.GOOGLE_API_KEY, http_options={'api_version': 'v1'} ) logger.info("✨ LLMService: Google GenAI (Gemini) active.") # 3. OpenRouter Client self.openrouter_client = None if self.settings.OPENROUTER_API_KEY: self.openrouter_client = AsyncOpenAI( base_url="https://openrouter.ai/api/v1", api_key=self.settings.OPENROUTER_API_KEY, # Strikter Timeout für OpenRouter Free-Tier zur Vermeidung von Hangs. timeout=45.0 ) logger.info("🛰️ LLMService: OpenRouter Integration active.") def _load_prompts(self) -> dict: """Lädt die Prompt-Konfiguration aus der YAML-Datei.""" path = Path(self.settings.PROMPTS_PATH) if not path.exists(): logger.error(f"❌ Prompts file not found at {path}") return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception as e: logger.error(f"❌ Failed to load prompts: {e}") return {} def get_prompt(self, key: str, provider: str = None) -> str: """ Hole provider-spezifisches Template mit intelligenter Text-Kaskade. HINWEIS: Dies ist nur ein Text-Lookup und verbraucht kein API-Kontingent. Kaskade: Gewählter Provider -> Gemini (Cloud-Stil) -> Ollama (Basis-Stil). WP-20 Fix: Garantiert die Rückgabe eines Strings, um AttributeError zu vermeiden. """ active_provider = provider or self.settings.MINDNET_LLM_PROVIDER data = self.prompts.get(key, "") if isinstance(data, dict): # Wir versuchen erst den Provider, dann Gemini, dann Ollama val = data.get(active_provider, data.get("gemini", data.get("ollama", ""))) # Falls val durch YAML-Fehler immer noch ein Dict ist, extrahiere ersten String if isinstance(val, dict): logger.warning(f"⚠️ [LLMService] Nested dictionary detected for key '{key}'. Using first entry.") val = next(iter(val.values()), "") if val else "" return str(val) return str(data) async def generate_raw_response( self, prompt: str, system: str = None, force_json: bool = False, max_retries: int = 2, base_delay: float = 2.0, priority: Literal["realtime", "background"] = "realtime", provider: Optional[str] = None, model_override: Optional[str] = None, json_schema: Optional[Dict[str, Any]] = None, json_schema_name: str = "mindnet_json", strict_json_schema: bool = True ) -> str: """ Haupteinstiegspunkt für LLM-Anfragen mit Priorisierung. force_json: - Ollama: nutzt payload["format"]="json" - Gemini: nutzt response_mime_type="application/json" - OpenRouter: nutzt response_format=json_object (Fallback) oder json_schema """ target_provider = provider or self.settings.MINDNET_LLM_PROVIDER if priority == "background": async with LLMService._background_semaphore: return await self._dispatch( target_provider, prompt, system, force_json, max_retries, base_delay, model_override, json_schema, json_schema_name, strict_json_schema ) return await self._dispatch( target_provider, prompt, system, force_json, max_retries, base_delay, model_override, json_schema, json_schema_name, strict_json_schema ) async def _dispatch( self, provider: str, prompt: str, system: Optional[str], force_json: bool, max_retries: int, base_delay: float, model_override: Optional[str], json_schema: Optional[Dict[str, Any]], json_schema_name: str, strict_json_schema: bool ) -> str: """ Routet die Anfrage mit intelligenter Rate-Limit Erkennung (WP-20 + WP-76). Schleife läuft über MINDNET_LLM_RATE_LIMIT_RETRIES. """ rate_limit_attempts = 0 max_rate_retries = getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3) wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0) while rate_limit_attempts <= max_rate_retries: try: if provider == "openrouter" and self.openrouter_client: return await self._execute_openrouter( prompt=prompt, system=system, force_json=force_json, model_override=model_override, json_schema=json_schema, json_schema_name=json_schema_name, strict_json_schema=strict_json_schema ) if provider == "gemini" and self.google_client: return await self._execute_google(prompt, system, force_json, model_override) # Default/Fallback zu Ollama return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay) except Exception as e: err_str = str(e) # Intelligente 429 Erkennung für alle Cloud-Provider is_rate_limit = any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited", "Too Many Requests"]) if is_rate_limit and rate_limit_attempts < max_rate_retries: rate_limit_attempts += 1 logger.warning( f"⏳ [LLMService] Rate Limit (429) detected from {provider}. " f"Attempt {rate_limit_attempts}/{max_rate_retries}. " f"Waiting {wait_time}s before cloud retry..." ) await asyncio.sleep(wait_time) continue # Nächster Versuch in der Cloud-Schleife # Wenn kein Rate-Limit oder Retries erschöpft -> Fallback zu Ollama (falls aktiviert) if self.settings.LLM_FALLBACK_ENABLED and provider != "ollama": logger.warning( f"🔄 Provider {provider} failed ({err_str}). Falling back to LOCAL OLLAMA." ) return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay) raise e async def _execute_google(self, prompt, system, force_json, model_override): """Native Google SDK Integration (Gemini) mit v1 Fix.""" model = model_override or self.settings.GEMINI_MODEL # Fix: Bereinige Modellnamen (Entfernung von 'models/' Präfix) clean_model = model.replace("models/", "") config = types.GenerateContentConfig( system_instruction=system, response_mime_type="application/json" if force_json else "text/plain" ) # Thread-Offloading mit striktem Timeout gegen "Hangs" response = await asyncio.wait_for( asyncio.to_thread( self.google_client.models.generate_content, model=clean_model, contents=prompt, config=config ), timeout=45.0 ) return response.text.strip() async def _execute_openrouter( self, prompt: str, system: Optional[str], force_json: bool, model_override: Optional[str], json_schema: Optional[Dict[str, Any]] = None, json_schema_name: str = "mindnet_json", strict_json_schema: bool = True ) -> str: """OpenRouter API Integration (OpenAI-kompatibel) mit Schema-Support.""" model = model_override or self.settings.OPENROUTER_MODEL messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) kwargs: Dict[str, Any] = {} if force_json: if json_schema: kwargs["response_format"] = { "type": "json_schema", "json_schema": { "name": json_schema_name, "strict": strict_json_schema, "schema": json_schema } } else: kwargs["response_format"] = {"type": "json_object"} response = await self.openrouter_client.chat.completions.create( model=model, messages=messages, **kwargs ) return response.choices[0].message.content.strip() async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay): """Lokaler Ollama Call mit exponentiellem Backoff.""" payload = { "model": self.settings.LLM_MODEL, "prompt": prompt, "stream": False, "options": { "temperature": 0.1 if force_json else 0.7, "num_ctx": 8192 } } if force_json: payload["format"] = "json" if system: payload["system"] = system attempt = 0 while True: try: res = await self.ollama_client.post("/api/generate", json=payload) res.raise_for_status() return res.json().get("response", "").strip() except Exception as e: attempt += 1 if attempt > max_retries: logger.error(f"❌ Ollama Error after {attempt} retries: {e}") raise e wait_time = base_delay * (2 ** (attempt - 1)) logger.warning(f"⚠️ Ollama attempt {attempt} failed. Retrying in {wait_time}s...") await asyncio.sleep(wait_time) async def generate_rag_response(self, query: str, context_str: str) -> str: """Vollständiges RAG Chat-Interface.""" provider = self.settings.MINDNET_LLM_PROVIDER system_prompt = self.get_prompt("system_prompt", provider) rag_template = self.get_prompt("rag_template", provider) final_prompt = rag_template.format(context_str=context_str, query=query) return await self.generate_raw_response( final_prompt, system=system_prompt, priority="realtime" ) async def close(self): """Schließt die HTTP-Verbindungen.""" if self.ollama_client: await self.ollama_client.aclose()