mindnet/app/core/qdrant_points.py
Lars 8b3b343645
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
Dateien nach "app/core" hochladen
2025-11-08 16:27:47 +01:00

334 lines
11 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app/core/qdrant_points.py
Zweck
- Gemeinsame Helfer zum Erzeugen von Qdrant-Points für Notes, Chunks und Edges.
- Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py:
- alt: {'edge_type','src_id','dst_id', ...}
- neu: {'kind','source_id','target_id', ...}
Version
- 1.3 (2025-09-08)
Änderungen (ggü. 1.2)
- points_for_edges() akzeptiert jetzt beide Edge-Schemata.
- Normalisiert alte Felder auf 'kind' / 'source_id' / 'target_id' und schreibt eine
stabile 'edge_id' zurück in die Payload.
- Verhindert, dass mehrere Edges dieselbe Point-ID erhalten (Root Cause deiner 1-Edge-Sammlung).
Aufruf / Verwendung
- Wird von Import-/Backfill-Skripten via:
from app.core.qdrant_points import points_for_note, points_for_chunks, points_for_edges, upsert_batch
eingebunden. Keine CLI.
Hinweise
- Edges bekommen absichtlich einen 1D-Dummy-Vektor [0.0], damit Qdrant das Objekt akzeptiert.
- Die Point-IDs werden deterministisch aus stabilen Strings (UUIDv5) abgeleitet.
"""
from __future__ import annotations
import uuid
from typing import List, Tuple
from qdrant_client.http import models as rest
def _names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _to_uuid(stable_key: str) -> str:
"""Stabile UUIDv5 aus einem String-Key (deterministisch)."""
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
def points_for_note(
prefix: str,
note_payload: dict,
note_vec: List[float] | None,
dim: int,
) -> Tuple[str, List[rest.PointStruct]]:
"""Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim."""
notes_col, _, _ = _names(prefix)
vector = note_vec if note_vec is not None else [0.0] * int(dim)
raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id"
point_id = _to_uuid(raw_note_id)
pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload)
return notes_col, [pt]
def points_for_chunks(
prefix: str,
chunk_payloads: List[dict],
vectors: List[List[float]],
) -> Tuple[str, List[rest.PointStruct]]:
"""
Chunks-Collection: erwartet pro Chunk einen Vektor.
Robustheit:
- Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert).
- Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']).
"""
_, chunks_col, _ = _names(prefix)
points: List[rest.PointStruct] = []
for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1):
chunk_id = pl.get("chunk_id") or pl.get("id")
if not chunk_id:
note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note"
chunk_id = f"{note_id}#{i}"
pl["chunk_id"] = chunk_id
point_id = _to_uuid(chunk_id)
points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl))
return chunks_col, points
def _normalize_edge_payload(pl: dict) -> dict:
"""
Sorgt für kompatible Feldnamen.
akzeptiert:
- neu: kind, source_id, target_id, seq?
- alt: edge_type, src_id, dst_id, order?/index?
schreibt zurück: kind, source_id, target_id, seq?
"""
# bereits neu?
kind = pl.get("kind") or pl.get("edge_type") or "edge"
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
seq = pl.get("seq") or pl.get("order") or pl.get("index")
# in Payload zurückschreiben (ohne alte Felder zu entfernen → maximal kompatibel)
pl.setdefault("kind", kind)
pl.setdefault("source_id", source_id)
pl.setdefault("target_id", target_id)
if seq is not None and "seq" not in pl:
pl["seq"] = seq
return pl
def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]:
"""
Edges-Collection mit 1D-Dummy-Vektor.
- Akzeptiert sowohl neues als auch altes Edge-Schema (siehe _normalize_edge_payload).
- Fehlt 'edge_id', wird sie stabil aus (kind, source_id, target_id, seq) konstruiert.
"""
_, _, edges_col = _names(prefix)
points: List[rest.PointStruct] = []
for raw in edge_payloads:
pl = _normalize_edge_payload(raw)
edge_id = pl.get("edge_id")
if not edge_id:
kind = pl.get("kind", "edge")
s = pl.get("source_id", "unknown-src")
t = pl.get("target_id", "unknown-tgt")
seq = pl.get("seq") or ""
edge_id = f"{kind}:{s}->{t}#{seq}"
pl["edge_id"] = edge_id
point_id = _to_uuid(edge_id)
points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl))
return edges_col, points
def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> None:
if not points:
return
client.upsert(collection_name=collection, points=points, wait=True)
# --- WP-04 Ergänzungen: Graph/Retriever Hilfsfunktionen ---
from typing import Optional, Dict, Any, Iterable
from qdrant_client import QdrantClient
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
"""Erzeuge OR-Filter: payload[field] == any(values)."""
return rest.Filter(
should=[
rest.FieldCondition(key=field, match=rest.MatchValue(value=v))
for v in values
]
)
def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
"""Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert)."""
fs = [f for f in filters if f is not None]
if not fs:
return None
if len(fs) == 1:
return fs[0]
# rest.Filter hat must/should; wir kombinieren als must=[...]
must = []
for f in fs:
# Überführe vorhandene Bedingungen in must
if getattr(f, "must", None):
must.extend(f.must)
if getattr(f, "should", None):
# "should" als eigene Gruppe beilegen (Qdrant interpretiert OR)
must.append(rest.Filter(should=f.should))
if getattr(f, "must_not", None):
# negative Bedingungen weiterreichen
if "must_not" not in locals():
pass
return rest.Filter(must=must)
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
"""
Einfache Filterumsetzung:
- Bei Listenwerten: OR über mehrere MatchValue (field == any(values))
- Bei Skalarwerten: Gleichheit (field == value)
Für komplexere Filter (z. B. tags ∈ payload.tags) bitte erweitern.
"""
if not filters:
return None
parts = []
for k, v in filters.items():
if isinstance(v, (list, tuple, set)):
parts.append(_filter_any(k, [str(x) for x in v]))
else:
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
return _merge_filters(*parts)
def search_chunks_by_vector(
client: QdrantClient,
prefix: str,
vector: list[float],
top: int = 10,
filters: Optional[Dict[str, Any]] = None,
) -> list[tuple[str, float, dict]]:
"""
Vektorielle Suche in {prefix}_chunks.
Rückgabe: Liste von (point_id, score, payload)
"""
_, chunks_col, _ = _names(prefix)
flt = _filter_from_dict(filters)
res = client.search(
collection_name=chunks_col,
query_vector=vector,
limit=top,
with_payload=True,
with_vectors=False,
query_filter=flt,
)
out: list[tuple[str, float, dict]] = []
for r in res:
out.append((str(r.id), float(r.score), dict(r.payload or {})))
return out
def get_edges_for_sources(
client: QdrantClient,
prefix: str,
source_ids: list[str],
edge_types: Optional[list[str]] = None,
limit: int = 2048,
) -> list[dict]:
"""
Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types).
Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden).
"""
_, _, edges_col = _names(prefix)
f_src = _filter_any("source_id", source_ids)
f_kind = _filter_any("kind", edge_types) if edge_types else None
flt = _merge_filters(f_src, f_kind)
collected: list[dict] = []
next_page = None
while True:
points, next_page = client.scroll(
collection_name=edges_col,
scroll_filter=flt,
limit=min(512, limit - len(collected)),
with_payload=True,
with_vectors=False,
offset=next_page,
)
for p in points:
pl = dict(p.payload or {})
# füge die deterministische ID hinzu (nützlich für Clients)
pl.setdefault("id", str(p.id))
collected.append(pl)
if len(collected) >= limit:
return collected
if next_page is None:
break
return collected
def get_note_payload(
client: QdrantClient,
prefix: str,
note_id: str,
) -> Optional[dict]:
"""
Hole eine Note anhand ihres payload.note_id (nicht internal UUID!).
"""
notes_col, _, _ = _names(prefix)
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
points, _ = client.scroll(
collection_name=notes_col,
scroll_filter=flt,
limit=1,
with_payload=True,
with_vectors=False,
)
if not points:
return None
pl = dict(points[0].payload or {})
pl.setdefault("id", str(points[0].id))
return pl
def get_neighbor_nodes(
client: QdrantClient,
prefix: str,
target_ids: list[str],
limit_per_collection: int = 2048,
) -> dict[str, dict]:
"""
Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs.
IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs.
Rückgabe: Mapping target_id -> payload
"""
notes_col, chunks_col, _ = _names(prefix)
out: dict[str, dict] = {}
# Notes
flt_notes = _filter_any("note_id", target_ids)
next_page = None
while True:
pts, next_page = client.scroll(
collection_name=notes_col,
scroll_filter=flt_notes,
limit=256,
with_payload=True,
with_vectors=False,
offset=next_page,
)
for p in pts:
pl = dict(p.payload or {})
nid = pl.get("note_id")
if nid and nid not in out:
pl.setdefault("id", str(p.id))
out[nid] = pl
if next_page is None or len(out) >= limit_per_collection:
break
# Chunks
flt_chunks = _filter_any("chunk_id", target_ids)
next_page = None
while True:
pts, next_page = client.scroll(
collection_name=chunks_col,
scroll_filter=flt_chunks,
limit=256,
with_payload=True,
with_vectors=False,
offset=next_page,
)
for p in pts:
pl = dict(p.payload or {})
cid = pl.get("chunk_id")
if cid and cid not in out:
pl.setdefault("id", str(p.id))
out[cid] = pl
if next_page is None or len(out) >= limit_per_collection:
break
return out