#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ app/core/qdrant_points.py Zweck - Gemeinsame Helfer zum Erzeugen von Qdrant-Points für Notes, Chunks und Edges. - Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py: - alt: {'edge_type','src_id','dst_id', ...} - neu: {'kind','source_id','target_id', ...} - **NEU (v1.4.0):** Re-Exports/Wrapper für Legacy-Importer: * ensure_collections_for_prefix(...) → delegiert an app.core.qdrant.ensure_collections_for_prefix * collection_names(...) → delegiert an app.core.qdrant.collection_names Version - 1.4.0 (2025-11-08) Aufruf / Verwendung - Wird von Import-/Backfill-Skripten via: from app.core.qdrant_points import ( points_for_note, points_for_chunks, points_for_edges, upsert_batch, ensure_collections_for_prefix, collection_names ) eingebunden. Keine CLI. """ from __future__ import annotations import uuid from typing import List, Tuple, Optional, Iterable, Dict, Any from qdrant_client.http import models as rest from qdrant_client import QdrantClient # Delegation an app.core.qdrant, um Kompatibilität sicherzustellen try: from app.core.qdrant import ( ensure_collections_for_prefix as _ensure_cols_legacy, collection_names as _collection_names, ) except Exception: _ensure_cols_legacy = None _collection_names = None # ------------------------------- # Utility # ------------------------------- def _names(prefix: str) -> Tuple[str, str, str]: if _collection_names: return _collection_names(prefix) return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" def _to_uuid(stable_key: str) -> str: """Stabile UUIDv5 aus einem String-Key (deterministisch).""" return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) # ------------------------------- # Public API (Points) # ------------------------------- def points_for_note( prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int, ) -> Tuple[str, List[rest.PointStruct]]: """Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim.""" notes_col, _, _ = _names(prefix) vector = note_vec if note_vec is not None else [0.0] * int(dim) raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" point_id = _to_uuid(raw_note_id) pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) return notes_col, [pt] def points_for_chunks( prefix: str, chunk_payloads: List[dict], vectors: List[List[float]], ) -> Tuple[str, List[rest.PointStruct]]: """ Chunks-Collection: erwartet pro Chunk einen Vektor. Robustheit: - Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert). - Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']). """ _, chunks_col, _ = _names(prefix) points: List[rest.PointStruct] = [] for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): chunk_id = pl.get("chunk_id") or pl.get("id") if not chunk_id: note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" chunk_id = f"{note_id}#{i}" pl["chunk_id"] = chunk_id point_id = _to_uuid(chunk_id) points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) return chunks_col, points def _normalize_edge_payload(pl: dict) -> dict: """ Sorgt für kompatible Feldnamen. akzeptiert: - neu: kind, source_id, target_id, seq? - alt: edge_type, src_id, dst_id, order?/index? schreibt zurück: kind, source_id, target_id, seq? """ kind = pl.get("kind") or pl.get("edge_type") or "edge" source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" seq = pl.get("seq") or pl.get("order") or pl.get("index") # in Payload zurückschreiben (ohne alte Felder zu entfernen → maximal kompatibel) pl.setdefault("kind", kind) pl.setdefault("source_id", source_id) pl.setdefault("target_id", target_id) if seq is not None and "seq" not in pl: pl["seq"] = seq return pl def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: """ Edges-Collection mit 1D-Dummy-Vektor. - Akzeptiert sowohl neues als auch altes Edge-Schema (siehe _normalize_edge_payload). - Fehlt 'edge_id', wird sie stabil aus (kind, source_id, target_id, seq) konstruiert. """ _, _, edges_col = _names(prefix) points: List[rest.PointStruct] = [] for raw in edge_payloads: pl = _normalize_edge_payload(raw) edge_id = pl.get("edge_id") if not edge_id: kind = pl.get("kind", "edge") s = pl.get("source_id", "unknown-src") t = pl.get("target_id", "unknown-tgt") seq = pl.get("seq") or "" edge_id = f"{kind}:{s}->{t}#{seq}" pl["edge_id"] = edge_id point_id = _to_uuid(edge_id) points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) return edges_col, points def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None: if not points: return client.upsert(collection_name=collection, points=points, wait=True) # ------------------------------- # NEU: Legacy-Wrapper (Re-Exports) # ------------------------------- def ensure_collections_for_prefix(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> Tuple[str, str, str]: """ Für ältere Importer, die diese Funktion aus qdrant_points importieren. Delegiert an app.core.qdrant.ensure_collections_for_prefix (falls vorhanden), sonst minimaler Fallback via local _names(..). """ if _ensure_cols_legacy: return _ensure_cols_legacy(client, prefix, dim, destructive=destructive) # Fallback: keine Seiteneffekte, nur Namen liefern (Collections sollten vorher existieren) return _names(prefix) # ------------------------------- # WP-04 Helfer (Suche/Graph) — unverändert zu v1.3, hier nur der Vollständigkeit halber # ------------------------------- def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: """Erzeuge OR-Filter: payload[field] == any(values).""" return rest.Filter( should=[ rest.FieldCondition(key=field, match=rest.MatchValue(value=str(v))) for v in values ] ) def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: """Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert).""" fs = [f for f in filters if f is not None] if not fs: return None if len(fs) == 1: return fs[0] must = [] for f in fs: if getattr(f, "must", None): must.extend(f.must) if getattr(f, "should", None): must.append(rest.Filter(should=f.should)) if getattr(f, "must_not", None): # Negative Bedingungen beibehalten if isinstance(f.must_not, list) and f.must_not: if not hasattr(_merge_filters, "_warned"): _merge_filters._warned = True # type: ignore[attr-defined] # Qdrant erlaubt must_not auf Top-Level; hier nicht zusammengeführt return rest.Filter(must=must) def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: if not filters: return None parts = [] for k, v in filters.items(): if isinstance(v, (list, tuple, set)): parts.append(_filter_any(k, [str(x) for x in v])) else: parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=str(v)))])) return _merge_filters(*parts) def search_chunks_by_vector( client: QdrantClient, prefix: str, vector: list[float], top: int = 10, filters: Optional[Dict[str, Any]] = None, ) -> list[tuple[str, float, dict]]: """ Vektorielle Suche in {prefix}_chunks. Rückgabe: Liste von (point_id, score, payload) """ _, chunks_col, _ = _names(prefix) flt = _filter_from_dict(filters) res = client.search( collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt, ) out: list[tuple[str, float, dict]] = [] for r in res: out.append((str(r.id), float(r.score), dict(r.payload or {}))) return out def get_edges_for_sources( client: QdrantClient, prefix: str, source_ids: list[str], edge_types: Optional[list[str]] = None, limit: int = 2048, ) -> list[dict]: """ Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types). Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden). """ _, _, edges_col = _names(prefix) f_src = _filter_any("source_id", source_ids) f_kind = _filter_any("kind", edge_types) if edge_types else None flt = _merge_filters(f_src, f_kind) collected: list[dict] = [] next_page = None while True: points, next_page = client.scroll( collection_name=edges_col, scroll_filter=flt, limit=min(512, limit - len(collected)), with_payload=True, with_vectors=False, offset=next_page, ) for p in points: pl = dict(p.payload or {}) pl.setdefault("id", str(p.id)) collected.append(pl) if len(collected) >= limit: return collected if next_page is None: break return collected def get_note_payload( client: QdrantClient, prefix: str, note_id: str, ) -> Optional[dict]: """ Hole eine Note anhand ihres payload.note_id (nicht internal UUID!). """ notes_col, _, _ = _names(prefix) flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) points, _ = client.scroll( collection_name=notes_col, scroll_filter=flt, limit=1, with_payload=True, with_vectors=False, ) if not points: return None pl = dict(points[0].payload or {}) pl.setdefault("id", str(points[0].id)) return pl def get_neighbor_nodes( client: QdrantClient, prefix: str, target_ids: list[str], limit_per_collection: int = 2048, ) -> dict[str, dict]: """ Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs. IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs. Rückgabe: Mapping target_id -> payload """ notes_col, chunks_col, _ = _names(prefix) out: dict[str, dict] = {} # Notes flt_notes = _filter_any("note_id", target_ids) next_page = None while True: pts, next_page = client.scroll( collection_name=notes_col, scroll_filter=flt_notes, limit=256, with_payload=True, with_vectors=False, offset=next_page, ) for p in pts: pl = dict(p.payload or {}) nid = pl.get("note_id") if nid and nid not in out: pl.setdefault("id", str(p.id)) out[nid] = pl if next_page is None or len(out) >= limit_per_collection: break # Chunks flt_chunks = _filter_any("chunk_id", target_ids) next_page = None while True: pts, next_page = client.scroll( collection_name=chunks_col, scroll_filter=flt_chunks, limit=256, with_payload=True, with_vectors=False, offset=next_page, ) for p in pts: pl = dict(p.payload or {}) cid = pl.get("chunk_id") if cid and cid not in out: pl.setdefault("id", str(p.id)) out[cid] = pl if next_page is None or len(out) >= limit_per_collection: break return out