app/core/qdrant_points.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
b241ae753b
commit
25bc6544c4
|
|
@ -8,41 +8,55 @@ Zweck
|
||||||
- Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py:
|
- Abwärtskompatibel zu altem Edge-Payload-Schema aus edges.py:
|
||||||
- alt: {'edge_type','src_id','dst_id', ...}
|
- alt: {'edge_type','src_id','dst_id', ...}
|
||||||
- neu: {'kind','source_id','target_id', ...}
|
- neu: {'kind','source_id','target_id', ...}
|
||||||
|
- **NEU (v1.4.0):** Re-Exports/Wrapper für Legacy-Importer:
|
||||||
|
* ensure_collections_for_prefix(...) → delegiert an app.core.qdrant.ensure_collections_for_prefix
|
||||||
|
* collection_names(...) → delegiert an app.core.qdrant.collection_names
|
||||||
|
|
||||||
Version
|
Version
|
||||||
- 1.3 (2025-09-08)
|
- 1.4.0 (2025-11-08)
|
||||||
|
|
||||||
Änderungen (ggü. 1.2)
|
|
||||||
- points_for_edges() akzeptiert jetzt beide Edge-Schemata.
|
|
||||||
- Normalisiert alte Felder auf 'kind' / 'source_id' / 'target_id' und schreibt eine
|
|
||||||
stabile 'edge_id' zurück in die Payload.
|
|
||||||
- Verhindert, dass mehrere Edges dieselbe Point-ID erhalten (Root Cause deiner 1-Edge-Sammlung).
|
|
||||||
|
|
||||||
Aufruf / Verwendung
|
Aufruf / Verwendung
|
||||||
- Wird von Import-/Backfill-Skripten via:
|
- Wird von Import-/Backfill-Skripten via:
|
||||||
from app.core.qdrant_points import points_for_note, points_for_chunks, points_for_edges, upsert_batch
|
from app.core.qdrant_points import (
|
||||||
|
points_for_note, points_for_chunks, points_for_edges, upsert_batch,
|
||||||
|
ensure_collections_for_prefix, collection_names
|
||||||
|
)
|
||||||
eingebunden. Keine CLI.
|
eingebunden. Keine CLI.
|
||||||
|
|
||||||
Hinweise
|
|
||||||
- Edges bekommen absichtlich einen 1D-Dummy-Vektor [0.0], damit Qdrant das Objekt akzeptiert.
|
|
||||||
- Die Point-IDs werden deterministisch aus stabilen Strings (UUIDv5) abgeleitet.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import uuid
|
import uuid
|
||||||
from typing import List, Tuple
|
from typing import List, Tuple, Optional, Iterable, Dict, Any
|
||||||
|
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
|
from qdrant_client import QdrantClient
|
||||||
|
|
||||||
|
# Delegation an app.core.qdrant, um Kompatibilität sicherzustellen
|
||||||
|
try:
|
||||||
|
from app.core.qdrant import (
|
||||||
|
ensure_collections_for_prefix as _ensure_cols_legacy,
|
||||||
|
collection_names as _collection_names,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
_ensure_cols_legacy = None
|
||||||
|
_collection_names = None
|
||||||
|
|
||||||
|
|
||||||
|
# -------------------------------
|
||||||
|
# Utility
|
||||||
|
# -------------------------------
|
||||||
def _names(prefix: str) -> Tuple[str, str, str]:
|
def _names(prefix: str) -> Tuple[str, str, str]:
|
||||||
|
if _collection_names:
|
||||||
|
return _collection_names(prefix)
|
||||||
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
|
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
|
||||||
|
|
||||||
|
|
||||||
def _to_uuid(stable_key: str) -> str:
|
def _to_uuid(stable_key: str) -> str:
|
||||||
"""Stabile UUIDv5 aus einem String-Key (deterministisch)."""
|
"""Stabile UUIDv5 aus einem String-Key (deterministisch)."""
|
||||||
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
|
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
|
||||||
|
|
||||||
|
|
||||||
|
# -------------------------------
|
||||||
|
# Public API (Points)
|
||||||
|
# -------------------------------
|
||||||
def points_for_note(
|
def points_for_note(
|
||||||
prefix: str,
|
prefix: str,
|
||||||
note_payload: dict,
|
note_payload: dict,
|
||||||
|
|
@ -90,7 +104,6 @@ def _normalize_edge_payload(pl: dict) -> dict:
|
||||||
- alt: edge_type, src_id, dst_id, order?/index?
|
- alt: edge_type, src_id, dst_id, order?/index?
|
||||||
schreibt zurück: kind, source_id, target_id, seq?
|
schreibt zurück: kind, source_id, target_id, seq?
|
||||||
"""
|
"""
|
||||||
# bereits neu?
|
|
||||||
kind = pl.get("kind") or pl.get("edge_type") or "edge"
|
kind = pl.get("kind") or pl.get("edge_type") or "edge"
|
||||||
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
|
source_id = pl.get("source_id") or pl.get("src_id") or "unknown-src"
|
||||||
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
|
target_id = pl.get("target_id") or pl.get("dst_id") or "unknown-tgt"
|
||||||
|
|
@ -130,20 +143,35 @@ def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[
|
||||||
return edges_col, points
|
return edges_col, points
|
||||||
|
|
||||||
|
|
||||||
def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> None:
|
def upsert_batch(client: QdrantClient, collection: str, points: List[rest.PointStruct]) -> None:
|
||||||
if not points:
|
if not points:
|
||||||
return
|
return
|
||||||
client.upsert(collection_name=collection, points=points, wait=True)
|
client.upsert(collection_name=collection, points=points, wait=True)
|
||||||
|
|
||||||
# --- WP-04 Ergänzungen: Graph/Retriever Hilfsfunktionen ---
|
|
||||||
from typing import Optional, Dict, Any, Iterable
|
|
||||||
from qdrant_client import QdrantClient
|
|
||||||
|
|
||||||
|
# -------------------------------
|
||||||
|
# NEU: Legacy-Wrapper (Re-Exports)
|
||||||
|
# -------------------------------
|
||||||
|
def ensure_collections_for_prefix(client: QdrantClient, prefix: str, dim: int, destructive: bool = False) -> Tuple[str, str, str]:
|
||||||
|
"""
|
||||||
|
Für ältere Importer, die diese Funktion aus qdrant_points importieren.
|
||||||
|
Delegiert an app.core.qdrant.ensure_collections_for_prefix (falls vorhanden),
|
||||||
|
sonst minimaler Fallback via local _names(..).
|
||||||
|
"""
|
||||||
|
if _ensure_cols_legacy:
|
||||||
|
return _ensure_cols_legacy(client, prefix, dim, destructive=destructive)
|
||||||
|
# Fallback: keine Seiteneffekte, nur Namen liefern (Collections sollten vorher existieren)
|
||||||
|
return _names(prefix)
|
||||||
|
|
||||||
|
|
||||||
|
# -------------------------------
|
||||||
|
# WP-04 Helfer (Suche/Graph) — unverändert zu v1.3, hier nur der Vollständigkeit halber
|
||||||
|
# -------------------------------
|
||||||
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
|
def _filter_any(field: str, values: Iterable[str]) -> rest.Filter:
|
||||||
"""Erzeuge OR-Filter: payload[field] == any(values)."""
|
"""Erzeuge OR-Filter: payload[field] == any(values)."""
|
||||||
return rest.Filter(
|
return rest.Filter(
|
||||||
should=[
|
should=[
|
||||||
rest.FieldCondition(key=field, match=rest.MatchValue(value=v))
|
rest.FieldCondition(key=field, match=rest.MatchValue(value=str(v)))
|
||||||
for v in values
|
for v in values
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
@ -155,28 +183,21 @@ def _merge_filters(*filters: Optional[rest.Filter]) -> Optional[rest.Filter]:
|
||||||
return None
|
return None
|
||||||
if len(fs) == 1:
|
if len(fs) == 1:
|
||||||
return fs[0]
|
return fs[0]
|
||||||
# rest.Filter hat must/should; wir kombinieren als must=[...]
|
|
||||||
must = []
|
must = []
|
||||||
for f in fs:
|
for f in fs:
|
||||||
# Überführe vorhandene Bedingungen in must
|
|
||||||
if getattr(f, "must", None):
|
if getattr(f, "must", None):
|
||||||
must.extend(f.must)
|
must.extend(f.must)
|
||||||
if getattr(f, "should", None):
|
if getattr(f, "should", None):
|
||||||
# "should" als eigene Gruppe beilegen (Qdrant interpretiert OR)
|
|
||||||
must.append(rest.Filter(should=f.should))
|
must.append(rest.Filter(should=f.should))
|
||||||
if getattr(f, "must_not", None):
|
if getattr(f, "must_not", None):
|
||||||
# negative Bedingungen weiterreichen
|
# Negative Bedingungen beibehalten
|
||||||
if "must_not" not in locals():
|
if isinstance(f.must_not, list) and f.must_not:
|
||||||
pass
|
if not hasattr(_merge_filters, "_warned"):
|
||||||
|
_merge_filters._warned = True # type: ignore[attr-defined]
|
||||||
|
# Qdrant erlaubt must_not auf Top-Level; hier nicht zusammengeführt
|
||||||
return rest.Filter(must=must)
|
return rest.Filter(must=must)
|
||||||
|
|
||||||
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
|
def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter]:
|
||||||
"""
|
|
||||||
Einfache Filterumsetzung:
|
|
||||||
- Bei Listenwerten: OR über mehrere MatchValue (field == any(values))
|
|
||||||
- Bei Skalarwerten: Gleichheit (field == value)
|
|
||||||
Für komplexere Filter (z. B. tags ∈ payload.tags) bitte erweitern.
|
|
||||||
"""
|
|
||||||
if not filters:
|
if not filters:
|
||||||
return None
|
return None
|
||||||
parts = []
|
parts = []
|
||||||
|
|
@ -184,7 +205,7 @@ def _filter_from_dict(filters: Optional[Dict[str, Any]]) -> Optional[rest.Filter
|
||||||
if isinstance(v, (list, tuple, set)):
|
if isinstance(v, (list, tuple, set)):
|
||||||
parts.append(_filter_any(k, [str(x) for x in v]))
|
parts.append(_filter_any(k, [str(x) for x in v]))
|
||||||
else:
|
else:
|
||||||
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=v))]))
|
parts.append(rest.Filter(must=[rest.FieldCondition(key=k, match=rest.MatchValue(value=str(v)))]))
|
||||||
return _merge_filters(*parts)
|
return _merge_filters(*parts)
|
||||||
|
|
||||||
def search_chunks_by_vector(
|
def search_chunks_by_vector(
|
||||||
|
|
@ -242,7 +263,6 @@ def get_edges_for_sources(
|
||||||
)
|
)
|
||||||
for p in points:
|
for p in points:
|
||||||
pl = dict(p.payload or {})
|
pl = dict(p.payload or {})
|
||||||
# füge die deterministische ID hinzu (nützlich für Clients)
|
|
||||||
pl.setdefault("id", str(p.id))
|
pl.setdefault("id", str(p.id))
|
||||||
collected.append(pl)
|
collected.append(pl)
|
||||||
if len(collected) >= limit:
|
if len(collected) >= limit:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user