mindnet/app/core/qdrant_points.py
Lars b49d97b8d7
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
app/core/qdrant_points.py aktualisiert
2025-11-08 11:20:29 +01:00

287 lines
9.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Name: app/core/qdrant_points.py
Version: v1.8.0 (2025-11-08)
Kurzbeschreibung
Punkt-Operationen (Upsert/Delete/Scroll) für mindnet:
- Notes/Chunks/Edges als einzelne Upsert-Helper
- Generischer upsert_batch(...)
- Purge/Delete-Helfer je Note (für --purge-before-upsert)
- Edge-Payload-Normalisierung + deterministische edge_id
- Wrapper ensure_collections_for_prefix(...), delegiert an app.core.qdrant
Abwärtskompatibilität
* Beibehaltung der bisherigen Funktionsnamen:
- upsert_notes(...), upsert_chunks(...), upsert_edges(...)
- upsert_batch(...)
- delete_note_scope(...)
- ensure_collections_for_prefix(...)
* Robust ggü. qdrant_client-Versionen (MatchValue-Konstruktor etc.).
Erwartete Collections
- {prefix}_notes
- {prefix}_chunks
- {prefix}_edges (1D Dummy-Vektor)
"""
from __future__ import annotations
import hashlib
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
# Delegation auf zentrale Collection-Helfer
from app.core.qdrant import (
collection_names,
ensure_collections as _ensure_collections,
ensure_payload_indexes as _ensure_payload_indexes,
)
# ---------------------------------------------------------------------------
# Utility: MatchValue versionstolerant
# ---------------------------------------------------------------------------
def _match_value(value: Any):
try:
return rest.MatchValue(value=value)
except TypeError:
return rest.MatchValue(value) # ältere Signatur
# ---------------------------------------------------------------------------
# Edge-Normalisierung & deterministische edge_id
# ---------------------------------------------------------------------------
def normalize_edge_payload(pl: Dict[str, Any]) -> Dict[str, Any]:
"""
Vereinheitlicht ältere/neuere Edge-Payload-Varianten:
alt: edge_type/src_id/dst_id
neu: kind/source_id/target_id
Ergänzt: scope, note_id (falls sinnvoll), seq
Bildet: edge_id deterministisch (uuidv5-ähnliche Signatur via sha256)
"""
out = dict(pl or {})
# Feldnormalisierung
if "kind" not in out and "edge_type" in out:
out["kind"] = out.pop("edge_type")
if "source_id" not in out and "src_id" in out:
out["source_id"] = out.pop("src_id")
if "target_id" not in out and "dst_id" in out:
out["target_id"] = out.pop("dst_id")
# Defaults
out.setdefault("scope", "note") # "note" | "chunk"
out.setdefault("seq", 0)
# deterministische edge_id
kind = str(out.get("kind", "unknown"))
scope = str(out.get("scope", "note"))
src = str(out.get("source_id", ""))
dst = str(out.get("target_id", ""))
seq = str(out.get("seq", 0))
sig = f"{kind}|{scope}|{src}|{dst}|{seq}"
edge_id = hashlib.sha256(sig.encode("utf-8")).hexdigest()
out["edge_id"] = edge_id
return out
# ---------------------------------------------------------------------------
# Ensure Collections (Wrapper für ältere Importe)
# ---------------------------------------------------------------------------
def ensure_collections_for_prefix(
client: QdrantClient, prefix: str, dim: int, destructive: bool = False
) -> Tuple[str, str, str]:
"""
Wrapper, damit ältere Aufrufer weiterhin funktionieren.
Erstellt Collections + Payload-Indizes, falls noch nicht vorhanden.
"""
_ensure_collections(client, prefix, dim, destructive=destructive)
_ensure_payload_indexes(client, prefix)
return collection_names(prefix)
# ---------------------------------------------------------------------------
# Upsert: Generisch
# ---------------------------------------------------------------------------
def upsert_batch(
client: QdrantClient,
collection: str,
payloads: Sequence[Dict[str, Any]],
*,
point_id_field: Optional[str] = None,
vectors: Optional[Sequence[Sequence[float]]] = None,
ids: Optional[Sequence[Any]] = None,
wait: bool = True,
) -> None:
"""
Generische Upsert-Funktion.
- point_id_field: wenn gesetzt, wird der Point-ID aus payload[point_id_field] entnommen.
- vectors: optional gleich lang wie payloads.
- ids: explizite Point-IDs, alternativ zu point_id_field.
Priorität Point-ID:
1) ids[i], falls übergeben
2) payload[point_id_field], falls gesetzt
3) None → Qdrant vergibt selbst (nicht empfohlen für mindnet)
"""
pts: List[rest.PointStruct] = []
for i, pl in enumerate(payloads):
pid = None
if ids is not None:
pid = ids[i]
elif point_id_field:
pid = pl.get(point_id_field)
vec = None
if vectors is not None:
vec = vectors[i]
pts.append(rest.PointStruct(id=pid, vector=vec, payload=pl))
client.upsert(collection_name=collection, points=pts, wait=wait)
# ---------------------------------------------------------------------------
# Upsert: Notes / Chunks / Edges
# ---------------------------------------------------------------------------
def upsert_notes(
client: QdrantClient,
notes_collection: str,
note_payloads: Sequence[Dict[str, Any]],
*,
note_vectors: Optional[Sequence[Sequence[float]]] = None,
wait: bool = True,
) -> None:
"""
Upsert für Notizen. Point-ID = payload['note_id'].
"""
upsert_batch(
client,
collection=notes_collection,
payloads=note_payloads,
point_id_field="note_id",
vectors=note_vectors,
wait=wait,
)
def upsert_chunks(
client: QdrantClient,
chunks_collection: str,
chunk_payloads: Sequence[Dict[str, Any]],
*,
chunk_vectors: Optional[Sequence[Sequence[float]]] = None,
wait: bool = True,
) -> None:
"""
Upsert für Chunks. Point-ID = payload['chunk_id'].
"""
upsert_batch(
client,
collection=chunks_collection,
payloads=chunk_payloads,
point_id_field="chunk_id",
vectors=chunk_vectors,
wait=wait,
)
def upsert_edges(
client: QdrantClient,
edges_collection: str,
edge_payloads: Sequence[Dict[str, Any]],
*,
wait: bool = True,
) -> None:
"""
Upsert für Edges. Point-ID = payload['edge_id'].
Vektor: 1D Dummy (0.0), Collection ist auf 1D konfiguriert.
"""
normalized: List[Dict[str, Any]] = [normalize_edge_payload(pl) for pl in edge_payloads]
dummy_vectors = [[0.0] for _ in normalized]
upsert_batch(
client,
collection=edges_collection,
payloads=normalized,
point_id_field="edge_id",
vectors=dummy_vectors,
wait=wait,
)
# ---------------------------------------------------------------------------
# Delete (für --purge-before-upsert und Sync)
# ---------------------------------------------------------------------------
def delete_by_filter(client: QdrantClient, collection: str, flt: rest.Filter, *, wait: bool = True) -> None:
client.delete(collection_name=collection, points_selector=rest.FilterSelector(filter=flt), wait=wait)
def delete_note_scope(
client: QdrantClient,
prefix: str,
note_id: str,
*,
include_edges: bool = True,
include_chunks: bool = True,
include_note: bool = False,
wait: bool = True,
) -> None:
"""
Löscht alle Chunks/Edges (und optional die Note) für eine Note.
Wird typischerweise vor einem Upsert genutzt, wenn --purge-before-upsert aktiv ist.
"""
notes_col, chunks_col, edges_col = collection_names(prefix)
if include_chunks:
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
delete_by_filter(client, chunks_col, flt, wait=wait)
if include_edges:
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
delete_by_filter(client, edges_col, flt, wait=wait)
if include_note:
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
delete_by_filter(client, notes_col, flt, wait=wait)
# ---------------------------------------------------------------------------
# Simple Queries (Scroll)
# ---------------------------------------------------------------------------
def list_point_ids_by_note(
client: QdrantClient, collection: str, note_id: str, *, id_field: str
) -> List[str]:
"""
Listet Point-IDs in einer Collection gefiltert auf payload.note_id == note_id.
"""
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
out: List[str] = []
next_page = None
while True:
points, next_page = client.scroll(
collection_name=collection,
scroll_filter=flt,
limit=512,
with_payload=True,
with_vectors=False,
offset=next_page,
)
if not points:
break
for p in points:
pl = p.payload or {}
pid = pl.get(id_field)
if isinstance(pid, str):
out.append(pid)
if next_page is None:
break
return out