mindnet/app/core/qdrant_points.py
Lars 70fa95966c
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
app/core/qdrant_points.py aktualisiert
2025-11-08 11:46:15 +01:00

143 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app.core.qdrant_points
Version: 1.7.0
Datum: 2025-11-08
Zweck
-----
Einheitliche Upsert-/Delete-Helfer für Notes/Chunks/Edges.
Diese Version ergänzt nur Namen/Wrapper, die von neueren Skripten erwartet werden:
Neu/kompatibel:
• upsert_notes(client, cfg, notes: List[dict])
• upsert_chunks(client, cfg, chunks: List[dict])
• upsert_edges(client, cfg, edges: List[dict])
• delete_by_note(client, cfg, note_id: str)
und mappt sie falls vorhanden auf bestehende Implementierungen:
• upsert_batch(...)
• delete_by_filter(...)
Damit bleiben ältere Aufrufer (alt & neu) funktionsfähig.
"""
from __future__ import annotations
from typing import Dict, List, Optional, Tuple
try:
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from qdrant_client.http.models import PointStruct
except Exception as e: # pragma: no cover
raise RuntimeError(f"qdrant_client not available: {e}")
# ----------------------------------------------------------------------------
# Hilfen
# ----------------------------------------------------------------------------
def _as_points(payloads: List[dict], id_field: Optional[str] = None) -> List[PointStruct]:
"""
Baut PointStructs aus Payload-Listen. Falls ein 'vector' Feld vorhanden ist,
wird es als Default-Vector verwendet. Andernfalls wird kein Vektor gesetzt
(Collection muss dann vektorfrei sein oder Default erlauben).
"""
pts: List[PointStruct] = []
for i, pl in enumerate(payloads):
pid = None
if id_field:
pid = pl.get(id_field)
pid = pid or pl.get("id") or pl.get("note_id") or pl.get("edge_id")
vec = pl.get("vector") # optional
if vec is None:
pts.append(PointStruct(id=pid, payload=pl))
else:
pts.append(PointStruct(id=pid, vector=vec, payload=pl))
return pts
# ----------------------------------------------------------------------------
# Bestehende (mögliche) APIs referenzieren, wenn vorhanden
# ----------------------------------------------------------------------------
# Platzhalter werden zur Laufzeit überschrieben, falls alte Funktionen existieren.
_legacy_upsert_batch = None
_legacy_delete_by_filter = None
try:
# Falls dieses Modul in deiner Codebase bereits upsert_batch bereitstellt,
# referenzieren wir es, um das vorhandene Verhalten 1:1 zu nutzen.
from app.core.qdrant_points import upsert_batch as _legacy_upsert_batch # type: ignore # noqa
except Exception:
pass
try:
from app.core.qdrant_points import delete_by_filter as _legacy_delete_by_filter # type: ignore # noqa
except Exception:
pass
# ----------------------------------------------------------------------------
# Öffentliche, neue Wrapper-APIs (werden von import_markdown v3.9.x erwartet)
# ----------------------------------------------------------------------------
def upsert_notes(client: QdrantClient, cfg, notes: List[dict]) -> None:
if not notes:
return
if _legacy_upsert_batch:
_legacy_upsert_batch(client, cfg.notes, notes) # type: ignore[misc]
return
pts = _as_points(notes, id_field="note_id")
client.upsert(collection_name=cfg.notes, points=pts)
def upsert_chunks(client: QdrantClient, cfg, chunks: List[dict]) -> None:
if not chunks:
return
if _legacy_upsert_batch:
_legacy_upsert_batch(client, cfg.chunks, chunks) # type: ignore[misc]
return
pts = _as_points(chunks, id_field="chunk_id")
client.upsert(collection_name=cfg.chunks, points=pts)
def upsert_edges(client: QdrantClient, cfg, edges: List[dict]) -> None:
if not edges:
return
if _legacy_upsert_batch:
_legacy_upsert_batch(client, cfg.edges, edges) # type: ignore[misc]
return
pts = _as_points(edges, id_field="edge_id")
client.upsert(collection_name=cfg.edges, points=pts)
def delete_by_note(client: QdrantClient, cfg, note_id: str) -> None:
"""
Löscht alle Chunks/Edges (und optional Notes), die zu einer Note gehören.
Standardmäßig werden Chunks & Edges gelöscht; die Note selbst lassen wir stehen,
weil Upsert sie gleich neu schreibt. Passe das Verhalten nach Bedarf an.
"""
flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
# Chunks
if _legacy_delete_by_filter:
_legacy_delete_by_filter(client, cfg.chunks, flt_note) # type: ignore[misc]
else:
client.delete(collection_name=cfg.chunks, points_selector=rest.FilterSelector(filter=flt_note))
# Edges
if _legacy_delete_by_filter:
_legacy_delete_by_filter(client, cfg.edges, flt_note) # type: ignore[misc]
else:
client.delete(collection_name=cfg.edges, points_selector=rest.FilterSelector(filter=flt_note))
# Optional auch die Note löschen? In den meisten Flows nicht nötig.
# Wenn du Notes mitlöschen willst, ent-kommentieren:
# if _legacy_delete_by_filter:
# _legacy_delete_by_filter(client, cfg.notes, flt_note) # type: ignore[misc]
# else:
# client.delete(collection_name=cfg.notes, points_selector=rest.FilterSelector(filter=flt_note))