from __future__ import annotations import os, time, json import urllib.request from typing import List, Dict, Any EMBED_URL = os.getenv("EMBED_URL", "http://127.0.0.1:8990/embed") EMBED_MODEL = os.getenv("EMBED_MODEL", "minilm-384") EMBED_BATCH = int(os.getenv("EMBED_BATCH", "64")) TIMEOUT = 60 class EmbedError(RuntimeError): ... def _post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=TIMEOUT) as resp: return json.loads(resp.read().decode("utf-8")) def embed_texts(texts: List[str], model: str | None = None, batch_size: int | None = None) -> List[List[float]]: model = model or EMBED_MODEL batch = batch_size or EMBED_BATCH out: List[List[float]] = [] i = 0 while i < len(texts): chunk = texts[i:i+batch] # einfache Retries bei 429/500er for attempt in range(5): try: resp = _post_json(EMBED_URL, {"model": model, "inputs": chunk}) vecs = resp.get("embeddings") or resp.get("vectors") or resp.get("data") if not isinstance(vecs, list): raise EmbedError(f"Bad embed response keys: {resp.keys()}") out.extend(vecs) break except Exception as e: if attempt == 4: raise time.sleep(1.5 * (attempt + 1)) continue i += batch return out def embed_one(text: str, model: str | None = None) -> List[float]: return embed_texts([text], model=model, batch_size=1)[0]