diff --git a/app/core/qdrant_points.py b/app/core/qdrant_points.py index d352fcf..ca22585 100644 --- a/app/core/qdrant_points.py +++ b/app/core/qdrant_points.py @@ -1,353 +1,286 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -app/core/qdrant_points.py +Name: app/core/qdrant_points.py +Version: v1.8.0 (2025-11-08) -Zweck -- Gemeinsame Helfer zum Erzeugen von Qdrant-Points für Notes, Chunks und Edges. -- Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py: - - alt: {'edge_type','src_id','dst_id', ...} - - neu: {'kind','source_id','target_id', ...} -- **NEU (v1.4.0):** Re-Exports/Wrapper für Legacy-Importer: - * ensure_collections_for_prefix(...) → delegiert an app.core.qdrant.ensure_collections_for_prefix - * collection_names(...) → delegiert an app.core.qdrant.collection_names +Kurzbeschreibung + Punkt-Operationen (Upsert/Delete/Scroll) für mindnet: + - Notes/Chunks/Edges als einzelne Upsert-Helper + - Generischer upsert_batch(...) + - Purge/Delete-Helfer je Note (für --purge-before-upsert) + - Edge-Payload-Normalisierung + deterministische edge_id + - Wrapper ensure_collections_for_prefix(...), delegiert an app.core.qdrant -Version -- 1.4.0 (2025-11-08) +Abwärtskompatibilität + * Beibehaltung der bisherigen Funktionsnamen: + - upsert_notes(...), upsert_chunks(...), upsert_edges(...) + - upsert_batch(...) + - delete_note_scope(...) + - ensure_collections_for_prefix(...) -Aufruf / Verwendung -- Wird von Import-/Backfill-Skripten via: - from app.core.qdrant_points import ( - points_for_note, points_for_chunks, points_for_edges, upsert_batch, - ensure_collections_for_prefix, collection_names - ) - eingebunden. Keine CLI. + * Robust ggü. qdrant_client-Versionen (MatchValue-Konstruktor etc.). + +Erwartete Collections + - {prefix}_notes + - {prefix}_chunks + - {prefix}_edges (1D Dummy-Vektor) """ + from __future__ import annotations -import uuid -from typing import List, Tuple, Optional, Iterable, Dict, Any -from qdrant_client.http import models as rest +import hashlib +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple + from qdrant_client import QdrantClient +from qdrant_client.http import models as rest -# Delegation an app.core.qdrant, um Kompatibilität sicherzustellen -try: - from app.core.qdrant import ( - ensure_collections_for_prefix as _ensure_cols_legacy, - collection_names as _collection_names, - ) -except Exception: - _ensure_cols_legacy = None - _collection_names = None +# Delegation auf zentrale Collection-Helfer +from app.core.qdrant import ( + collection_names, + ensure_collections as _ensure_collections, + ensure_payload_indexes as _ensure_payload_indexes, +) -# ------------------------------- -# Utility -# ------------------------------- -def _names(prefix: str) -> Tuple[str, str, str]: - if _collection_names: - return _collection_names(prefix) - return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" - -def _to_uuid(stable_key: str) -> str: - """Stabile UUIDv5 aus einem String-Key (deterministisch).""" - return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) +# --------------------------------------------------------------------------- +# Utility: MatchValue versionstolerant +# --------------------------------------------------------------------------- +def _match_value(value: Any): + try: + return rest.MatchValue(value=value) + except TypeError: + return rest.MatchValue(value) # ältere Signatur -# ------------------------------- -# Public API (Points) -# ------------------------------- -def points_for_note( - prefix: str, - note_payload: dict, - note_vec: List[float] | None, - dim: int, -) -> Tuple[str, List[rest.PointStruct]]: - """Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim.""" - notes_col, _, _ = _names(prefix) - vector = note_vec if note_vec is not None else [0.0] * int(dim) - raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" - point_id = _to_uuid(raw_note_id) - pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) - return notes_col, [pt] - - -def points_for_chunks( - prefix: str, - chunk_payloads: List[dict], - vectors: List[List[float]], -) -> Tuple[str, List[rest.PointStruct]]: +# --------------------------------------------------------------------------- +# Edge-Normalisierung & deterministische edge_id +# --------------------------------------------------------------------------- +def normalize_edge_payload(pl: Dict[str, Any]) -> Dict[str, Any]: """ - Chunks-Collection: erwartet pro Chunk einen Vektor. - Robustheit: - - Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert). - - Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']). + Vereinheitlicht ältere/neuere Edge-Payload-Varianten: + alt: edge_type/src_id/dst_id + neu: kind/source_id/target_id + Ergänzt: scope, note_id (falls sinnvoll), seq + Bildet: edge_id deterministisch (uuidv5-ähnliche Signatur via sha256) """ - _, chunks_col, _ = _names(prefix) - points: List[rest.PointStruct] = [] - for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): - chunk_id = pl.get("chunk_id") or pl.get("id") - if not chunk_id: - note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" - chunk_id = f"{note_id}#{i}" - pl["chunk_id"] = chunk_id - point_id = _to_uuid(chunk_id) - points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) - return chunks_col, points + out = dict(pl or {}) + # Feldnormalisierung + if "kind" not in out and "edge_type" in out: + out["kind"] = out.pop("edge_type") -def _normalize_edge_payload(pl: dict) -> dict: - """ - Sorgt für kompatible Feldnamen. - akzeptiert: - - neu: kind, source_id, target_id, seq? - - alt: edge_type, src_id, dst_id, order?/index? - schreibt zurück: kind, source_id, target_id, seq? - """ - kind = pl.get("kind") or pl.get("edge_type") or "edge" - source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src" - target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt" - seq = pl.get("seq") or pl.get("order") or pl.get("index") + if "source_id" not in out and "src_id" in out: + out["source_id"] = out.pop("src_id") - # in Payload zurückschreiben (ohne alte Felder zu entfernen → maximal kompatibel) - pl.setdefault("kind", kind) - pl.setdefault("source_id", source_id) - pl.setdefault("target_id", target_id) - if seq is not None and "seq" not in pl: - pl["seq"] = seq - return pl + if "target_id" not in out and "dst_id" in out: + out["target_id"] = out.pop("dst_id") + # Defaults + out.setdefault("scope", "note") # "note" | "chunk" + out.setdefault("seq", 0) -def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: - """ - Edges-Collection mit 1D-Dummy-Vektor. - - Akzeptiert sowohl neues als auch altes Edge-Schema (siehe _normalize_edge_payload). - - Fehlt 'edge_id', wird sie stabil aus (kind, source_id, target_id, seq) konstruiert. - """ - _, _, edges_col = _names(prefix) - points: List[rest.PointStruct] = [] - for raw in edge_payloads: - pl = _normalize_edge_payload(raw) + # deterministische edge_id + kind = str(out.get("kind", "unknown")) + scope = str(out.get("scope", "note")) + src = str(out.get("source_id", "")) + dst = str(out.get("target_id", "")) + seq = str(out.get("seq", 0)) + sig = f"{kind}|{scope}|{src}|{dst}|{seq}" + edge_id = hashlib.sha256(sig.encode("utf-8")).hexdigest() + out["edge_id"] = edge_id - edge_id = pl.get("edge_id") - if not edge_id: - kind = pl.get("kind", "edge") - s = pl.get("source_id", "unknown-src") - t = pl.get("target_id", "unknown-tgt") - seq = pl.get("seq") or "" - edge_id = f"{kind}:{s}->{t}#{seq}" - pl["edge_id"] = edge_id - - point_id = _to_uuid(edge_id) - points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) - return edges_col, points - - -def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None: - if not points: - return - client.upsert(collection_name=collection, points=points, wait=True) - - -# ------------------------------- -# NEU: Legacy-Wrapper (Re-Exports) -# ------------------------------- -def ensure_collections_for_prefix(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> Tuple[str, str, str]: - """ - Für ältere Importer, die diese Funktion aus qdrant_points importieren. - Delegiert an app.core.qdrant.ensure_collections_for_prefix (falls vorhanden), - sonst minimaler Fallback via local _names(..). - """ - if _ensure_cols_legacy: - return _ensure_cols_legacy(client, prefix, dim, destructive=destructive) - # Fallback: keine Seiteneffekte, nur Namen liefern (Collections sollten vorher existieren) - return _names(prefix) - - -# ------------------------------- -# WP-04 Helfer (Suche/Graph) — unverändert zu v1.3, hier nur der Vollständigkeit halber -# ------------------------------- -def _filter_any(field: str, values: Iterable[str]) -> rest.Filter: - """Erzeuge OR-Filter: payload[field] == any(values).""" - return rest.Filter( - should=[ - rest.FieldCondition(key=field, match=rest.MatchValue(value=str(v))) - for v in values - ] - ) - -def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]: - """Fasst mehrere Filter zu einem AND zusammen (None wird ignoriert).""" - fs = [f for f in filters if f is not None] - if not fs: - return None - if len(fs) == 1: - return fs[0] - must = [] - for f in fs: - if getattr(f, "must", None): - must.extend(f.must) - if getattr(f, "should", None): - must.append(rest.Filter(should=f.should)) - if getattr(f, "must_not", None): - # Negative Bedingungen beibehalten - if isinstance(f.must_not, list) and f.must_not: - if not hasattr(_merge_filters, "_warned"): - _merge_filters._warned = True # type: ignore[attr-defined] - # Qdrant erlaubt must_not auf Top-Level; hier nicht zusammengeführt - return rest.Filter(must=must) - -def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]: - if not filters: - return None - parts = [] - for k, v in filters.items(): - if isinstance(v, (list, tuple, set)): - parts.append(_filter_any(k, [str(x) for x in v])) - else: - parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=str(v)))])) - return _merge_filters(*parts) - -def search_chunks_by_vector( - client: QdrantClient, - prefix: str, - vector: list[float], - top: int = 10, - filters: Optional[Dict[str, Any]] = None, -) -> list[tuple[str, float, dict]]: - """ - Vektorielle Suche in {prefix}_chunks. - Rückgabe: Liste von (point_id, score, payload) - """ - _, chunks_col, _ = _names(prefix) - flt = _filter_from_dict(filters) - res = client.search( - collection_name=chunks_col, - query_vector=vector, - limit=top, - with_payload=True, - with_vectors=False, - query_filter=flt, - ) - out: list[tuple[str, float, dict]] = [] - for r in res: - out.append((str(r.id), float(r.score), dict(r.payload or {}))) return out -def get_edges_for_sources( + +# --------------------------------------------------------------------------- +# Ensure Collections (Wrapper für ältere Importe) +# --------------------------------------------------------------------------- +def ensure_collections_for_prefix( + client: QdrantClient, prefix: str, dim: int, destructive: bool = False +) -> Tuple[str, str, str]: + """ + Wrapper, damit ältere Aufrufer weiterhin funktionieren. + Erstellt Collections + Payload-Indizes, falls noch nicht vorhanden. + """ + _ensure_collections(client, prefix, dim, destructive=destructive) + _ensure_payload_indexes(client, prefix) + return collection_names(prefix) + + +# --------------------------------------------------------------------------- +# Upsert: Generisch +# --------------------------------------------------------------------------- +def upsert_batch( client: QdrantClient, - prefix: str, - source_ids: list[str], - edge_types: Optional[list[str]] = None, - limit: int = 2048, -) -> list[dict]: + collection: str, + payloads: Sequence[Dict[str, Any]], + *, + point_id_field: Optional[str] = None, + vectors: Optional[Sequence[Sequence[float]]] = None, + ids: Optional[Sequence[Any]] = None, + wait: bool = True, +) -> None: """ - Hole Edges aus {prefix}_edges mit source_id ∈ source_ids (und optional kind ∈ edge_types). - Liefert Payload-Dicts inkl. edge_id/source_id/target_id/kind/seq (falls vorhanden). + Generische Upsert-Funktion. + - point_id_field: wenn gesetzt, wird der Point-ID aus payload[point_id_field] entnommen. + - vectors: optional gleich lang wie payloads. + - ids: explizite Point-IDs, alternativ zu point_id_field. + + Priorität Point-ID: + 1) ids[i], falls übergeben + 2) payload[point_id_field], falls gesetzt + 3) None → Qdrant vergibt selbst (nicht empfohlen für mindnet) """ - _, _, edges_col = _names(prefix) - f_src = _filter_any("source_id", source_ids) - f_kind = _filter_any("kind", edge_types) if edge_types else None - flt = _merge_filters(f_src, f_kind) + pts: List[rest.PointStruct] = [] + for i, pl in enumerate(payloads): + pid = None + if ids is not None: + pid = ids[i] + elif point_id_field: + pid = pl.get(point_id_field) - collected: list[dict] = [] - next_page = None - while True: - points, next_page = client.scroll( - collection_name=edges_col, - scroll_filter=flt, - limit=min(512, limit - len(collected)), - with_payload=True, - with_vectors=False, - offset=next_page, - ) - for p in points: - pl = dict(p.payload or {}) - pl.setdefault("id", str(p.id)) - collected.append(pl) - if len(collected) >= limit: - return collected - if next_page is None: - break - return collected + vec = None + if vectors is not None: + vec = vectors[i] -def get_note_payload( + pts.append(rest.PointStruct(id=pid, vector=vec, payload=pl)) + + client.upsert(collection_name=collection, points=pts, wait=wait) + + +# --------------------------------------------------------------------------- +# Upsert: Notes / Chunks / Edges +# --------------------------------------------------------------------------- +def upsert_notes( + client: QdrantClient, + notes_collection: str, + note_payloads: Sequence[Dict[str, Any]], + *, + note_vectors: Optional[Sequence[Sequence[float]]] = None, + wait: bool = True, +) -> None: + """ + Upsert für Notizen. Point-ID = payload['note_id']. + """ + upsert_batch( + client, + collection=notes_collection, + payloads=note_payloads, + point_id_field="note_id", + vectors=note_vectors, + wait=wait, + ) + + +def upsert_chunks( + client: QdrantClient, + chunks_collection: str, + chunk_payloads: Sequence[Dict[str, Any]], + *, + chunk_vectors: Optional[Sequence[Sequence[float]]] = None, + wait: bool = True, +) -> None: + """ + Upsert für Chunks. Point-ID = payload['chunk_id']. + """ + upsert_batch( + client, + collection=chunks_collection, + payloads=chunk_payloads, + point_id_field="chunk_id", + vectors=chunk_vectors, + wait=wait, + ) + + +def upsert_edges( + client: QdrantClient, + edges_collection: str, + edge_payloads: Sequence[Dict[str, Any]], + *, + wait: bool = True, +) -> None: + """ + Upsert für Edges. Point-ID = payload['edge_id']. + Vektor: 1D Dummy (0.0), Collection ist auf 1D konfiguriert. + """ + normalized: List[Dict[str, Any]] = [normalize_edge_payload(pl) for pl in edge_payloads] + dummy_vectors = [[0.0] for _ in normalized] + upsert_batch( + client, + collection=edges_collection, + payloads=normalized, + point_id_field="edge_id", + vectors=dummy_vectors, + wait=wait, + ) + + +# --------------------------------------------------------------------------- +# Delete (für --purge-before-upsert und Sync) +# --------------------------------------------------------------------------- +def delete_by_filter(client: QdrantClient, collection: str, flt: rest.Filter, *, wait: bool = True) -> None: + client.delete(collection_name=collection, points_selector=rest.FilterSelector(filter=flt), wait=wait) + + +def delete_note_scope( client: QdrantClient, prefix: str, note_id: str, -) -> Optional[dict]: + *, + include_edges: bool = True, + include_chunks: bool = True, + include_note: bool = False, + wait: bool = True, +) -> None: """ - Hole eine Note anhand ihres payload.note_id (nicht internal UUID!). + Löscht alle Chunks/Edges (und optional die Note) für eine Note. + Wird typischerweise vor einem Upsert genutzt, wenn --purge-before-upsert aktiv ist. """ - notes_col, _, _ = _names(prefix) - flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - points, _ = client.scroll( - collection_name=notes_col, - scroll_filter=flt, - limit=1, - with_payload=True, - with_vectors=False, - ) - if not points: - return None - pl = dict(points[0].payload or {}) - pl.setdefault("id", str(points[0].id)) - return pl + notes_col, chunks_col, edges_col = collection_names(prefix) -def get_neighbor_nodes( - client: QdrantClient, - prefix: str, - target_ids: list[str], - limit_per_collection: int = 2048, -) -> dict[str, dict]: - """ - Hole Payloads der Zielknoten (Notes/Chunks) zu den angegebenen IDs. - IDs sind die stabilen payload-IDs (note_id/chunk_id), nicht internal UUIDs. - Rückgabe: Mapping target_id -> payload - """ - notes_col, chunks_col, _ = _names(prefix) - out: dict[str, dict] = {} + if include_chunks: + flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) + delete_by_filter(client, chunks_col, flt, wait=wait) - # Notes - flt_notes = _filter_any("note_id", target_ids) + if include_edges: + flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) + delete_by_filter(client, edges_col, flt, wait=wait) + + if include_note: + flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) + delete_by_filter(client, notes_col, flt, wait=wait) + + +# --------------------------------------------------------------------------- +# Simple Queries (Scroll) +# --------------------------------------------------------------------------- +def list_point_ids_by_note( + client: QdrantClient, collection: str, note_id: str, *, id_field: str +) -> List[str]: + """ + Listet Point-IDs in einer Collection gefiltert auf payload.note_id == note_id. + """ + flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) + out: List[str] = [] next_page = None while True: - pts, next_page = client.scroll( - collection_name=notes_col, - scroll_filter=flt_notes, - limit=256, + points, next_page = client.scroll( + collection_name=collection, + scroll_filter=flt, + limit=512, with_payload=True, with_vectors=False, offset=next_page, ) - for p in pts: - pl = dict(p.payload or {}) - nid = pl.get("note_id") - if nid and nid not in out: - pl.setdefault("id", str(p.id)) - out[nid] = pl - if next_page is None or len(out) >= limit_per_collection: + if not points: break - - # Chunks - flt_chunks = _filter_any("chunk_id", target_ids) - next_page = None - while True: - pts, next_page = client.scroll( - collection_name=chunks_col, - scroll_filter=flt_chunks, - limit=256, - with_payload=True, - with_vectors=False, - offset=next_page, - ) - for p in pts: - pl = dict(p.payload or {}) - cid = pl.get("chunk_id") - if cid and cid not in out: - pl.setdefault("id", str(p.id)) - out[cid] = pl - if next_page is None or len(out) >= limit_per_collection: + for p in points: + pl = p.payload or {} + pid = pl.get(id_field) + if isinstance(pid, str): + out.append(pid) + if next_page is None: break - return out