app/core/qdrant_points.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
This commit is contained in:
parent
52d5d01337
commit
55c6a5058c
|
|
@ -134,3 +134,200 @@ def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> Non
|
||||||
if not points:
|
if not points:
|
||||||
return
|
return
|
||||||
client.upsert(collection_name=collection, points=points, wait=True)
|
client.upsert(collection_name=collection, points=points, wait=True)
|
||||||
|
|
||||||
|
# --- WP-04 Ergänzungen: Graph/Retriever Hilfsfunktionen ---
|
||||||
|
from typing import Optional, Dict, Any, Iterable
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
|
||||||
|
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
|
||||||
|
"""Erzeuge OR-Filter: payload[field] == any(values)."""
|
||||||
|
return rest.Filter(
|
||||||
|
should=[
|
||||||
|
rest.FieldCondition(key=field, match=rest.MatchValue(value=v))
|
||||||
|
for v in values
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
|
||||||
|
"""Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert)."""
|
||||||
|
fs = [f for f in filters if f is not None]
|
||||||
|
if not fs:
|
||||||
|
return None
|
||||||
|
if len(fs) == 1:
|
||||||
|
return fs[0]
|
||||||
|
# rest.Filter hat must/should; wir kombinieren als must=[...]
|
||||||
|
must = []
|
||||||
|
for f in fs:
|
||||||
|
# Überführe vorhandene Bedingungen in must
|
||||||
|
if getattr(f, "must", None):
|
||||||
|
must.extend(f.must)
|
||||||
|
if getattr(f, "should", None):
|
||||||
|
# "should" als eigene Gruppe beilegen (Qdrant interpretiert OR)
|
||||||
|
must.append(rest.Filter(should=f.should))
|
||||||
|
if getattr(f, "must_not", None):
|
||||||
|
# negative Bedingungen weiterreichen
|
||||||
|
if "must_not" not in locals():
|
||||||
|
pass
|
||||||
|
return rest.Filter(must=must)
|
||||||
|
|
||||||
|
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
|
||||||
|
"""
|
||||||
|
Einfache Filterumsetzung:
|
||||||
|
- Bei Listenwerten: OR über mehrere MatchValue (field == any(values))
|
||||||
|
- Bei Skalarwerten: Gleichheit (field == value)
|
||||||
|
Für komplexere Filter (z. B. tags ∈ payload.tags) bitte erweitern.
|
||||||
|
"""
|
||||||
|
if not filters:
|
||||||
|
return None
|
||||||
|
parts = []
|
||||||
|
for k, v in filters.items():
|
||||||
|
if isinstance(v, (list, tuple, set)):
|
||||||
|
parts.append(_filter_any(k, [str(x) for x in v]))
|
||||||
|
else:
|
||||||
|
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
|
||||||
|
return _merge_filters(*parts)
|
||||||
|
|
||||||
|
def search_chunks_by_vector(
|
||||||
|
client: QdrantClient,
|
||||||
|
prefix: str,
|
||||||
|
vector: list[float],
|
||||||
|
top: int = 10,
|
||||||
|
filters: Optional[Dict[str, Any]] = None,
|
||||||
|
) -> list[tuple[str, float, dict]]:
|
||||||
|
"""
|
||||||
|
Vektorielle Suche in {prefix}_chunks.
|
||||||
|
Rückgabe: Liste von (point_id, score, payload)
|
||||||
|
"""
|
||||||
|
_, chunks_col, _ = _names(prefix)
|
||||||
|
flt = _filter_from_dict(filters)
|
||||||
|
res = client.search(
|
||||||
|
collection_name=chunks_col,
|
||||||
|
query_vector=vector,
|
||||||
|
limit=top,
|
||||||
|
with_payload=True,
|
||||||
|
with_vectors=False,
|
||||||
|
query_filter=flt,
|
||||||
|
)
|
||||||
|
out: list[tuple[str, float, dict]] = []
|
||||||
|
for r in res:
|
||||||
|
out.append((str(r.id), float(r.score), dict(r.payload or {})))
|
||||||
|
return out
|
||||||
|
|
||||||
|
def get_edges_for_sources(
|
||||||
|
client: QdrantClient,
|
||||||
|
prefix: str,
|
||||||
|
source_ids: list[str],
|
||||||
|
edge_types: Optional[list[str]] = None,
|
||||||
|
limit: int = 2048,
|
||||||
|
) -> list[dict]:
|
||||||
|
"""
|
||||||
|
Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types).
|
||||||
|
Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden).
|
||||||
|
"""
|
||||||
|
_, _, edges_col = _names(prefix)
|
||||||
|
f_src = _filter_any("source_id", source_ids)
|
||||||
|
f_kind = _filter_any("kind", edge_types) if edge_types else None
|
||||||
|
flt = _merge_filters(f_src, f_kind)
|
||||||
|
|
||||||
|
collected: list[dict] = []
|
||||||
|
next_page = None
|
||||||
|
while True:
|
||||||
|
points, next_page = client.scroll(
|
||||||
|
collection_name=edges_col,
|
||||||
|
scroll_filter=flt,
|
||||||
|
limit=min(512, limit - len(collected)),
|
||||||
|
with_payload=True,
|
||||||
|
with_vectors=False,
|
||||||
|
offset=next_page,
|
||||||
|
)
|
||||||
|
for p in points:
|
||||||
|
pl = dict(p.payload or {})
|
||||||
|
# füge die deterministische ID hinzu (nützlich für Clients)
|
||||||
|
pl.setdefault("id", str(p.id))
|
||||||
|
collected.append(pl)
|
||||||
|
if len(collected) >= limit:
|
||||||
|
return collected
|
||||||
|
if next_page is None:
|
||||||
|
break
|
||||||
|
return collected
|
||||||
|
|
||||||
|
def get_note_payload(
|
||||||
|
client: QdrantClient,
|
||||||
|
prefix: str,
|
||||||
|
note_id: str,
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""
|
||||||
|
Hole eine Note anhand ihres payload.note_id (nicht internal UUID!).
|
||||||
|
"""
|
||||||
|
notes_col, _, _ = _names(prefix)
|
||||||
|
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||||||
|
points, _ = client.scroll(
|
||||||
|
collection_name=notes_col,
|
||||||
|
scroll_filter=flt,
|
||||||
|
limit=1,
|
||||||
|
with_payload=True,
|
||||||
|
with_vectors=False,
|
||||||
|
)
|
||||||
|
if not points:
|
||||||
|
return None
|
||||||
|
pl = dict(points[0].payload or {})
|
||||||
|
pl.setdefault("id", str(points[0].id))
|
||||||
|
return pl
|
||||||
|
|
||||||
|
def get_neighbor_nodes(
|
||||||
|
client: QdrantClient,
|
||||||
|
prefix: str,
|
||||||
|
target_ids: list[str],
|
||||||
|
limit_per_collection: int = 2048,
|
||||||
|
) -> dict[str, dict]:
|
||||||
|
"""
|
||||||
|
Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs.
|
||||||
|
IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs.
|
||||||
|
Rückgabe: Mapping target_id -> payload
|
||||||
|
"""
|
||||||
|
notes_col, chunks_col, _ = _names(prefix)
|
||||||
|
out: dict[str, dict] = {}
|
||||||
|
|
||||||
|
# Notes
|
||||||
|
flt_notes = _filter_any("note_id", target_ids)
|
||||||
|
next_page = None
|
||||||
|
while True:
|
||||||
|
pts, next_page = client.scroll(
|
||||||
|
collection_name=notes_col,
|
||||||
|
scroll_filter=flt_notes,
|
||||||
|
limit=256,
|
||||||
|
with_payload=True,
|
||||||
|
with_vectors=False,
|
||||||
|
offset=next_page,
|
||||||
|
)
|
||||||
|
for p in pts:
|
||||||
|
pl = dict(p.payload or {})
|
||||||
|
nid = pl.get("note_id")
|
||||||
|
if nid and nid not in out:
|
||||||
|
pl.setdefault("id", str(p.id))
|
||||||
|
out[nid] = pl
|
||||||
|
if next_page is None or len(out) >= limit_per_collection:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Chunks
|
||||||
|
flt_chunks = _filter_any("chunk_id", target_ids)
|
||||||
|
next_page = None
|
||||||
|
while True:
|
||||||
|
pts, next_page = client.scroll(
|
||||||
|
collection_name=chunks_col,
|
||||||
|
scroll_filter=flt_chunks,
|
||||||
|
limit=256,
|
||||||
|
with_payload=True,
|
||||||
|
with_vectors=False,
|
||||||
|
offset=next_page,
|
||||||
|
)
|
||||||
|
for p in pts:
|
||||||
|
pl = dict(p.payload or {})
|
||||||
|
cid = pl.get("chunk_id")
|
||||||
|
if cid and cid not in out:
|
||||||
|
pl.setdefault("id", str(p.id))
|
||||||
|
out[cid] = pl
|
||||||
|
if next_page is None or len(out) >= limit_per_collection:
|
||||||
|
break
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user