mindnet/app/services/llm_service.py

310 lines
12 KiB
Python

"""
FILE: app/services/llm_service.py
DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter.
Verwaltet provider-spezifische Prompts und Background-Last.
WP-20: Optimiertes Fallback-Management zum Schutz von Cloud-Quoten.
WP-20 Fix: Bulletproof Prompt-Auflösung für format() Aufrufe.
WP-22/JSON: Optionales JSON-Schema + strict (für OpenRouter structured outputs),
OHNE Breaking Changes (neue Parameter nur am Ende).
VERSION: 3.3.3
STATUS: Active
"""
import httpx
import yaml
import logging
import asyncio
import json
from google import genai
from google.genai import types
from openai import AsyncOpenAI # Für OpenRouter (OpenAI-kompatibel)
from pathlib import Path
from typing import Optional, Dict, Any, Literal
from app.config import get_settings
logger = logging.getLogger(__name__)
class LLMService:
# GLOBALER SEMAPHOR für Hintergrund-Last Steuerung (WP-06)
_background_semaphore = None
def __init__(self):
self.settings = get_settings()
self.prompts = self._load_prompts()
# Initialisiere Semaphore einmalig auf Klassen-Ebene
if LLMService._background_semaphore is None:
limit = getattr(self.settings, "BACKGROUND_LIMIT", 2)
logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}")
LLMService._background_semaphore = asyncio.Semaphore(limit)
# 1. Lokaler Ollama Client
self.ollama_client = httpx.AsyncClient(
base_url=self.settings.OLLAMA_URL,
timeout=httpx.Timeout(self.settings.LLM_TIMEOUT)
)
# 2. Google GenAI Client (Modern SDK)
self.google_client = None
if self.settings.GOOGLE_API_KEY:
self.google_client = genai.Client(api_key=self.settings.GOOGLE_API_KEY)
logger.info("✨ LLMService: Google GenAI (Gemini) active.")
# 3. OpenRouter Client
self.openrouter_client = None
if self.settings.OPENROUTER_API_KEY:
self.openrouter_client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=self.settings.OPENROUTER_API_KEY
)
logger.info("🛰️ LLMService: OpenRouter Integration active.")
def _load_prompts(self) -> dict:
"""Lädt die Prompt-Konfiguration aus der YAML-Datei."""
path = Path(self.settings.PROMPTS_PATH)
if not path.exists():
logger.error(f"❌ Prompts file not found at {path}")
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error(f"❌ Failed to load prompts: {e}")
return {}
def get_prompt(self, key: str, provider: str = None) -> str:
"""
Hole provider-spezifisches Template mit intelligenter Text-Kaskade.
HINWEIS: Dies ist nur ein Text-Lookup und verbraucht kein API-Kontingent.
Kaskade: Gewählter Provider -> Gemini (Cloud-Stil) -> Ollama (Basis-Stil).
WP-20 Fix: Garantiert die Rückgabe eines Strings, um AttributeError zu vermeiden.
"""
active_provider = provider or self.settings.MINDNET_LLM_PROVIDER
data = self.prompts.get(key, "")
if isinstance(data, dict):
# Wir versuchen erst den Provider, dann Gemini (weil ähnlich leistungsfähig), dann Ollama
val = data.get(active_provider, data.get("gemini", data.get("ollama", "")))
# Falls val durch YAML-Fehler immer noch ein Dict ist, extrahiere ersten String
if isinstance(val, dict):
logger.warning(f"⚠️ [LLMService] Nested dictionary detected for key '{key}'. Using first entry.")
val = next(iter(val.values()), "") if val else ""
return str(val)
return str(data)
async def generate_raw_response(
self,
prompt: str,
system: str = None,
force_json: bool = False,
max_retries: int = 2,
base_delay: float = 2.0,
priority: Literal["realtime", "background"] = "realtime",
provider: Optional[str] = None,
model_override: Optional[str] = None,
# --- NEW (am Ende => rückwärtskompatibel!) ---
json_schema: Optional[Dict[str, Any]] = None,
json_schema_name: str = "mindnet_json",
strict_json_schema: bool = True
) -> str:
"""
Haupteinstiegspunkt für LLM-Anfragen mit Priorisierung.
force_json:
- Ollama: nutzt payload["format"]="json"
- Gemini: nutzt response_mime_type="application/json"
- OpenRouter: nutzt response_format=json_object (Fallback) oder json_schema (structured outputs)
json_schema + strict_json_schema (nur OpenRouter relevant):
- Wenn json_schema gesetzt ist UND force_json=True -> response_format.type="json_schema"
- strict_json_schema wird an OpenRouter/Provider weitergereicht (best effort je nach Provider)
"""
target_provider = provider or self.settings.MINDNET_LLM_PROVIDER
if priority == "background":
async with LLMService._background_semaphore:
return await self._dispatch(
target_provider,
prompt,
system,
force_json,
max_retries,
base_delay,
model_override,
json_schema,
json_schema_name,
strict_json_schema
)
return await self._dispatch(
target_provider,
prompt,
system,
force_json,
max_retries,
base_delay,
model_override,
json_schema,
json_schema_name,
strict_json_schema
)
async def _dispatch(
self,
provider: str,
prompt: str,
system: Optional[str],
force_json: bool,
max_retries: int,
base_delay: float,
model_override: Optional[str],
json_schema: Optional[Dict[str, Any]],
json_schema_name: str,
strict_json_schema: bool
) -> str:
"""Routet die Anfrage an den physikalischen API-Provider."""
try:
if provider == "openrouter" and self.openrouter_client:
return await self._execute_openrouter(
prompt=prompt,
system=system,
force_json=force_json,
model_override=model_override,
json_schema=json_schema,
json_schema_name=json_schema_name,
strict_json_schema=strict_json_schema
)
if provider == "gemini" and self.google_client:
return await self._execute_google(prompt, system, force_json, model_override)
# Default/Fallback zu Ollama
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
except Exception as e:
# QUOTEN-SCHUTZ: Wenn Cloud (OpenRouter/Gemini) fehlschlägt,
# gehen wir IMMER zu Ollama, niemals von OpenRouter zu Gemini.
if self.settings.LLM_FALLBACK_ENABLED and provider != "ollama":
logger.warning(
f"🔄 Provider {provider} failed: {e}. Falling back to LOCAL OLLAMA to protect cloud quotas."
)
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
raise e
async def _execute_google(self, prompt, system, force_json, model_override):
"""Native Google SDK Integration (Gemini)."""
# Nutzt GEMINI_MODEL aus config.py falls kein override übergeben wurde
model = model_override or self.settings.GEMINI_MODEL
config = types.GenerateContentConfig(
system_instruction=system,
response_mime_type="application/json" if force_json else "text/plain"
)
# SDK Call in Thread auslagern, da die Google API blocking sein kann
response = await asyncio.to_thread(
self.google_client.models.generate_content,
model=model, contents=prompt, config=config
)
return response.text.strip()
async def _execute_openrouter(
self,
prompt: str,
system: Optional[str],
force_json: bool,
model_override: Optional[str],
# --- NEW (optional) ---
json_schema: Optional[Dict[str, Any]] = None,
json_schema_name: str = "mindnet_json",
strict_json_schema: bool = True
) -> str:
"""
OpenRouter API Integration (OpenAI-kompatibel).
force_json=True:
- Ohne json_schema -> response_format={"type":"json_object"}
- Mit json_schema -> response_format={"type":"json_schema", "json_schema": {..., "strict": True}}
Wichtig: response_format NICHT als None senden (robuster gegenüber SDK/Provider).
"""
# Nutzt OPENROUTER_MODEL aus config.py
model = model_override or self.settings.OPENROUTER_MODEL
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
kwargs: Dict[str, Any] = {}
if force_json:
if json_schema:
kwargs["response_format"] = {
"type": "json_schema",
"json_schema": {
"name": json_schema_name,
"strict": strict_json_schema,
"schema": json_schema
}
}
else:
kwargs["response_format"] = {"type": "json_object"}
response = await self.openrouter_client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
return response.choices[0].message.content.strip()
async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay):
"""Lokaler Ollama Call mit exponentiellem Backoff."""
payload = {
"model": self.settings.LLM_MODEL,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1 if force_json else 0.7,
"num_ctx": 8192
}
}
if force_json:
payload["format"] = "json"
if system:
payload["system"] = system
attempt = 0
while True:
try:
res = await self.ollama_client.post("/api/generate", json=payload)
res.raise_for_status()
return res.json().get("response", "").strip()
except Exception as e:
attempt += 1
if attempt > max_retries:
logger.error(f"❌ Ollama Error after {attempt} retries: {e}")
raise e
wait_time = base_delay * (2 ** (attempt - 1))
logger.warning(f"⚠️ Ollama attempt {attempt} failed. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
async def generate_rag_response(self, query: str, context_str: str) -> str:
"""Vollständiges RAG Chat-Interface."""
provider = self.settings.MINDNET_LLM_PROVIDER
system_prompt = self.get_prompt("system_prompt", provider)
rag_template = self.get_prompt("rag_template", provider)
final_prompt = rag_template.format(context_str=context_str, query=query)
return await self.generate_raw_response(
final_prompt,
system=system_prompt,
priority="realtime"
)
async def close(self):
"""Schließt die HTTP-Verbindungen."""
if self.ollama_client:
await self.ollama_client.aclose()