317 lines
13 KiB
Python
317 lines
13 KiB
Python
"""
|
|
FILE: app/services/llm_service.py
|
|
DESCRIPTION: Hybrid-Client für Ollama, Google GenAI (Gemini) und OpenRouter.
|
|
Verwaltet provider-spezifische Prompts und Background-Last.
|
|
WP-20: Optimiertes Fallback-Management zum Schutz von Cloud-Quoten.
|
|
WP-20 Fix: Bulletproof Prompt-Auflösung für format() Aufrufe.
|
|
WP-22/JSON: Optionales JSON-Schema + strict (für OpenRouter structured outputs).
|
|
FIX: Intelligente Rate-Limit Erkennung (429 Handling), v1-API Sync & Timeouts.
|
|
VERSION: 3.3.9
|
|
STATUS: Active
|
|
FIX:
|
|
- Importiert clean_llm_text von app.core.registry zur Vermeidung von Circular Imports.
|
|
- Wendet clean_llm_text auf Text-Antworten in generate_raw_response an.
|
|
"""
|
|
import httpx
|
|
import yaml
|
|
import logging
|
|
import asyncio
|
|
import json
|
|
from google import genai
|
|
from google.genai import types
|
|
from openai import AsyncOpenAI # Für OpenRouter (OpenAI-kompatibel)
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, Literal
|
|
from app.config import get_settings
|
|
|
|
# ENTSCHEIDENDER FIX: Import der neutralen Bereinigungs-Logik (WP-14)
|
|
from app.core.registry import clean_llm_text
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LLMService:
|
|
# GLOBALER SEMAPHOR für Hintergrund-Last Steuerung (WP-06)
|
|
_background_semaphore = None
|
|
|
|
def __init__(self):
|
|
self.settings = get_settings()
|
|
self.prompts = self._load_prompts()
|
|
|
|
# Initialisiere Semaphore einmalig auf Klassen-Ebene
|
|
if LLMService._background_semaphore is None:
|
|
limit = getattr(self.settings, "BACKGROUND_LIMIT", 2)
|
|
logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}")
|
|
LLMService._background_semaphore = asyncio.Semaphore(limit)
|
|
|
|
# 1. Lokaler Ollama Client
|
|
self.ollama_client = httpx.AsyncClient(
|
|
base_url=self.settings.OLLAMA_URL,
|
|
timeout=httpx.Timeout(self.settings.LLM_TIMEOUT)
|
|
)
|
|
|
|
# 2. Google GenAI Client (Modern SDK)
|
|
self.google_client = None
|
|
if self.settings.GOOGLE_API_KEY:
|
|
# FIX: Wir erzwingen api_version 'v1' für höhere Stabilität bei 2.5er Modellen.
|
|
self.google_client = genai.Client(
|
|
api_key=self.settings.GOOGLE_API_KEY,
|
|
http_options={'api_version': 'v1'}
|
|
)
|
|
logger.info("✨ LLMService: Google GenAI (Gemini) active.")
|
|
|
|
# 3. OpenRouter Client
|
|
self.openrouter_client = None
|
|
if self.settings.OPENROUTER_API_KEY:
|
|
self.openrouter_client = AsyncOpenAI(
|
|
base_url="https://openrouter.ai/api/v1",
|
|
api_key=self.settings.OPENROUTER_API_KEY,
|
|
# Strikter Timeout für OpenRouter Free-Tier zur Vermeidung von Hangs.
|
|
timeout=45.0
|
|
)
|
|
logger.info("🛰️ LLMService: OpenRouter Integration active.")
|
|
|
|
def _load_prompts(self) -> dict:
|
|
"""Lädt die Prompt-Konfiguration aus der YAML-Datei."""
|
|
path = Path(self.settings.PROMPTS_PATH)
|
|
if not path.exists():
|
|
logger.error(f"❌ Prompts file not found at {path}")
|
|
return {}
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return yaml.safe_load(f) or {}
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to load prompts: {e}")
|
|
return {}
|
|
|
|
def get_prompt(self, key: str, provider: str = None) -> str:
|
|
"""
|
|
Hole provider-spezifisches Template mit intelligenter Text-Kaskade.
|
|
HINWEIS: Dies ist nur ein Text-Lookup und verbraucht kein API-Kontingent.
|
|
Kaskade: Gewählter Provider -> Gemini (Cloud-Stil) -> Ollama (Basis-Stil).
|
|
"""
|
|
active_provider = provider or self.settings.MINDNET_LLM_PROVIDER
|
|
data = self.prompts.get(key, "")
|
|
|
|
if isinstance(data, dict):
|
|
# Wir versuchen erst den Provider, dann Gemini, dann Ollama
|
|
val = data.get(active_provider, data.get("gemini", data.get("ollama", "")))
|
|
|
|
# Falls val durch YAML-Fehler immer noch ein Dict ist, extrahiere ersten String
|
|
if isinstance(val, dict):
|
|
logger.warning(f"⚠️ [LLMService] Nested dictionary detected for key '{key}'. Using first entry.")
|
|
val = next(iter(val.values()), "") if val else ""
|
|
return str(val)
|
|
|
|
return str(data)
|
|
|
|
async def generate_raw_response(
|
|
self,
|
|
prompt: str,
|
|
system: str = None,
|
|
force_json: bool = False,
|
|
max_retries: int = 2,
|
|
base_delay: float = 2.0,
|
|
priority: Literal["realtime", "background"] = "realtime",
|
|
provider: Optional[str] = None,
|
|
model_override: Optional[str] = None,
|
|
json_schema: Optional[Dict[str, Any]] = None,
|
|
json_schema_name: str = "mindnet_json",
|
|
strict_json_schema: bool = True
|
|
) -> str:
|
|
"""
|
|
Haupteinstiegspunkt für LLM-Anfragen mit Priorisierung.
|
|
Wendet die Bereinigung auf Text-Antworten an.
|
|
"""
|
|
target_provider = provider or self.settings.MINDNET_LLM_PROVIDER
|
|
|
|
if priority == "background":
|
|
async with LLMService._background_semaphore:
|
|
res = await self._dispatch(
|
|
target_provider, prompt, system, force_json,
|
|
max_retries, base_delay, model_override,
|
|
json_schema, json_schema_name, strict_json_schema
|
|
)
|
|
# WP-14 Fix: Bereinige Text-Antworten vor Rückgabe
|
|
return clean_llm_text(res) if not force_json else res
|
|
|
|
res = await self._dispatch(
|
|
target_provider, prompt, system, force_json,
|
|
max_retries, base_delay, model_override,
|
|
json_schema, json_schema_name, strict_json_schema
|
|
)
|
|
# WP-14 Fix: Bereinige Text-Antworten vor Rückgabe
|
|
return clean_llm_text(res) if not force_json else res
|
|
|
|
async def _dispatch(
|
|
self,
|
|
provider: str,
|
|
prompt: str,
|
|
system: Optional[str],
|
|
force_json: bool,
|
|
max_retries: int,
|
|
base_delay: float,
|
|
model_override: Optional[str],
|
|
json_schema: Optional[Dict[str, Any]],
|
|
json_schema_name: str,
|
|
strict_json_schema: bool
|
|
) -> str:
|
|
"""
|
|
Routet die Anfrage mit intelligenter Rate-Limit Erkennung.
|
|
Nutzt max_retries um die Rate-Limit Schleife zu begrenzen.
|
|
"""
|
|
rate_limit_attempts = 0
|
|
# FIX: Wir nutzen max_retries als Limit für Rate-Limit Versuche, wenn explizit klein gewählt (z.B. Chat)
|
|
max_rate_retries = min(max_retries, getattr(self.settings, "LLM_RATE_LIMIT_RETRIES", 3))
|
|
wait_time = getattr(self.settings, "LLM_RATE_LIMIT_WAIT", 60.0)
|
|
|
|
while rate_limit_attempts <= max_rate_retries:
|
|
try:
|
|
if provider == "openrouter" and self.openrouter_client:
|
|
return await self._execute_openrouter(
|
|
prompt=prompt,
|
|
system=system,
|
|
force_json=force_json,
|
|
model_override=model_override,
|
|
json_schema=json_schema,
|
|
json_schema_name=json_schema_name,
|
|
strict_json_schema=strict_json_schema
|
|
)
|
|
|
|
if provider == "gemini" and self.google_client:
|
|
return await self._execute_google(prompt, system, force_json, model_override)
|
|
|
|
# Default/Fallback zu Ollama
|
|
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
|
|
|
|
except Exception as e:
|
|
err_str = str(e)
|
|
# Intelligente 429 Erkennung
|
|
is_rate_limit = any(x in err_str for x in ["429", "RESOURCE_EXHAUSTED", "rate_limited", "Too Many Requests"])
|
|
|
|
if is_rate_limit and rate_limit_attempts < max_rate_retries:
|
|
rate_limit_attempts += 1
|
|
logger.warning(
|
|
f"⏳ [LLMService] Rate Limit detected from {provider}. "
|
|
f"Attempt {rate_limit_attempts}/{max_rate_retries}. Waiting {wait_time}s..."
|
|
)
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
|
|
# Wenn kein Rate-Limit oder Retries erschöpft -> Fallback zu Ollama (falls aktiviert)
|
|
if self.settings.LLM_FALLBACK_ENABLED and provider != "ollama":
|
|
logger.warning(
|
|
f"🔄 Provider {provider} failed ({err_str}). Falling back to LOCAL OLLAMA."
|
|
)
|
|
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
|
|
raise e
|
|
|
|
async def _execute_google(self, prompt, system, force_json, model_override):
|
|
"""Native Google SDK Integration (Gemini) mit v1 Fix."""
|
|
model = model_override or self.settings.GEMINI_MODEL
|
|
clean_model = model.replace("models/", "")
|
|
|
|
config = types.GenerateContentConfig(
|
|
system_instruction=system,
|
|
response_mime_type="application/json" if force_json else "text/plain"
|
|
)
|
|
response = await asyncio.wait_for(
|
|
asyncio.to_thread(
|
|
self.google_client.models.generate_content,
|
|
model=clean_model, contents=prompt, config=config
|
|
),
|
|
timeout=45.0
|
|
)
|
|
return response.text.strip()
|
|
|
|
async def _execute_openrouter(
|
|
self,
|
|
prompt: str,
|
|
system: Optional[str],
|
|
force_json: bool,
|
|
model_override: Optional[str],
|
|
json_schema: Optional[Dict[str, Any]] = None,
|
|
json_schema_name: str = "mindnet_json",
|
|
strict_json_schema: bool = True
|
|
) -> str:
|
|
"""OpenRouter API Integration (OpenAI-kompatibel)."""
|
|
model = model_override or self.settings.OPENROUTER_MODEL
|
|
messages = []
|
|
if system:
|
|
messages.append({"role": "system", "content": system})
|
|
messages.append({"role": "user", "content": prompt})
|
|
|
|
kwargs: Dict[str, Any] = {}
|
|
if force_json:
|
|
if json_schema:
|
|
kwargs["response_format"] = {
|
|
"type": "json_schema",
|
|
"json_schema": {
|
|
"name": json_schema_name,
|
|
"strict": strict_json_schema,
|
|
"schema": json_schema
|
|
}
|
|
}
|
|
else:
|
|
kwargs["response_format"] = {"type": "json_object"}
|
|
|
|
response = await self.openrouter_client.chat.completions.create(
|
|
model=model,
|
|
messages=messages,
|
|
**kwargs
|
|
)
|
|
return response.choices[0].message.content.strip()
|
|
|
|
async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay):
|
|
"""Lokaler Ollama Call mit striktem Retry-Limit."""
|
|
payload = {
|
|
"model": self.settings.LLM_MODEL,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1 if force_json else 0.7,
|
|
"num_ctx": 8192 # Begrenzung für Stabilität (WP-20)
|
|
}
|
|
}
|
|
if force_json:
|
|
payload["format"] = "json"
|
|
if system:
|
|
payload["system"] = system
|
|
|
|
attempt = 0
|
|
while True:
|
|
try:
|
|
res = await self.ollama_client.post("/api/generate", json=payload)
|
|
res.raise_for_status()
|
|
return res.json().get("response", "").strip()
|
|
except Exception as e:
|
|
attempt += 1
|
|
# WICHTIG: Wenn max_retries=0 (Chat), bricht dies nach dem 1. Versuch (attempt=1) sofort ab.
|
|
if attempt > max_retries:
|
|
logger.error(f"❌ Ollama request failed after {attempt} attempt(s): {e}")
|
|
raise e
|
|
|
|
wait_time = base_delay * (2 ** (attempt - 1))
|
|
logger.warning(f"⚠️ Ollama attempt {attempt} failed. Retrying in {wait_time}s...")
|
|
await asyncio.sleep(wait_time)
|
|
|
|
async def generate_rag_response(self, query: str, context_str: str) -> str:
|
|
"""Vollständiges RAG Chat-Interface."""
|
|
provider = self.settings.MINDNET_LLM_PROVIDER
|
|
system_prompt = self.get_prompt("system_prompt", provider)
|
|
rag_template = self.get_prompt("rag_template", provider)
|
|
|
|
final_prompt = rag_template.format(context_str=context_str, query=query)
|
|
|
|
# RAG Aufrufe im Chat nutzen nun standardmäßig max_retries=2 (überschreibbar)
|
|
# Durch den Aufruf von generate_raw_response wird die Bereinigung automatisch angewendet.
|
|
return await self.generate_raw_response(
|
|
final_prompt,
|
|
system=system_prompt,
|
|
priority="realtime"
|
|
)
|
|
|
|
async def close(self):
|
|
"""Schließt die HTTP-Verbindungen."""
|
|
if self.ollama_client:
|
|
await self.ollama_client.aclose() |