""" app/services/llm_service.py — LLM Client Version: 2.8.0 (Configurable Concurrency Limit) """ import httpx import yaml import logging import os import asyncio from pathlib import Path from typing import Optional, Dict, Any, Literal logger = logging.getLogger(__name__) class Settings: OLLAMA_URL = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434") LLM_TIMEOUT = float(os.getenv("MINDNET_LLM_TIMEOUT", 300.0)) LLM_MODEL = os.getenv("MINDNET_LLM_MODEL", "phi3:mini") PROMPTS_PATH = os.getenv("MINDNET_PROMPTS_PATH", "./config/prompts.yaml") # NEU: Konfigurierbares Limit für Hintergrund-Last # Default auf 2 (konservativ), kann in .env erhöht werden. BACKGROUND_LIMIT = int(os.getenv("MINDNET_LLM_BACKGROUND_LIMIT", "2")) def get_settings(): return Settings() class LLMService: # GLOBALER SEMAPHOR (Lazy Initialization) # Wir initialisieren ihn erst, wenn wir die Settings kennen. _background_semaphore = None def __init__(self): self.settings = get_settings() self.prompts = self._load_prompts() # Initialisiere Semaphore einmalig auf Klassen-Ebene basierend auf Config if LLMService._background_semaphore is None: limit = self.settings.BACKGROUND_LIMIT logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}") LLMService._background_semaphore = asyncio.Semaphore(limit) self.timeout = httpx.Timeout(self.settings.LLM_TIMEOUT, connect=10.0) self.client = httpx.AsyncClient( base_url=self.settings.OLLAMA_URL, timeout=self.timeout ) def _load_prompts(self) -> dict: path = Path(self.settings.PROMPTS_PATH) if not path.exists(): return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) except Exception as e: logger.error(f"Failed to load prompts: {e}") return {} async def generate_raw_response( self, prompt: str, system: str = None, force_json: bool = False, max_retries: int = 0, base_delay: float = 2.0, priority: Literal["realtime", "background"] = "realtime" ) -> str: """ Führt einen LLM Call aus. priority="realtime": Chat (Sofort, keine Bremse). priority="background": Import/Analyse (Gedrosselt durch Semaphore). """ use_semaphore = (priority == "background") if use_semaphore and LLMService._background_semaphore: async with LLMService._background_semaphore: return await self._execute_request(prompt, system, force_json, max_retries, base_delay) else: # Realtime oder Fallback (falls Semaphore Init fehlschlug) return await self._execute_request(prompt, system, force_json, max_retries, base_delay) async def _execute_request(self, prompt, system, force_json, max_retries, base_delay): payload: Dict[str, Any] = { "model": self.settings.LLM_MODEL, "prompt": prompt, "stream": False, "options": { "temperature": 0.1 if force_json else 0.7, "num_ctx": 8192 } } if force_json: payload["format"] = "json" if system: payload["system"] = system attempt = 0 while True: try: response = await self.client.post("/api/generate", json=payload) if response.status_code == 200: data = response.json() return data.get("response", "").strip() else: response.raise_for_status() except Exception as e: attempt += 1 if attempt > max_retries: logger.error(f"LLM Final Error (Versuch {attempt}): {e}") raise e wait_time = base_delay * (2 ** (attempt - 1)) logger.warning(f"⚠️ LLM Retry ({attempt}/{max_retries}) in {wait_time}s: {e}") await asyncio.sleep(wait_time) async def generate_rag_response(self, query: str, context_str: str) -> str: """ Chat-Wrapper: Immer Realtime. """ system_prompt = self.prompts.get("system_prompt", "") rag_template = self.prompts.get("rag_template", "{context_str}\n\n{query}") final_prompt = rag_template.format(context_str=context_str, query=query) return await self.generate_raw_response( final_prompt, system=system_prompt, max_retries=0, force_json=False, priority="realtime" ) async def close(self): if self.client: await self.client.aclose()