#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Name: app/core/qdrant.py Version: v1.7.0 (2025-11-08) Kurzbeschreibung Qdrant-Client & Collection-Setup für mindnet. - Stellt sicher, dass {prefix}_notes / {prefix}_chunks / {prefix}_edges existieren. - Edges-Collection nutzt 1D Dummy-Vektor (kein Such-Usecase). - Legt sinnvolle Payload-Indizes an. - Liefert zähl-/list-/fetch-Helfer, die von Importer/Exporter/Tests genutzt werden. Änderungsverlauf (Relevantes) v1.5.0: * ensure_collections_for_prefix(...) → Wrapper für legacy-Importer * count_points(client, collection) → stabile Zählfunktion (mit Fallback) * get_counts_for_prefix(...) → Summary über alle drei Collections * truncate_collections(...) → alle Punkte löschen (Collections bleiben) v1.6.0: * list_note_ids(client, notes_collection) → alle payload.note_id (unique) v1.7.0: * fetch_one_note(client, notes_collection, note_id, with_vectors=False) → von import_markdown v3.9.0 erwartet; liefert (point_id, payload, vector?) Öffentliche API from app.core.qdrant import ( QdrantConfig, get_client, ensure_collections, ensure_payload_indexes, ensure_collections_for_prefix, collection_names, count_points, get_counts_for_prefix, truncate_collections, list_note_ids, fetch_one_note, ) """ from __future__ import annotations import os from dataclasses import dataclass from typing import Optional, Tuple, Dict, List, Any from qdrant_client import QdrantClient from qdrant_client.http import models as rest from app.core.env_vars import get_collection_prefix # --------------------------------------------------------- # Konfiguration # --------------------------------------------------------- @dataclass class QdrantConfig: url: str api_key: Optional[str] prefix: str dim: int @staticmethod def from_env() -> "QdrantConfig": url = os.getenv("QDRANT_URL") if not url: host = os.getenv("QDRANT_HOST", "127.0.0.1") port = int(os.getenv("QDRANT_PORT", "6333")) url = f"http://{host}:{port}" api_key = os.getenv("QDRANT_API_KEY") or None prefix = os.getenv("COLLECTION_PREFIX", "mindnet") dim = int(os.getenv("VECTOR_DIM", "384")) return QdrantConfig(url=url, api_key=api_key, prefix=prefix, dim=dim) def get_client(cfg: QdrantConfig) -> QdrantClient: return QdrantClient(url=cfg.url, api_key=cfg.api_key) # --------------------------------------------------------- # Collection-Erstellung # --------------------------------------------------------- def _create_notes(client: QdrantClient, name: str, dim: int) -> None: if not client.collection_exists(name): client.create_collection( collection_name=name, vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE), ) def _create_chunks(client: QdrantClient, name: str, dim: int) -> None: if not client.collection_exists(name): client.create_collection( collection_name=name, vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE), ) def _create_edges(client: QdrantClient, name: str) -> None: if not client.collection_exists(name): client.create_collection( collection_name=name, vectors_config=rest.VectorParams(size=1, distance=rest.Distance.DOT), # 1D Dummy ) def ensure_collections(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> None: notes = f"{prefix}_notes" chunks = f"{prefix}_chunks" edges = f"{prefix}_edges" _create_notes(client, notes, dim) _create_chunks(client, chunks, dim) if client.collection_exists(edges): # Robustheit: Prüfen, ob eine VectorConfig existiert; falls nicht → optional neu erstellen try: info = client.get_collection(edges) vectors_cfg = getattr(getattr(info.result, "config", None), "params", None) has_vectors = getattr(vectors_cfg, "vectors", None) is not None except Exception: has_vectors = True if not has_vectors: if destructive: client.delete_collection(edges) _create_edges(client, edges) else: print(f"[ensure_collections] WARN: '{edges}' ohne VectorConfig; destructive=False.", flush=True) else: _create_edges(client, edges) def collection_names(prefix: str) -> Tuple[str, str, str]: return (f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges") # --------------------------------------------------------- # Payload-Indizes # --------------------------------------------------------- def _safe_create_index(client: QdrantClient, col: str, field: str, schema: rest.PayloadSchemaType) -> None: try: client.create_payload_index(collection_name=col, field_name=field, field_schema=schema) except Exception: # bereits vorhanden oder Schema nicht unterstützt → ignorieren pass def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None: notes, chunks, edges = collection_names(prefix) # Notes _safe_create_index(client, notes, "note_id", rest.PayloadSchemaType.KEYWORD) # Chunks _safe_create_index(client, chunks, "note_id", rest.PayloadSchemaType.KEYWORD) _safe_create_index(client, chunks, "chunk_index", rest.PayloadSchemaType.INTEGER) _safe_create_index(client, chunks, "chunk_id", rest.PayloadSchemaType.KEYWORD) # Edges for f in ("kind", "scope", "source_id", "target_id", "note_id", "edge_id"): _safe_create_index(client, edges, f, rest.PayloadSchemaType.KEYWORD) # --------------------------------------------------------- # Zähl-/Listen-/Maintenance-Helfer # --------------------------------------------------------- def ensure_collections_for_prefix( client: QdrantClient, prefix: str, dim: int, destructive: bool = False ) -> Tuple[str, str, str]: """ Legacy-Wrapper (Kompatibilität zu älteren Skripten). """ ensure_collections(client, prefix, dim, destructive=destructive) ensure_payload_indexes(client, prefix) return collection_names(prefix) def count_points(client: QdrantClient, collection: str) -> int: """ Zähle Punkte robust: 1) bevorzugt count(exact=True) 2) Fallback via Scroll """ try: res = client.count(collection_name=collection, count_filter=None, exact=True) cnt = getattr(res, "count", None) if isinstance(cnt, int): return cnt if isinstance(res, dict) and "count" in res: return int(res["count"]) except Exception: pass total = 0 next_page = None while True: points, next_page = client.scroll( collection_name=collection, limit=2048, with_payload=False, with_vectors=False, offset=next_page, ) total += len(points) if next_page is None or not points: break return total def get_counts_for_prefix(client: QdrantClient, prefix: str) -> Dict[str, int]: notes, chunks, edges = collection_names(prefix) return { "notes": count_points(client, notes), "chunks": count_points(client, chunks), "edges": count_points(client, edges), } def truncate_collections(client: QdrantClient, prefix: str) -> None: """ Löscht alle Punkte (Collections bleiben bestehen). """ for col in collection_names(prefix): try: client.delete( collection_name=col, points_selector=rest.FilterSelector(filter=rest.Filter(must=[])), wait=True, ) except Exception: pass def list_note_ids(client: QdrantClient, notes_collection: str, limit: int = 100000) -> List[str]: """ Liste aller payload.note_id (unique) aus der Notes-Collection. """ out: List[str] = [] seen = set() next_page = None fetched = 0 while True: points, next_page = client.scroll( collection_name=notes_collection, scroll_filter=None, limit=min(512, max(1, limit - fetched)), with_payload=True, with_vectors=False, offset=next_page, ) if not points: break for p in points: pl = p.payload or {} nid = pl.get("note_id") if isinstance(nid, str) and nid not in seen: seen.add(nid) out.append(nid) fetched += 1 if fetched >= limit: return out if next_page is None: break return out # --------------------------------------------------------- # Fetch-Helfer (NEU für Importer v3.9.0) # --------------------------------------------------------- def _match_value(value: Any): """ Qdrant HTTP-Models haben je nach Version unterschiedliche Konstruktoren. Wir versuchen zuerst MatchValue(value=...), dann MatchValue(...) als Fallback. """ try: return rest.MatchValue(value=value) except TypeError: return rest.MatchValue(value) # ältere Signatur def fetch_one_note( client: QdrantClient, notes_collection: str, note_id: str, with_vectors: bool = False, ) -> Optional[Tuple[str, Dict[str, Any], Optional[Any]]]: """ Liefert genau eine Note anhand payload.note_id. Rückgabe: (point_id, payload_dict, vector_or_None) oder None, falls nicht gefunden. Bruchsicher ggü. unterschiedlichen Client-Versionen. """ cond = rest.FieldCondition(key="note_id", match=_match_value(note_id)) flt = rest.Filter(must=[cond]) points, _ = client.scroll( collection_name=notes_collection, scroll_filter=flt, limit=1, with_payload=True, with_vectors=with_vectors, ) if not points: return None p = points[0] pid = str(getattr(p, "id", "")) if getattr(p, "id", None) is not None else "" payload = p.payload or {} vec = None if with_vectors: # Vektoren-Struktur ist je nach Clientversion leicht anders vec = getattr(p, "vector", None) if vec is None: vec = payload.get("_vector") # selten als Payload-Schatten return (pid, payload, vec)