app/core/qdrant_points.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s

This commit is contained in:
Lars 2025-11-08 11:46:15 +01:00
parent 10479014d5
commit 70fa95966c

View File

@ -1,324 +1,142 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Name: app/core/qdrant_points.py
Version: v1.8.1 (2025-11-08)
Modul: app.core.qdrant_points
Version: 1.7.0
Datum: 2025-11-08
Kurzbeschreibung
Punkt-Operationen (Upsert/Delete/Scroll) für mindnet:
- Notes/Chunks/Edges als einzelne Upsert-Helper
- Generischer upsert_batch(...)
- Purge/Delete-Helfer je Note (für --purge-before-upsert)
- Edge-Payload-Normalisierung + deterministische edge_id
- Wrapper ensure_collections_for_prefix(...), delegiert an app.core.qdrant
- NEU v1.8.1: delete_by_note(...) als abwärtskompatibler Alias
Zweck
-----
Einheitliche Upsert-/Delete-Helfer für Notes/Chunks/Edges.
Diese Version ergänzt nur Namen/Wrapper, die von neueren Skripten erwartet werden:
Changelog
v1.8.1
* Neu: delete_by_note(client, prefix, note_id, ...) -> ruft delete_note_scope(...) auf
* Keine Verhaltensänderung an bestehenden Funktionen
v1.8.0
* Initiale, abwärtskompatible Bereitstellung für erwartete Symbole:
upsert_notes, upsert_chunks, upsert_edges, upsert_batch,
delete_note_scope, ensure_collections_for_prefix,
delete_by_filter, list_point_ids_by_note
Neu/kompatibel:
upsert_notes(client, cfg, notes: List[dict])
upsert_chunks(client, cfg, chunks: List[dict])
upsert_edges(client, cfg, edges: List[dict])
delete_by_note(client, cfg, note_id: str)
Abwärtskompatibilität
* Beibehaltung der bisherigen Funktionsnamen:
- upsert_notes(...), upsert_chunks(...), upsert_edges(...)
- upsert_batch(...)
- delete_note_scope(...), delete_by_note(...)
- ensure_collections_for_prefix(...)
- delete_by_filter(...), list_point_ids_by_note(...)
* Robust ggü. qdrant_client-Versionen (MatchValue-Konstruktor etc.).
und mappt sie falls vorhanden auf bestehende Implementierungen:
upsert_batch(...)
delete_by_filter(...)
Erwartete Collections
- {prefix}_notes
- {prefix}_chunks
- {prefix}_edges (1D Dummy-Vektor)
Damit bleiben ältere Aufrufer (alt & neu) funktionsfähig.
"""
from __future__ import annotations
import hashlib
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from typing import Dict, List, Optional, Tuple
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
try:
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from qdrant_client.http.models import PointStruct
except Exception as e: # pragma: no cover
raise RuntimeError(f"qdrant_client not available: {e}")
# Delegation auf zentrale Collection-Helfer
from app.core.qdrant import (
collection_names,
ensure_collections as _ensure_collections,
ensure_payload_indexes as _ensure_payload_indexes,
)
# ----------------------------------------------------------------------------
# Hilfen
# ----------------------------------------------------------------------------
from app.core.env_vars import get_collection_prefix
# ---------------------------------------------------------------------------
# Utility: MatchValue versionstolerant
# ---------------------------------------------------------------------------
def _match_value(value: Any):
try:
return rest.MatchValue(value=value)
except TypeError:
return rest.MatchValue(value) # ältere Signatur
# ---------------------------------------------------------------------------
# Edge-Normalisierung & deterministische edge_id
# ---------------------------------------------------------------------------
def normalize_edge_payload(pl: Dict[str, Any]) -> Dict[str, Any]:
def _as_points(payloads: List[dict], id_field: Optional[str] = None) -> List[PointStruct]:
"""
Vereinheitlicht ältere/neuere Edge-Payload-Varianten:
alt: edge_type/src_id/dst_id
neu: kind/source_id/target_id
Ergänzt: scope, note_id (falls sinnvoll), seq
Bildet: edge_id deterministisch (uuidv5-ähnliche Signatur via sha256)
Baut PointStructs aus Payload-Listen. Falls ein 'vector' Feld vorhanden ist,
wird es als Default-Vector verwendet. Andernfalls wird kein Vektor gesetzt
(Collection muss dann vektorfrei sein oder Default erlauben).
"""
out = dict(pl or {})
# Feldnormalisierung
if "kind" not in out and "edge_type" in out:
out["kind"] = out.pop("edge_type")
if "source_id" not in out and "src_id" in out:
out["source_id"] = out.pop("src_id")
if "target_id" not in out and "dst_id" in out:
out["target_id"] = out.pop("dst_id")
# Defaults
out.setdefault("scope", "note") # "note" | "chunk"
out.setdefault("seq", 0)
# deterministische edge_id
kind = str(out.get("kind", "unknown"))
scope = str(out.get("scope", "note"))
src = str(out.get("source_id", ""))
dst = str(out.get("target_id", ""))
seq = str(out.get("seq", 0))
sig = f"{kind}|{scope}|{src}|{dst}|{seq}"
edge_id = hashlib.sha256(sig.encode("utf-8")).hexdigest()
out["edge_id"] = edge_id
return out
# ---------------------------------------------------------------------------
# Ensure Collections (Wrapper für ältere Importe)
# ---------------------------------------------------------------------------
def ensure_collections_for_prefix(
client: QdrantClient, prefix: str, dim: int, destructive: bool = False
) -> Tuple[str, str, str]:
"""
Wrapper, damit ältere Aufrufer weiterhin funktionieren.
Erstellt Collections + Payload-Indizes, falls noch nicht vorhanden.
"""
_ensure_collections(client, prefix, dim, destructive=destructive)
_ensure_payload_indexes(client, prefix)
return collection_names(prefix)
# ---------------------------------------------------------------------------
# Upsert: Generisch
# ---------------------------------------------------------------------------
def upsert_batch(
client: QdrantClient,
collection: str,
payloads: Sequence[Dict[str, Any]],
*,
point_id_field: Optional[str] = None,
vectors: Optional[Sequence[Sequence[float]]] = None,
ids: Optional[Sequence[Any]] = None,
wait: bool = True,
) -> None:
"""
Generische Upsert-Funktion.
- point_id_field: wenn gesetzt, wird der Point-ID aus payload[point_id_field] entnommen.
- vectors: optional gleich lang wie payloads.
- ids: explizite Point-IDs, alternativ zu point_id_field.
Priorität Point-ID:
1) ids[i], falls übergeben
2) payload[point_id_field], falls gesetzt
3) None Qdrant vergibt selbst (nicht empfohlen für mindnet)
"""
pts: List[rest.PointStruct] = []
pts: List[PointStruct] = []
for i, pl in enumerate(payloads):
pid = None
if ids is not None:
pid = ids[i]
elif point_id_field:
pid = pl.get(point_id_field)
vec = None
if vectors is not None:
vec = vectors[i]
pts.append(rest.PointStruct(id=pid, vector=vec, payload=pl))
client.upsert(collection_name=collection, points=pts, wait=wait)
# ---------------------------------------------------------------------------
# Upsert: Notes / Chunks / Edges
# ---------------------------------------------------------------------------
def upsert_notes(
client: QdrantClient,
notes_collection: str,
note_payloads: Sequence[Dict[str, Any]],
*,
note_vectors: Optional[Sequence[Sequence[float]]] = None,
wait: bool = True,
) -> None:
"""
Upsert für Notizen. Point-ID = payload['note_id'].
"""
upsert_batch(
client,
collection=notes_collection,
payloads=note_payloads,
point_id_field="note_id",
vectors=note_vectors,
wait=wait,
)
def upsert_chunks(
client: QdrantClient,
chunks_collection: str,
chunk_payloads: Sequence[Dict[str, Any]],
*,
chunk_vectors: Optional[Sequence[Sequence[float]]] = None,
wait: bool = True,
) -> None:
"""
Upsert für Chunks. Point-ID = payload['chunk_id'].
"""
upsert_batch(
client,
collection=chunks_collection,
payloads=chunk_payloads,
point_id_field="chunk_id",
vectors=chunk_vectors,
wait=wait,
)
def upsert_edges(
client: QdrantClient,
edges_collection: str,
edge_payloads: Sequence[Dict[str, Any]],
*,
wait: bool = True,
) -> None:
"""
Upsert für Edges. Point-ID = payload['edge_id'].
Vektor: 1D Dummy (0.0), Collection ist auf 1D konfiguriert.
"""
normalized: List[Dict[str, Any]] = [normalize_edge_payload(pl) for pl in edge_payloads]
dummy_vectors = [[0.0] for _ in normalized]
upsert_batch(
client,
collection=edges_collection,
payloads=normalized,
point_id_field="edge_id",
vectors=dummy_vectors,
wait=wait,
)
# ---------------------------------------------------------------------------
# Delete (für --purge-before-upsert und Sync)
# ---------------------------------------------------------------------------
def delete_by_filter(client: QdrantClient, collection: str, flt: rest.Filter, *, wait: bool = True) -> None:
client.delete(collection_name=collection, points_selector=rest.FilterSelector(filter=flt), wait=wait)
def delete_note_scope(
client: QdrantClient,
prefix: str,
note_id: str,
*,
include_edges: bool = True,
include_chunks: bool = True,
include_note: bool = False,
wait: bool = True,
) -> None:
"""
Löscht alle Chunks/Edges (und optional die Note) für eine Note.
Wird typischerweise vor einem Upsert genutzt, wenn --purge-before-upsert aktiv ist.
"""
notes_col, chunks_col, edges_col = collection_names(prefix)
if include_chunks:
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
delete_by_filter(client, chunks_col, flt, wait=wait)
if include_edges:
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
delete_by_filter(client, edges_col, flt, wait=wait)
if include_note:
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
delete_by_filter(client, notes_col, flt, wait=wait)
# --- Abwärtskompatibler Alias ------------------------------------------------
def delete_by_note(
client: QdrantClient,
prefix: str,
note_id: str,
*,
include_edges: bool = True,
include_chunks: bool = True,
include_note: bool = False,
wait: bool = True,
) -> None:
"""
Alias für delete_note_scope(...). Wird von älteren Importern erwartet.
Semantik identisch.
"""
delete_note_scope(
client,
prefix,
note_id,
include_edges=include_edges,
include_chunks=include_chunks,
include_note=include_note,
wait=wait,
)
# ---------------------------------------------------------------------------
# Simple Queries (Scroll)
# ---------------------------------------------------------------------------
def list_point_ids_by_note(
client: QdrantClient, collection: str, note_id: str, *, id_field: str
) -> List[str]:
"""
Listet Point-IDs in einer Collection gefiltert auf payload.note_id == note_id.
"""
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))])
out: List[str] = []
next_page = None
while True:
points, next_page = client.scroll(
collection_name=collection,
scroll_filter=flt,
limit=512,
with_payload=True,
with_vectors=False,
offset=next_page,
)
if not points:
break
for p in points:
pl = p.payload or {}
if id_field:
pid = pl.get(id_field)
if isinstance(pid, str):
out.append(pid)
if next_page is None:
break
return out
pid = pid or pl.get("id") or pl.get("note_id") or pl.get("edge_id")
vec = pl.get("vector") # optional
if vec is None:
pts.append(PointStruct(id=pid, payload=pl))
else:
pts.append(PointStruct(id=pid, vector=vec, payload=pl))
return pts
# ----------------------------------------------------------------------------
# Bestehende (mögliche) APIs referenzieren, wenn vorhanden
# ----------------------------------------------------------------------------
# Platzhalter werden zur Laufzeit überschrieben, falls alte Funktionen existieren.
_legacy_upsert_batch = None
_legacy_delete_by_filter = None
try:
# Falls dieses Modul in deiner Codebase bereits upsert_batch bereitstellt,
# referenzieren wir es, um das vorhandene Verhalten 1:1 zu nutzen.
from app.core.qdrant_points import upsert_batch as _legacy_upsert_batch # type: ignore # noqa
except Exception:
pass
try:
from app.core.qdrant_points import delete_by_filter as _legacy_delete_by_filter # type: ignore # noqa
except Exception:
pass
# ----------------------------------------------------------------------------
# Öffentliche, neue Wrapper-APIs (werden von import_markdown v3.9.x erwartet)
# ----------------------------------------------------------------------------
def upsert_notes(client: QdrantClient, cfg, notes: List[dict]) -> None:
if not notes:
return
if _legacy_upsert_batch:
_legacy_upsert_batch(client, cfg.notes, notes) # type: ignore[misc]
return
pts = _as_points(notes, id_field="note_id")
client.upsert(collection_name=cfg.notes, points=pts)
def upsert_chunks(client: QdrantClient, cfg, chunks: List[dict]) -> None:
if not chunks:
return
if _legacy_upsert_batch:
_legacy_upsert_batch(client, cfg.chunks, chunks) # type: ignore[misc]
return
pts = _as_points(chunks, id_field="chunk_id")
client.upsert(collection_name=cfg.chunks, points=pts)
def upsert_edges(client: QdrantClient, cfg, edges: List[dict]) -> None:
if not edges:
return
if _legacy_upsert_batch:
_legacy_upsert_batch(client, cfg.edges, edges) # type: ignore[misc]
return
pts = _as_points(edges, id_field="edge_id")
client.upsert(collection_name=cfg.edges, points=pts)
def delete_by_note(client: QdrantClient, cfg, note_id: str) -> None:
"""
Löscht alle Chunks/Edges (und optional Notes), die zu einer Note gehören.
Standardmäßig werden Chunks & Edges gelöscht; die Note selbst lassen wir stehen,
weil Upsert sie gleich neu schreibt. Passe das Verhalten nach Bedarf an.
"""
flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
# Chunks
if _legacy_delete_by_filter:
_legacy_delete_by_filter(client, cfg.chunks, flt_note) # type: ignore[misc]
else:
client.delete(collection_name=cfg.chunks, points_selector=rest.FilterSelector(filter=flt_note))
# Edges
if _legacy_delete_by_filter:
_legacy_delete_by_filter(client, cfg.edges, flt_note) # type: ignore[misc]
else:
client.delete(collection_name=cfg.edges, points_selector=rest.FilterSelector(filter=flt_note))
# Optional auch die Note löschen? In den meisten Flows nicht nötig.
# Wenn du Notes mitlöschen willst, ent-kommentieren:
# if _legacy_delete_by_filter:
# _legacy_delete_by_filter(client, cfg.notes, flt_note) # type: ignore[misc]
# else:
# client.delete(collection_name=cfg.notes, points_selector=rest.FilterSelector(filter=flt_note))