from __future__ import annotations import uuid from typing import List, Tuple from qdrant_client.http import models as rest def _names(prefix: str) -> Tuple[str, str, str]: return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" def _to_uuid(stable_key: str) -> str: """Stabile UUIDv5 aus einem String-Key (deterministisch).""" return str(uuid.uuid5(uuid.NAMESPACE_URL, stable_key)) def points_for_note( prefix: str, note_payload: dict, note_vec: List[float] | None, dim: int, ) -> Tuple[str, List[rest.PointStruct]]: """Notes-Collection: falls kein Note-Embedding -> Nullvektor der Länge dim.""" notes_col, _, _ = _names(prefix) vector = note_vec if note_vec is not None else [0.0] * int(dim) # Qdrant-Point-ID MUSS int/UUID sein raw_note_id = note_payload.get("note_id") or note_payload.get("id") or "missing-note-id" point_id = _to_uuid(raw_note_id) pt = rest.PointStruct(id=point_id, vector=vector, payload=note_payload) return notes_col, [pt] def points_for_chunks( prefix: str, chunk_payloads: List[dict], vectors: List[List[float]], ) -> Tuple[str, List[rest.PointStruct]]: """ Chunks-Collection: erwartet pro Chunk einen Vektor. Robustheit: - Fehlt 'chunk_id', nutze 'id', sonst baue '${note_id}#${i}' (1-basiert). - Schreibe die abgeleitete ID zurück in die Payload (pl['chunk_id']). """ _, chunks_col, _ = _names(prefix) points: List[rest.PointStruct] = [] for i, (pl, vec) in enumerate(zip(chunk_payloads, vectors), start=1): chunk_id = pl.get("chunk_id") or pl.get("id") if not chunk_id: note_id = pl.get("note_id") or pl.get("parent_note_id") or "missing-note" chunk_id = f"{note_id}#{i}" pl["chunk_id"] = chunk_id # persistenter Fallback in Payload point_id = _to_uuid(chunk_id) points.append(rest.PointStruct(id=point_id, vector=vec, payload=pl)) return chunks_col, points def points_for_edges(prefix: str, edge_payloads: List[dict]) -> Tuple[str, List[rest.PointStruct]]: """ Edges-Collection mit 1D-Dummy-Vektor. - Fehlt 'edge_id', konstruieren wir eine stabile ID aus (kind, source_id, target_id, seq). - vector=[0.0] erfüllt die Client-Validierung. """ _, _, edges_col = _names(prefix) points: List[rest.PointStruct] = [] for pl in edge_payloads: edge_id = pl.get("edge_id") if not edge_id: kind = pl.get("kind", "edge") s = pl.get("source_id", "unknown-src") t = pl.get("target_id", "unknown-tgt") seq = pl.get("seq") or pl.get("order") or "" edge_id = f"{kind}:{s}->{t}#{seq}" pl["edge_id"] = edge_id point_id = _to_uuid(edge_id) points.append(rest.PointStruct(id=point_id, vector=[0.0], payload=pl)) return edges_col, points def upsert_batch(client, collection: str, points: List[rest.PointStruct]) -> None: if not points: return client.upsert(collection_name=collection, points=points, wait=True)