from __future__ import annotations import os, time, json import urllib.request from typing import List, Dict, Any # Backend-Auswahl: # - EMBED_BACKEND=ollama -> EMBED_URL=/api/embeddings (Ollama), EMBED_MODEL=z.B. nomic-embed-text # - EMBED_BACKEND=mini -> EMBED_URL=/embed (unser MiniLM-Server), EMBED_MODEL=minilm-384 EMBED_BACKEND = os.getenv("EMBED_BACKEND", "mini").lower() EMBED_URL = os.getenv("EMBED_URL", "http://127.0.0.1:8990/embed") EMBED_MODEL = os.getenv("EMBED_MODEL", "minilm-384") EMBED_BATCH = int(os.getenv("EMBED_BATCH", "64")) TIMEOUT = 60 class EmbedError(RuntimeError): ... def _post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=TIMEOUT) as resp: return json.loads(resp.read().decode("utf-8")) def _embed_mini(inputs: List[str], model: str, batch: int) -> List[List[float]]: out: List[List[float]] = [] i = 0 while i < len(inputs): chunk = inputs[i:i+batch] # einfache Retries for attempt in range(5): try: resp = _post_json(EMBED_URL, {"model": model, "inputs": chunk}) vecs = resp.get("embeddings") or resp.get("vectors") or resp.get("data") if not isinstance(vecs, list): raise EmbedError(f"Bad embed response keys: {list(resp.keys())}") out.extend(vecs) break except Exception: if attempt == 4: raise time.sleep(1.5 * (attempt + 1)) i += batch return out def _embed_ollama(inputs: List[str], model: str, batch: int) -> List[List[float]]: # Ollama /api/embeddings akzeptiert "input" als String ODER Array. # Die Response enthält: # - für single input: {"embedding":[...], "model":"...", ...} # - für array input: {"embeddings":[[...],[...],...], "model":"...", ...} (je nach Version) # Um maximal kompatibel zu sein, rufen wir pro Text einzeln auf. out: List[List[float]] = [] for text in inputs: # Retries for attempt in range(5): try: resp = _post_json(EMBED_URL, {"model": model, "input": text}) if "embedding" in resp and isinstance(resp["embedding"], list): out.append(resp["embedding"]) elif "embeddings" in resp and isinstance(resp["embeddings"], list): # Falls Server array zurückgibt, nimm das erste Element vecs = resp["embeddings"] out.append(vecs[0] if vecs else []) else: raise EmbedError(f"Ollama response unexpected keys: {list(resp.keys())}") break except Exception: if attempt == 4: raise time.sleep(1.5 * (attempt + 1)) return out def embed_texts(texts: List[str], model: str | None = None, batch_size: int | None = None) -> List[List[float]]: model = model or EMBED_MODEL batch = batch_size or EMBED_BATCH if not texts: return [] if EMBED_BACKEND == "ollama": return _embed_ollama(texts, model, batch) # default: mini return _embed_mini(texts, model, batch) def embed_one(text: str, model: str | None = None) -> List[float]: return embed_texts([text], model=model, batch_size=1)[0]