mindnet/app/frontend/ui_graph_service.py
Lars 5c4ce5d727
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
neuer test
2025-12-28 11:41:47 +01:00

478 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/frontend/ui_graph_service.py
DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching").
VERSION: 2.6.0
STATUS: Active
DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re
LAST_ANALYSIS: 2025-12-15
"""
import re
from qdrant_client import QdrantClient, models
from streamlit_agraph import Node, Edge
from ui_config import GRAPH_COLORS, get_edge_color, SYSTEM_EDGES
class GraphExplorerService:
def __init__(self, url, api_key=None, prefix="mindnet"):
self.client = QdrantClient(url=url, api_key=api_key)
self.prefix = prefix
self.notes_col = f"{prefix}_notes"
self.chunks_col = f"{prefix}_chunks"
self.edges_col = f"{prefix}_edges"
self._note_cache = {}
def get_note_with_full_content(self, note_id):
"""
Lädt die Metadaten der Note und rekonstruiert den gesamten Text
aus den Chunks (Stitching). Wichtig für den Editor-Fallback.
"""
# 1. Metadaten holen
meta = self._fetch_note_cached(note_id)
if not meta: return None
# 2. Volltext aus Chunks bauen
full_text = self._fetch_full_text_stitched(note_id)
# 3. Ergebnis kombinieren (Wir überschreiben das 'fulltext' Feld mit dem frischen Stitching)
# Wir geben eine Kopie zurück, um den Cache nicht zu verfälschen
complete_note = meta.copy()
if full_text:
complete_note['fulltext'] = full_text
return complete_note
def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True):
"""
Erstellt den Ego-Graphen um eine zentrale Notiz.
Lädt Volltext für das Zentrum und Snippets für Nachbarn.
"""
nodes_dict = {}
unique_edges = {}
# 1. Center Note laden
center_note = self._fetch_note_cached(center_note_id)
if not center_note: return [], []
self._add_node_to_dict(nodes_dict, center_note, level=0)
# Initialset für Suche
level_1_ids = {center_note_id}
# Suche Kanten für Center (L1)
l1_edges = self._find_connected_edges([center_note_id], center_note.get("title"))
for edge_data in l1_edges:
src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1)
if src_id: level_1_ids.add(src_id)
if tgt_id: level_1_ids.add(tgt_id)
# Level 2 Suche (begrenzt für Performance)
if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 80:
l1_subset = list(level_1_ids - {center_note_id})
if l1_subset:
l2_edges = self._find_connected_edges_batch(l1_subset)
for edge_data in l2_edges:
self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2)
# --- SMART CONTENT LOADING ---
# A. Fulltext für Center Node holen (Chunks zusammenfügen)
center_text = self._fetch_full_text_stitched(center_note_id)
if center_note_id in nodes_dict:
orig_title = getattr(nodes_dict[center_note_id], 'title', None) or getattr(nodes_dict[center_note_id], 'label', '')
clean_full = self._clean_markdown(center_text[:2000])
# Wir packen den Text in den Tooltip (title attribute)
nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..."
# B. Previews für alle Nachbarn holen (Batch)
all_ids = list(nodes_dict.keys())
previews = self._fetch_previews_for_nodes(all_ids)
for nid, node_obj in nodes_dict.items():
if nid != center_note_id:
prev_raw = previews.get(nid, "Kein Vorschau-Text.")
clean_prev = self._clean_markdown(prev_raw[:600])
current_title = getattr(node_obj, 'title', None) or getattr(node_obj, 'label', '')
node_obj.title = f"{current_title}\n\n🔍 VORSCHAU:\n{clean_prev}..."
# Graphen bauen (Nodes & Edges finalisieren)
final_edges = []
for (src, tgt), data in unique_edges.items():
kind = data['kind']
prov = data['provenance']
color = get_edge_color(kind)
is_smart = (prov != "explicit" and prov != "rule")
# Label Logik
label_text = kind if show_labels else " "
final_edges.append(Edge(
source=src, target=tgt, label=label_text, color=color, dashes=is_smart,
title=f"Relation: {kind}\nProvenance: {prov}"
))
return list(nodes_dict.values()), final_edges
def _clean_markdown(self, text):
"""Entfernt Markdown-Sonderzeichen für saubere Tooltips im Browser."""
if not text: return ""
# Entferne Header Marker (## )
text = re.sub(r'#+\s', '', text)
# Entferne Bold/Italic (** oder *)
text = re.sub(r'\*\*|__|\*|_', '', text)
# Entferne Links [Text](Url) -> Text
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
# Entferne Wikilinks [[Link]] -> Link
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text)
return text
def _fetch_full_text_stitched(self, note_id):
"""Lädt alle Chunks einer Note und baut den Text zusammen."""
try:
scroll_filter = models.Filter(
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]
)
# Limit hoch genug setzen
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True)
# Sortieren nach 'ord' (Reihenfolge im Dokument)
chunks.sort(key=lambda x: x.payload.get('ord', 999))
full_text = []
for c in chunks:
# 'text' ist der reine Inhalt ohne Overlap
txt = c.payload.get('text', '')
if txt: full_text.append(txt)
return "\n\n".join(full_text)
except:
return "Fehler beim Laden des Volltexts."
def _fetch_previews_for_nodes(self, node_ids):
"""Holt Batch-weise den ersten Chunk für eine Liste von Nodes."""
if not node_ids: return {}
previews = {}
try:
scroll_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))])
# Limit = Anzahl Nodes * 3 (Puffer)
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True)
for c in chunks:
nid = c.payload.get("note_id")
# Nur den ersten gefundenen Chunk pro Note nehmen
if nid and nid not in previews:
previews[nid] = c.payload.get("window") or c.payload.get("text") or ""
except: pass
return previews
def _find_connected_edges(self, note_ids, note_title=None):
"""Findet eingehende und ausgehende Kanten."""
results = []
if not note_ids:
return results
# 1. OUTGOING EDGES (Der "Owner"-Fix)
# Wir suchen Kanten, die im Feld 'note_id' (Owner) eine unserer Notizen haben.
# Das findet ALLE ausgehenden Kanten, egal ob sie an einem Chunk oder der Note hängen.
out_filter = models.Filter(must=[
models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)),
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
])
# Limit erhöht, um alle Kanten zu finden
res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=2000, with_payload=True)
results.extend(res_out)
# 2. INCOMING EDGES (Ziel = Chunk ID oder Titel oder Note ID)
# Hier müssen wir Chunks auflösen, um Treffer auf Chunks zu finden.
# Chunk IDs der aktuellen Notes holen (Limit erhöht)
chunk_ids = []
c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))])
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=1000, with_payload=False)
chunk_ids = [c.id for c in chunks]
shoulds = []
# Case A: Edge zeigt auf einen unserer Chunks
if chunk_ids:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids)))
# Case B: Edge zeigt direkt auf unsere Note ID
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids)))
# Case C: Edge zeigt auf unseren Titel (Wikilinks)
# WICHTIG: target_id ist der vollständige Wikilink-Text ohne [[]], z.B. "Meine Prinzipien 2025#P3 Disziplin"
# Das kann sein: "Titel" oder "Titel#Abschnitt" oder "Titel#Abschnitt (Details)"
note_titles_to_search = []
if note_title:
note_titles_to_search.append(note_title)
else:
# Fallback: Lade Titel der Notes, wenn note_title nicht übergeben wurde
for nid in note_ids:
note = self._fetch_note_cached(nid)
if note and note.get("title"):
note_titles_to_search.append(note.get("title"))
# Für jeden Titel: Suche nach exaktem Match
# WICHTIG: target_id kann "Titel" oder "Titel#Abschnitt" oder "Titel#Abschnitt (Details)" sein
# Wir suchen nach exaktem Match für "Titel"
for title in note_titles_to_search:
# Exakte Übereinstimmung (für target_id = "Titel")
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=title)))
# WICHTIG: "Titel#*" Varianten werden in Case D gefunden (clientseitige Filterung)
if shoulds:
in_filter = models.Filter(
must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))],
should=shoulds
)
# Limit erhöht, um alle eingehenden Kanten zu finden
res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=2000, with_payload=True)
results.extend(res_in)
# Case D: ZUSÄTZLICHE Suche für "Titel#Abschnitt" Format (nur für INCOMING edges)
# PROBLEM: target_id ist der vollständige Wikilink-Text, z.B. "Meine Prinzipien 2025#P3 Disziplin"
# Da Qdrant keine Wildcard-Suche hat, müssen wir breiter suchen und clientseitig filtern
# WICHTIG: Diese Suche ist nur für eingehende Kanten relevant
# Für ausgehende Kanten werden alle über note_id gefunden, unabhängig vom target_id Format
if note_titles_to_search:
# Erweiterte Suche: Lade alle relevanten Kanten und filtere clientseitig
# Da target_id KEYWORD ist (nicht TEXT), können wir keine Präfix-Suche direkt machen
# STRATEGIE: Lade alle Kanten (mit Limit) und filtere clientseitig nach target_id.startswith(title + "#")
# Erstelle Set der bereits gefundenen Edge-IDs für schnelle Deduplizierung
existing_edge_ids = {r.id for r in results}
# Lade alle relevanten Kanten (ohne target_id Filter, da wir Präfixe suchen)
extended_filter = models.Filter(
must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))]
)
# WICHTIG: Wir müssen genug Kanten laden, um alle "Titel#Abschnitt" Varianten zu finden
# Verwende paginierte Suche, um sicherzustellen, dass wir alle Kanten durchsuchen
res_extended = []
next_offset = None
max_iterations = 20 # Maximal 20 Iterationen (20 * 5000 = 100000 Kanten)
iteration = 0
while iteration < max_iterations:
batch, next_offset = self.client.scroll(
self.edges_col,
scroll_filter=extended_filter,
limit=5000,
offset=next_offset,
with_payload=True
)
if batch:
res_extended.extend(batch)
if next_offset is None or not batch:
break
iteration += 1
# Clientseitige Filterung: Finde Kanten, deren target_id mit einem unserer Titel beginnt
# WICHTIG: target_id ist der vollständige Wikilink-Text, z.B. "Meine Prinzipien 2025#P3 Disziplin"
matched_count = 0
for edge in res_extended:
tgt_id = edge.payload.get("target_id", "")
if not tgt_id or edge.id in existing_edge_ids:
continue
# Prüfe, ob target_id mit einem unserer Titel beginnt
# target_id kann sein: "Titel", "Titel#Abschnitt", "Titel#Abschnitt (Details)"
for title in note_titles_to_search:
# Exakte Übereinstimmung ODER beginnt mit "Titel#"
# WICHTIG: startswith findet alle Varianten wie "Titel#P3 Disziplin"
if tgt_id == title or tgt_id.startswith(title + "#"):
results.append(edge)
existing_edge_ids.add(edge.id)
matched_count += 1
break # Nur einmal hinzufügen, auch wenn mehrere Titel passen
return results
def _find_connected_edges_batch(self, note_ids):
# Wrapper für Level 2 Suche - lade Titel für alle Notes
note_titles = []
for nid in note_ids:
note = self._fetch_note_cached(nid)
if note and note.get("title"):
note_titles.append(note.get("title"))
# Verwende den ersten Titel als Fallback (oder None, wenn keine gefunden)
title = note_titles[0] if note_titles else None
return self._find_connected_edges(note_ids, note_title=title)
def _process_edge(self, record, nodes_dict, unique_edges, current_depth):
"""Verarbeitet eine rohe Edge, löst IDs auf und fügt sie den Dictionaries hinzu."""
if not record or not record.payload:
return None, None
payload = record.payload
src_ref = payload.get("source_id")
tgt_ref = payload.get("target_id")
kind = payload.get("kind")
provenance = payload.get("provenance", "explicit")
# Prüfe, ob beide Referenzen vorhanden sind
if not src_ref or not tgt_ref:
return None, None
# IDs zu Notes auflösen
src_note = self._resolve_note_from_ref(src_ref)
tgt_note = self._resolve_note_from_ref(tgt_ref)
if src_note and tgt_note:
src_id = src_note.get('note_id')
tgt_id = tgt_note.get('note_id')
# Prüfe, ob beide IDs vorhanden sind
if not src_id or not tgt_id:
return None, None
if src_id != tgt_id:
# Nodes hinzufügen
self._add_node_to_dict(nodes_dict, src_note, level=current_depth)
self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth)
# Kante hinzufügen (mit Deduplizierung)
key = (src_id, tgt_id)
existing = unique_edges.get(key)
should_update = True
# Bevorzuge explizite Kanten vor Smart Kanten
is_current_explicit = (provenance in ["explicit", "rule"])
if existing:
is_existing_explicit = (existing.get('provenance', '') in ["explicit", "rule"])
if is_existing_explicit and not is_current_explicit:
should_update = False
if should_update:
unique_edges[key] = {"source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance}
return src_id, tgt_id
return None, None
def _fetch_note_cached(self, note_id):
if note_id in self._note_cache: return self._note_cache[note_id]
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]),
limit=1, with_payload=True
)
if res:
self._note_cache[note_id] = res[0].payload
return res[0].payload
return None
def _resolve_note_from_ref(self, ref_str):
"""
Löst eine Referenz zu einer Note Payload auf.
ref_str kann sein:
- Note-ID: "20250101-meine-note"
- Chunk-ID: "20250101-meine-note#c01"
- Titel: "Meine Prinzipien 2025"
- Wikilink-Text: "Meine Prinzipien 2025#P3 Disziplin (Selbstführung & Familie)"
"""
if not ref_str: return None
# Fall A: Enthält # (kann Chunk-ID oder Wikilink mit Abschnitt sein)
if "#" in ref_str:
try:
# Versuch 1: Chunk ID direkt (Format: note_id#c01)
res = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True)
if res and res[0].payload:
note_id = res[0].payload.get("note_id")
if note_id:
note = self._fetch_note_cached(note_id)
if note: return note
except Exception:
pass
# Versuch 2: NoteID#Section (Hash abtrennen und als Note-ID versuchen)
# z.B. "20250101-meine-note#Abschnitt" -> "20250101-meine-note"
possible_note_id = ref_str.split("#")[0]
note = self._fetch_note_cached(possible_note_id)
if note: return note
# Versuch 3: Wikilink-Text mit Abschnitt (z.B. "Meine Prinzipien 2025#P3 Disziplin")
# WICHTIG: target_id ist der vollständige Wikilink-Text, wir müssen den Titel-Teil extrahieren
# Der Teil vor dem ersten "#" ist der Titel
possible_title = ref_str.split("#")[0].strip()
if possible_title:
try:
# Suche nach exaktem Titel-Match
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=possible_title))]),
limit=1, with_payload=True
)
if res and res[0].payload:
self._note_cache[res[0].payload['note_id']] = res[0].payload
return res[0].payload
except Exception:
pass
# Fallback: Text-Suche für Fuzzy-Matching
try:
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchText(text=possible_title))]),
limit=1, with_payload=True
)
if res and res[0].payload:
self._note_cache[res[0].payload['note_id']] = res[0].payload
return res[0].payload
except Exception:
pass
# Fall B: Note ID direkt
note = self._fetch_note_cached(ref_str)
if note: return note
# Fall C: Titel (exakte Übereinstimmung)
try:
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=str(ref_str)))]),
limit=1, with_payload=True
)
if res and res[0].payload:
self._note_cache[res[0].payload['note_id']] = res[0].payload
return res[0].payload
except Exception:
pass
# Fall D: Titel (Text-Suche für Fuzzy-Matching, falls exakte Suche fehlschlägt)
try:
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchText(text=str(ref_str)))]),
limit=1, with_payload=True
)
if res and res[0].payload:
self._note_cache[res[0].payload['note_id']] = res[0].payload
return res[0].payload
except Exception:
pass
return None
def _add_node_to_dict(self, node_dict, note_payload, level=1):
nid = note_payload.get("note_id")
if nid in node_dict: return
ntype = note_payload.get("type", "default")
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"])
# Basis-Tooltip (wird später erweitert)
tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}"
if level == 0: size = 45
elif level == 1: size = 25
else: size = 15
node_dict[nid] = Node(
id=nid,
label=note_payload.get('title', nid),
size=size,
color=color,
shape="dot" if level > 0 else "diamond",
title=tooltip,
font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0}
)