mindnet/app/core/qdrant.py
Lars 10479014d5
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
app/core/qdrant.py aktualisiert
2025-11-08 11:45:59 +01:00

230 lines
7.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app.core.qdrant
Version: 1.8.0
Datum: 2025-11-08
Zweck
-----
Zentrale Qdrant-Hilfen (Config, Client, Collections, Zähl- & Listenfunktionen).
Diese Version ergänzt:
• QdrantConfig.from_env(prefix: Optional[str]) -> erwartet von import_markdown v3.9.x
• list_note_ids(), fetch_one_note() -> erwartet von import_markdown v3.9.x
• count_points() -> konsolidierte Zählwerte
Abwärtskompatibilität
---------------------
• Bestehende Funktionen/Signaturen bleiben erhalten.
• Neue Funktionen sind additive Erweiterungen.
• Nutzt Env-Variablen:
COLLECTION_PREFIX (bevorzugt für Collection-Präfix)
MINDNET_PREFIX (Legacy-Fallback)
QDRANT_HOST, QDRANT_PORT, QDRANT_API_KEY
Wichtig: Diese Datei legt KEINE Collections neu an (Schemafragen bleiben unangetastet),
sondern stellt nur ensure_collections(...) bereit, das eine vorhandene Anlage respektiert.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
try:
from qdrant_client import QdrantClient
from qdrant_client.conversions.conversion import payload_to_grpc
from qdrant_client.http import models as rest
except Exception as e: # pragma: no cover
raise RuntimeError(f"qdrant_client not available: {e}")
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
@dataclass
class QdrantConfig:
host: str
port: int
api_key: Optional[str]
prefix: str
notes: str
chunks: str
edges: str
@staticmethod
def from_env(prefix: Optional[str] = None) -> "QdrantConfig":
"""Erzeuge Config aus ENV; optional extern gesetztes prefix überschreibt ENV.
Präfix-Priorität:
1) Funktionsargument `prefix` (falls gesetzt & nicht leer)
2) ENV COLLECTION_PREFIX
3) ENV MINDNET_PREFIX
4) Default "mindnet"
"""
host = os.environ.get("QDRANT_HOST", "localhost").strip() or "localhost"
port_s = os.environ.get("QDRANT_PORT", "6333").strip()
api_key = os.environ.get("QDRANT_API_KEY", "").strip() or None
env_prefix = (os.environ.get("COLLECTION_PREFIX", "") or os.environ.get("MINDNET_PREFIX", "")).strip()
use_prefix = (prefix or env_prefix or "mindnet").strip()
return QdrantConfig(
host=host,
port=int(port_s) if port_s.isdigit() else 6333,
api_key=api_key,
prefix=use_prefix,
notes=f"{use_prefix}_notes",
chunks=f"{use_prefix}_chunks",
edges=f"{use_prefix}_edges",
)
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
def get_client(cfg: QdrantConfig) -> QdrantClient:
"""Erzeuge QdrantClient gemäß Konfiguration."""
return QdrantClient(host=cfg.host, port=cfg.port, api_key=cfg.api_key)
# ---------------------------------------------------------------------------
# Collections sicherstellen (ohne Schemazwang)
# ---------------------------------------------------------------------------
def _collection_exists(client: QdrantClient, name: str) -> bool:
try:
_ = client.get_collection(name)
return True
except Exception:
return False
def ensure_collections(client: QdrantClient, cfg: QdrantConfig) -> None:
"""
Stellt sicher, dass die drei Collections existieren.
Diese Funktion erzwingt KEIN bestimmtes Schema. Falls Collections fehlen,
wird eine minimal valide Anlage mit Default-Vektordefinition (1-Dummy)
nur für den Notfall versucht. In existierenden Umgebungen greift das nicht.
"""
# Falls vorhanden: nichts tun.
for name in (cfg.notes, cfg.chunks, cfg.edges):
if _collection_exists(client, name):
continue
# Minimal-Anlage: vektorlos, falls Server dies unterstützt; sonst 1D-Vector.
# Wir versuchen zuerst vektorlos (neuere Qdrant-Versionen erlauben "vectors=None").
try:
client.recreate_collection(
collection_name=name,
vectors_config=None, # type: ignore[arg-type]
)
continue
except Exception:
pass
# Fallback: 1D-Vector
try:
client.recreate_collection(
collection_name=name,
vectors_config=rest.VectorParams(size=1, distance=rest.Distance.COSINE),
)
except Exception as e: # pragma: no cover
raise RuntimeError(f"Failed to create collection '{name}': {e}")
# ---------------------------------------------------------------------------
# Zähl- & Hilfsfunktionen
# ---------------------------------------------------------------------------
def count_points(client: QdrantClient, cfg: QdrantConfig) -> Dict[str, int]:
"""Zähle Punkte in allen Collections (exact=True)."""
res = {}
for name, key in ((cfg.notes, "notes"), (cfg.chunks, "chunks"), (cfg.edges, "edges")):
try:
c = client.count(name, exact=True)
res[key] = int(c.count) # type: ignore[attr-defined]
except Exception:
# Fallback, falls count nicht verfügbar ist:
try:
pts, _ = client.scroll(name, limit=1)
# Wenn scroll funktioniert, holen wir via get_collection die config/points_count
meta = client.get_collection(name)
# qdrant_client >=1.7 liefert ggf. points_count im Status:
points_count = getattr(meta, "points_count", None)
if isinstance(points_count, int):
res[key] = points_count
else:
# Worst case: scrollen wir "grob" (vermeiden wir hier aus Performancegründen)
res[key] = 0
except Exception:
res[key] = 0
return res
def list_note_ids(client: QdrantClient, collection: str, batch: int = 2048) -> List[str]:
"""
Liefert alle note_id-Werte aus einer Collection, die Notes speichert.
Greift die Payload-Felder 'note_id' bzw. 'id' auf (falls ersteres fehlt).
"""
out: List[str] = []
next_page: Optional[List[int]] = None # offset
while True:
pts, next_page = client.scroll(
collection_name=collection,
with_payload=True,
limit=batch,
offset=next_page,
)
if not pts:
break
for p in pts:
pl = p.payload or {}
nid = pl.get("note_id") or pl.get("id")
if isinstance(nid, str):
out.append(nid)
if not next_page:
break
return out
def fetch_one_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> Optional[Dict]:
"""
Holt genau eine Note-Payload anhand note_id (oder id).
Gibt Payload-Dict zurück oder None.
"""
flt = rest.Filter(
must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
]
)
try:
pts = client.scroll(
collection_name=cfg.notes,
with_payload=True,
scroll_filter=flt,
limit=1,
)[0]
if pts:
pl = pts[0].payload or {}
return dict(pl)
except Exception:
# Fallback: versuchen mit 'id'
flt2 = rest.Filter(
must=[rest.FieldCondition(key="id", match=rest.MatchValue(value=note_id))]
)
try:
pts = client.scroll(
collection_name=cfg.notes,
with_payload=True,
scroll_filter=flt2,
limit=1,
)[0]
if pts:
pl = pts[0].payload or {}
return dict(pl)
except Exception:
return None
return None