All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
230 lines
7.9 KiB
Python
230 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Modul: app.core.qdrant
|
|
Version: 1.8.0
|
|
Datum: 2025-11-08
|
|
|
|
Zweck
|
|
-----
|
|
Zentrale Qdrant-Hilfen (Config, Client, Collections, Zähl- & Listenfunktionen).
|
|
Diese Version ergänzt:
|
|
• QdrantConfig.from_env(prefix: Optional[str]) -> erwartet von import_markdown v3.9.x
|
|
• list_note_ids(), fetch_one_note() -> erwartet von import_markdown v3.9.x
|
|
• count_points() -> konsolidierte Zählwerte
|
|
|
|
Abwärtskompatibilität
|
|
---------------------
|
|
• Bestehende Funktionen/Signaturen bleiben erhalten.
|
|
• Neue Funktionen sind additive Erweiterungen.
|
|
• Nutzt Env-Variablen:
|
|
COLLECTION_PREFIX (bevorzugt für Collection-Präfix)
|
|
MINDNET_PREFIX (Legacy-Fallback)
|
|
QDRANT_HOST, QDRANT_PORT, QDRANT_API_KEY
|
|
|
|
Wichtig: Diese Datei legt KEINE Collections neu an (Schemafragen bleiben unangetastet),
|
|
sondern stellt nur ensure_collections(...) bereit, das eine vorhandene Anlage respektiert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.conversions.conversion import payload_to_grpc
|
|
from qdrant_client.http import models as rest
|
|
except Exception as e: # pragma: no cover
|
|
raise RuntimeError(f"qdrant_client not available: {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Konfiguration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class QdrantConfig:
|
|
host: str
|
|
port: int
|
|
api_key: Optional[str]
|
|
prefix: str
|
|
notes: str
|
|
chunks: str
|
|
edges: str
|
|
|
|
@staticmethod
|
|
def from_env(prefix: Optional[str] = None) -> "QdrantConfig":
|
|
"""Erzeuge Config aus ENV; optional extern gesetztes prefix überschreibt ENV.
|
|
|
|
Präfix-Priorität:
|
|
1) Funktionsargument `prefix` (falls gesetzt & nicht leer)
|
|
2) ENV COLLECTION_PREFIX
|
|
3) ENV MINDNET_PREFIX
|
|
4) Default "mindnet"
|
|
"""
|
|
host = os.environ.get("QDRANT_HOST", "localhost").strip() or "localhost"
|
|
port_s = os.environ.get("QDRANT_PORT", "6333").strip()
|
|
api_key = os.environ.get("QDRANT_API_KEY", "").strip() or None
|
|
|
|
env_prefix = (os.environ.get("COLLECTION_PREFIX", "") or os.environ.get("MINDNET_PREFIX", "")).strip()
|
|
use_prefix = (prefix or env_prefix or "mindnet").strip()
|
|
|
|
return QdrantConfig(
|
|
host=host,
|
|
port=int(port_s) if port_s.isdigit() else 6333,
|
|
api_key=api_key,
|
|
prefix=use_prefix,
|
|
notes=f"{use_prefix}_notes",
|
|
chunks=f"{use_prefix}_chunks",
|
|
edges=f"{use_prefix}_edges",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Client
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_client(cfg: QdrantConfig) -> QdrantClient:
|
|
"""Erzeuge QdrantClient gemäß Konfiguration."""
|
|
return QdrantClient(host=cfg.host, port=cfg.port, api_key=cfg.api_key)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Collections sicherstellen (ohne Schemazwang)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _collection_exists(client: QdrantClient, name: str) -> bool:
|
|
try:
|
|
_ = client.get_collection(name)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def ensure_collections(client: QdrantClient, cfg: QdrantConfig) -> None:
|
|
"""
|
|
Stellt sicher, dass die drei Collections existieren.
|
|
Diese Funktion erzwingt KEIN bestimmtes Schema. Falls Collections fehlen,
|
|
wird eine minimal valide Anlage mit Default-Vektordefinition (1-Dummy)
|
|
nur für den Notfall versucht. In existierenden Umgebungen greift das nicht.
|
|
"""
|
|
# Falls vorhanden: nichts tun.
|
|
for name in (cfg.notes, cfg.chunks, cfg.edges):
|
|
if _collection_exists(client, name):
|
|
continue
|
|
# Minimal-Anlage: vektorlos, falls Server dies unterstützt; sonst 1D-Vector.
|
|
# Wir versuchen zuerst vektorlos (neuere Qdrant-Versionen erlauben "vectors=None").
|
|
try:
|
|
client.recreate_collection(
|
|
collection_name=name,
|
|
vectors_config=None, # type: ignore[arg-type]
|
|
)
|
|
continue
|
|
except Exception:
|
|
pass
|
|
# Fallback: 1D-Vector
|
|
try:
|
|
client.recreate_collection(
|
|
collection_name=name,
|
|
vectors_config=rest.VectorParams(size=1, distance=rest.Distance.COSINE),
|
|
)
|
|
except Exception as e: # pragma: no cover
|
|
raise RuntimeError(f"Failed to create collection '{name}': {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Zähl- & Hilfsfunktionen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def count_points(client: QdrantClient, cfg: QdrantConfig) -> Dict[str, int]:
|
|
"""Zähle Punkte in allen Collections (exact=True)."""
|
|
res = {}
|
|
for name, key in ((cfg.notes, "notes"), (cfg.chunks, "chunks"), (cfg.edges, "edges")):
|
|
try:
|
|
c = client.count(name, exact=True)
|
|
res[key] = int(c.count) # type: ignore[attr-defined]
|
|
except Exception:
|
|
# Fallback, falls count nicht verfügbar ist:
|
|
try:
|
|
pts, _ = client.scroll(name, limit=1)
|
|
# Wenn scroll funktioniert, holen wir via get_collection die config/points_count
|
|
meta = client.get_collection(name)
|
|
# qdrant_client >=1.7 liefert ggf. points_count im Status:
|
|
points_count = getattr(meta, "points_count", None)
|
|
if isinstance(points_count, int):
|
|
res[key] = points_count
|
|
else:
|
|
# Worst case: scrollen wir "grob" (vermeiden wir hier aus Performancegründen)
|
|
res[key] = 0
|
|
except Exception:
|
|
res[key] = 0
|
|
return res
|
|
|
|
|
|
def list_note_ids(client: QdrantClient, collection: str, batch: int = 2048) -> List[str]:
|
|
"""
|
|
Liefert alle note_id-Werte aus einer Collection, die Notes speichert.
|
|
Greift die Payload-Felder 'note_id' bzw. 'id' auf (falls ersteres fehlt).
|
|
"""
|
|
out: List[str] = []
|
|
next_page: Optional[List[int]] = None # offset
|
|
while True:
|
|
pts, next_page = client.scroll(
|
|
collection_name=collection,
|
|
with_payload=True,
|
|
limit=batch,
|
|
offset=next_page,
|
|
)
|
|
if not pts:
|
|
break
|
|
for p in pts:
|
|
pl = p.payload or {}
|
|
nid = pl.get("note_id") or pl.get("id")
|
|
if isinstance(nid, str):
|
|
out.append(nid)
|
|
if not next_page:
|
|
break
|
|
return out
|
|
|
|
|
|
def fetch_one_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> Optional[Dict]:
|
|
"""
|
|
Holt genau eine Note-Payload anhand note_id (oder id).
|
|
Gibt Payload-Dict zurück oder None.
|
|
"""
|
|
flt = rest.Filter(
|
|
must=[
|
|
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))
|
|
]
|
|
)
|
|
try:
|
|
pts = client.scroll(
|
|
collection_name=cfg.notes,
|
|
with_payload=True,
|
|
scroll_filter=flt,
|
|
limit=1,
|
|
)[0]
|
|
if pts:
|
|
pl = pts[0].payload or {}
|
|
return dict(pl)
|
|
except Exception:
|
|
# Fallback: versuchen mit 'id'
|
|
flt2 = rest.Filter(
|
|
must=[rest.FieldCondition(key="id", match=rest.MatchValue(value=note_id))]
|
|
)
|
|
try:
|
|
pts = client.scroll(
|
|
collection_name=cfg.notes,
|
|
with_payload=True,
|
|
scroll_filter=flt2,
|
|
limit=1,
|
|
)[0]
|
|
if pts:
|
|
pl = pts[0].payload or {}
|
|
return dict(pl)
|
|
except Exception:
|
|
return None
|
|
return None
|