app/core/qdrant.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s

This commit is contained in:
Lars 2025-11-08 11:45:59 +01:00
parent 148e22aa7d
commit 10479014d5

View File

@ -1,306 +1,229 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Name: app/core/qdrant.py Modul: app.core.qdrant
Version: v1.7.0 (2025-11-08) Version: 1.8.0
Datum: 2025-11-08
Kurzbeschreibung Zweck
Qdrant-Client & Collection-Setup für mindnet. -----
- Stellt sicher, dass {prefix}_notes / {prefix}_chunks / {prefix}_edges existieren. Zentrale Qdrant-Hilfen (Config, Client, Collections, Zähl- & Listenfunktionen).
- Edges-Collection nutzt 1D Dummy-Vektor (kein Such-Usecase). Diese Version ergänzt:
- Legt sinnvolle Payload-Indizes an. QdrantConfig.from_env(prefix: Optional[str]) -> erwartet von import_markdown v3.9.x
- Liefert zähl-/list-/fetch-Helfer, die von Importer/Exporter/Tests genutzt werden. list_note_ids(), fetch_one_note() -> erwartet von import_markdown v3.9.x
count_points() -> konsolidierte Zählwerte
Änderungsverlauf (Relevantes) Abwärtskompatibilität
v1.5.0: ---------------------
* ensure_collections_for_prefix(...) Wrapper für legacy-Importer Bestehende Funktionen/Signaturen bleiben erhalten.
* count_points(client, collection) stabile Zählfunktion (mit Fallback) Neue Funktionen sind additive Erweiterungen.
* get_counts_for_prefix(...) Summary über alle drei Collections Nutzt Env-Variablen:
* truncate_collections(...) alle Punkte löschen (Collections bleiben) COLLECTION_PREFIX (bevorzugt für Collection-Präfix)
v1.6.0: MINDNET_PREFIX (Legacy-Fallback)
* list_note_ids(client, notes_collection) alle payload.note_id (unique) QDRANT_HOST, QDRANT_PORT, QDRANT_API_KEY
v1.7.0:
* fetch_one_note(client, notes_collection, note_id, with_vectors=False)
von import_markdown v3.9.0 erwartet; liefert (point_id, payload, vector?)
Öffentliche API Wichtig: Diese Datei legt KEINE Collections neu an (Schemafragen bleiben unangetastet),
from app.core.qdrant import ( sondern stellt nur ensure_collections(...) bereit, das eine vorhandene Anlage respektiert.
QdrantConfig, get_client,
ensure_collections, ensure_payload_indexes,
ensure_collections_for_prefix, collection_names,
count_points, get_counts_for_prefix, truncate_collections,
list_note_ids, fetch_one_note,
)
""" """
from __future__ import annotations from __future__ import annotations
import os import os
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List, Any from typing import Dict, List, Optional, Tuple
from qdrant_client import QdrantClient try:
from qdrant_client.http import models as rest from qdrant_client import QdrantClient
from app.core.env_vars import get_collection_prefix from qdrant_client.conversions.conversion import payload_to_grpc
from qdrant_client.http import models as rest
except Exception as e: # pragma: no cover
raise RuntimeError(f"qdrant_client not available: {e}")
# --------------------------------------------------------- # ---------------------------------------------------------------------------
# Konfiguration # Konfiguration
# --------------------------------------------------------- # ---------------------------------------------------------------------------
@dataclass @dataclass
class QdrantConfig: class QdrantConfig:
url: str host: str
port: int
api_key: Optional[str] api_key: Optional[str]
prefix: str prefix: str
dim: int notes: str
chunks: str
edges: str
@staticmethod @staticmethod
def from_env() -> "QdrantConfig": def from_env(prefix: Optional[str] = None) -> "QdrantConfig":
url = os.getenv("QDRANT_URL") """Erzeuge Config aus ENV; optional extern gesetztes prefix überschreibt ENV.
if not url:
host = os.getenv("QDRANT_HOST", "127.0.0.1")
port = int(os.getenv("QDRANT_PORT", "6333"))
url = f"http://{host}:{port}"
api_key = os.getenv("QDRANT_API_KEY") or None
prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
dim = int(os.getenv("VECTOR_DIM", "384"))
return QdrantConfig(url=url, api_key=api_key, prefix=prefix, dim=dim)
Präfix-Priorität:
1) Funktionsargument `prefix` (falls gesetzt & nicht leer)
2) ENV COLLECTION_PREFIX
3) ENV MINDNET_PREFIX
4) Default "mindnet"
"""
host = os.environ.get("QDRANT_HOST", "localhost").strip() or "localhost"
port_s = os.environ.get("QDRANT_PORT", "6333").strip()
api_key = os.environ.get("QDRANT_API_KEY", "").strip() or None
env_prefix = (os.environ.get("COLLECTION_PREFIX", "") or os.environ.get("MINDNET_PREFIX", "")).strip()
use_prefix = (prefix or env_prefix or "mindnet").strip()
return QdrantConfig(
host=host,
port=int(port_s) if port_s.isdigit() else 6333,
api_key=api_key,
prefix=use_prefix,
notes=f"{use_prefix}_notes",
chunks=f"{use_prefix}_chunks",
edges=f"{use_prefix}_edges",
)
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
def get_client(cfg: QdrantConfig) -> QdrantClient: def get_client(cfg: QdrantConfig) -> QdrantClient:
return QdrantClient(url=cfg.url, api_key=cfg.api_key) """Erzeuge QdrantClient gemäß Konfiguration."""
return QdrantClient(host=cfg.host, port=cfg.port, api_key=cfg.api_key)
# --------------------------------------------------------- # ---------------------------------------------------------------------------
# Collection-Erstellung # Collections sicherstellen (ohne Schemazwang)
# --------------------------------------------------------- # ---------------------------------------------------------------------------
def _create_notes(client: QdrantClient, name: str, dim: int) -> None:
if not client.collection_exists(name):
client.create_collection(
collection_name=name,
vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE),
)
def _collection_exists(client: QdrantClient, name: str) -> bool:
def _create_chunks(client: QdrantClient, name: str, dim: int) -> None:
if not client.collection_exists(name):
client.create_collection(
collection_name=name,
vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE),
)
def _create_edges(client: QdrantClient, name: str) -> None:
if not client.collection_exists(name):
client.create_collection(
collection_name=name,
vectors_config=rest.VectorParams(size=1, distance=rest.Distance.DOT), # 1D Dummy
)
def ensure_collections(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> None:
notes = f"{prefix}_notes"
chunks = f"{prefix}_chunks"
edges = f"{prefix}_edges"
_create_notes(client, notes, dim)
_create_chunks(client, chunks, dim)
if client.collection_exists(edges):
# Robustheit: Prüfen, ob eine VectorConfig existiert; falls nicht → optional neu erstellen
try:
info = client.get_collection(edges)
vectors_cfg = getattr(getattr(info.result, "config", None), "params", None)
has_vectors = getattr(vectors_cfg, "vectors", None) is not None
except Exception:
has_vectors = True
if not has_vectors:
if destructive:
client.delete_collection(edges)
_create_edges(client, edges)
else:
print(f"[ensure_collections] WARN: '{edges}' ohne VectorConfig; destructive=False.", flush=True)
else:
_create_edges(client, edges)
def collection_names(prefix: str) -> Tuple[str, str, str]:
return (f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges")
# ---------------------------------------------------------
# Payload-Indizes
# ---------------------------------------------------------
def _safe_create_index(client: QdrantClient, col: str, field: str, schema: rest.PayloadSchemaType) -> None:
try: try:
client.create_payload_index(collection_name=col, field_name=field, field_schema=schema) _ = client.get_collection(name)
return True
except Exception: except Exception:
# bereits vorhanden oder Schema nicht unterstützt → ignorieren return False
pass
def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None: def ensure_collections(client: QdrantClient, cfg: QdrantConfig) -> None:
notes, chunks, edges = collection_names(prefix)
# Notes
_safe_create_index(client, notes, "note_id", rest.PayloadSchemaType.KEYWORD)
# Chunks
_safe_create_index(client, chunks, "note_id", rest.PayloadSchemaType.KEYWORD)
_safe_create_index(client, chunks, "chunk_index", rest.PayloadSchemaType.INTEGER)
_safe_create_index(client, chunks, "chunk_id", rest.PayloadSchemaType.KEYWORD)
# Edges
for f in ("kind", "scope", "source_id", "target_id", "note_id", "edge_id"):
_safe_create_index(client, edges, f, rest.PayloadSchemaType.KEYWORD)
# ---------------------------------------------------------
# Zähl-/Listen-/Maintenance-Helfer
# ---------------------------------------------------------
def ensure_collections_for_prefix(
client: QdrantClient, prefix: str, dim: int, destructive: bool = False
) -> Tuple[str, str, str]:
""" """
Legacy-Wrapper (Kompatibilität zu älteren Skripten). Stellt sicher, dass die drei Collections existieren.
Diese Funktion erzwingt KEIN bestimmtes Schema. Falls Collections fehlen,
wird eine minimal valide Anlage mit Default-Vektordefinition (1-Dummy)
nur für den Notfall versucht. In existierenden Umgebungen greift das nicht.
""" """
ensure_collections(client, prefix, dim, destructive=destructive) # Falls vorhanden: nichts tun.
ensure_payload_indexes(client, prefix) for name in (cfg.notes, cfg.chunks, cfg.edges):
return collection_names(prefix) if _collection_exists(client, name):
continue
# Minimal-Anlage: vektorlos, falls Server dies unterstützt; sonst 1D-Vector.
def count_points(client: QdrantClient, collection: str) -> int: # Wir versuchen zuerst vektorlos (neuere Qdrant-Versionen erlauben "vectors=None").
"""
Zähle Punkte robust:
1) bevorzugt count(exact=True)
2) Fallback via Scroll
"""
try:
res = client.count(collection_name=collection, count_filter=None, exact=True)
cnt = getattr(res, "count", None)
if isinstance(cnt, int):
return cnt
if isinstance(res, dict) and "count" in res:
return int(res["count"])
except Exception:
pass
total = 0
next_page = None
while True:
points, next_page = client.scroll(
collection_name=collection,
limit=2048,
with_payload=False,
with_vectors=False,
offset=next_page,
)
total += len(points)
if next_page is None or not points:
break
return total
def get_counts_for_prefix(client: QdrantClient, prefix: str) -> Dict[str, int]:
notes, chunks, edges = collection_names(prefix)
return {
"notes": count_points(client, notes),
"chunks": count_points(client, chunks),
"edges": count_points(client, edges),
}
def truncate_collections(client: QdrantClient, prefix: str) -> None:
"""
Löscht alle Punkte (Collections bleiben bestehen).
"""
for col in collection_names(prefix):
try: try:
client.delete( client.recreate_collection(
collection_name=col, collection_name=name,
points_selector=rest.FilterSelector(filter=rest.Filter(must=[])), vectors_config=None, # type: ignore[arg-type]
wait=True,
) )
continue
except Exception: except Exception:
pass pass
# Fallback: 1D-Vector
try:
client.recreate_collection(
collection_name=name,
vectors_config=rest.VectorParams(size=1, distance=rest.Distance.COSINE),
)
except Exception as e: # pragma: no cover
raise RuntimeError(f"Failed to create collection '{name}': {e}")
def list_note_ids(client: QdrantClient, notes_collection: str, limit: int = 100000) -> List[str]: # ---------------------------------------------------------------------------
# Zähl- & Hilfsfunktionen
# ---------------------------------------------------------------------------
def count_points(client: QdrantClient, cfg: QdrantConfig) -> Dict[str, int]:
"""Zähle Punkte in allen Collections (exact=True)."""
res = {}
for name, key in ((cfg.notes, "notes"), (cfg.chunks, "chunks"), (cfg.edges, "edges")):
try:
c = client.count(name, exact=True)
res[key] = int(c.count) # type: ignore[attr-defined]
except Exception:
# Fallback, falls count nicht verfügbar ist:
try:
pts, _ = client.scroll(name, limit=1)
# Wenn scroll funktioniert, holen wir via get_collection die config/points_count
meta = client.get_collection(name)
# qdrant_client >=1.7 liefert ggf. points_count im Status:
points_count = getattr(meta, "points_count", None)
if isinstance(points_count, int):
res[key] = points_count
else:
# Worst case: scrollen wir "grob" (vermeiden wir hier aus Performancegründen)
res[key] = 0
except Exception:
res[key] = 0
return res
def list_note_ids(client: QdrantClient, collection: str, batch: int = 2048) -> List[str]:
""" """
Liste aller payload.note_id (unique) aus der Notes-Collection. Liefert alle note_id-Werte aus einer Collection, die Notes speichert.
Greift die Payload-Felder 'note_id' bzw. 'id' auf (falls ersteres fehlt).
""" """
out: List[str] = [] out: List[str] = []
seen = set() next_page: Optional[List[int]] = None # offset
next_page = None
fetched = 0
while True: while True:
points, next_page = client.scroll( pts, next_page = client.scroll(
collection_name=notes_collection, collection_name=collection,
scroll_filter=None,
limit=min(512, max(1, limit - fetched)),
with_payload=True, with_payload=True,
with_vectors=False, limit=batch,
offset=next_page, offset=next_page,
) )
if not points: if not pts:
break break
for p in points: for p in pts:
pl = p.payload or {} pl = p.payload or {}
nid = pl.get("note_id") nid = pl.get("note_id") or pl.get("id")
if isinstance(nid, str) and nid not in seen: if isinstance(nid, str):
seen.add(nid)
out.append(nid) out.append(nid)
fetched += 1 if not next_page:
if fetched >= limit:
return out
if next_page is None:
break break
return out return out
# --------------------------------------------------------- def fetch_one_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> Optional[Dict]:
# Fetch-Helfer (NEU für Importer v3.9.0)
# ---------------------------------------------------------
def _match_value(value: Any):
""" """
Qdrant HTTP-Models haben je nach Version unterschiedliche Konstruktoren. Holt genau eine Note-Payload anhand note_id (oder id).
Wir versuchen zuerst MatchValue(value=...), dann MatchValue(...) als Fallback. Gibt Payload-Dict zurück oder None.
""" """
try: flt = rest.Filter(
return rest.MatchValue(value=value) must=[
except TypeError: rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
return rest.MatchValue(value) # ältere Signatur ]
def fetch_one_note(
client: QdrantClient,
notes_collection: str,
note_id: str,
with_vectors: bool = False,
) -> Optional[Tuple[str, Dict[str, Any], Optional[Any]]]:
"""
Liefert genau eine Note anhand payload.note_id.
Rückgabe:
(point_id, payload_dict, vector_or_None) oder None, falls nicht gefunden.
Bruchsicher ggü. unterschiedlichen Client-Versionen.
"""
cond = rest.FieldCondition(key="note_id", match=_match_value(note_id))
flt = rest.Filter(must=[cond])
points, _ = client.scroll(
collection_name=notes_collection,
scroll_filter=flt,
limit=1,
with_payload=True,
with_vectors=with_vectors,
) )
if not points: try:
return None pts = client.scroll(
collection_name=cfg.notes,
p = points[0] with_payload=True,
pid = str(getattr(p, "id", "")) if getattr(p, "id", None) is not None else "" scroll_filter=flt,
payload = p.payload or {} limit=1,
vec = None )[0]
if with_vectors: if pts:
# Vektoren-Struktur ist je nach Clientversion leicht anders pl = pts[0].payload or {}
vec = getattr(p, "vector", None) return dict(pl)
if vec is None: except Exception:
vec = payload.get("_vector") # selten als Payload-Schatten # Fallback: versuchen mit 'id'
return (pid, payload, vec) flt2 = rest.Filter(
must=[rest.FieldCondition(key="id", match=rest.MatchValue(value=note_id))]
)
try:
pts = client.scroll(
collection_name=cfg.notes,
with_payload=True,
scroll_filter=flt2,
limit=1,
)[0]
if pts:
pl = pts[0].payload or {}
return dict(pl)
except Exception:
return None
return None