mindnet/app/services/llm_service.py

328 lines
15 KiB
Python

"""
FILE: app/services/llm_service.py
DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter.
WP-25b: Implementierung der Lazy-Prompt-Orchestration (Modell-spezifisch).
VERSION: 3.5.5 (WP-25b: Prompt Orchestration & Full Resilience)
STATUS: Active
FIX:
- WP-25b: get_prompt() unterstützt Hierarchie: Model-ID -> Provider -> Default.
- WP-25b: generate_raw_response() unterstützt prompt_key + variables für Lazy-Formatting.
- WP-25a: Voller Erhalt der rekursiven Fallback-Kaskade und visited_profiles Schutz.
- WP-20: Restaurierung des internen Ollama-Retry-Loops für Hardware-Stabilität.
"""
import httpx
import yaml
import logging
import asyncio
import json
from google import genai
from google.genai import types
from openai import AsyncOpenAI
from pathlib import Path
from typing import Optional, Dict, Any, Literal
from app.config import get_settings
# Import der neutralen Bereinigungs-Logik
from app.core.registry import clean_llm_text
logger = logging.getLogger(__name__)
class LLMService:
_background_semaphore = None
def __init__(self):
self.settings = get_settings()
self.prompts = self._load_prompts()
self.profiles = self._load_llm_profiles()
self._decision_engine = None
if LLMService._background_semaphore is None:
limit = getattr(self.settings, "BACKGROUND_LIMIT", 2)
logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}")
LLMService._background_semaphore = asyncio.Semaphore(limit)
# 1. Lokaler Ollama Client
self.ollama_client = httpx.AsyncClient(
base_url=self.settings.OLLAMA_URL,
timeout=httpx.Timeout(self.settings.LLM_TIMEOUT)
)
# 2. Google GenAI Client
self.google_client = None
if self.settings.GOOGLE_API_KEY:
self.google_client = genai.Client(
api_key=self.settings.GOOGLE_API_KEY,
http_options={'api_version': 'v1'}
)
logger.info("✨ LLMService: Google GenAI (Gemini) active.")
# 3. OpenRouter Client
self.openrouter_client = None
if self.settings.OPENROUTER_API_KEY:
self.openrouter_client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=self.settings.OPENROUTER_API_KEY,
timeout=45.0
)
logger.info("🛰️ LLMService: OpenRouter Integration active.")
@property
def decision_engine(self):
if self._decision_engine is None:
from app.core.retrieval.decision_engine import DecisionEngine
self._decision_engine = DecisionEngine()
return self._decision_engine
def _load_prompts(self) -> dict:
path = Path(self.settings.PROMPTS_PATH)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"❌ Failed to load prompts: {e}")
return {}
def _load_llm_profiles(self) -> dict:
"""WP-25a: Lädt die zentralen MoE-Profile aus der llm_profiles.yaml."""
path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml")
path = Path(path_str)
if not path.exists():
logger.warning(f"⚠️ LLM Profiles file not found at {path}.")
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
return data.get("profiles", {})
except Exception as e:
logger.error(f"❌ Failed to load llm_profiles.yaml: {e}")
return {}
def get_prompt(self, key: str, model_id: str = None, provider: str = None) -> str:
"""
WP-25b: Hochpräziser Prompt-Lookup mit detailliertem Trace-Logging.
"""
data = self.prompts.get(key, "")
if not isinstance(data, dict):
return str(data)
# 1. Spezifischstes Match: Exakte Modell-ID
if model_id and model_id in data:
logger.info(f"🎯 [PROMPT-TRACE] Level 1 Match: Model-specific ('{model_id}') for key '{key}'")
return str(data[model_id])
# 2. Mittlere Ebene: Provider
if provider and provider in data:
logger.info(f"📡 [PROMPT-TRACE] Level 2 Match: Provider-fallback ('{provider}') for key '{key}'")
return str(data[provider])
# 3. Globaler Fallback
default_val = data.get("default", data.get("gemini", data.get("ollama", "")))
logger.info(f"⚓ [PROMPT-TRACE] Level 3 Match: Global Default for key '{key}'")
return str(default_val)
async def generate_raw_response(
self,
prompt: str = None,
prompt_key: str = None, # WP-25b: Lazy Loading Key
variables: dict = None, # WP-25b: Daten für Formatierung
system: str = None,
force_json: bool = False,
max_retries: int = 2,
base_delay: float = 2.0,
priority: Literal["realtime", "background"] = "realtime",
provider: Optional[str] = None,
model_override: Optional[str] = None,
json_schema: Optional[Dict[str, Any]] = None,
json_schema_name: str = "mindnet_json",
strict_json_schema: bool = True,
profile_name: Optional[str] = None,
visited_profiles: Optional[list] = None
) -> str:
"""Haupteinstiegspunkt für LLM-Anfragen mit Lazy-Prompt Orchestrierung."""
visited_profiles = visited_profiles or []
target_provider = provider
target_model = model_override
target_temp = None
fallback_profile = None
# 1. Profil-Auflösung (Mixture of Experts)
if profile_name and self.profiles:
profile = self.profiles.get(profile_name)
if profile:
target_provider = profile.get("provider", target_provider)
target_model = profile.get("model", target_model)
target_temp = profile.get("temperature")
fallback_profile = profile.get("fallback_profile")
visited_profiles.append(profile_name)
logger.info(f"🎭 MoE Dispatch: Profil='{profile_name}' -> Provider='{target_provider}' | Model='{target_model}'")
else:
logger.warning(f"⚠️ Profil '{profile_name}' nicht in llm_profiles.yaml gefunden!")
if not target_provider:
target_provider = self.settings.MINDNET_LLM_PROVIDER
# 2. WP-25b: Lazy Prompt Resolving
# Wir laden den Prompt erst JETZT, basierend auf dem gerade aktiven Modell.
current_prompt = prompt
if prompt_key:
template = self.get_prompt(prompt_key, model_id=target_model, provider=target_provider)
try:
# Formatierung mit den übergebenen Variablen
current_prompt = template.format(**(variables or {}))
except Exception as e:
logger.error(f"❌ Prompt formatting failed for key '{prompt_key}': {e}")
current_prompt = template # Sicherheits-Fallback
# 3. Ausführung mit Fehler-Handling für Kaskade
try:
if priority == "background":
async with LLMService._background_semaphore:
res = await self._dispatch(
target_provider, current_prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
else:
res = await self._dispatch(
target_provider, current_prompt, system, force_json,
max_retries, base_delay, target_model,
json_schema, json_schema_name, strict_json_schema, target_temp
)
# Check auf leere Cloud-Antworten (WP-25 Stability)
if not res and target_provider != "ollama":
logger.warning(f"⚠️ Empty response from {target_provider}. Triggering fallback.")
raise ValueError(f"Empty response from {target_provider}")
return clean_llm_text(res) if not force_json else res
except Exception as e:
logger.error(f"❌ Error during execution of profile '{profile_name}' ({target_provider}): {e}")
# 4. WP-25b Kaskaden-Logik (Rekursiv mit Modell-spezifischem Re-Loading)
if fallback_profile and fallback_profile not in visited_profiles:
logger.info(f"🔄 Switching to fallback profile: '{fallback_profile}'")
return await self.generate_raw_response(
prompt=prompt,
prompt_key=prompt_key,
variables=variables, # Ermöglicht neues Formatting für Fallback-Modell
system=system, force_json=force_json,
max_retries=max_retries, base_delay=base_delay,
priority=priority, provider=None, model_override=None,
json_schema=json_schema, json_schema_name=json_schema_name,
strict_json_schema=strict_json_schema,
profile_name=fallback_profile,
visited_profiles=visited_profiles
)
# 5. Ultimativer Notanker: Falls alles fehlschlägt, direkt zu Ollama
if target_provider != "ollama" and self.settings.LLM_FALLBACK_ENABLED:
logger.warning(f"🚨 Kaskade erschöpft. Nutze finalen Ollama-Notanker.")
res = await self._execute_ollama(current_prompt, system, force_json, max_retries, base_delay, target_temp, target_model)
return clean_llm_text(res) if not force_json else res
raise e
async def _dispatch(
self, provider, prompt, system, force_json, max_retries, base_delay,
model_override, json_schema, json_schema_name, strict_json_schema, temperature
) -> str:
"""Routet die Anfrage an den spezifischen Provider-Executor."""
rate_limit_attempts = 0
max_rate_retries = min(max_retries, getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3))
wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0)
while rate_limit_attempts <= max_rate_retries:
try:
if provider == "openrouter" and self.openrouter_client:
return await self._execute_openrouter(
prompt=prompt, system=system, force_json=force_json,
model_override=model_override, json_schema=json_schema,
json_schema_name=json_schema_name, strict_json_schema=strict_json_schema,
temperature=temperature
)
if provider == "gemini" and self.google_client:
return await self._execute_google(prompt, system, force_json, model_override, temperature)
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay, temperature, model_override)
except Exception as e:
err_str = str(e)
if any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited"]):
rate_limit_attempts += 1
logger.warning(f"⏳ Rate Limit {provider}. Attempt {rate_limit_attempts}. Wait {wait_time}s.")
await asyncio.sleep(wait_time)
continue
raise e
async def _execute_google(self, prompt, system, force_json, model_override, temperature):
model = (model_override or self.settings.GEMINI_MODEL).replace("models/", "")
config_kwargs = {
"system_instruction": system,
"response_mime_type": "application/json" if force_json else "text/plain"
}
if temperature is not None:
config_kwargs["temperature"] = temperature
config = types.GenerateContentConfig(**config_kwargs)
response = await asyncio.wait_for(
asyncio.to_thread(self.google_client.models.generate_content, model=model, contents=prompt, config=config),
timeout=45.0
)
return response.text.strip()
async def _execute_openrouter(self, prompt, system, force_json, model_override, json_schema, json_schema_name, strict_json_schema, temperature) -> str:
model = model_override or self.settings.OPENROUTER_MODEL
logger.info(f"🛰️ OpenRouter Call: Model='{model}' | Temp={temperature}")
messages = []
if system: messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
kwargs: Dict[str, Any] = {}
if temperature is not None: kwargs["temperature"] = temperature
if force_json:
if json_schema:
kwargs["response_format"] = {"type": "json_schema", "json_schema": {"name": json_schema_name, "strict": strict_json_schema, "schema": json_schema}}
else:
kwargs["response_format"] = {"type": "json_object"}
response = await self.openrouter_client.chat.completions.create(model=model, messages=messages, **kwargs)
if not response.choices: return ""
return response.choices[0].message.content.strip() if response.choices[0].message.content else ""
async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay, temperature=None, model_override=None):
# WP-20: Restaurierter Retry-Loop für lokale Hardware-Resilienz
effective_model = model_override or self.settings.LLM_MODEL
effective_temp = temperature if temperature is not None else (0.1 if force_json else 0.7)
payload = {
"model": effective_model,
"prompt": prompt, "stream": False,
"options": {"temperature": effective_temp, "num_ctx": 8192}
}
if force_json: payload["format"] = "json"
if system: payload["system"] = system
attempt = 0
while True:
try:
res = await self.ollama_client.post("/api/generate", json=payload)
res.raise_for_status()
return res.json().get("response", "").strip()
except Exception as e:
attempt += 1
if attempt > max_retries:
logger.error(f"❌ Ollama failure after {attempt} attempts: {e}")
raise e
await asyncio.sleep(base_delay * (2 ** (attempt - 1)))
async def generate_rag_response(self, query: str, context_str: Optional[str] = None) -> str:
return await self.decision_engine.ask(query)
async def close(self):
if self.ollama_client:
await self.ollama_client.aclose()