app/core/qdrant_points.py hinzugefügt
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s

This commit is contained in:
Lars 2025-09-03 07:57:31 +02:00
parent fb4a1bc702
commit f122b2442e

37
app/core/qdrant_points.py Normal file
View File

@ -0,0 +1,37 @@
from __future__ import annotations
import os, datetime
from typing import List, Dict, Tuple
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from .qdrant import QdrantConfig, get_client, _collection_names
def ts_iso() -> str:
return datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def points_for_chunks(prefix: str, chunk_payloads: List[Dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]:
assert len(chunk_payloads) == len(vectors)
_, chunks, _ = _collection_names(prefix)
pts = []
for pl, vec in zip(chunk_payloads, vectors):
pts.append(rest.PointStruct(id=pl["id"], vector=vec, payload=pl))
return chunks, pts
def points_for_note(prefix: str, note_payload: Dict, vector: List[float] | None) -> Tuple[str, List[rest.PointStruct]]:
notes, _, _ = _collection_names(prefix)
if vector is None:
# Für Notizen erlauben wir auch Payload-only Upserts (Vektor leer)
return notes, [rest.PointStruct(id=note_payload["note_id"], payload=note_payload)]
return notes, [rest.PointStruct(id=note_payload["note_id"], vector=vector, payload=note_payload)]
def points_for_edges(prefix: str, edges: List[Dict]) -> Tuple[str, List[rest.PointStruct]]:
_, _, edges_col = _collection_names(prefix)
pts = []
for e in edges:
pid = f"{e['src_id']}~{e['edge_type']}~{e['dst_id']}~{e.get('scope','note')}"
# 1-dim Dummy-Vektor, weil Collection einen Vektor erwartet
pts.append(rest.PointStruct(id=pid, vector=[0.0], payload=e))
return edges_col, pts
def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]):
if not points: return
client.upsert(collection, points=points, wait=True)