mindnet/app/core/qdrant_points.py
Lars 62f8f4b313
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "app/core" hochladen
2025-11-08 16:42:33 +01:00

224 lines
9.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app/core/qdrant_points.py — robust points helpers for Qdrant
- Single source of truth for building PointStruct for notes/chunks/edges
- Backward-compatible to older payload schemas for edges
- NEW: Upsert path auto-detects collection vector schema (single vs named vectors)
and coerces points accordingly to avoid 'Not existing vector name' errors.
Version: 1.4.0 (2025-11-08)
"""
from __future__ import annotations
import os
import uuid
from typing import List, Tuple, Iterable, Optional, Dict, Any
from qdrant_client.http import models as rest
from qdrant_client import QdrantClient
# --------------------- ID helpers ---------------------
def _to_uuid(stable_key: str) -> str:
"""Deterministic UUIDv5 from a stable string key."""
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
def _names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
# --------------------- Notes / Chunks ---------------------
def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]:
"""Notes-Collection: if no note embedding -> zero vector of length dim."""
notes_col, _, _ = _names(prefix)
vector = note_vec if note_vec is not None else [0.0] * int(dim)
raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id"
point_id = _to_uuid(raw_note_id)
pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload)
return notes_col, [pt]
def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]:
"""Create point structs for the chunk collection (expects one vector per chunk)."""
_, chunks_col, _ = _names(prefix)
points: List[rest.PointStruct] = []
for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1):
chunk_id = pl.get("chunk_id") or pl.get("id")
if not chunk_id:
note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note"
chunk_id = f"{note_id}#{i}"
pl["chunk_id"] = chunk_id
point_id = _to_uuid(chunk_id)
points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl))
return chunks_col, points
# --------------------- Edges ---------------------
def _normalize_edge_payload(pl: dict) -> dict:
"""Normalize edge payload keys to a common schema."""
kind = pl.get("kind") or pl.get("edge_type") or "edge"
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
seq = pl.get("seq") or pl.get("order") or pl.get("index")
pl.setdefault("kind", kind)
pl.setdefault("source_id", source_id)
pl.setdefault("target_id", target_id)
if seq is not None and "seq" not in pl:
pl["seq"] = seq
return pl
def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]:
"""Edges collection (1D dummy vector)."""
_, _, edges_col = _names(prefix)
points: List[rest.PointStruct] = []
for raw in edge_payloads:
pl = _normalize_edge_payload(raw)
edge_id = pl.get("edge_id")
if not edge_id:
kind = pl.get("kind", "edge")
s = pl.get("source_id", "unknown-src")
t = pl.get("target_id", "unknown-tgt")
seq = pl.get("seq") or ""
edge_id = f"{kind}:{s}->{t}#{seq}"
pl["edge_id"] = edge_id
point_id = _to_uuid(edge_id)
points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl))
return edges_col, points
# --------------------- Vector schema detection ---------------------
def _preferred_name(candidates: List[str]) -> str:
"""Pick a preferred vector name using env overrides then common fallbacks."""
env_prefs = [
os.getenv("NOTES_VECTOR_NAME"),
os.getenv("CHUNKS_VECTOR_NAME"),
os.getenv("EDGES_VECTOR_NAME"),
os.getenv("MINDNET_VECTOR_NAME"),
os.getenv("QDRANT_VECTOR_NAME"),
]
for p in env_prefs:
if p and p in candidates:
return p
for k in ("text", "default", "embedding", "content"):
if k in candidates:
return k
return sorted(candidates)[0]
def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict:
"""Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}."""
try:
info = client.get_collection(collection_name=collection_name)
vecs = getattr(info, "vectors", None)
if hasattr(vecs, "size") and isinstance(vecs.size, int):
return {"kind": "single", "size": vecs.size}
cfg = getattr(vecs, "config", None)
if isinstance(cfg, dict) and cfg:
names = list(cfg.keys())
if names:
return {"kind": "named", "names": names, "primary": _preferred_name(names)}
except Exception:
pass
return {"kind": "single", "size": None}
def _coerce_for_collection(client: QdrantClient, collection_name: str, points: List[rest.PointStruct]) -> List[rest.PointStruct]:
"""If collection uses named vectors, convert vector=[...] -> vector={name: [...]}"""
try:
schema = _get_vector_schema(client, collection_name)
if schema.get("kind") != "named":
return points
primary = schema.get("primary")
if not primary:
return points
fixed: List[rest.PointStruct] = []
for pt in points:
vec = getattr(pt, "vector", None)
if isinstance(vec, dict):
fixed.append(pt) # already named
elif vec is not None:
fixed.append(rest.PointStruct(id=pt.id, vectors={primary: vec}, payload=pt.payload))
else:
fixed.append(pt) # edges with no vector (shouldn't happen) or already correct
return fixed
except Exception:
return points
def _try_upsert_with_names(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None:
schema = _get_vector_schema(client, collection)
if schema.get("kind") != "named":
raise
names = schema.get("names") or []
# prefer env-defined names first
pref = _preferred_name(names)
order = [pref] + [n for n in names if n != pref]
for name in order:
converted: List[rest.PointStruct] = []
for pt in points:
vec = getattr(pt, "vector", None)
if isinstance(vec, dict) and name in vec:
converted.append(pt)
elif vec is not None:
converted.append(rest.PointStruct(id=pt.id, vectors={name: vec}, payload=pt.payload))
else:
converted.append(pt)
try:
client.upsert(collection_name=collection, points=converted, wait=True)
return
except Exception:
continue
raise
# --------------------- Qdrant ops ---------------------
def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None:
if not points:
return
pts = _coerce_for_collection(client, collection, points)
try:
client.upsert(collection_name=collection, points=pts, wait=True)
except Exception as e:
msg = str(e)
if "Not existing vector name" in msg or "named vector" in msg:
_try_upsert_with_names(client, collection, points)
else:
raise
# --- Optional search helpers ---
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values])
def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
fs = [f for f in filters if f is not None]
if not fs:
return None
if len(fs) == 1:
return fs[0]
must = []
for f in fs:
if getattr(f, "must", None):
must.extend(f.must)
if getattr(f, "should", None):
must.append(rest.Filter(should=f.should))
return rest.Filter(must=must)
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
if not filters:
return None
parts = []
for k, v in filters.items():
if isinstance(v, (list, tuple, set)):
parts.append(_filter_any(k, [str(x) for x in v]))
else:
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
return _merge_filters(*parts)
def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]:
_, chunks_col, _ = _names(prefix)
flt = _filter_from_dict(filters)
res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt)
out: List[Tuple[str, float, dict]] = []
for r in res:
out.append((str(r.id), float(r.score), dict(r.payload or {})))
return out