mindnet/app/services/embeddings_client.py

138 lines
5.3 KiB
Python

"""
FILE: app/services/embeddings_client.py
DESCRIPTION: Unified Embedding Client. Nutzt MoE-Profile zur Modellsteuerung.
WP-25a: Integration der llm_profiles.yaml für konsistente Vektoren.
VERSION: 2.6.0 (WP-25a: MoE & Profile Support)
STATUS: Active
DEPENDENCIES: httpx, requests, app.config, yaml
"""
from __future__ import annotations
import os
import logging
import httpx
import requests
import yaml
from pathlib import Path
from typing import List, Dict, Any
from app.config import get_settings
logger = logging.getLogger(__name__)
class EmbeddingsClient:
"""
Async Client für Embeddings.
Steuerung erfolgt über das 'embedding_expert' Profil in llm_profiles.yaml.
"""
def __init__(self):
self.settings = get_settings()
# 1. MoE-Profil laden (WP-25a)
self.profile = self._load_embedding_profile()
# 2. Modell & URL auflösen
# Priorität: llm_profiles.yaml -> .env (Legacy) -> Fallback
self.model = self.profile.get("model") or os.getenv("MINDNET_EMBEDDING_MODEL")
provider = self.profile.get("provider", "ollama")
if provider == "ollama":
self.base_url = self.settings.OLLAMA_URL
else:
# Platzhalter für zukünftige Cloud-Embedding-Provider
self.base_url = os.getenv("MINDNET_OLLAMA_URL", "http://127.0.0.1:11434")
if not self.model:
self.model = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
logger.warning(f"⚠️ Kein Embedding-Modell in Profil oder .env gefunden. Fallback auf '{self.model}'.")
else:
logger.info(f"🧬 Embedding-Experte aktiv: Model='{self.model}' via {provider}")
def _load_embedding_profile(self) -> Dict[str, Any]:
"""Lädt die Konfiguration für den embedding_expert."""
path_str = getattr(self.settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml")
path = Path(path_str)
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
profiles = data.get("profiles", {})
return profiles.get("embedding_expert", {})
except Exception as e:
logger.error(f"❌ Failed to load embedding profile: {e}")
return {}
async def embed_query(self, text: str) -> List[float]:
"""Erzeugt einen Vektor für eine Suchanfrage."""
return await self._request_embedding(text)
async def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Erzeugt Vektoren für einen Batch von Dokumenten."""
vectors = []
# Längeres Timeout für Batches (WP-20 Resilienz)
async with httpx.AsyncClient(timeout=120.0) as client:
for text in texts:
vec = await self._request_embedding_with_client(client, text)
vectors.append(vec)
return vectors
async def _request_embedding(self, text: str) -> List[float]:
"""Interner Request-Handler für Einzelabfragen."""
async with httpx.AsyncClient(timeout=30.0) as client:
return await self._request_embedding_with_client(client, text)
async def _request_embedding_with_client(self, client: httpx.AsyncClient, text: str) -> List[float]:
"""Führt den HTTP-Call gegen die Embedding-API durch."""
if not text or not text.strip():
return []
url = f"{self.base_url}/api/embeddings"
try:
# WP-25: Aktuell optimiert für Ollama-API Struktur
response = await client.post(url, json={"model": self.model, "prompt": text})
response.raise_for_status()
return response.json().get("embedding", [])
except Exception as e:
logger.error(f"Async embedding failed (Model: {self.model}): {e}")
return []
# ==============================================================================
# TEIL 2: SYNCHRONER FALLBACK (Unified)
# ==============================================================================
def embed_text(text: str) -> List[float]:
"""
LEGACY/SYNC: Nutzt ebenfalls die Profil-Logik für Konsistenz.
Ersetzt lokale sentence-transformers zur Vermeidung von Dimensionskonflikten.
"""
if not text or not text.strip():
return []
settings = get_settings()
# Schneller Profil-Lookup für Sync-Mode
path = Path(getattr(settings, "LLM_PROFILES_PATH", "config/llm_profiles.yaml"))
model = os.getenv("MINDNET_EMBEDDING_MODEL")
base_url = settings.OLLAMA_URL
if path.exists():
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
prof = data.get("profiles", {}).get("embedding_expert", {})
if prof.get("model"):
model = prof["model"]
except: pass
if not model:
model = os.getenv("MINDNET_LLM_MODEL", "phi3:mini")
url = f"{base_url}/api/embeddings"
try:
# Synchroner Request via requests
response = requests.post(url, json={"model": model, "prompt": text}, timeout=30)
response.raise_for_status()
return response.json().get("embedding", [])
except Exception as e:
logger.error(f"Sync embedding failed (Model: {model}): {e}")
return []