From 8b3b34364566d55fd689008d2bdbcde105bc9830 Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 8 Nov 2025 16:27:47 +0100 Subject: [PATCH] Dateien nach "app/core" hochladen --- app/core/qdrant.py | 297 +++++++++----------------- app/core/qdrant_points.py | 425 +++++++++++++++++++++++++++----------- 2 files changed, 404 insertions(+), 318 deletions(-) diff --git a/app/core/qdrant.py b/app/core/qdrant.py index 6137a2b..8cea113 100644 --- a/app/core/qdrant.py +++ b/app/core/qdrant.py @@ -1,229 +1,124 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Modul: app.core.qdrant -Version: 1.8.0 -Datum: 2025-11-08 +Name: app/core/qdrant.py +Version: v1.4.0 (2025-09-09) -Zweck ------ -Zentrale Qdrant-Hilfen (Config, Client, Collections, Zähl- & Listenfunktionen). -Diese Version ergänzt: - • QdrantConfig.from_env(prefix: Optional[str]) -> erwartet von import_markdown v3.9.x - • list_note_ids(), fetch_one_note() -> erwartet von import_markdown v3.9.x - • count_points() -> konsolidierte Zählwerte +Kurzbeschreibung: + Qdrant-Client & Collection-Setup für mindnet. + - Stellt sicher, dass {prefix}_notes / {prefix}_chunks / {prefix}_edges existieren. + - Edges-Collection nutzt 1D Dummy-Vektor. + - NEW: ensure_payload_indexes(...) legt sinnvolle Payload-Indizes an. -Abwärtskompatibilität ---------------------- -• Bestehende Funktionen/Signaturen bleiben erhalten. -• Neue Funktionen sind additive Erweiterungen. -• Nutzt Env-Variablen: - COLLECTION_PREFIX (bevorzugt für Collection-Präfix) - MINDNET_PREFIX (Legacy-Fallback) - QDRANT_HOST, QDRANT_PORT, QDRANT_API_KEY - -Wichtig: Diese Datei legt KEINE Collections neu an (Schemafragen bleiben unangetastet), -sondern stellt nur ensure_collections(...) bereit, das eine vorhandene Anlage respektiert. +Aufruf: + from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes """ - from __future__ import annotations - import os from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple +from typing import Optional, Tuple -try: - from qdrant_client import QdrantClient - from qdrant_client.conversions.conversion import payload_to_grpc - from qdrant_client.http import models as rest -except Exception as e: # pragma: no cover - raise RuntimeError(f"qdrant_client not available: {e}") +from qdrant_client import QdrantClient +from qdrant_client.http import models as rest -# --------------------------------------------------------------------------- -# Konfiguration -# --------------------------------------------------------------------------- - @dataclass class QdrantConfig: - host: str - port: int + url: str api_key: Optional[str] prefix: str - notes: str - chunks: str - edges: str + dim: int @staticmethod - def from_env(prefix: Optional[str] = None) -> "QdrantConfig": - """Erzeuge Config aus ENV; optional extern gesetztes prefix überschreibt ENV. + def from_env() -> "QdrantConfig": + url = os.getenv("QDRANT_URL") + if not url: + host = os.getenv("QDRANT_HOST", "127.0.0.1") + port = int(os.getenv("QDRANT_PORT", "6333")) + url = f"http://{host}:{port}" + api_key = os.getenv("QDRANT_API_KEY") or None + prefix = os.getenv("COLLECTION_PREFIX", "mindnet") + dim = int(os.getenv("VECTOR_DIM", "384")) + return QdrantConfig(url=url, api_key=api_key, prefix=prefix, dim=dim) - Präfix-Priorität: - 1) Funktionsargument `prefix` (falls gesetzt & nicht leer) - 2) ENV COLLECTION_PREFIX - 3) ENV MINDNET_PREFIX - 4) Default "mindnet" - """ - host = os.environ.get("QDRANT_HOST", "localhost").strip() or "localhost" - port_s = os.environ.get("QDRANT_PORT", "6333").strip() - api_key = os.environ.get("QDRANT_API_KEY", "").strip() or None - - env_prefix = (os.environ.get("COLLECTION_PREFIX", "") or os.environ.get("MINDNET_PREFIX", "")).strip() - use_prefix = (prefix or env_prefix or "mindnet").strip() - - return QdrantConfig( - host=host, - port=int(port_s) if port_s.isdigit() else 6333, - api_key=api_key, - prefix=use_prefix, - notes=f"{use_prefix}_notes", - chunks=f"{use_prefix}_chunks", - edges=f"{use_prefix}_edges", - ) - - -# --------------------------------------------------------------------------- -# Client -# --------------------------------------------------------------------------- def get_client(cfg: QdrantConfig) -> QdrantClient: - """Erzeuge QdrantClient gemäß Konfiguration.""" - return QdrantClient(host=cfg.host, port=cfg.port, api_key=cfg.api_key) + return QdrantClient(url=cfg.url, api_key=cfg.api_key) -# --------------------------------------------------------------------------- -# Collections sicherstellen (ohne Schemazwang) -# --------------------------------------------------------------------------- - -def _collection_exists(client: QdrantClient, name: str) -> bool: - try: - _ = client.get_collection(name) - return True - except Exception: - return False - - -def ensure_collections(client: QdrantClient, cfg: QdrantConfig) -> None: - """ - Stellt sicher, dass die drei Collections existieren. - Diese Funktion erzwingt KEIN bestimmtes Schema. Falls Collections fehlen, - wird eine minimal valide Anlage mit Default-Vektordefinition (1-Dummy) - nur für den Notfall versucht. In existierenden Umgebungen greift das nicht. - """ - # Falls vorhanden: nichts tun. - for name in (cfg.notes, cfg.chunks, cfg.edges): - if _collection_exists(client, name): - continue - # Minimal-Anlage: vektorlos, falls Server dies unterstützt; sonst 1D-Vector. - # Wir versuchen zuerst vektorlos (neuere Qdrant-Versionen erlauben "vectors=None"). - try: - client.recreate_collection( - collection_name=name, - vectors_config=None, # type: ignore[arg-type] - ) - continue - except Exception: - pass - # Fallback: 1D-Vector - try: - client.recreate_collection( - collection_name=name, - vectors_config=rest.VectorParams(size=1, distance=rest.Distance.COSINE), - ) - except Exception as e: # pragma: no cover - raise RuntimeError(f"Failed to create collection '{name}': {e}") - - -# --------------------------------------------------------------------------- -# Zähl- & Hilfsfunktionen -# --------------------------------------------------------------------------- - -def count_points(client: QdrantClient, cfg: QdrantConfig) -> Dict[str, int]: - """Zähle Punkte in allen Collections (exact=True).""" - res = {} - for name, key in ((cfg.notes, "notes"), (cfg.chunks, "chunks"), (cfg.edges, "edges")): - try: - c = client.count(name, exact=True) - res[key] = int(c.count) # type: ignore[attr-defined] - except Exception: - # Fallback, falls count nicht verfügbar ist: - try: - pts, _ = client.scroll(name, limit=1) - # Wenn scroll funktioniert, holen wir via get_collection die config/points_count - meta = client.get_collection(name) - # qdrant_client >=1.7 liefert ggf. points_count im Status: - points_count = getattr(meta, "points_count", None) - if isinstance(points_count, int): - res[key] = points_count - else: - # Worst case: scrollen wir "grob" (vermeiden wir hier aus Performancegründen) - res[key] = 0 - except Exception: - res[key] = 0 - return res - - -def list_note_ids(client: QdrantClient, collection: str, batch: int = 2048) -> List[str]: - """ - Liefert alle note_id-Werte aus einer Collection, die Notes speichert. - Greift die Payload-Felder 'note_id' bzw. 'id' auf (falls ersteres fehlt). - """ - out: List[str] = [] - next_page: Optional[List[int]] = None # offset - while True: - pts, next_page = client.scroll( - collection_name=collection, - with_payload=True, - limit=batch, - offset=next_page, +def _create_notes(client: QdrantClient, name: str, dim: int) -> None: + if not client.collection_exists(name): + client.create_collection( + collection_name=name, + vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE), ) - if not pts: - break - for p in pts: - pl = p.payload or {} - nid = pl.get("note_id") or pl.get("id") - if isinstance(nid, str): - out.append(nid) - if not next_page: - break - return out - -def fetch_one_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> Optional[Dict]: - """ - Holt genau eine Note-Payload anhand note_id (oder id). - Gibt Payload-Dict zurück oder None. - """ - flt = rest.Filter( - must=[ - rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)) - ] - ) - try: - pts = client.scroll( - collection_name=cfg.notes, - with_payload=True, - scroll_filter=flt, - limit=1, - )[0] - if pts: - pl = pts[0].payload or {} - return dict(pl) - except Exception: - # Fallback: versuchen mit 'id' - flt2 = rest.Filter( - must=[rest.FieldCondition(key="id", match=rest.MatchValue(value=note_id))] +def _create_chunks(client: QdrantClient, name: str, dim: int) -> None: + if not client.collection_exists(name): + client.create_collection( + collection_name=name, + vectors_config=rest.VectorParams(size=dim, distance=rest.Distance.COSINE), ) + +def _create_edges(client: QdrantClient, name: str) -> None: + if not client.collection_exists(name): + client.create_collection( + collection_name=name, + vectors_config=rest.VectorParams(size=1, distance=rest.Distance.DOT), # 1D-Dummy + ) + + +def ensure_collections(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> None: + notes = f"{prefix}_notes" + chunks = f"{prefix}_chunks" + edges = f"{prefix}_edges" + + _create_notes(client, notes, dim) + _create_chunks(client, chunks, dim) + + if client.collection_exists(edges): try: - pts = client.scroll( - collection_name=cfg.notes, - with_payload=True, - scroll_filter=flt2, - limit=1, - )[0] - if pts: - pl = pts[0].payload or {} - return dict(pl) + info = client.get_collection(edges) + vectors_cfg = getattr(getattr(info.result, "config", None), "params", None) + has_vectors = getattr(vectors_cfg, "vectors", None) is not None except Exception: - return None - return None + has_vectors = True + if not has_vectors: + if destructive: + client.delete_collection(edges) + _create_edges(client, edges) + else: + print(f"[ensure_collections] WARN: '{edges}' ohne VectorConfig; destructive=False.", flush=True) + else: + _create_edges(client, edges) + + +def collection_names(prefix: str) -> Tuple[str, str, str]: + return (f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges") + + +# ------------------------------- +# NEW: Payload-Indexing +# ------------------------------- + +def _safe_create_index(client: QdrantClient, col: str, field: str, schema: rest.PayloadSchemaType): + try: + client.create_payload_index( + collection_name=col, + field_name=field, + field_schema=schema, + ) + except Exception: + # bereits vorhanden oder nicht unterstütztes Schema → ignorieren + pass + +def ensure_payload_indexes(client: QdrantClient, prefix: str) -> None: + notes, chunks, edges = collection_names(prefix) + # Notes + _safe_create_index(client, notes, "note_id", rest.PayloadSchemaType.KEYWORD) + # Chunks + _safe_create_index(client, chunks, "note_id", rest.PayloadSchemaType.KEYWORD) + _safe_create_index(client, chunks, "chunk_index", rest.PayloadSchemaType.INTEGER) + # Edges + for f in ("kind", "scope", "source_id", "target_id", "note_id"): + _safe_create_index(client, edges, f, rest.PayloadSchemaType.KEYWORD) diff --git a/app/core/qdrant_points.py b/app/core/qdrant_points.py index 2efa801..e0b608d 100644 --- a/app/core/qdrant_points.py +++ b/app/core/qdrant_points.py @@ -1,142 +1,333 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Modul: app.core.qdrant_points -Version: 1.7.0 -Datum: 2025-11-08 +app/core/qdrant_points.py Zweck ------ -Einheitliche Upsert-/Delete-Helfer für Notes/Chunks/Edges. -Diese Version ergänzt nur Namen/Wrapper, die von neueren Skripten erwartet werden: +- Gemeinsame Helfer zum Erzeugen von Qdrant-Points für Notes, Chunks und Edges. +- Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py: + - alt: {'edge_type','src_id','dst_id', ...} + - neu: {'kind','source_id','target_id', ...} -Neu/kompatibel: - • upsert_notes(client, cfg, notes: List[dict]) - • upsert_chunks(client, cfg, chunks: List[dict]) - • upsert_edges(client, cfg, edges: List[dict]) - • delete_by_note(client, cfg, note_id: str) +Version +- 1.3 (2025-09-08) -und mappt sie – falls vorhanden – auf bestehende Implementierungen: - • upsert_batch(...) - • delete_by_filter(...) +Änderungen (ggü. 1.2) +- points_for_edges() akzeptiert jetzt beide Edge-Schemata. +- Normalisiert alte Felder auf 'kind' / 'source_id' / 'target_id' und schreibt eine + stabile 'edge_id' zurück in die Payload. +- Verhindert, dass mehrere Edges dieselbe Point-ID erhalten (Root Cause deiner 1-Edge-Sammlung). -Damit bleiben ältere Aufrufer (alt & neu) funktionsfähig. +Aufruf / Verwendung +- Wird von Import-/Backfill-Skripten via: + from app.core.qdrant_points import points_for_note, points_for_chunks, points_for_edges, upsert_batch + eingebunden. Keine CLI. + +Hinweise +- Edges bekommen absichtlich einen 1D-Dummy-Vektor [0.0], damit Qdrant das Objekt akzeptiert. +- Die Point-IDs werden deterministisch aus stabilen Strings (UUIDv5) abgeleitet. """ from __future__ import annotations +import uuid +from typing import List, Tuple +from qdrant_client.http import models as rest -from typing import Dict, List, Optional, Tuple -try: - from qdrant_client import QdrantClient - from qdrant_client.http import models as rest - from qdrant_client.http.models import PointStruct -except Exception as e: # pragma: no cover - raise RuntimeError(f"qdrant_client not available: {e}") +def _names(prefix: str) -> Tuple[str, str, str]: + return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" -# ---------------------------------------------------------------------------- -# Hilfen -# ---------------------------------------------------------------------------- -def _as_points(payloads: List[dict], id_field: Optional[str] = None) -> List[PointStruct]: +def _to_uuid(stable_key: str) -> str: + """Stabile UUIDv5 aus einem String-Key (deterministisch).""" + return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) + + +def points_for_note( + prefix: str, + note_payload: dict, + note_vec: List[float] | None, + dim: int, +) -> Tuple[str, List[rest.PointStruct]]: + """Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim.""" + notes_col, _, _ = _names(prefix) + vector = note_vec if note_vec is not None else [0.0] * int(dim) + raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" + point_id = _to_uuid(raw_note_id) + pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) + return notes_col, [pt] + + +def points_for_chunks( + prefix: str, + chunk_payloads: List[dict], + vectors: List[List[float]], +) -> Tuple[str, List[rest.PointStruct]]: """ - Baut PointStructs aus Payload-Listen. Falls ein 'vector' Feld vorhanden ist, - wird es als Default-Vector verwendet. Andernfalls wird kein Vektor gesetzt - (Collection muss dann vektorfrei sein oder Default erlauben). + Chunks-Collection: erwartet pro Chunk einen Vektor. + Robustheit: + - Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert). + - Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']). """ - pts: List[PointStruct] = [] - for i, pl in enumerate(payloads): - pid = None - if id_field: - pid = pl.get(id_field) - pid = pid or pl.get("id") or pl.get("note_id") or pl.get("edge_id") - vec = pl.get("vector") # optional + _, chunks_col, _ = _names(prefix) + points: List[rest.PointStruct] = [] + for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): + chunk_id = pl.get("chunk_id") or pl.get("id") + if not chunk_id: + note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" + chunk_id = f"{note_id}#{i}" + pl["chunk_id"] = chunk_id + point_id = _to_uuid(chunk_id) + points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) + return chunks_col, points - if vec is None: - pts.append(PointStruct(id=pid, payload=pl)) + +def _normalize_edge_payload(pl: dict) -> dict: + """ + Sorgt für kompatible Feldnamen. + akzeptiert: + - neu: kind, source_id, target_id, seq? + - alt: edge_type, src_id, dst_id, order?/index? + schreibt zurück: kind, source_id, target_id, seq? + """ + # bereits neu? + kind = pl.get("kind") or pl.get("edge_type") or "edge" + source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" + target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" + seq = pl.get("seq") or pl.get("order") or pl.get("index") + + # in Payload zurückschreiben (ohne alte Felder zu entfernen → maximal kompatibel) + pl.setdefault("kind", kind) + pl.setdefault("source_id", source_id) + pl.setdefault("target_id", target_id) + if seq is not None and "seq" not in pl: + pl["seq"] = seq + return pl + + +def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: + """ + Edges-Collection mit 1D-Dummy-Vektor. + - Akzeptiert sowohl neues als auch altes Edge-Schema (siehe _normalize_edge_payload). + - Fehlt 'edge_id', wird sie stabil aus (kind, source_id, target_id, seq) konstruiert. + """ + _, _, edges_col = _names(prefix) + points: List[rest.PointStruct] = [] + for raw in edge_payloads: + pl = _normalize_edge_payload(raw) + + edge_id = pl.get("edge_id") + if not edge_id: + kind = pl.get("kind", "edge") + s = pl.get("source_id", "unknown-src") + t = pl.get("target_id", "unknown-tgt") + seq = pl.get("seq") or "" + edge_id = f"{kind}:{s}->{t}#{seq}" + pl["edge_id"] = edge_id + + point_id = _to_uuid(edge_id) + points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) + return edges_col, points + + +def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> None: + if not points: + return + client.upsert(collection_name=collection, points=points, wait=True) + +# --- WP-04 Ergänzungen: Graph/Retriever Hilfsfunktionen --- +from typing import Optional, Dict, Any, Iterable +from qdrant_client import QdrantClient + +def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: + """Erzeuge OR-Filter: payload[field] == any(values).""" + return rest.Filter( + should=[ + rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) + for v in values + ] + ) + +def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: + """Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert).""" + fs = [f for f in filters if f is not None] + if not fs: + return None + if len(fs) == 1: + return fs[0] + # rest.Filter hat must/should; wir kombinieren als must=[...] + must = [] + for f in fs: + # Überführe vorhandene Bedingungen in must + if getattr(f, "must", None): + must.extend(f.must) + if getattr(f, "should", None): + # "should" als eigene Gruppe beilegen (Qdrant interpretiert OR) + must.append(rest.Filter(should=f.should)) + if getattr(f, "must_not", None): + # negative Bedingungen weiterreichen + if "must_not" not in locals(): + pass + return rest.Filter(must=must) + +def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: + """ + Einfache Filterumsetzung: + - Bei Listenwerten: OR über mehrere MatchValue (field == any(values)) + - Bei Skalarwerten: Gleichheit (field == value) + Für komplexere Filter (z. B. tags ∈ payload.tags) bitte erweitern. + """ + if not filters: + return None + parts = [] + for k, v in filters.items(): + if isinstance(v, (list, tuple, set)): + parts.append(_filter_any(k, [str(x) for x in v])) else: - pts.append(PointStruct(id=pid, vector=vec, payload=pl)) - return pts + parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))])) + return _merge_filters(*parts) - -# ---------------------------------------------------------------------------- -# Bestehende (mögliche) APIs referenzieren, wenn vorhanden -# ---------------------------------------------------------------------------- - -# Platzhalter – werden zur Laufzeit überschrieben, falls alte Funktionen existieren. -_legacy_upsert_batch = None -_legacy_delete_by_filter = None - -try: - # Falls dieses Modul in deiner Codebase bereits upsert_batch bereitstellt, - # referenzieren wir es, um das vorhandene Verhalten 1:1 zu nutzen. - from app.core.qdrant_points import upsert_batch as _legacy_upsert_batch # type: ignore # noqa -except Exception: - pass - -try: - from app.core.qdrant_points import delete_by_filter as _legacy_delete_by_filter # type: ignore # noqa -except Exception: - pass - - -# ---------------------------------------------------------------------------- -# Öffentliche, neue Wrapper-APIs (werden von import_markdown v3.9.x erwartet) -# ---------------------------------------------------------------------------- - -def upsert_notes(client: QdrantClient, cfg, notes: List[dict]) -> None: - if not notes: - return - if _legacy_upsert_batch: - _legacy_upsert_batch(client, cfg.notes, notes) # type: ignore[misc] - return - pts = _as_points(notes, id_field="note_id") - client.upsert(collection_name=cfg.notes, points=pts) - - -def upsert_chunks(client: QdrantClient, cfg, chunks: List[dict]) -> None: - if not chunks: - return - if _legacy_upsert_batch: - _legacy_upsert_batch(client, cfg.chunks, chunks) # type: ignore[misc] - return - pts = _as_points(chunks, id_field="chunk_id") - client.upsert(collection_name=cfg.chunks, points=pts) - - -def upsert_edges(client: QdrantClient, cfg, edges: List[dict]) -> None: - if not edges: - return - if _legacy_upsert_batch: - _legacy_upsert_batch(client, cfg.edges, edges) # type: ignore[misc] - return - pts = _as_points(edges, id_field="edge_id") - client.upsert(collection_name=cfg.edges, points=pts) - - -def delete_by_note(client: QdrantClient, cfg, note_id: str) -> None: +def search_chunks_by_vector( + client: QdrantClient, + prefix: str, + vector: list[float], + top: int = 10, + filters: Optional[Dict[str, Any]] = None, +) -> list[tuple[str, float, dict]]: """ - Löscht alle Chunks/Edges (und optional Notes), die zu einer Note gehören. - Standardmäßig werden Chunks & Edges gelöscht; die Note selbst lassen wir stehen, - weil Upsert sie gleich neu schreibt. Passe das Verhalten nach Bedarf an. + Vektorielle Suche in {prefix}_chunks. + Rückgabe: Liste von (point_id, score, payload) """ - flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + _, chunks_col, _ = _names(prefix) + flt = _filter_from_dict(filters) + res = client.search( + collection_name=chunks_col, + query_vector=vector, + limit=top, + with_payload=True, + with_vectors=False, + query_filter=flt, + ) + out: list[tuple[str, float, dict]] = [] + for r in res: + out.append((str(r.id), float(r.score), dict(r.payload or {}))) + return out + +def get_edges_for_sources( + client: QdrantClient, + prefix: str, + source_ids: list[str], + edge_types: Optional[list[str]] = None, + limit: int = 2048, +) -> list[dict]: + """ + Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types). + Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden). + """ + _, _, edges_col = _names(prefix) + f_src = _filter_any("source_id", source_ids) + f_kind = _filter_any("kind", edge_types) if edge_types else None + flt = _merge_filters(f_src, f_kind) + + collected: list[dict] = [] + next_page = None + while True: + points, next_page = client.scroll( + collection_name=edges_col, + scroll_filter=flt, + limit=min(512, limit - len(collected)), + with_payload=True, + with_vectors=False, + offset=next_page, + ) + for p in points: + pl = dict(p.payload or {}) + # füge die deterministische ID hinzu (nützlich für Clients) + pl.setdefault("id", str(p.id)) + collected.append(pl) + if len(collected) >= limit: + return collected + if next_page is None: + break + return collected + +def get_note_payload( + client: QdrantClient, + prefix: str, + note_id: str, +) -> Optional[dict]: + """ + Hole eine Note anhand ihres payload.note_id (nicht internal UUID!). + """ + notes_col, _, _ = _names(prefix) + flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + points, _ = client.scroll( + collection_name=notes_col, + scroll_filter=flt, + limit=1, + with_payload=True, + with_vectors=False, + ) + if not points: + return None + pl = dict(points[0].payload or {}) + pl.setdefault("id", str(points[0].id)) + return pl + +def get_neighbor_nodes( + client: QdrantClient, + prefix: str, + target_ids: list[str], + limit_per_collection: int = 2048, +) -> dict[str, dict]: + """ + Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs. + IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs. + Rückgabe: Mapping target_id -> payload + """ + notes_col, chunks_col, _ = _names(prefix) + out: dict[str, dict] = {} + + # Notes + flt_notes = _filter_any("note_id", target_ids) + next_page = None + while True: + pts, next_page = client.scroll( + collection_name=notes_col, + scroll_filter=flt_notes, + limit=256, + with_payload=True, + with_vectors=False, + offset=next_page, + ) + for p in pts: + pl = dict(p.payload or {}) + nid = pl.get("note_id") + if nid and nid not in out: + pl.setdefault("id", str(p.id)) + out[nid] = pl + if next_page is None or len(out) >= limit_per_collection: + break # Chunks - if _legacy_delete_by_filter: - _legacy_delete_by_filter(client, cfg.chunks, flt_note) # type: ignore[misc] - else: - client.delete(collection_name=cfg.chunks, points_selector=rest.FilterSelector(filter=flt_note)) + flt_chunks = _filter_any("chunk_id", target_ids) + next_page = None + while True: + pts, next_page = client.scroll( + collection_name=chunks_col, + scroll_filter=flt_chunks, + limit=256, + with_payload=True, + with_vectors=False, + offset=next_page, + ) + for p in pts: + pl = dict(p.payload or {}) + cid = pl.get("chunk_id") + if cid and cid not in out: + pl.setdefault("id", str(p.id)) + out[cid] = pl + if next_page is None or len(out) >= limit_per_collection: + break - # Edges - if _legacy_delete_by_filter: - _legacy_delete_by_filter(client, cfg.edges, flt_note) # type: ignore[misc] - else: - client.delete(collection_name=cfg.edges, points_selector=rest.FilterSelector(filter=flt_note)) - - # Optional auch die Note löschen? In den meisten Flows nicht nötig. - # Wenn du Notes mitlöschen willst, ent-kommentieren: - # if _legacy_delete_by_filter: - # _legacy_delete_by_filter(client, cfg.notes, flt_note) # type: ignore[misc] - # else: - # client.delete(collection_name=cfg.notes, points_selector=rest.FilterSelector(filter=flt_note)) + return out