mindnet/app/frontend/ui_graph_service.py
2025-12-14 14:50:06 +01:00

312 lines
13 KiB
Python

import re
from qdrant_client import QdrantClient, models
from streamlit_agraph import Node, Edge
from ui_config import GRAPH_COLORS, get_edge_color, SYSTEM_EDGES
class GraphExplorerService:
def __init__(self, url, api_key=None, prefix="mindnet"):
self.client = QdrantClient(url=url, api_key=api_key)
self.prefix = prefix
self.notes_col = f"{prefix}_notes"
self.chunks_col = f"{prefix}_chunks"
self.edges_col = f"{prefix}_edges"
self._note_cache = {}
def get_note_with_full_content(self, note_id):
"""
Lädt die Metadaten der Note und rekonstruiert den gesamten Text
aus den Chunks (Stitching). Wichtig für den Editor-Fallback.
"""
# 1. Metadaten holen
meta = self._fetch_note_cached(note_id)
if not meta: return None
# 2. Volltext aus Chunks bauen
full_text = self._fetch_full_text_stitched(note_id)
# 3. Ergebnis kombinieren (Wir überschreiben das 'fulltext' Feld mit dem frischen Stitching)
# Wir geben eine Kopie zurück, um den Cache nicht zu verfälschen
complete_note = meta.copy()
if full_text:
complete_note['fulltext'] = full_text
return complete_note
def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True):
"""
Erstellt den Ego-Graphen um eine zentrale Notiz.
Lädt Volltext für das Zentrum und Snippets für Nachbarn.
"""
nodes_dict = {}
unique_edges = {}
# 1. Center Note laden
center_note = self._fetch_note_cached(center_note_id)
if not center_note: return [], []
self._add_node_to_dict(nodes_dict, center_note, level=0)
# Initialset für Suche
level_1_ids = {center_note_id}
# Suche Kanten für Center (L1)
l1_edges = self._find_connected_edges([center_note_id], center_note.get("title"))
for edge_data in l1_edges:
src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1)
if src_id: level_1_ids.add(src_id)
if tgt_id: level_1_ids.add(tgt_id)
# Level 2 Suche (begrenzt für Performance)
if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 80:
l1_subset = list(level_1_ids - {center_note_id})
if l1_subset:
l2_edges = self._find_connected_edges_batch(l1_subset)
for edge_data in l2_edges:
self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2)
# --- SMART CONTENT LOADING ---
# A. Fulltext für Center Node holen (Chunks zusammenfügen)
center_text = self._fetch_full_text_stitched(center_note_id)
if center_note_id in nodes_dict:
orig_title = nodes_dict[center_note_id].title
clean_full = self._clean_markdown(center_text[:2000])
# Wir packen den Text in den Tooltip (title attribute)
nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..."
# B. Previews für alle Nachbarn holen (Batch)
all_ids = list(nodes_dict.keys())
previews = self._fetch_previews_for_nodes(all_ids)
for nid, node_obj in nodes_dict.items():
if nid != center_note_id:
prev_raw = previews.get(nid, "Kein Vorschau-Text.")
clean_prev = self._clean_markdown(prev_raw[:600])
node_obj.title = f"{node_obj.title}\n\n🔍 VORSCHAU:\n{clean_prev}..."
# Graphen bauen (Nodes & Edges finalisieren)
final_edges = []
for (src, tgt), data in unique_edges.items():
kind = data['kind']
prov = data['provenance']
color = get_edge_color(kind)
is_smart = (prov != "explicit" and prov != "rule")
# Label Logik
label_text = kind if show_labels else " "
final_edges.append(Edge(
source=src, target=tgt, label=label_text, color=color, dashes=is_smart,
title=f"Relation: {kind}\nProvenance: {prov}"
))
return list(nodes_dict.values()), final_edges
def _clean_markdown(self, text):
"""Entfernt Markdown-Sonderzeichen für saubere Tooltips im Browser."""
if not text: return ""
# Entferne Header Marker (## )
text = re.sub(r'#+\s', '', text)
# Entferne Bold/Italic (** oder *)
text = re.sub(r'\*\*|__|\*|_', '', text)
# Entferne Links [Text](Url) -> Text
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
# Entferne Wikilinks [[Link]] -> Link
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text)
return text
def _fetch_full_text_stitched(self, note_id):
"""Lädt alle Chunks einer Note und baut den Text zusammen."""
try:
scroll_filter = models.Filter(
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]
)
# Limit hoch genug setzen
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True)
# Sortieren nach 'ord' (Reihenfolge im Dokument)
chunks.sort(key=lambda x: x.payload.get('ord', 999))
full_text = []
for c in chunks:
# 'text' ist der reine Inhalt ohne Overlap
txt = c.payload.get('text', '')
if txt: full_text.append(txt)
return "\n\n".join(full_text)
except:
return "Fehler beim Laden des Volltexts."
def _fetch_previews_for_nodes(self, node_ids):
"""Holt Batch-weise den ersten Chunk für eine Liste von Nodes."""
if not node_ids: return {}
previews = {}
try:
scroll_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))])
# Limit = Anzahl Nodes * 3 (Puffer)
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True)
for c in chunks:
nid = c.payload.get("note_id")
# Nur den ersten gefundenen Chunk pro Note nehmen
if nid and nid not in previews:
previews[nid] = c.payload.get("window") or c.payload.get("text") or ""
except: pass
return previews
def _find_connected_edges(self, note_ids, note_title=None):
"""Findet eingehende und ausgehende Kanten."""
results = []
# 1. OUTGOING EDGES (Der "Owner"-Fix)
# Wir suchen Kanten, die im Feld 'note_id' (Owner) eine unserer Notizen haben.
# Das findet ALLE ausgehenden Kanten, egal ob sie an einem Chunk oder der Note hängen.
if note_ids:
out_filter = models.Filter(must=[
models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)),
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
])
# Limit hoch, um alles zu finden
res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=500, with_payload=True)
results.extend(res_out)
# 2. INCOMING EDGES (Ziel = Chunk ID oder Titel oder Note ID)
# Hier müssen wir Chunks auflösen, um Treffer auf Chunks zu finden.
# Chunk IDs der aktuellen Notes holen
chunk_ids = []
if note_ids:
c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))])
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=300)
chunk_ids = [c.id for c in chunks]
shoulds = []
# Case A: Edge zeigt auf einen unserer Chunks
if chunk_ids:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids)))
# Case B: Edge zeigt direkt auf unsere Note ID
if note_ids:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids)))
# Case C: Edge zeigt auf unseren Titel (Wikilinks)
if note_title:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=note_title)))
if shoulds:
in_filter = models.Filter(
must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))],
should=shoulds
)
res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=500, with_payload=True)
results.extend(res_in)
return results
def _find_connected_edges_batch(self, note_ids):
# Wrapper für Level 2 Suche
return self._find_connected_edges(note_ids)
def _process_edge(self, record, nodes_dict, unique_edges, current_depth):
"""Verarbeitet eine rohe Edge, löst IDs auf und fügt sie den Dictionaries hinzu."""
payload = record.payload
src_ref = payload.get("source_id")
tgt_ref = payload.get("target_id")
kind = payload.get("kind")
provenance = payload.get("provenance", "explicit")
# IDs zu Notes auflösen
src_note = self._resolve_note_from_ref(src_ref)
tgt_note = self._resolve_note_from_ref(tgt_ref)
if src_note and tgt_note:
src_id = src_note['note_id']
tgt_id = tgt_note['note_id']
if src_id != tgt_id:
# Nodes hinzufügen
self._add_node_to_dict(nodes_dict, src_note, level=current_depth)
self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth)
# Kante hinzufügen (mit Deduplizierung)
key = (src_id, tgt_id)
existing = unique_edges.get(key)
should_update = True
# Bevorzuge explizite Kanten vor Smart Kanten
is_current_explicit = (provenance in ["explicit", "rule"])
if existing:
is_existing_explicit = (existing['provenance'] in ["explicit", "rule"])
if is_existing_explicit and not is_current_explicit:
should_update = False
if should_update:
unique_edges[key] = {"source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance}
return src_id, tgt_id
return None, None
def _fetch_note_cached(self, note_id):
if note_id in self._note_cache: return self._note_cache[note_id]
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]),
limit=1, with_payload=True
)
if res:
self._note_cache[note_id] = res[0].payload
return res[0].payload
return None
def _resolve_note_from_ref(self, ref_str):
"""Löst eine ID (Chunk, Note oder Titel) zu einer Note Payload auf."""
if not ref_str: return None
# Fall A: Chunk ID (enthält #)
if "#" in ref_str:
try:
# Versuch 1: Chunk ID direkt
res = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True)
if res: return self._fetch_note_cached(res[0].payload.get("note_id"))
except: pass
# Versuch 2: NoteID#Section (Hash abtrennen)
possible_note_id = ref_str.split("#")[0]
if self._fetch_note_cached(possible_note_id): return self._fetch_note_cached(possible_note_id)
# Fall B: Note ID direkt
if self._fetch_note_cached(ref_str): return self._fetch_note_cached(ref_str)
# Fall C: Titel
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=ref_str))]),
limit=1, with_payload=True
)
if res:
self._note_cache[res[0].payload['note_id']] = res[0].payload
return res[0].payload
return None
def _add_node_to_dict(self, node_dict, note_payload, level=1):
nid = note_payload.get("note_id")
if nid in node_dict: return
ntype = note_payload.get("type", "default")
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"])
# Basis-Tooltip (wird später erweitert)
tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}"
if level == 0: size = 45
elif level == 1: size = 25
else: size = 15
node_dict[nid] = Node(
id=nid,
label=note_payload.get('title', nid),
size=size,
color=color,
shape="dot" if level > 0 else "diamond",
title=tooltip,
font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0}
)