From 55c6a5058c1fa0da332c62a5ed07ce749deabbd5 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 7 Oct 2025 10:51:12 +0200 Subject: [PATCH] app/core/qdrant_points.py aktualisiert --- app/core/qdrant_points.py | 197 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/app/core/qdrant_points.py b/app/core/qdrant_points.py index 91b2c23..e0b608d 100644 --- a/app/core/qdrant_points.py +++ b/app/core/qdrant_points.py @@ -134,3 +134,200 @@ def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> Non if not points: return client.upsert(collection_name=collection, points=points, wait=True) + +# --- WP-04 Ergänzungen: Graph/Retriever Hilfsfunktionen --- +from typing import Optional, Dict, Any, Iterable +from qdrant_client import QdrantClient + +def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: + """Erzeuge OR-Filter: payload[field] == any(values).""" + return rest.Filter( + should=[ + rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) + for v in values + ] + ) + +def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: + """Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert).""" + fs = [f for f in filters if f is not None] + if not fs: + return None + if len(fs) == 1: + return fs[0] + # rest.Filter hat must/should; wir kombinieren als must=[...] + must = [] + for f in fs: + # Überführe vorhandene Bedingungen in must + if getattr(f, "must", None): + must.extend(f.must) + if getattr(f, "should", None): + # "should" als eigene Gruppe beilegen (Qdrant interpretiert OR) + must.append(rest.Filter(should=f.should)) + if getattr(f, "must_not", None): + # negative Bedingungen weiterreichen + if "must_not" not in locals(): + pass + return rest.Filter(must=must) + +def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: + """ + Einfache Filterumsetzung: + - Bei Listenwerten: OR über mehrere MatchValue (field == any(values)) + - Bei Skalarwerten: Gleichheit (field == value) + Für komplexere Filter (z. B. tags ∈ payload.tags) bitte erweitern. + """ + if not filters: + return None + parts = [] + for k, v in filters.items(): + if isinstance(v, (list, tuple, set)): + parts.append(_filter_any(k, [str(x) for x in v])) + else: + parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))])) + return _merge_filters(*parts) + +def search_chunks_by_vector( + client: QdrantClient, + prefix: str, + vector: list[float], + top: int = 10, + filters: Optional[Dict[str, Any]] = None, +) -> list[tuple[str, float, dict]]: + """ + Vektorielle Suche in {prefix}_chunks. + Rückgabe: Liste von (point_id, score, payload) + """ + _, chunks_col, _ = _names(prefix) + flt = _filter_from_dict(filters) + res = client.search( + collection_name=chunks_col, + query_vector=vector, + limit=top, + with_payload=True, + with_vectors=False, + query_filter=flt, + ) + out: list[tuple[str, float, dict]] = [] + for r in res: + out.append((str(r.id), float(r.score), dict(r.payload or {}))) + return out + +def get_edges_for_sources( + client: QdrantClient, + prefix: str, + source_ids: list[str], + edge_types: Optional[list[str]] = None, + limit: int = 2048, +) -> list[dict]: + """ + Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types). + Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden). + """ + _, _, edges_col = _names(prefix) + f_src = _filter_any("source_id", source_ids) + f_kind = _filter_any("kind", edge_types) if edge_types else None + flt = _merge_filters(f_src, f_kind) + + collected: list[dict] = [] + next_page = None + while True: + points, next_page = client.scroll( + collection_name=edges_col, + scroll_filter=flt, + limit=min(512, limit - len(collected)), + with_payload=True, + with_vectors=False, + offset=next_page, + ) + for p in points: + pl = dict(p.payload or {}) + # füge die deterministische ID hinzu (nützlich für Clients) + pl.setdefault("id", str(p.id)) + collected.append(pl) + if len(collected) >= limit: + return collected + if next_page is None: + break + return collected + +def get_note_payload( + client: QdrantClient, + prefix: str, + note_id: str, +) -> Optional[dict]: + """ + Hole eine Note anhand ihres payload.note_id (nicht internal UUID!). + """ + notes_col, _, _ = _names(prefix) + flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + points, _ = client.scroll( + collection_name=notes_col, + scroll_filter=flt, + limit=1, + with_payload=True, + with_vectors=False, + ) + if not points: + return None + pl = dict(points[0].payload or {}) + pl.setdefault("id", str(points[0].id)) + return pl + +def get_neighbor_nodes( + client: QdrantClient, + prefix: str, + target_ids: list[str], + limit_per_collection: int = 2048, +) -> dict[str, dict]: + """ + Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs. + IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs. + Rückgabe: Mapping target_id -> payload + """ + notes_col, chunks_col, _ = _names(prefix) + out: dict[str, dict] = {} + + # Notes + flt_notes = _filter_any("note_id", target_ids) + next_page = None + while True: + pts, next_page = client.scroll( + collection_name=notes_col, + scroll_filter=flt_notes, + limit=256, + with_payload=True, + with_vectors=False, + offset=next_page, + ) + for p in pts: + pl = dict(p.payload or {}) + nid = pl.get("note_id") + if nid and nid not in out: + pl.setdefault("id", str(p.id)) + out[nid] = pl + if next_page is None or len(out) >= limit_per_collection: + break + + # Chunks + flt_chunks = _filter_any("chunk_id", target_ids) + next_page = None + while True: + pts, next_page = client.scroll( + collection_name=chunks_col, + scroll_filter=flt_chunks, + limit=256, + with_payload=True, + with_vectors=False, + offset=next_page, + ) + for p in pts: + pl = dict(p.payload or {}) + cid = pl.get("chunk_id") + if cid and cid not in out: + pl.setdefault("id", str(p.id)) + out[cid] = pl + if next_page is None or len(out) >= limit_per_collection: + break + + return out