diff --git a/app/core/qdrant_points.py b/app/core/qdrant_points.py index e0b608d..ffd39f1 100644 --- a/app/core/qdrant_points.py +++ b/app/core/qdrant_points.py @@ -1,55 +1,36 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -app/core/qdrant_points.py +app/core/qdrant_points.py — robust points helpers for Qdrant -Zweck -- Gemeinsame Helfer zum Erzeugen von Qdrant-Points für Notes, Chunks und Edges. -- Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py: - - alt: {'edge_type','src_id','dst_id', ...} - - neu: {'kind','source_id','target_id', ...} +- Single source of truth for building PointStruct for notes/chunks/edges +- Backward-compatible to older payload schemas for edges +- NEW: Upsert path auto-detects collection vector schema (single vs named vectors) + and coerces points accordingly to avoid 'Not existing vector name' errors. -Version -- 1.3 (2025-09-08) - -Änderungen (ggü. 1.2) -- points_for_edges() akzeptiert jetzt beide Edge-Schemata. -- Normalisiert alte Felder auf 'kind' / 'source_id' / 'target_id' und schreibt eine - stabile 'edge_id' zurück in die Payload. -- Verhindert, dass mehrere Edges dieselbe Point-ID erhalten (Root Cause deiner 1-Edge-Sammlung). - -Aufruf / Verwendung -- Wird von Import-/Backfill-Skripten via: - from app.core.qdrant_points import points_for_note, points_for_chunks, points_for_edges, upsert_batch - eingebunden. Keine CLI. - -Hinweise -- Edges bekommen absichtlich einen 1D-Dummy-Vektor [0.0], damit Qdrant das Objekt akzeptiert. -- Die Point-IDs werden deterministisch aus stabilen Strings (UUIDv5) abgeleitet. +Version: 1.4.0 (2025-11-08) """ - from __future__ import annotations +import os import uuid -from typing import List, Tuple -from qdrant_client.http import models as rest +from typing import List, Tuple, Iterable, Optional, Dict, Any +from qdrant_client.http import models as rest +from qdrant_client import QdrantClient + +# --------------------- ID helpers --------------------- + +def _to_uuid(stable_key: str) -> str: + """Deterministic UUIDv5 from a stable string key.""" + return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) def _names(prefix: str) -> Tuple[str, str, str]: return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" +# --------------------- Notes / Chunks --------------------- -def _to_uuid(stable_key: str) -> str: - """Stabile UUIDv5 aus einem String-Key (deterministisch).""" - return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) - - -def points_for_note( - prefix: str, - note_payload: dict, - note_vec: List[float] | None, - dim: int, -) -> Tuple[str, List[rest.PointStruct]]: - """Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim.""" +def points_for_note(prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int) -> Tuple[str, List[rest.PointStruct]]: + """Notes-Collection: if no note embedding -> zero vector of length dim.""" notes_col, _, _ = _names(prefix) vector = note_vec if note_vec is not None else [0.0] * int(dim) raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" @@ -57,18 +38,8 @@ def points_for_note( pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) return notes_col, [pt] - -def points_for_chunks( - prefix: str, - chunk_payloads: List[dict], - vectors: List[List[float]], -) -> Tuple[str, List[rest.PointStruct]]: - """ - Chunks-Collection: erwartet pro Chunk einen Vektor. - Robustheit: - - Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert). - - Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']). - """ +def points_for_chunks(prefix: str, chunk_payloads: List[dict], vectors: List[List[float]]) -> Tuple[str, List[rest.PointStruct]]: + """Create point structs for the chunk collection (expects one vector per chunk).""" _, chunks_col, _ = _names(prefix) points: List[rest.PointStruct] = [] for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): @@ -81,22 +52,15 @@ def points_for_chunks( points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) return chunks_col, points +# --------------------- Edges --------------------- def _normalize_edge_payload(pl: dict) -> dict: - """ - Sorgt für kompatible Feldnamen. - akzeptiert: - - neu: kind, source_id, target_id, seq? - - alt: edge_type, src_id, dst_id, order?/index? - schreibt zurück: kind, source_id, target_id, seq? - """ - # bereits neu? + """Normalize edge payload keys to a common schema.""" kind = pl.get("kind") or pl.get("edge_type") or "edge" source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" seq = pl.get("seq") or pl.get("order") or pl.get("index") - # in Payload zurückschreiben (ohne alte Felder zu entfernen → maximal kompatibel) pl.setdefault("kind", kind) pl.setdefault("source_id", source_id) pl.setdefault("target_id", target_id) @@ -104,18 +68,12 @@ def _normalize_edge_payload(pl: dict) -> dict: pl["seq"] = seq return pl - def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: - """ - Edges-Collection mit 1D-Dummy-Vektor. - - Akzeptiert sowohl neues als auch altes Edge-Schema (siehe _normalize_edge_payload). - - Fehlt 'edge_id', wird sie stabil aus (kind, source_id, target_id, seq) konstruiert. - """ + """Edges collection (1D dummy vector).""" _, _, edges_col = _names(prefix) points: List[rest.PointStruct] = [] for raw in edge_payloads: pl = _normalize_edge_payload(raw) - edge_id = pl.get("edge_id") if not edge_id: kind = pl.get("kind", "edge") @@ -124,59 +82,95 @@ def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[ seq = pl.get("seq") or "" edge_id = f"{kind}:{s}->{t}#{seq}" pl["edge_id"] = edge_id - point_id = _to_uuid(edge_id) points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) return edges_col, points +# --------------------- Vector schema detection --------------------- -def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> None: +def _preferred_name(candidates: List[str]) -> str: + """Pick a preferred vector name using env overrides then common fallbacks.""" + env_prefs = [ + os.getenv("NOTES_VECTOR_NAME"), + os.getenv("CHUNKS_VECTOR_NAME"), + os.getenv("EDGES_VECTOR_NAME"), + os.getenv("MINDNET_VECTOR_NAME"), + os.getenv("QDRANT_VECTOR_NAME"), + ] + for p in env_prefs: + if p and p in candidates: + return p + for k in ("text", "default", "embedding", "content"): + if k in candidates: + return k + return sorted(candidates)[0] + +def _get_vector_schema(client: QdrantClient, collection_name: str) -> dict: + """Return {"kind": "single", "size": int} or {"kind": "named", "names": [...], "primary": str}.""" + try: + info = client.get_collection(collection_name=collection_name) + vecs = getattr(info, "vectors", None) + if hasattr(vecs, "size") and isinstance(vecs.size, int): + return {"kind": "single", "size": vecs.size} + cfg = getattr(vecs, "config", None) + if isinstance(cfg, dict) and cfg: + names = list(cfg.keys()) + if names: + return {"kind": "named", "names": names, "primary": _preferred_name(names)} + except Exception: + pass + return {"kind": "single", "size": None} + +def _coerce_for_collection(client: QdrantClient, collection_name: str, points: List[rest.PointStruct]) -> List[rest.PointStruct]: + """If collection uses named vectors, convert vector=[...] -> vector={name: [...]}""" + try: + schema = _get_vector_schema(client, collection_name) + if schema.get("kind") != "named": + return points + primary = schema.get("primary") + if not primary: + return points + fixed: List[rest.PointStruct] = [] + for pt in points: + vec = getattr(pt, "vector", None) + if isinstance(vec, dict): + fixed.append(pt) # already named + elif vec is not None: + fixed.append(rest.PointStruct(id=pt.id, vector={primary: vec}, payload=pt.payload)) + else: + fixed.append(pt) # edges with no vector (shouldn't happen) or already correct + return fixed + except Exception: + return points + +# --------------------- Qdrant ops --------------------- + +def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None: if not points: return - client.upsert(collection_name=collection, points=points, wait=True) + pts = _coerce_for_collection(client, collection, points) + client.upsert(collection_name=collection, points=pts, wait=True) -# --- WP-04 Ergänzungen: Graph/Retriever Hilfsfunktionen --- -from typing import Optional, Dict, Any, Iterable -from qdrant_client import QdrantClient +# --- Optional search helpers --- def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: - """Erzeuge OR-Filter: payload[field] == any(values).""" - return rest.Filter( - should=[ - rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) - for v in values - ] - ) + return rest.Filter(should=[rest.FieldCondition(key=field, match=rest.MatchValue(value=v)) for v in values]) def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: - """Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert).""" fs = [f for f in filters if f is not None] if not fs: return None if len(fs) == 1: return fs[0] - # rest.Filter hat must/should; wir kombinieren als must=[...] must = [] for f in fs: - # Überführe vorhandene Bedingungen in must if getattr(f, "must", None): must.extend(f.must) if getattr(f, "should", None): - # "should" als eigene Gruppe beilegen (Qdrant interpretiert OR) must.append(rest.Filter(should=f.should)) - if getattr(f, "must_not", None): - # negative Bedingungen weiterreichen - if "must_not" not in locals(): - pass return rest.Filter(must=must) def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: - """ - Einfache Filterumsetzung: - - Bei Listenwerten: OR über mehrere MatchValue (field == any(values)) - - Bei Skalarwerten: Gleichheit (field == value) - Für komplexere Filter (z. B. tags ∈ payload.tags) bitte erweitern. - """ if not filters: return None parts = [] @@ -187,147 +181,11 @@ def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))])) return _merge_filters(*parts) -def search_chunks_by_vector( - client: QdrantClient, - prefix: str, - vector: list[float], - top: int = 10, - filters: Optional[Dict[str, Any]] = None, -) -> list[tuple[str, float, dict]]: - """ - Vektorielle Suche in {prefix}_chunks. - Rückgabe: Liste von (point_id, score, payload) - """ +def search_chunks_by_vector(client: QdrantClient, prefix: str, vector: List[float], top: int = 10, filters: Optional[Dict[str, Any]] = None) -> List[Tuple[str, float, dict]]: _, chunks_col, _ = _names(prefix) flt = _filter_from_dict(filters) - res = client.search( - collection_name=chunks_col, - query_vector=vector, - limit=top, - with_payload=True, - with_vectors=False, - query_filter=flt, - ) - out: list[tuple[str, float, dict]] = [] + res = client.search(collection_name=chunks_col, query_vector=vector, limit=top, with_payload=True, with_vectors=False, query_filter=flt) + out: List[Tuple[str, float, dict]] = [] for r in res: out.append((str(r.id), float(r.score), dict(r.payload or {}))) return out - -def get_edges_for_sources( - client: QdrantClient, - prefix: str, - source_ids: list[str], - edge_types: Optional[list[str]] = None, - limit: int = 2048, -) -> list[dict]: - """ - Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types). - Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden). - """ - _, _, edges_col = _names(prefix) - f_src = _filter_any("source_id", source_ids) - f_kind = _filter_any("kind", edge_types) if edge_types else None - flt = _merge_filters(f_src, f_kind) - - collected: list[dict] = [] - next_page = None - while True: - points, next_page = client.scroll( - collection_name=edges_col, - scroll_filter=flt, - limit=min(512, limit - len(collected)), - with_payload=True, - with_vectors=False, - offset=next_page, - ) - for p in points: - pl = dict(p.payload or {}) - # füge die deterministische ID hinzu (nützlich für Clients) - pl.setdefault("id", str(p.id)) - collected.append(pl) - if len(collected) >= limit: - return collected - if next_page is None: - break - return collected - -def get_note_payload( - client: QdrantClient, - prefix: str, - note_id: str, -) -> Optional[dict]: - """ - Hole eine Note anhand ihres payload.note_id (nicht internal UUID!). - """ - notes_col, _, _ = _names(prefix) - flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - points, _ = client.scroll( - collection_name=notes_col, - scroll_filter=flt, - limit=1, - with_payload=True, - with_vectors=False, - ) - if not points: - return None - pl = dict(points[0].payload or {}) - pl.setdefault("id", str(points[0].id)) - return pl - -def get_neighbor_nodes( - client: QdrantClient, - prefix: str, - target_ids: list[str], - limit_per_collection: int = 2048, -) -> dict[str, dict]: - """ - Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs. - IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs. - Rückgabe: Mapping target_id -> payload - """ - notes_col, chunks_col, _ = _names(prefix) - out: dict[str, dict] = {} - - # Notes - flt_notes = _filter_any("note_id", target_ids) - next_page = None - while True: - pts, next_page = client.scroll( - collection_name=notes_col, - scroll_filter=flt_notes, - limit=256, - with_payload=True, - with_vectors=False, - offset=next_page, - ) - for p in pts: - pl = dict(p.payload or {}) - nid = pl.get("note_id") - if nid and nid not in out: - pl.setdefault("id", str(p.id)) - out[nid] = pl - if next_page is None or len(out) >= limit_per_collection: - break - - # Chunks - flt_chunks = _filter_any("chunk_id", target_ids) - next_page = None - while True: - pts, next_page = client.scroll( - collection_name=chunks_col, - scroll_filter=flt_chunks, - limit=256, - with_payload=True, - with_vectors=False, - offset=next_page, - ) - for p in pts: - pl = dict(p.payload or {}) - cid = pl.get("chunk_id") - if cid and cid not in out: - pl.setdefault("id", str(p.id)) - out[cid] = pl - if next_page is None or len(out) >= limit_per_collection: - break - - return out