mindnet/app/services/llm_service.py

306 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/services/llm_service.py
DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter.
WP-25a: Implementierung der Mixture of Experts (MoE) Profil-Steuerung.
VERSION: 3.5.0 (WP-25a: MoE & Profile Orchestration)
STATUS: Active
FIX:
- WP-25a: Profilbasiertes Routing via llm_profiles.yaml.
- WP-25a: Unterstützung individueller Temperaturen pro Experten-Profil.
- WP-25: Beibehaltung der Ingest-Stability (kein Schwellenwert für YES/NO).
- WP-25: Erhalt der vollständigen v3.4.2 Resilienz-Logik.
"""
import httpx
import yaml
import logging
import asyncio
import json
from google import genai
from google.genai import types
from openai import AsyncOpenAI
from pathlib import Path
from typing import Optional, Dict, Any, Literal
from app.config import get_settings
# Import der neutralen Bereinigungs-Logik
from app.core.registry import clean_llm_text
logger = logging.getLogger(__name__)
class LLMService:
_background_semaphore = None
def __init__(self):
self.settings = get_settings()
self.prompts = self._load_prompts()
# WP-25a: Zentrale Experten-Profile laden
self.profiles = self._load_llm_profiles()
self._decision_engine = None
if LLMService._background_semaphore is None:
limit = getattr(self.settings, "BACKGROUND_LIMIT", 2)
logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}")
LLMService._background_semaphore = asyncio.Semaphore(limit)
# 1. Lokaler Ollama Client
self.ollama_client = httpx.AsyncClient(
base_url=self.settings.OLLAMA_URL,
timeout=httpx.Timeout(self.settings.LLM_TIMEOUT)
)
# 2. Google GenAI Client
self.google_client = None
if self.settings.GOOGLE_API_KEY:
self.google_client = genai.Client(
api_key=self.settings.GOOGLE_API_KEY,
http_options={'api_version': 'v1'}
)
logger.info("✨ LLMService: Google GenAI (Gemini) active.")
# 3. OpenRouter Client
self.openrouter_client = None
if self.settings.OPENROUTER_API_KEY:
self.openrouter_client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=self.settings.OPENROUTER_API_KEY,
timeout=45.0
)
logger.info("🛰️ LLMService: OpenRouter Integration active.")
@property
def decision_engine(self):
if self._decision_engine is None:
from app.core.retrieval.decision_engine import DecisionEngine
self._decision_engine = DecisionEngine()
return self._decision_engine
def _load_prompts(self) -> dict:
path = Path(self.settings.PROMPTS_PATH)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"❌ Failed to load prompts: {e}")
return {}
def _load_llm_profiles(self) -> dict:
"""WP-25a: Lädt die zentralen MoE-Profile aus der llm_profiles.yaml."""
# Wir nutzen den in settings oder decision_engine definierten Pfad
path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml")
path = Path(path_str)
if not path.exists():
logger.warning(f"⚠️ LLM Profiles file not found at {path}. System will use .env defaults.")
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data.get("profiles", {})
except Exception as e:
logger.error(f"❌ Failed to load llm_profiles.yaml: {e}")
return {}
def get_prompt(self, key: str, provider: str = None) -> str:
active_provider = provider or self.settings.MINDNET_LLM_PROVIDER
data = self.prompts.get(key, "")
if isinstance(data, dict):
val = data.get(active_provider, data.get("gemini", data.get("ollama", "")))
return str(val)
return str(data)
async def generate_raw_response(
self,
prompt: str,
system: str = None,
force_json: bool = False,
max_retries: int = 2,
base_delay: float = 2.0,
priority: Literal["realtime", "background"] = "realtime",
provider: Optional[str] = None,
model_override: Optional[str] = None,
json_schema: Optional[Dict[str, Any]] = None,
json_schema_name: str = "mindnet_json",
strict_json_schema: bool = True,
profile_name: Optional[str] = None # WP-25a
) -> str:
"""
Haupteinstiegspunkt für LLM-Anfragen mit Profil-Unterstützung.
"""
target_provider = provider
target_model = model_override
target_temp = None
# WP-25a: Profil-Auflösung (Provider, Modell, Temperatur)
if profile_name and self.profiles:
profile = self.profiles.get(profile_name)
if profile:
target_provider = profile.get("provider", target_provider)
target_model = profile.get("model", target_model)
target_temp = profile.get("temperature")
logger.info(f"🎭 MoE Dispatch: Profil='{profile_name}' -> Provider='{target_provider}' | Model='{target_model}'")
else:
logger.warning(f"⚠️ Profil '{profile_name}' nicht in llm_profiles.yaml gefunden!")
# Fallback auf Standard-Provider falls nichts übergeben/definiert wurde
if not target_provider:
target_provider = self.settings.MINDNET_LLM_PROVIDER
logger.info(f" Kein Provider/Profil definiert. Nutze Default: {target_provider}")
if priority == "background":
async with LLMService._background_semaphore:
res = await self._dispatch(
target_provider, prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
else:
res = await self._dispatch(
target_provider, prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
# WP-25 Fix: Ingest-Stability (Ermöglicht YES/NO ohne Schwellenwert-Blockade)
if not res and target_provider != "ollama":
logger.warning(f"⚠️ [WP-25] Empty response from {target_provider}. Fallback to OLLAMA.")
res = await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, target_temp)
return clean_llm_text(res) if not force_json else res
async def _dispatch(
self,
provider: str,
prompt: str,
system: Optional[str],
force_json: bool,
max_retries: int,
base_delay: float,
model_override: Optional[str],
json_schema: Optional[Dict[str, Any]],
json_schema_name: str,
strict_json_schema: bool,
temperature: Optional[float] = None # WP-25a
) -> str:
"""Routet die Anfrage mit Rate-Limit Erkennung."""
rate_limit_attempts = 0
max_rate_retries = min(max_retries, getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3))
wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0)
while rate_limit_attempts <= max_rate_retries:
try:
if provider == "openrouter" and self.openrouter_client:
return await self._execute_openrouter(
prompt=prompt, system=system, force_json=force_json,
model_override=model_override, json_schema=json_schema,
json_schema_name=json_schema_name, strict_json_schema=strict_json_schema,
temperature=temperature
)
if provider == "gemini" and self.google_client:
return await self._execute_google(prompt, system, force_json, model_override, temperature)
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature)
except Exception as e:
err_str = str(e)
if any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited"]):
rate_limit_attempts += 1
logger.warning(f"⏳ Rate Limit {provider}. Attempt {rate_limit_attempts}. Wait {wait_time}s.")
await asyncio.sleep(wait_time)
continue
if self.settings.LLM_FALLBACK_ENABLED and provider != "ollama":
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature)
raise e
async def _execute_google(self, prompt, system, force_json, model_override, temperature):
model = model_override or self.settings.GEMINI_MODEL
clean_model = model.replace("models/", "")
config_kwargs = {
"system_instruction": system,
"response_mime_type": "application/json" if force_json else "text/plain"
}
if temperature is not None:
config_kwargs["temperature"] = temperature
config = types.GenerateContentConfig(**config_kwargs)
response = await asyncio.wait_for(
asyncio.to_thread(
self.google_client.models.generate_content,
model=clean_model, contents=prompt, config=config
),
timeout=45.0
)
return response.text.strip()
async def _execute_openrouter(
self, prompt: str, system: Optional[str], force_json: bool,
model_override: Optional[str], json_schema: Optional[Dict[str, Any]] = None,
json_schema_name: str = "mindnet_json", strict_json_schema: bool = True,
temperature: Optional[float] = None
) -> str:
model = model_override or self.settings.OPENROUTER_MODEL
# ERWEITERTES LOGGING VOR DEM CALL
logger.info(f"🛰️ OpenRouter Call: Model='{model}' | Temp={temperature}")
messages = []
if system: messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
kwargs: Dict[str, Any] = {}
if temperature is not None:
kwargs["temperature"] = temperature
if force_json:
if json_schema:
kwargs["response_format"] = {
"type": "json_schema",
"json_schema": {"name": json_schema_name, "strict": strict_json_schema, "schema": json_schema}
}
else:
kwargs["response_format"] = {"type": "json_object"}
response = await self.openrouter_client.chat.completions.create(
model=model, messages=messages, **kwargs
)
if not response.choices:
return ""
return response.choices[0].message.content.strip() if response.choices[0].message.content else ""
async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay, temperature=None):
# WP-25a: Nutzt Profil-Temperatur oder Standard
effective_temp = temperature if temperature is not None else (0.1 if force_json else 0.7)
payload = {
"model": self.settings.LLM_MODEL,
"prompt": prompt,
"stream": False,
"options": {"temperature": effective_temp, "num_ctx": 8192}
}
if force_json: payload["format"] = "json"
if system: payload["system"] = system
attempt = 0
while True:
try:
res = await self.ollama_client.post("/api/generate", json=payload)
res.raise_for_status()
return res.json().get("response", "").strip()
except Exception as e:
attempt += 1
if attempt > max_retries: raise e
await asyncio.sleep(base_delay * (2 ** (attempt - 1)))
async def generate_rag_response(self, query: str, context_str: Optional[str] = None) -> str:
"""WP-25: Orchestrierung via DecisionEngine."""
return await self.decision_engine.ask(query)
async def close(self):
if self.ollama_client:
await self.ollama_client.aclose()