mindnet/app/frontend/ui_graph_service.py
2025-12-14 08:35:33 +01:00

181 lines
7.3 KiB
Python

from qdrant_client import QdrantClient, models
from streamlit_agraph import Node, Edge
from ui_config import GRAPH_COLORS, EDGE_COLORS
class GraphExplorerService:
def __init__(self, url, api_key=None, prefix="mindnet"):
self.client = QdrantClient(url=url, api_key=api_key)
self.prefix = prefix
self.notes_col = f"{prefix}_notes"
self.chunks_col = f"{prefix}_chunks"
self.edges_col = f"{prefix}_edges"
self._note_cache = {}
def get_ego_graph(self, center_note_id: str):
nodes_dict = {}
unique_edges = {}
# 1. Center Note laden
center_note = self._fetch_note_cached(center_note_id)
if not center_note: return [], []
self._add_node_to_dict(nodes_dict, center_note, is_center=True)
center_title = center_note.get("title")
# 2. Chunks der Center Note finden
scroll_filter = models.Filter(
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=center_note_id))]
)
chunks, _ = self.client.scroll(
collection_name=self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True
)
center_chunk_ids = [c.id for c in chunks]
raw_edges = []
# 3. OUTGOING EDGES: Source = einer meiner Chunks
if center_chunk_ids:
out_filter = models.Filter(
must=[models.FieldCondition(key="source_id", match=models.MatchAny(any=center_chunk_ids))]
)
res_out, _ = self.client.scroll(
collection_name=self.edges_col, scroll_filter=out_filter, limit=100, with_payload=True
)
raw_edges.extend(res_out)
# 4. INCOMING EDGES: Target = Chunk, Titel oder Note-ID
must_conditions = []
if center_chunk_ids:
must_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=center_chunk_ids)))
if center_title:
must_conditions.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=center_title)))
# FIX: Auch exakte Note-ID als Target prüfen
must_conditions.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=center_note_id)))
if must_conditions:
in_filter = models.Filter(should=must_conditions) # 'should' = OR
res_in, _ = self.client.scroll(
collection_name=self.edges_col, scroll_filter=in_filter, limit=100, with_payload=True
)
raw_edges.extend(res_in)
# 5. Verarbeitung & Auflösung
for record in raw_edges:
payload = record.payload
src_ref = payload.get("source_id")
tgt_ref = payload.get("target_id")
kind = payload.get("kind", "related_to")
provenance = payload.get("provenance", "explicit")
src_note = self._resolve_note_from_ref(src_ref)
tgt_note = self._resolve_note_from_ref(tgt_ref)
if src_note and tgt_note:
src_id = src_note['note_id']
tgt_id = tgt_note['note_id']
# Keine Self-Loops und valide Verbindung
if src_id != tgt_id:
self._add_node_to_dict(nodes_dict, src_note)
self._add_node_to_dict(nodes_dict, tgt_note)
key = (src_id, tgt_id)
existing = unique_edges.get(key)
# Deduplizierung: Explizite Kanten überschreiben Smart Edges
is_current_explicit = (provenance == "explicit" or provenance == "rule")
should_update = True
if existing:
is_existing_explicit = (existing['provenance'] == "explicit" or existing['provenance'] == "rule")
if is_existing_explicit and not is_current_explicit:
should_update = False
if should_update:
unique_edges[key] = {
"source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance
}
# 6. Agraph Objekte bauen
final_edges = []
for (src, tgt), data in unique_edges.items():
kind = data['kind']
prov = data['provenance']
color = EDGE_COLORS.get(kind, "#bdc3c7")
is_smart = (prov != "explicit" and prov != "rule")
final_edges.append(Edge(
source=src, target=tgt, label=kind, color=color, dashes=is_smart,
title=f"Provenance: {prov}\nType: {kind}"
))
return list(nodes_dict.values()), final_edges
def _fetch_note_cached(self, note_id):
if note_id in self._note_cache: return self._note_cache[note_id]
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]),
limit=1, with_payload=True
)
if res:
self._note_cache[note_id] = res[0].payload
return res[0].payload
return None
def _resolve_note_from_ref(self, ref_str):
if not ref_str: return None
# Fall A: Chunk ID (Format: note_id#cXX)
if "#" in ref_str:
# Versuch 1: Echte Chunk ID in DB
try:
res = self.client.retrieve(collection_name=self.chunks_col, ids=[ref_str], with_payload=True)
if res:
parent_id = res[0].payload.get("note_id")
return self._fetch_note_cached(parent_id)
except: pass
# Versuch 2: Section Link (note-id#Header) -> Hash abschneiden
possible_note_id = ref_str.split("#")[0]
note_by_id = self._fetch_note_cached(possible_note_id)
if note_by_id: return note_by_id
# Fall B: Es ist direkt die Note ID
note_by_id = self._fetch_note_cached(ref_str)
if note_by_id: return note_by_id
# Fall C: Es ist der Titel (Wikilink)
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=ref_str))]),
limit=1, with_payload=True
)
if res:
p = res[0].payload
self._note_cache[p['note_id']] = p
return p
return None
def _add_node_to_dict(self, node_dict, note_payload, is_center=False):
nid = note_payload.get("note_id")
if nid in node_dict: return
ntype = note_payload.get("type", "default")
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"])
size = 35 if is_center else 20
node_dict[nid] = Node(
id=nid,
label=note_payload.get("title", nid),
size=size,
color=color,
shape="dot" if not is_center else "diamond",
title=f"Type: {ntype}\nTags: {note_payload.get('tags')}",
font={'color': 'black'}
)