All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
95 lines
3.0 KiB
Python
95 lines
3.0 KiB
Python
# app/core/edges_writer.py
|
||
from __future__ import annotations
|
||
import hashlib
|
||
from typing import Dict, List, Iterable, Tuple
|
||
|
||
try:
|
||
# Dein Modul mit der Schemadefinition und der Builder-Funktion
|
||
from app.core.edges import build_edges_for_note # noqa: F401
|
||
except Exception as e:
|
||
raise RuntimeError("Konnte app.core.edges nicht importieren. "
|
||
"Bitte sicherstellen, dass app/core/edges.py vorhanden ist.") from e
|
||
|
||
def _edge_uid(kind: str, source_id: str, target_id: str, scope: str) -> str:
|
||
"""
|
||
Deterministische, kurze ID für eine Edge.
|
||
Kollisionen sind praktisch ausgeschlossen (BLAKE2s über den Kanonischen Schlüssel).
|
||
"""
|
||
key = f"{kind}|{source_id}|{target_id}|{scope}"
|
||
return hashlib.blake2s(key.encode("utf-8"), digest_size=12).hexdigest()
|
||
|
||
def ensure_edges_collection(qdrant_client, collection: str) -> None:
|
||
"""
|
||
Legt die Edge-Collection an, falls sie nicht existiert.
|
||
Minimal: 1D-Vector (Dummy), Cosine. Payload-only-Collections sind je nach Qdrant-Version heikel.
|
||
"""
|
||
from qdrant_client.http import models as qm
|
||
|
||
existing = [c.name for c in qdrant_client.get_collections().collections]
|
||
if collection in existing:
|
||
return
|
||
|
||
qdrant_client.recreate_collection(
|
||
collection_name=collection,
|
||
vectors_config=qm.VectorParams(size=1, distance=qm.Distance.COSINE),
|
||
on_disk_payload=True,
|
||
)
|
||
|
||
def edges_from_note(
|
||
note_id: str,
|
||
chunk_payloads: List[Dict],
|
||
note_level_refs: Iterable[str] | None,
|
||
*,
|
||
include_note_scope_refs: bool = False,
|
||
) -> List[Dict]:
|
||
"""
|
||
Ruft deinen Edge-Builder auf und gibt die (deduplizierten) Edge-Payloads zurück.
|
||
Keine Schemaänderung – exakt das aus app/core/edges.py.
|
||
"""
|
||
return build_edges_for_note(
|
||
note_id=note_id,
|
||
chunk_payloads=chunk_payloads,
|
||
note_level_refs=list(note_level_refs or []),
|
||
include_note_scope_refs=include_note_scope_refs,
|
||
)
|
||
|
||
def upsert_edges(
|
||
qdrant_client,
|
||
collection: str,
|
||
edge_payloads: List[Dict],
|
||
) -> Tuple[int, int]:
|
||
"""
|
||
Schreibt Edges als Points in Qdrant.
|
||
- id: deterministisch aus (kind, source_id, target_id, scope)
|
||
- vector: [0.0] Dummy
|
||
- payload: Edge-Dict (unverändert, siehe Schema in app/core/edges.py)
|
||
Gibt (anzahl_points, anzahl_unique_keys) zurück.
|
||
"""
|
||
from qdrant_client.models import PointStruct
|
||
|
||
if not edge_payloads:
|
||
return 0, 0
|
||
|
||
points = []
|
||
seen = set()
|
||
for e in edge_payloads:
|
||
key = (e.get("kind"), e.get("source_id"), e.get("target_id"), e.get("scope"))
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
eid = _edge_uid(*key)
|
||
points.append(
|
||
PointStruct(
|
||
id=eid,
|
||
vector=[0.0],
|
||
payload=e,
|
||
)
|
||
)
|
||
|
||
if not points:
|
||
return 0, 0
|
||
|
||
ensure_edges_collection(qdrant_client, collection)
|
||
qdrant_client.upsert(collection_name=collection, points=points)
|
||
return len(points), len(seen)
|