110 lines
3.8 KiB
Python
110 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Name: app/core/qdrant_points.py
|
|
Version: v1.2.0 (2025-09-05)
|
|
|
|
Kurzbeschreibung:
|
|
Erzeugt Qdrant-Points für Notes, Chunks, Edges.
|
|
- Edges: 1D-Dummy-Vektor (Workaround).
|
|
- **NEU**: Edge-ID berücksichtigt optional 'occ' (Occurrence-Index),
|
|
um Mehrfach-Vorkommen von Wikilinks eindeutig zu machen.
|
|
|
|
Aufruf:
|
|
from app.core.qdrant_points import (
|
|
points_for_note, points_for_chunks, points_for_edges, upsert_batch
|
|
)
|
|
|
|
Kompatibilität:
|
|
- Bestehende Edge-IDs bleiben gültig; neue Edges können ein '@{occ}'-Suffix in der ID haben.
|
|
- Idempotenz weiterhin über UUIDv5 auf stabilem Key.
|
|
|
|
Changelog:
|
|
v1.2.0: Edge-ID-Logik erweitert: "{kind}:{src}->{tgt}#{seq}@{occ}" wenn 'occ' vorhanden.
|
|
v1.1.x: Stabilisierung Note-/Chunk-Upsert.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import uuid
|
|
from typing import List, Tuple
|
|
from qdrant_client.http import models as rest
|
|
|
|
|
|
def _names(prefix: str) -> Tuple[str, str, str]:
|
|
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
|
|
|
|
|
|
def _to_uuid(stable_key: str) -> str:
|
|
"""Stabile UUIDv5 aus einem String-Key (deterministisch)."""
|
|
return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key))
|
|
|
|
|
|
def points_for_note(
|
|
prefix: str,
|
|
note_payload: dict,
|
|
note_vec: List[float] | None,
|
|
dim: int,
|
|
) -> Tuple[str, List[rest.PointStruct]]:
|
|
"""Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim."""
|
|
notes_col, _, _ = _names(prefix)
|
|
vector = note_vec if note_vec is not None else [0.0] * int(dim)
|
|
raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id"
|
|
point_id = _to_uuid(raw_note_id)
|
|
pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload)
|
|
return notes_col, [pt]
|
|
|
|
|
|
def points_for_chunks(
|
|
prefix: str,
|
|
chunk_payloads: List[dict],
|
|
vectors: List[List[float]],
|
|
) -> Tuple[str, List[rest.PointStruct]]:
|
|
"""
|
|
Chunks-Collection: erwartet pro Chunk einen Vektor.
|
|
Robustheit:
|
|
- Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert).
|
|
- Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']).
|
|
"""
|
|
_, chunks_col, _ = _names(prefix)
|
|
points: List[rest.PointStruct] = []
|
|
for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1):
|
|
chunk_id = pl.get("chunk_id") or pl.get("id")
|
|
if not chunk_id:
|
|
note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note"
|
|
chunk_id = f"{note_id}#{i}"
|
|
pl["chunk_id"] = chunk_id
|
|
point_id = _to_uuid(chunk_id)
|
|
points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl))
|
|
return chunks_col, points
|
|
|
|
|
|
def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]:
|
|
"""
|
|
Edges-Collection mit 1D-Dummy-Vektor.
|
|
ID-Bildung:
|
|
- Basis: "{kind}:{src}->{tgt}#{seq}"
|
|
- **Neu**: Falls 'occ' in Payload, erweitern zu "{...}@{occ}"
|
|
"""
|
|
_, _, edges_col = _names(prefix)
|
|
points: List[rest.PointStruct] = []
|
|
for pl in edge_payloads:
|
|
edge_id = pl.get("edge_id")
|
|
if not edge_id:
|
|
kind = pl.get("kind", "edge")
|
|
s = pl.get("source_id", "unknown-src")
|
|
t = pl.get("target_id", "unknown-tgt")
|
|
seq = pl.get("seq") or pl.get("order") or ""
|
|
occ = pl.get("occ") # optionaler Occurrence-Index
|
|
base = f"{kind}:{s}->{t}#{seq}"
|
|
edge_id = f"{base}@{occ}" if occ is not None else base
|
|
pl["edge_id"] = edge_id
|
|
point_id = _to_uuid(edge_id)
|
|
points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl))
|
|
return edges_col, points
|
|
|
|
|
|
def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> None:
|
|
if not points:
|
|
return
|
|
client.upsert(collection_name=collection, points=points, wait=True)
|