mindnet/app/routers/qdrant_router.py
Lars e65ba23941
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
app/routers/qdrant_router.py aktualisiert
2025-09-02 17:05:36 +02:00

161 lines
5.6 KiB
Python

"""
Version 0.1
"""
from __future__ import annotations
from typing import Any, Optional, List
import uuid
from fastapi import APIRouter
from pydantic import BaseModel, Field
from qdrant_client import QdrantClient
from qdrant_client.http.models import (
Distance,
VectorParams,
PointStruct,
Filter,
FieldCondition,
MatchValue,
)
from ..config import get_settings
from ..embeddings import embed_texts
router = APIRouter(prefix="/qdrant", tags=["qdrant"])
def _client() -> QdrantClient:
s = get_settings()
return QdrantClient(url=s.QDRANT_URL, api_key=s.QDRANT_API_KEY)
def _col(name: str) -> str:
return f"{get_settings().COLLECTION_PREFIX}_{name}"
def _uuid5(s: str) -> str:
"""Deterministic UUIDv5 from arbitrary string (server-side point id)."""
return str(uuid.uuid5(uuid.NAMESPACE_URL, s))
# --- Models ---
class BaseMeta(BaseModel):
note_id: str = Field(..., description="Stable ID of the note (e.g., hash of vault-relative path)")
title: Optional[str] = Field(None, description="Note or chunk title")
path: Optional[str] = Field(None, description="Vault-relative path to the .md file")
Typ: Optional[str] = None
Status: Optional[str] = None
tags: Optional[List[str]] = None
Rolle: Optional[List[str]] = None # allow list
class UpsertChunkRequest(BaseMeta):
chunk_id: str = Field(..., description="Stable ID of the chunk within the note")
text: str = Field(..., description="Chunk text content")
links: Optional[List[str]] = Field(default=None, description="Outbound links detected in the chunk")
class UpsertNoteRequest(BaseMeta):
text: Optional[str] = Field(None, description="Full note text (optional)")
class UpsertEdgeRequest(BaseModel):
src_note_id: str
dst_note_id: Optional[str] = None
src_chunk_id: Optional[str] = None
dst_chunk_id: Optional[str] = None
relation: str = Field(default="links_to")
link_text: Optional[str] = None
class QueryRequest(BaseModel):
query: str
limit: int = 5
note_id: Optional[str] = None
path: Optional[str] = None
tags: Optional[List[str]] = None
# --- Helpers ---
def _ensure_collections():
s = get_settings()
cli = _client()
# chunks
try:
cli.get_collection(_col("chunks"))
except Exception:
cli.recreate_collection(_col("chunks"), vectors_config=VectorParams(size=s.VECTOR_SIZE, distance=Distance.COSINE))
# notes
try:
cli.get_collection(_col("notes"))
except Exception:
cli.recreate_collection(_col("notes"), vectors_config=VectorParams(size=s.VECTOR_SIZE, distance=Distance.COSINE))
# edges (dummy vector of size 1)
try:
cli.get_collection(_col("edges"))
except Exception:
cli.recreate_collection(_col("edges"), vectors_config=VectorParams(size=1, distance=Distance.COSINE))
@router.post("/upsert_chunk", summary="Upsert a chunk into mindnet_chunks")
def upsert_chunk(req: UpsertChunkRequest) -> dict:
_ensure_collections()
cli = _client()
vec = embed_texts([req.text])[0]
payload: dict[str, Any] = req.model_dump()
payload.pop("text", None)
payload["preview"] = (req.text[:240] + "") if len(req.text) > 240 else req.text
qdrant_id = _uuid5(f"chunk:{req.chunk_id}")
pt = PointStruct(id=qdrant_id, vector=vec, payload=payload)
cli.upsert(collection_name=_col("chunks"), points=[pt])
return {"status": "ok", "id": qdrant_id}
@router.post("/upsert_note", summary="Upsert a note into mindnet_notes")
def upsert_note(req: UpsertNoteRequest) -> dict:
_ensure_collections()
cli = _client()
text_for_embedding = req.text if req.text else (req.title or req.note_id)
vec = embed_texts([text_for_embedding])[0]
payload: dict[str, Any] = req.model_dump()
payload.pop("text", None)
qdrant_id = _uuid5(f"note:{req.note_id}")
pt = PointStruct(id=qdrant_id, vector=vec, payload=payload)
cli.upsert(collection_name=_col("notes"), points=[pt])
return {"status": "ok", "id": qdrant_id}
@router.post("/upsert_edge", summary="Upsert a graph edge into mindnet_edges")
def upsert_edge(req: UpsertEdgeRequest) -> dict:
_ensure_collections()
cli = _client()
payload = req.model_dump()
vec = [0.0]
raw_edge_id = f"{req.src_note_id}|{req.src_chunk_id or ''}->{req.dst_note_id or ''}|{req.dst_chunk_id or ''}|{req.relation}"
qdrant_id = _uuid5(f"edge:{raw_edge_id}")
pt = PointStruct(id=qdrant_id, vector=vec, payload=payload)
cli.upsert(collection_name=_col("edges"), points=[pt])
return {"status": "ok", "id": qdrant_id}
@router.post("/query", summary="Vector query over mindnet_chunks with optional filters")
def query(req: QueryRequest) -> dict:
_ensure_collections()
cli = _client()
vec = embed_texts([req.query])[0]
flt: Optional[Filter] = None
conds = []
if req.note_id:
conds.append(FieldCondition(key="note_id", match=MatchValue(value=req.note_id)))
if req.path:
conds.append(FieldCondition(key="path", match=MatchValue(value=req.path)))
if req.tags:
for t in req.tags:
conds.append(FieldCondition(key="tags", match=MatchValue(value=t)))
if conds:
flt = Filter(must=conds)
res = cli.search(collection_name=_col("chunks"), query_vector=vec, limit=req.limit, with_payload=True, with_vectors=False, query_filter=flt)
hits = []
for p in res:
pl = p.payload or {}
hits.append({
"chunk_id": p.id,
"score": p.score,
"note_id": pl.get("note_id"),
"title": pl.get("title"),
"path": pl.get("path"),
"preview": pl.get("preview"),
"tags": pl.get("tags"),
})
return {"results": hits}