142 lines
4.9 KiB
Python
142 lines
4.9 KiB
Python
"""
|
|
app/services/llm_service.py — LLM Client
|
|
Version: 2.8.0 (Configurable Concurrency Limit)
|
|
"""
|
|
|
|
import httpx
|
|
import yaml
|
|
import logging
|
|
import os
|
|
import asyncio
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, Literal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Settings:
|
|
OLLAMA_URL = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
|
|
LLM_TIMEOUT = float(os.getenv("MINDNET_LLM_TIMEOUT", 300.0))
|
|
LLM_MODEL = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
|
|
PROMPTS_PATH = os.getenv("MINDNET_PROMPTS_PATH", "./config/prompts.yaml")
|
|
|
|
# NEU: Konfigurierbares Limit für Hintergrund-Last
|
|
# Default auf 2 (konservativ), kann in .env erhöht werden.
|
|
BACKGROUND_LIMIT = int(os.getenv("MINDNET_LLM_BACKGROUND_LIMIT", "2"))
|
|
|
|
def get_settings():
|
|
return Settings()
|
|
|
|
class LLMService:
|
|
# GLOBALER SEMAPHOR (Lazy Initialization)
|
|
# Wir initialisieren ihn erst, wenn wir die Settings kennen.
|
|
_background_semaphore = None
|
|
|
|
def __init__(self):
|
|
self.settings = get_settings()
|
|
self.prompts = self._load_prompts()
|
|
|
|
# Initialisiere Semaphore einmalig auf Klassen-Ebene basierend auf Config
|
|
if LLMService._background_semaphore is None:
|
|
limit = self.settings.BACKGROUND_LIMIT
|
|
logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}")
|
|
LLMService._background_semaphore = asyncio.Semaphore(limit)
|
|
|
|
self.timeout = httpx.Timeout(self.settings.LLM_TIMEOUT, connect=10.0)
|
|
|
|
self.client = httpx.AsyncClient(
|
|
base_url=self.settings.OLLAMA_URL,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
def _load_prompts(self) -> dict:
|
|
path = Path(self.settings.PROMPTS_PATH)
|
|
if not path.exists(): return {}
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load prompts: {e}")
|
|
return {}
|
|
|
|
async def generate_raw_response(
|
|
self,
|
|
prompt: str,
|
|
system: str = None,
|
|
force_json: bool = False,
|
|
max_retries: int = 0,
|
|
base_delay: float = 2.0,
|
|
priority: Literal["realtime", "background"] = "realtime"
|
|
) -> str:
|
|
"""
|
|
Führt einen LLM Call aus.
|
|
priority="realtime": Chat (Sofort, keine Bremse).
|
|
priority="background": Import/Analyse (Gedrosselt durch Semaphore).
|
|
"""
|
|
|
|
use_semaphore = (priority == "background")
|
|
|
|
if use_semaphore and LLMService._background_semaphore:
|
|
async with LLMService._background_semaphore:
|
|
return await self._execute_request(prompt, system, force_json, max_retries, base_delay)
|
|
else:
|
|
# Realtime oder Fallback (falls Semaphore Init fehlschlug)
|
|
return await self._execute_request(prompt, system, force_json, max_retries, base_delay)
|
|
|
|
async def _execute_request(self, prompt, system, force_json, max_retries, base_delay):
|
|
payload: Dict[str, Any] = {
|
|
"model": self.settings.LLM_MODEL,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1 if force_json else 0.7,
|
|
"num_ctx": 8192
|
|
}
|
|
}
|
|
|
|
if force_json:
|
|
payload["format"] = "json"
|
|
|
|
if system:
|
|
payload["system"] = system
|
|
|
|
attempt = 0
|
|
|
|
while True:
|
|
try:
|
|
response = await self.client.post("/api/generate", json=payload)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("response", "").strip()
|
|
else:
|
|
response.raise_for_status()
|
|
|
|
except Exception as e:
|
|
attempt += 1
|
|
if attempt > max_retries:
|
|
logger.error(f"LLM Final Error (Versuch {attempt}): {e}")
|
|
raise e
|
|
|
|
wait_time = base_delay * (2 ** (attempt - 1))
|
|
logger.warning(f"⚠️ LLM Retry ({attempt}/{max_retries}) in {wait_time}s: {e}")
|
|
await asyncio.sleep(wait_time)
|
|
|
|
async def generate_rag_response(self, query: str, context_str: str) -> str:
|
|
"""
|
|
Chat-Wrapper: Immer Realtime.
|
|
"""
|
|
system_prompt = self.prompts.get("system_prompt", "")
|
|
rag_template = self.prompts.get("rag_template", "{context_str}\n\n{query}")
|
|
|
|
final_prompt = rag_template.format(context_str=context_str, query=query)
|
|
|
|
return await self.generate_raw_response(
|
|
final_prompt,
|
|
system=system_prompt,
|
|
max_retries=0,
|
|
force_json=False,
|
|
priority="realtime"
|
|
)
|
|
|
|
async def close(self):
|
|
if self.client:
|
|
await self.client.aclose() |