All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
143 lines
5.2 KiB
Python
143 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Modul: app.core.qdrant_points
|
||
Version: 1.7.0
|
||
Datum: 2025-11-08
|
||
|
||
Zweck
|
||
-----
|
||
Einheitliche Upsert-/Delete-Helfer für Notes/Chunks/Edges.
|
||
Diese Version ergänzt nur Namen/Wrapper, die von neueren Skripten erwartet werden:
|
||
|
||
Neu/kompatibel:
|
||
• upsert_notes(client, cfg, notes: List[dict])
|
||
• upsert_chunks(client, cfg, chunks: List[dict])
|
||
• upsert_edges(client, cfg, edges: List[dict])
|
||
• delete_by_note(client, cfg, note_id: str)
|
||
|
||
und mappt sie – falls vorhanden – auf bestehende Implementierungen:
|
||
• upsert_batch(...)
|
||
• delete_by_filter(...)
|
||
|
||
Damit bleiben ältere Aufrufer (alt & neu) funktionsfähig.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
try:
|
||
from qdrant_client import QdrantClient
|
||
from qdrant_client.http import models as rest
|
||
from qdrant_client.http.models import PointStruct
|
||
except Exception as e: # pragma: no cover
|
||
raise RuntimeError(f"qdrant_client not available: {e}")
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Hilfen
|
||
# ----------------------------------------------------------------------------
|
||
|
||
def _as_points(payloads: List[dict], id_field: Optional[str] = None) -> List[PointStruct]:
|
||
"""
|
||
Baut PointStructs aus Payload-Listen. Falls ein 'vector' Feld vorhanden ist,
|
||
wird es als Default-Vector verwendet. Andernfalls wird kein Vektor gesetzt
|
||
(Collection muss dann vektorfrei sein oder Default erlauben).
|
||
"""
|
||
pts: List[PointStruct] = []
|
||
for i, pl in enumerate(payloads):
|
||
pid = None
|
||
if id_field:
|
||
pid = pl.get(id_field)
|
||
pid = pid or pl.get("id") or pl.get("note_id") or pl.get("edge_id")
|
||
vec = pl.get("vector") # optional
|
||
|
||
if vec is None:
|
||
pts.append(PointStruct(id=pid, payload=pl))
|
||
else:
|
||
pts.append(PointStruct(id=pid, vector=vec, payload=pl))
|
||
return pts
|
||
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Bestehende (mögliche) APIs referenzieren, wenn vorhanden
|
||
# ----------------------------------------------------------------------------
|
||
|
||
# Platzhalter – werden zur Laufzeit überschrieben, falls alte Funktionen existieren.
|
||
_legacy_upsert_batch = None
|
||
_legacy_delete_by_filter = None
|
||
|
||
try:
|
||
# Falls dieses Modul in deiner Codebase bereits upsert_batch bereitstellt,
|
||
# referenzieren wir es, um das vorhandene Verhalten 1:1 zu nutzen.
|
||
from app.core.qdrant_points import upsert_batch as _legacy_upsert_batch # type: ignore # noqa
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
from app.core.qdrant_points import delete_by_filter as _legacy_delete_by_filter # type: ignore # noqa
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Öffentliche, neue Wrapper-APIs (werden von import_markdown v3.9.x erwartet)
|
||
# ----------------------------------------------------------------------------
|
||
|
||
def upsert_notes(client: QdrantClient, cfg, notes: List[dict]) -> None:
|
||
if not notes:
|
||
return
|
||
if _legacy_upsert_batch:
|
||
_legacy_upsert_batch(client, cfg.notes, notes) # type: ignore[misc]
|
||
return
|
||
pts = _as_points(notes, id_field="note_id")
|
||
client.upsert(collection_name=cfg.notes, points=pts)
|
||
|
||
|
||
def upsert_chunks(client: QdrantClient, cfg, chunks: List[dict]) -> None:
|
||
if not chunks:
|
||
return
|
||
if _legacy_upsert_batch:
|
||
_legacy_upsert_batch(client, cfg.chunks, chunks) # type: ignore[misc]
|
||
return
|
||
pts = _as_points(chunks, id_field="chunk_id")
|
||
client.upsert(collection_name=cfg.chunks, points=pts)
|
||
|
||
|
||
def upsert_edges(client: QdrantClient, cfg, edges: List[dict]) -> None:
|
||
if not edges:
|
||
return
|
||
if _legacy_upsert_batch:
|
||
_legacy_upsert_batch(client, cfg.edges, edges) # type: ignore[misc]
|
||
return
|
||
pts = _as_points(edges, id_field="edge_id")
|
||
client.upsert(collection_name=cfg.edges, points=pts)
|
||
|
||
|
||
def delete_by_note(client: QdrantClient, cfg, note_id: str) -> None:
|
||
"""
|
||
Löscht alle Chunks/Edges (und optional Notes), die zu einer Note gehören.
|
||
Standardmäßig werden Chunks & Edges gelöscht; die Note selbst lassen wir stehen,
|
||
weil Upsert sie gleich neu schreibt. Passe das Verhalten nach Bedarf an.
|
||
"""
|
||
flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||
|
||
# Chunks
|
||
if _legacy_delete_by_filter:
|
||
_legacy_delete_by_filter(client, cfg.chunks, flt_note) # type: ignore[misc]
|
||
else:
|
||
client.delete(collection_name=cfg.chunks, points_selector=rest.FilterSelector(filter=flt_note))
|
||
|
||
# Edges
|
||
if _legacy_delete_by_filter:
|
||
_legacy_delete_by_filter(client, cfg.edges, flt_note) # type: ignore[misc]
|
||
else:
|
||
client.delete(collection_name=cfg.edges, points_selector=rest.FilterSelector(filter=flt_note))
|
||
|
||
# Optional auch die Note löschen? In den meisten Flows nicht nötig.
|
||
# Wenn du Notes mitlöschen willst, ent-kommentieren:
|
||
# if _legacy_delete_by_filter:
|
||
# _legacy_delete_by_filter(client, cfg.notes, flt_note) # type: ignore[misc]
|
||
# else:
|
||
# client.delete(collection_name=cfg.notes, points_selector=rest.FilterSelector(filter=flt_note))
|