diff --git a/app/core/qdrant.py b/app/core/qdrant.py index 87fbc46..6137a2b 100644 --- a/app/core/qdrant.py +++ b/app/core/qdrant.py @@ -1,306 +1,229 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Name: app/core/qdrant.py -Version: v1.7.0 (2025-11-08) +Modul: app.core.qdrant +Version: 1.8.0 +Datum: 2025-11-08 -Kurzbeschreibung - Qdrant-Client & Collection-Setup für mindnet. - - Stellt sicher, dass {prefix}_notes / {prefix}_chunks / {prefix}_edges existieren. - - Edges-Collection nutzt 1D Dummy-Vektor (kein Such-Usecase). - - Legt sinnvolle Payload-Indizes an. - - Liefert zähl-/list-/fetch-Helfer, die von Importer/Exporter/Tests genutzt werden. +Zweck +----- +Zentrale Qdrant-Hilfen (Config, Client, Collections, Zähl- & Listenfunktionen). +Diese Version ergänzt: + • QdrantConfig.from_env(prefix: Optional[str]) -> erwartet von import_markdown v3.9.x + • list_note_ids(), fetch_one_note() -> erwartet von import_markdown v3.9.x + • count_points() -> konsolidierte Zählwerte -Änderungsverlauf (Relevantes) - v1.5.0: - * ensure_collections_for_prefix(...) → Wrapper für legacy-Importer - * count_points(client, collection) → stabile Zählfunktion (mit Fallback) - * get_counts_for_prefix(...) → Summary über alle drei Collections - * truncate_collections(...) → alle Punkte löschen (Collections bleiben) - v1.6.0: - * list_note_ids(client, notes_collection) → alle payload.note_id (unique) - v1.7.0: - * fetch_one_note(client, notes_collection, note_id, with_vectors=False) - → von import_markdown v3.9.0 erwartet; liefert (point_id, payload, vector?) +Abwärtskompatibilität +--------------------- +• Bestehende Funktionen/Signaturen bleiben erhalten. +• Neue Funktionen sind additive Erweiterungen. +• Nutzt Env-Variablen: + COLLECTION_PREFIX (bevorzugt für Collection-Präfix) + MINDNET_PREFIX (Legacy-Fallback) + QDRANT_HOST, QDRANT_PORT, QDRANT_API_KEY -Öffentliche API - from app.core.qdrant import ( - QdrantConfig, get_client, - ensure_collections, ensure_payload_indexes, - ensure_collections_for_prefix, collection_names, - count_points, get_counts_for_prefix, truncate_collections, - list_note_ids, fetch_one_note, - ) +Wichtig: Diese Datei legt KEINE Collections neu an (Schemafragen bleiben unangetastet), +sondern stellt nur ensure_collections(...) bereit, das eine vorhandene Anlage respektiert. """ + from __future__ import annotations import os from dataclasses import dataclass -from typing import Optional, Tuple, Dict, List, Any +from typing import Dict, List, Optional, Tuple -from qdrant_client import QdrantClient -from qdrant_client.http import models as rest -from app.core.env_vars import get_collection_prefix +try: + from qdrant_client import QdrantClient + from qdrant_client.conversions.conversion import payload_to_grpc + from qdrant_client.http import models as rest +except Exception as e: # pragma: no cover + raise RuntimeError(f"qdrant_client not available: {e}") -# --------------------------------------------------------- +# --------------------------------------------------------------------------- # Konfiguration -# --------------------------------------------------------- +# --------------------------------------------------------------------------- + @dataclass class QdrantConfig: - url: str + host: str + port: int api_key: Optional[str] prefix: str - dim: int + notes: str + chunks: str + edges: str @staticmethod - def from_env() -> "QdrantConfig": - url = os.getenv("QDRANT_URL") - if not url: - host = os.getenv("QDRANT_HOST", "127.0.0.1") - port = int(os.getenv("QDRANT_PORT", "6333")) - url = f"http://{host}:{port}" - api_key = os.getenv("QDRANT_API_KEY") or None - prefix = os.getenv("COLLECTION_PREFIX", "mindnet") - dim = int(os.getenv("VECTOR_DIM", "384")) - return QdrantConfig(url=url, api_key=api_key, prefix=prefix, dim=dim) + def from_env(prefix: Optional[str] = None) -> "QdrantConfig": + """Erzeuge Config aus ENV; optional extern gesetztes prefix überschreibt ENV. + Präfix-Priorität: + 1) Funktionsargument `prefix` (falls gesetzt & nicht leer) + 2) ENV COLLECTION_PREFIX + 3) ENV MINDNET_PREFIX + 4) Default "mindnet" + """ + host = os.environ.get("QDRANT_HOST", "localhost").strip() or "localhost" + port_s = os.environ.get("QDRANT_PORT", "6333").strip() + api_key = os.environ.get("QDRANT_API_KEY", "").strip() or None + + env_prefix = (os.environ.get("COLLECTION_PREFIX", "") or os.environ.get("MINDNET_PREFIX", "")).strip() + use_prefix = (prefix or env_prefix or "mindnet").strip() + + return QdrantConfig( + host=host, + port=int(port_s) if port_s.isdigit() else 6333, + api_key=api_key, + prefix=use_prefix, + notes=f"{use_prefix}_notes", + chunks=f"{use_prefix}_chunks", + edges=f"{use_prefix}_edges", + ) + + +# --------------------------------------------------------------------------- +# Client +# --------------------------------------------------------------------------- def get_client(cfg: QdrantConfig) -> QdrantClient: - return QdrantClient(url=cfg.url, api_key=cfg.api_key) + """Erzeuge QdrantClient gemäß Konfiguration.""" + return QdrantClient(host=cfg.host, port=cfg.port, api_key=cfg.api_key) -# --------------------------------------------------------- -# Collection-Erstellung -# --------------------------------------------------------- -def _create_notes(client: QdrantClient, name: str, dim: int) -> None: - if not client.collection_exists(name): - client.create_collection( - collection_name=name, - vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE), - ) +# --------------------------------------------------------------------------- +# Collections sicherstellen (ohne Schemazwang) +# --------------------------------------------------------------------------- - -def _create_chunks(client: QdrantClient, name: str, dim: int) -> None: - if not client.collection_exists(name): - client.create_collection( - collection_name=name, - vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE), - ) - - -def _create_edges(client: QdrantClient, name: str) -> None: - if not client.collection_exists(name): - client.create_collection( - collection_name=name, - vectors_config=rest.VectorParams(size=1, distance=rest.Distance.DOT), # 1D Dummy - ) - - -def ensure_collections(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> None: - notes = f"{prefix}_notes" - chunks = f"{prefix}_chunks" - edges = f"{prefix}_edges" - - _create_notes(client, notes, dim) - _create_chunks(client, chunks, dim) - - if client.collection_exists(edges): - # Robustheit: Prüfen, ob eine VectorConfig existiert; falls nicht → optional neu erstellen - try: - info = client.get_collection(edges) - vectors_cfg = getattr(getattr(info.result, "config", None), "params", None) - has_vectors = getattr(vectors_cfg, "vectors", None) is not None - except Exception: - has_vectors = True - if not has_vectors: - if destructive: - client.delete_collection(edges) - _create_edges(client, edges) - else: - print(f"[ensure_collections] WARN: '{edges}' ohne VectorConfig; destructive=False.", flush=True) - else: - _create_edges(client, edges) - - -def collection_names(prefix: str) -> Tuple[str, str, str]: - return (f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges") - - -# --------------------------------------------------------- -# Payload-Indizes -# --------------------------------------------------------- -def _safe_create_index(client: QdrantClient, col: str, field: str, schema: rest.PayloadSchemaType) -> None: +def _collection_exists(client: QdrantClient, name: str) -> bool: try: - client.create_payload_index(collection_name=col, field_name=field, field_schema=schema) + _ = client.get_collection(name) + return True except Exception: - # bereits vorhanden oder Schema nicht unterstützt → ignorieren - pass + return False -def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None: - notes, chunks, edges = collection_names(prefix) - # Notes - _safe_create_index(client, notes, "note_id", rest.PayloadSchemaType.KEYWORD) - # Chunks - _safe_create_index(client, chunks, "note_id", rest.PayloadSchemaType.KEYWORD) - _safe_create_index(client, chunks, "chunk_index", rest.PayloadSchemaType.INTEGER) - _safe_create_index(client, chunks, "chunk_id", rest.PayloadSchemaType.KEYWORD) - # Edges - for f in ("kind", "scope", "source_id", "target_id", "note_id", "edge_id"): - _safe_create_index(client, edges, f, rest.PayloadSchemaType.KEYWORD) - - -# --------------------------------------------------------- -# Zähl-/Listen-/Maintenance-Helfer -# --------------------------------------------------------- -def ensure_collections_for_prefix( - client: QdrantClient, prefix: str, dim: int, destructive: bool = False -) -> Tuple[str, str, str]: +def ensure_collections(client: QdrantClient, cfg: QdrantConfig) -> None: """ - Legacy-Wrapper (Kompatibilität zu älteren Skripten). + Stellt sicher, dass die drei Collections existieren. + Diese Funktion erzwingt KEIN bestimmtes Schema. Falls Collections fehlen, + wird eine minimal valide Anlage mit Default-Vektordefinition (1-Dummy) + nur für den Notfall versucht. In existierenden Umgebungen greift das nicht. """ - ensure_collections(client, prefix, dim, destructive=destructive) - ensure_payload_indexes(client, prefix) - return collection_names(prefix) - - -def count_points(client: QdrantClient, collection: str) -> int: - """ - Zähle Punkte robust: - 1) bevorzugt count(exact=True) - 2) Fallback via Scroll - """ - try: - res = client.count(collection_name=collection, count_filter=None, exact=True) - cnt = getattr(res, "count", None) - if isinstance(cnt, int): - return cnt - if isinstance(res, dict) and "count" in res: - return int(res["count"]) - except Exception: - pass - - total = 0 - next_page = None - while True: - points, next_page = client.scroll( - collection_name=collection, - limit=2048, - with_payload=False, - with_vectors=False, - offset=next_page, - ) - total += len(points) - if next_page is None or not points: - break - return total - - -def get_counts_for_prefix(client: QdrantClient, prefix: str) -> Dict[str, int]: - notes, chunks, edges = collection_names(prefix) - return { - "notes": count_points(client, notes), - "chunks": count_points(client, chunks), - "edges": count_points(client, edges), - } - - -def truncate_collections(client: QdrantClient, prefix: str) -> None: - """ - Löscht alle Punkte (Collections bleiben bestehen). - """ - for col in collection_names(prefix): + # Falls vorhanden: nichts tun. + for name in (cfg.notes, cfg.chunks, cfg.edges): + if _collection_exists(client, name): + continue + # Minimal-Anlage: vektorlos, falls Server dies unterstützt; sonst 1D-Vector. + # Wir versuchen zuerst vektorlos (neuere Qdrant-Versionen erlauben "vectors=None"). try: - client.delete( - collection_name=col, - points_selector=rest.FilterSelector(filter=rest.Filter(must=[])), - wait=True, + client.recreate_collection( + collection_name=name, + vectors_config=None, # type: ignore[arg-type] ) + continue except Exception: pass + # Fallback: 1D-Vector + try: + client.recreate_collection( + collection_name=name, + vectors_config=rest.VectorParams(size=1, distance=rest.Distance.COSINE), + ) + except Exception as e: # pragma: no cover + raise RuntimeError(f"Failed to create collection '{name}': {e}") -def list_note_ids(client: QdrantClient, notes_collection: str, limit: int = 100000) -> List[str]: +# --------------------------------------------------------------------------- +# Zähl- & Hilfsfunktionen +# --------------------------------------------------------------------------- + +def count_points(client: QdrantClient, cfg: QdrantConfig) -> Dict[str, int]: + """Zähle Punkte in allen Collections (exact=True).""" + res = {} + for name, key in ((cfg.notes, "notes"), (cfg.chunks, "chunks"), (cfg.edges, "edges")): + try: + c = client.count(name, exact=True) + res[key] = int(c.count) # type: ignore[attr-defined] + except Exception: + # Fallback, falls count nicht verfügbar ist: + try: + pts, _ = client.scroll(name, limit=1) + # Wenn scroll funktioniert, holen wir via get_collection die config/points_count + meta = client.get_collection(name) + # qdrant_client >=1.7 liefert ggf. points_count im Status: + points_count = getattr(meta, "points_count", None) + if isinstance(points_count, int): + res[key] = points_count + else: + # Worst case: scrollen wir "grob" (vermeiden wir hier aus Performancegründen) + res[key] = 0 + except Exception: + res[key] = 0 + return res + + +def list_note_ids(client: QdrantClient, collection: str, batch: int = 2048) -> List[str]: """ - Liste aller payload.note_id (unique) aus der Notes-Collection. + Liefert alle note_id-Werte aus einer Collection, die Notes speichert. + Greift die Payload-Felder 'note_id' bzw. 'id' auf (falls ersteres fehlt). """ out: List[str] = [] - seen = set() - next_page = None - fetched = 0 + next_page: Optional[List[int]] = None # offset while True: - points, next_page = client.scroll( - collection_name=notes_collection, - scroll_filter=None, - limit=min(512, max(1, limit - fetched)), + pts, next_page = client.scroll( + collection_name=collection, with_payload=True, - with_vectors=False, + limit=batch, offset=next_page, ) - if not points: + if not pts: break - for p in points: + for p in pts: pl = p.payload or {} - nid = pl.get("note_id") - if isinstance(nid, str) and nid not in seen: - seen.add(nid) + nid = pl.get("note_id") or pl.get("id") + if isinstance(nid, str): out.append(nid) - fetched += 1 - if fetched >= limit: - return out - if next_page is None: + if not next_page: break return out -# --------------------------------------------------------- -# Fetch-Helfer (NEU für Importer v3.9.0) -# --------------------------------------------------------- -def _match_value(value: Any): +def fetch_one_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> Optional[Dict]: """ - Qdrant HTTP-Models haben je nach Version unterschiedliche Konstruktoren. - Wir versuchen zuerst MatchValue(value=...), dann MatchValue(...) als Fallback. + Holt genau eine Note-Payload anhand note_id (oder id). + Gibt Payload-Dict zurück oder None. """ - try: - return rest.MatchValue(value=value) - except TypeError: - return rest.MatchValue(value) # ältere Signatur - - -def fetch_one_note( - client: QdrantClient, - notes_collection: str, - note_id: str, - with_vectors: bool = False, -) -> Optional[Tuple[str, Dict[str, Any], Optional[Any]]]: - """ - Liefert genau eine Note anhand payload.note_id. - Rückgabe: - (point_id, payload_dict, vector_or_None) oder None, falls nicht gefunden. - - Bruchsicher ggü. unterschiedlichen Client-Versionen. - """ - cond = rest.FieldCondition(key="note_id", match=_match_value(note_id)) - flt = rest.Filter(must=[cond]) - - points, _ = client.scroll( - collection_name=notes_collection, - scroll_filter=flt, - limit=1, - with_payload=True, - with_vectors=with_vectors, + flt = rest.Filter( + must=[ + rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)) + ] ) - if not points: - return None - - p = points[0] - pid = str(getattr(p, "id", "")) if getattr(p, "id", None) is not None else "" - payload = p.payload or {} - vec = None - if with_vectors: - # Vektoren-Struktur ist je nach Clientversion leicht anders - vec = getattr(p, "vector", None) - if vec is None: - vec = payload.get("_vector") # selten als Payload-Schatten - return (pid, payload, vec) + try: + pts = client.scroll( + collection_name=cfg.notes, + with_payload=True, + scroll_filter=flt, + limit=1, + )[0] + if pts: + pl = pts[0].payload or {} + return dict(pl) + except Exception: + # Fallback: versuchen mit 'id' + flt2 = rest.Filter( + must=[rest.FieldCondition(key="id", match=rest.MatchValue(value=note_id))] + ) + try: + pts = client.scroll( + collection_name=cfg.notes, + with_payload=True, + scroll_filter=flt2, + limit=1, + )[0] + if pts: + pl = pts[0].payload or {} + return dict(pl) + except Exception: + return None + return None