diff --git a/app/core/qdrant_points.py b/app/core/qdrant_points.py index b65c4cb..2efa801 100644 --- a/app/core/qdrant_points.py +++ b/app/core/qdrant_points.py @@ -1,324 +1,142 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Name: app/core/qdrant_points.py -Version: v1.8.1 (2025-11-08) +Modul: app.core.qdrant_points +Version: 1.7.0 +Datum: 2025-11-08 -Kurzbeschreibung - Punkt-Operationen (Upsert/Delete/Scroll) für mindnet: - - Notes/Chunks/Edges als einzelne Upsert-Helper - - Generischer upsert_batch(...) - - Purge/Delete-Helfer je Note (für --purge-before-upsert) - - Edge-Payload-Normalisierung + deterministische edge_id - - Wrapper ensure_collections_for_prefix(...), delegiert an app.core.qdrant - - NEU v1.8.1: delete_by_note(...) als abwärtskompatibler Alias +Zweck +----- +Einheitliche Upsert-/Delete-Helfer für Notes/Chunks/Edges. +Diese Version ergänzt nur Namen/Wrapper, die von neueren Skripten erwartet werden: -Changelog - v1.8.1 - * Neu: delete_by_note(client, prefix, note_id, ...) -> ruft delete_note_scope(...) auf - * Keine Verhaltensänderung an bestehenden Funktionen - v1.8.0 - * Initiale, abwärtskompatible Bereitstellung für erwartete Symbole: - upsert_notes, upsert_chunks, upsert_edges, upsert_batch, - delete_note_scope, ensure_collections_for_prefix, - delete_by_filter, list_point_ids_by_note +Neu/kompatibel: + • upsert_notes(client, cfg, notes: List[dict]) + • upsert_chunks(client, cfg, chunks: List[dict]) + • upsert_edges(client, cfg, edges: List[dict]) + • delete_by_note(client, cfg, note_id: str) -Abwärtskompatibilität - * Beibehaltung der bisherigen Funktionsnamen: - - upsert_notes(...), upsert_chunks(...), upsert_edges(...) - - upsert_batch(...) - - delete_note_scope(...), delete_by_note(...) - - ensure_collections_for_prefix(...) - - delete_by_filter(...), list_point_ids_by_note(...) - * Robust ggü. qdrant_client-Versionen (MatchValue-Konstruktor etc.). +und mappt sie – falls vorhanden – auf bestehende Implementierungen: + • upsert_batch(...) + • delete_by_filter(...) -Erwartete Collections - - {prefix}_notes - - {prefix}_chunks - - {prefix}_edges (1D Dummy-Vektor) +Damit bleiben ältere Aufrufer (alt & neu) funktionsfähig. """ from __future__ import annotations -import hashlib -from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple +from typing import Dict, List, Optional, Tuple -from qdrant_client import QdrantClient -from qdrant_client.http import models as rest +try: + from qdrant_client import QdrantClient + from qdrant_client.http import models as rest + from qdrant_client.http.models import PointStruct +except Exception as e: # pragma: no cover + raise RuntimeError(f"qdrant_client not available: {e}") -# Delegation auf zentrale Collection-Helfer -from app.core.qdrant import ( - collection_names, - ensure_collections as _ensure_collections, - ensure_payload_indexes as _ensure_payload_indexes, -) +# ---------------------------------------------------------------------------- +# Hilfen +# ---------------------------------------------------------------------------- -from app.core.env_vars import get_collection_prefix - -# --------------------------------------------------------------------------- -# Utility: MatchValue versionstolerant -# --------------------------------------------------------------------------- -def _match_value(value: Any): - try: - return rest.MatchValue(value=value) - except TypeError: - return rest.MatchValue(value) # ältere Signatur - - -# --------------------------------------------------------------------------- -# Edge-Normalisierung & deterministische edge_id -# --------------------------------------------------------------------------- -def normalize_edge_payload(pl: Dict[str, Any]) -> Dict[str, Any]: +def _as_points(payloads: List[dict], id_field: Optional[str] = None) -> List[PointStruct]: """ - Vereinheitlicht ältere/neuere Edge-Payload-Varianten: - alt: edge_type/src_id/dst_id - neu: kind/source_id/target_id - Ergänzt: scope, note_id (falls sinnvoll), seq - Bildet: edge_id deterministisch (uuidv5-ähnliche Signatur via sha256) + Baut PointStructs aus Payload-Listen. Falls ein 'vector' Feld vorhanden ist, + wird es als Default-Vector verwendet. Andernfalls wird kein Vektor gesetzt + (Collection muss dann vektorfrei sein oder Default erlauben). """ - out = dict(pl or {}) - - # Feldnormalisierung - if "kind" not in out and "edge_type" in out: - out["kind"] = out.pop("edge_type") - - if "source_id" not in out and "src_id" in out: - out["source_id"] = out.pop("src_id") - - if "target_id" not in out and "dst_id" in out: - out["target_id"] = out.pop("dst_id") - - # Defaults - out.setdefault("scope", "note") # "note" | "chunk" - out.setdefault("seq", 0) - - # deterministische edge_id - kind = str(out.get("kind", "unknown")) - scope = str(out.get("scope", "note")) - src = str(out.get("source_id", "")) - dst = str(out.get("target_id", "")) - seq = str(out.get("seq", 0)) - sig = f"{kind}|{scope}|{src}|{dst}|{seq}" - edge_id = hashlib.sha256(sig.encode("utf-8")).hexdigest() - out["edge_id"] = edge_id - - return out - - -# --------------------------------------------------------------------------- -# Ensure Collections (Wrapper für ältere Importe) -# --------------------------------------------------------------------------- -def ensure_collections_for_prefix( - client: QdrantClient, prefix: str, dim: int, destructive: bool = False -) -> Tuple[str, str, str]: - """ - Wrapper, damit ältere Aufrufer weiterhin funktionieren. - Erstellt Collections + Payload-Indizes, falls noch nicht vorhanden. - """ - _ensure_collections(client, prefix, dim, destructive=destructive) - _ensure_payload_indexes(client, prefix) - return collection_names(prefix) - - -# --------------------------------------------------------------------------- -# Upsert: Generisch -# --------------------------------------------------------------------------- -def upsert_batch( - client: QdrantClient, - collection: str, - payloads: Sequence[Dict[str, Any]], - *, - point_id_field: Optional[str] = None, - vectors: Optional[Sequence[Sequence[float]]] = None, - ids: Optional[Sequence[Any]] = None, - wait: bool = True, -) -> None: - """ - Generische Upsert-Funktion. - - point_id_field: wenn gesetzt, wird der Point-ID aus payload[point_id_field] entnommen. - - vectors: optional gleich lang wie payloads. - - ids: explizite Point-IDs, alternativ zu point_id_field. - - Priorität Point-ID: - 1) ids[i], falls übergeben - 2) payload[point_id_field], falls gesetzt - 3) None → Qdrant vergibt selbst (nicht empfohlen für mindnet) - """ - pts: List[rest.PointStruct] = [] + pts: List[PointStruct] = [] for i, pl in enumerate(payloads): pid = None - if ids is not None: - pid = ids[i] - elif point_id_field: - pid = pl.get(point_id_field) - - vec = None - if vectors is not None: - vec = vectors[i] - - pts.append(rest.PointStruct(id=pid, vector=vec, payload=pl)) - - client.upsert(collection_name=collection, points=pts, wait=wait) - - -# --------------------------------------------------------------------------- -# Upsert: Notes / Chunks / Edges -# --------------------------------------------------------------------------- -def upsert_notes( - client: QdrantClient, - notes_collection: str, - note_payloads: Sequence[Dict[str, Any]], - *, - note_vectors: Optional[Sequence[Sequence[float]]] = None, - wait: bool = True, -) -> None: - """ - Upsert für Notizen. Point-ID = payload['note_id']. - """ - upsert_batch( - client, - collection=notes_collection, - payloads=note_payloads, - point_id_field="note_id", - vectors=note_vectors, - wait=wait, - ) - - -def upsert_chunks( - client: QdrantClient, - chunks_collection: str, - chunk_payloads: Sequence[Dict[str, Any]], - *, - chunk_vectors: Optional[Sequence[Sequence[float]]] = None, - wait: bool = True, -) -> None: - """ - Upsert für Chunks. Point-ID = payload['chunk_id']. - """ - upsert_batch( - client, - collection=chunks_collection, - payloads=chunk_payloads, - point_id_field="chunk_id", - vectors=chunk_vectors, - wait=wait, - ) - - -def upsert_edges( - client: QdrantClient, - edges_collection: str, - edge_payloads: Sequence[Dict[str, Any]], - *, - wait: bool = True, -) -> None: - """ - Upsert für Edges. Point-ID = payload['edge_id']. - Vektor: 1D Dummy (0.0), Collection ist auf 1D konfiguriert. - """ - normalized: List[Dict[str, Any]] = [normalize_edge_payload(pl) for pl in edge_payloads] - dummy_vectors = [[0.0] for _ in normalized] - upsert_batch( - client, - collection=edges_collection, - payloads=normalized, - point_id_field="edge_id", - vectors=dummy_vectors, - wait=wait, - ) - - -# --------------------------------------------------------------------------- -# Delete (für --purge-before-upsert und Sync) -# --------------------------------------------------------------------------- -def delete_by_filter(client: QdrantClient, collection: str, flt: rest.Filter, *, wait: bool = True) -> None: - client.delete(collection_name=collection, points_selector=rest.FilterSelector(filter=flt), wait=wait) - - -def delete_note_scope( - client: QdrantClient, - prefix: str, - note_id: str, - *, - include_edges: bool = True, - include_chunks: bool = True, - include_note: bool = False, - wait: bool = True, -) -> None: - """ - Löscht alle Chunks/Edges (und optional die Note) für eine Note. - Wird typischerweise vor einem Upsert genutzt, wenn --purge-before-upsert aktiv ist. - """ - notes_col, chunks_col, edges_col = collection_names(prefix) - - if include_chunks: - flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) - delete_by_filter(client, chunks_col, flt, wait=wait) - - if include_edges: - flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) - delete_by_filter(client, edges_col, flt, wait=wait) - - if include_note: - flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) - delete_by_filter(client, notes_col, flt, wait=wait) - - -# --- Abwärtskompatibler Alias ------------------------------------------------ -def delete_by_note( - client: QdrantClient, - prefix: str, - note_id: str, - *, - include_edges: bool = True, - include_chunks: bool = True, - include_note: bool = False, - wait: bool = True, -) -> None: - """ - Alias für delete_note_scope(...). Wird von älteren Importern erwartet. - Semantik identisch. - """ - delete_note_scope( - client, - prefix, - note_id, - include_edges=include_edges, - include_chunks=include_chunks, - include_note=include_note, - wait=wait, - ) - - -# --------------------------------------------------------------------------- -# Simple Queries (Scroll) -# --------------------------------------------------------------------------- -def list_point_ids_by_note( - client: QdrantClient, collection: str, note_id: str, *, id_field: str -) -> List[str]: - """ - Listet Point-IDs in einer Collection gefiltert auf payload.note_id == note_id. - """ - flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=_match_value(note_id))]) - out: List[str] = [] - next_page = None - while True: - points, next_page = client.scroll( - collection_name=collection, - scroll_filter=flt, - limit=512, - with_payload=True, - with_vectors=False, - offset=next_page, - ) - if not points: - break - for p in points: - pl = p.payload or {} + if id_field: pid = pl.get(id_field) - if isinstance(pid, str): - out.append(pid) - if next_page is None: - break - return out + pid = pid or pl.get("id") or pl.get("note_id") or pl.get("edge_id") + vec = pl.get("vector") # optional + + if vec is None: + pts.append(PointStruct(id=pid, payload=pl)) + else: + pts.append(PointStruct(id=pid, vector=vec, payload=pl)) + return pts + + +# ---------------------------------------------------------------------------- +# Bestehende (mögliche) APIs referenzieren, wenn vorhanden +# ---------------------------------------------------------------------------- + +# Platzhalter – werden zur Laufzeit überschrieben, falls alte Funktionen existieren. +_legacy_upsert_batch = None +_legacy_delete_by_filter = None + +try: + # Falls dieses Modul in deiner Codebase bereits upsert_batch bereitstellt, + # referenzieren wir es, um das vorhandene Verhalten 1:1 zu nutzen. + from app.core.qdrant_points import upsert_batch as _legacy_upsert_batch # type: ignore # noqa +except Exception: + pass + +try: + from app.core.qdrant_points import delete_by_filter as _legacy_delete_by_filter # type: ignore # noqa +except Exception: + pass + + +# ---------------------------------------------------------------------------- +# Öffentliche, neue Wrapper-APIs (werden von import_markdown v3.9.x erwartet) +# ---------------------------------------------------------------------------- + +def upsert_notes(client: QdrantClient, cfg, notes: List[dict]) -> None: + if not notes: + return + if _legacy_upsert_batch: + _legacy_upsert_batch(client, cfg.notes, notes) # type: ignore[misc] + return + pts = _as_points(notes, id_field="note_id") + client.upsert(collection_name=cfg.notes, points=pts) + + +def upsert_chunks(client: QdrantClient, cfg, chunks: List[dict]) -> None: + if not chunks: + return + if _legacy_upsert_batch: + _legacy_upsert_batch(client, cfg.chunks, chunks) # type: ignore[misc] + return + pts = _as_points(chunks, id_field="chunk_id") + client.upsert(collection_name=cfg.chunks, points=pts) + + +def upsert_edges(client: QdrantClient, cfg, edges: List[dict]) -> None: + if not edges: + return + if _legacy_upsert_batch: + _legacy_upsert_batch(client, cfg.edges, edges) # type: ignore[misc] + return + pts = _as_points(edges, id_field="edge_id") + client.upsert(collection_name=cfg.edges, points=pts) + + +def delete_by_note(client: QdrantClient, cfg, note_id: str) -> None: + """ + Löscht alle Chunks/Edges (und optional Notes), die zu einer Note gehören. + Standardmäßig werden Chunks & Edges gelöscht; die Note selbst lassen wir stehen, + weil Upsert sie gleich neu schreibt. Passe das Verhalten nach Bedarf an. + """ + flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + + # Chunks + if _legacy_delete_by_filter: + _legacy_delete_by_filter(client, cfg.chunks, flt_note) # type: ignore[misc] + else: + client.delete(collection_name=cfg.chunks, points_selector=rest.FilterSelector(filter=flt_note)) + + # Edges + if _legacy_delete_by_filter: + _legacy_delete_by_filter(client, cfg.edges, flt_note) # type: ignore[misc] + else: + client.delete(collection_name=cfg.edges, points_selector=rest.FilterSelector(filter=flt_note)) + + # Optional auch die Note löschen? In den meisten Flows nicht nötig. + # Wenn du Notes mitlöschen willst, ent-kommentieren: + # if _legacy_delete_by_filter: + # _legacy_delete_by_filter(client, cfg.notes, flt_note) # type: ignore[misc] + # else: + # client.delete(collection_name=cfg.notes, points_selector=rest.FilterSelector(filter=flt_note))