mindnet/app/core/qdrant.py
Lars 2e7c497b69
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "app/core" hochladen
2025-11-11 17:58:21 +01:00

180 lines
6.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app/core/qdrant.py
Version: 2.2.0 (2025-11-11)
Aufgabe
-------
- Zentraler Qdrant-Zugriff (Client, Config)
- Collection-Anlage (notes/chunks/edges)
- **Payload-Indizes sicherstellen** (idempotent)
Hinweis
-------
Diese Datei ist als Drop-in-Ersatz gedacht, falls in deinem Projekt noch keine
robuste ensure_payload_indexes()-Implementierung vorliegt. Die Signaturen
bleiben kompatibel zu scripts.import_markdown und scripts.reset_qdrant.
API-Notizen
-----------
- Payload-Indizes werden mit `create_payload_index` angelegt.
- Typen stammen aus `qdrant_client.http.models.PayloadSchemaType`:
KEYWORD | TEXT | INTEGER | FLOAT | BOOL | GEO | DATETIME
- Für häufige Filterfelder (note_id, kind, scope, type, tags, ...) legen wir
Indizes an. Das ist laut Qdrant-Doku Best Practice für performante Filter.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Optional, Tuple, Dict, List
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
@dataclass
class QdrantConfig:
host: Optional[str] = None
port: Optional[int] = None
url: Optional[str] = None
api_key: Optional[str] = None
prefix: str = "mindnet"
dim: int = 384
distance: str = "Cosine" # Cosine | Dot | Euclid
on_disk_payload: bool = True
@classmethod
def from_env(cls) -> "QdrantConfig":
# Entweder URL ODER Host/Port, API-Key optional
url = os.getenv("QDRANT_URL") or None
host = os.getenv("QDRANT_HOST") or None
port = os.getenv("QDRANT_PORT")
port = int(port) if port else None
api_key = os.getenv("QDRANT_API_KEY") or None
prefix = os.getenv("COLLECTION_PREFIX") or "mindnet"
dim = int(os.getenv("VECTOR_DIM") or 384)
distance = os.getenv("DISTANCE", "Cosine")
on_disk_payload = (os.getenv("ON_DISK_PAYLOAD", "true").lower() == "true")
return cls(
host=host, port=port, url=url, api_key=api_key,
prefix=prefix, dim=dim, distance=distance, on_disk_payload=on_disk_payload
)
def get_client(cfg: QdrantConfig) -> QdrantClient:
# QdrantClient akzeptiert entweder url=... oder host/port
if cfg.url:
return QdrantClient(url=cfg.url, api_key=cfg.api_key, timeout=60.0)
return QdrantClient(host=cfg.host or "127.0.0.1", port=cfg.port or 6333, api_key=cfg.api_key, timeout=60.0)
# ---------------------------------------------------------------------------
# Collections
# ---------------------------------------------------------------------------
def collection_names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _vector_params(dim: int, distance: str) -> rest.VectorParams:
# Distance: "Cosine" | "Dot" | "Euclid"
dist = getattr(rest.Distance, distance.capitalize(), rest.Distance.COSINE)
return rest.VectorParams(size=dim, distance=dist)
def ensure_collections(client: QdrantClient, prefix: str, dim: int) -> None:
"""Legt mindnet_notes, mindnet_chunks, mindnet_edges an (falls nicht vorhanden)."""
notes, chunks, edges = collection_names(prefix)
# notes
if not client.collection_exists(notes):
client.create_collection(
collection_name=notes,
vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")),
on_disk_payload=True,
)
# chunks
if not client.collection_exists(chunks):
client.create_collection(
collection_name=chunks,
vectors_config=_vector_params(dim, os.getenv("DISTANCE", "Cosine")),
on_disk_payload=True,
)
# edges (Dummy-Vektor, Filter via Payload)
if not client.collection_exists(edges):
client.create_collection(
collection_name=edges,
vectors_config=_vector_params(1, "Dot"),
on_disk_payload=True,
)
# ---------------------------------------------------------------------------
# Payload-Indizes
# ---------------------------------------------------------------------------
def _ensure_index(client: QdrantClient, collection: str, field: str, schema: rest.PayloadSchemaType) -> None:
"""Idempotentes Anlegen eines Payload-Indexes für ein Feld."""
try:
client.create_payload_index(collection_name=collection, field_name=field, field_schema=schema, wait=True)
except Exception as e:
# Fehler ignorieren, falls Index bereits existiert oder Server "already indexed" meldet.
# Für Debugging ggf. Logging ergänzen.
_ = e
def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None:
"""
Stellt sicher, dass alle benötigten Payload-Indizes existieren.
- notes: note_id(KEYWORD), type(KEYWORD), title(TEXT), updated(INTEGER), tags(KEYWORD)
- chunks: note_id(KEYWORD), chunk_id(KEYWORD), index(INTEGER), type(KEYWORD), tags(KEYWORD)
- edges: note_id(KEYWORD), kind(KEYWORD), scope(KEYWORD), source_id(KEYWORD), target_id(KEYWORD), chunk_id(KEYWORD)
"""
notes, chunks, edges = collection_names(prefix)
# NOTES
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("type", rest.PayloadSchemaType.KEYWORD),
("title", rest.PayloadSchemaType.TEXT),
("updated", rest.PayloadSchemaType.INTEGER),
("tags", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, notes, field, schema)
# CHUNKS
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("chunk_id", rest.PayloadSchemaType.KEYWORD),
("index", rest.PayloadSchemaType.INTEGER),
("type", rest.PayloadSchemaType.KEYWORD),
("tags", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, chunks, field, schema)
# EDGES
for field, schema in [
("note_id", rest.PayloadSchemaType.KEYWORD),
("kind", rest.PayloadSchemaType.KEYWORD),
("scope", rest.PayloadSchemaType.KEYWORD),
("source_id", rest.PayloadSchemaType.KEYWORD),
("target_id", rest.PayloadSchemaType.KEYWORD),
("chunk_id", rest.PayloadSchemaType.KEYWORD),
]:
_ensure_index(client, edges, field, schema)
__all__ = [
"QdrantConfig",
"get_client",
"ensure_collections",
"ensure_payload_indexes",
"collection_names",
]