mindnet/app/services/llm_service.py
2025-12-23 14:33:51 +01:00

185 lines
7.6 KiB
Python

"""
FILE: app/services/llm_service.py
DESCRIPTION: Hybrid-Client für Ollama & Google Gemini.
Verwaltet Prompts, Background-Last (Semaphore) und Cloud-Routing.
VERSION: 3.1.0 (WP-20 Full Integration: Provider-Aware Prompting)
STATUS: Active
DEPENDENCIES: httpx, yaml, asyncio, google-generativeai, app.config
EXTERNAL_CONFIG: config/prompts.yaml
"""
import httpx
import yaml
import logging
import os
import asyncio
import json
import google.generativeai as genai
from pathlib import Path
from typing import Optional, Dict, Any, Literal
from app.config import get_settings
logger = logging.getLogger(__name__)
class LLMService:
# GLOBALER SEMAPHOR für Hintergrund-Last Steuerung (WP-06 / WP-20)
_background_semaphore = None
def __init__(self):
self.settings = get_settings()
self.prompts = self._load_prompts()
# Initialisiere Semaphore einmalig auf Klassen-Ebene
if LLMService._background_semaphore is None:
limit = getattr(self.settings, "BACKGROUND_LIMIT", 2)
logger.info(f"🚦 LLMService: Initializing Background Semaphore with limit: {limit}")
LLMService._background_semaphore = asyncio.Semaphore(limit)
# Ollama Setup
self.timeout = httpx.Timeout(self.settings.LLM_TIMEOUT, connect=10.0)
self.ollama_client = httpx.AsyncClient(
base_url=self.settings.OLLAMA_URL,
timeout=self.timeout
)
# Gemini Setup [WP-20]
if hasattr(self.settings, "GOOGLE_API_KEY") and self.settings.GOOGLE_API_KEY:
genai.configure(api_key=self.settings.GOOGLE_API_KEY)
model_name = getattr(self.settings, "GEMINI_MODEL", "gemini-1.5-flash")
self.gemini_model = genai.GenerativeModel(model_name)
logger.info(f"✨ LLMService: Gemini Cloud Mode active ({model_name})")
else:
self.gemini_model = None
logger.warning("⚠️ LLMService: No GOOGLE_API_KEY found. Gemini mode disabled.")
def _load_prompts(self) -> dict:
"""Lädt die Prompt-Konfiguration aus der YAML-Datei."""
path = Path(self.settings.PROMPTS_PATH)
if not path.exists(): return {}
try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f)
except Exception as e:
logger.error(f"Failed to load prompts: {e}")
return {}
def get_prompt(self, key: str, provider: str = None) -> str:
"""
Wählt das Template basierend auf dem Provider aus (WP-20).
Unterstützt sowohl flache Strings als auch Dictionary-basierte Provider-Zweige.
"""
active_provider = provider or getattr(self.settings, "MINDNET_LLM_PROVIDER", "ollama")
data = self.prompts.get(key, "")
if isinstance(data, dict):
# Versuche den Provider-Key, Fallback auf 'ollama'
return data.get(active_provider, data.get("ollama", ""))
return str(data)
async def generate_raw_response(
self,
prompt: str,
system: str = None,
force_json: bool = False,
max_retries: int = 2,
base_delay: float = 2.0,
priority: Literal["realtime", "background"] = "realtime",
provider: Optional[str] = None
) -> str:
"""
Führt einen LLM Call aus mit Priority-Handling und Provider-Wahl.
"""
# Bestimme Provider: Parameter-Override > Config-Default
target_provider = provider or getattr(self.settings, "MINDNET_LLM_PROVIDER", "ollama")
use_semaphore = (priority == "background")
if use_semaphore and LLMService._background_semaphore:
async with LLMService._background_semaphore:
return await self._dispatch_request(target_provider, prompt, system, force_json, max_retries, base_delay)
else:
return await self._dispatch_request(target_provider, prompt, system, force_json, max_retries, base_delay)
async def _dispatch_request(self, provider, prompt, system, force_json, max_retries, base_delay):
"""Routet die Anfrage an den gewählten Provider mit Fallback-Logik."""
try:
if provider == "gemini" and self.gemini_model:
return await self._execute_gemini(prompt, system, force_json)
else:
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
except Exception as e:
# Automatischer Fallback auf Ollama bei Cloud-Fehlern (WP-20)
if provider == "gemini" and getattr(self.settings, "LLM_FALLBACK_ENABLED", True):
logger.warning(f"🔄 Gemini failed: {e}. Falling back to Ollama.")
return await self._execute_ollama(prompt, system, force_json, max_retries, base_delay)
raise e
async def _execute_gemini(self, prompt, system, force_json) -> str:
"""Asynchroner Google Gemini Call (WP-20)."""
full_prompt = f"System: {system}\n\nUser: {prompt}" if system else prompt
# Gemini JSON Mode Support
gen_config = {}
if force_json:
gen_config["response_mime_type"] = "application/json"
response = await self.gemini_model.generate_content_async(
full_prompt,
generation_config=gen_config
)
return response.text.strip()
async def _execute_ollama(self, prompt, system, force_json, max_retries, base_delay) -> str:
"""Ollama Call mit exponentieller Backoff-Retry-Logik."""
payload: Dict[str, Any] = {
"model": self.settings.LLM_MODEL,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1 if force_json else 0.7,
"num_ctx": 8192
}
}
if force_json: payload["format"] = "json"
if system: payload["system"] = system
attempt = 0
while True:
try:
response = await self.ollama_client.post("/api/generate", json=payload)
if response.status_code == 200:
return response.json().get("response", "").strip()
response.raise_for_status()
except Exception as e:
attempt += 1
if attempt > max_retries:
logger.error(f"Ollama Error after {attempt} retries: {e}")
raise e
# Exponentieller Backoff: base_delay * (2 ^ (attempt - 1))
wait_time = base_delay * (2 ** (attempt - 1))
logger.warning(f"⚠️ Ollama attempt {attempt} failed. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
async def generate_rag_response(self, query: str, context_str: str) -> str:
"""Standard RAG Chat-Interface mit Provider-spezifischen Templates."""
provider = getattr(self.settings, "MINDNET_LLM_PROVIDER", "ollama")
# Holen der Templates über die neue get_prompt Methode
system_prompt = self.get_prompt("system_prompt", provider)
rag_template = self.get_prompt("rag_template", provider)
# Fallback für RAG Template Struktur
if not rag_template:
rag_template = "{context_str}\n\n{query}"
final_prompt = rag_template.format(context_str=context_str, query=query)
return await self.generate_raw_response(
final_prompt,
system=system_prompt,
priority="realtime"
)
async def close(self):
"""Schließt alle offenen HTTP-Verbindungen."""
if self.ollama_client:
await self.ollama_client.aclose()