""" FILE: app/frontend/ui_graph_service.py DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching"). VERSION: 2.6.0 STATUS: Active DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re LAST_ANALYSIS: 2025-12-15 """ import re from qdrant_client import QdrantClient, models from streamlit_agraph import Node, Edge from ui_config import GRAPH_COLORS, get_edge_color, SYSTEM_EDGES class GraphExplorerService: def __init__(self, url, api_key=None, prefix="mindnet"): self.client = QdrantClient(url=url, api_key=api_key) self.prefix = prefix self.notes_col = f"{prefix}_notes" self.chunks_col = f"{prefix}_chunks" self.edges_col = f"{prefix}_edges" self._note_cache = {} # Cache für Note-Payloads self._ref_resolution_cache = {} # Cache für aufgelöste Referenzen (ref_str -> note_payload) self._note_cache = {} def get_note_with_full_content(self, note_id): """ Lädt die Metadaten der Note und rekonstruiert den gesamten Text aus den Chunks (Stitching). Wichtig für den Editor-Fallback. """ # 1. Metadaten holen meta = self._fetch_note_cached(note_id) if not meta: return None # 2. Volltext aus Chunks bauen full_text = self._fetch_full_text_stitched(note_id) # 3. Ergebnis kombinieren (Wir überschreiben das 'fulltext' Feld mit dem frischen Stitching) # Wir geben eine Kopie zurück, um den Cache nicht zu verfälschen complete_note = meta.copy() if full_text: complete_note['fulltext'] = full_text return complete_note def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True): """ Erstellt den Ego-Graphen um eine zentrale Notiz. Lädt Volltext für das Zentrum und Snippets für Nachbarn. """ nodes_dict = {} unique_edges = {} # 1. Center Note laden center_note = self._fetch_note_cached(center_note_id) if not center_note: return [], [] self._add_node_to_dict(nodes_dict, center_note, level=0) # Initialset für Suche level_1_ids = {center_note_id} # Suche Kanten für Center (L1) l1_edges = self._find_connected_edges([center_note_id], center_note.get("title")) # DEBUG: Zeige gefundene Kanten print(f"DEBUG: Gefundene L1-Kanten: {len(l1_edges)}") processed_count = 0 failed_count = 0 for edge_data in l1_edges: src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1) if src_id and tgt_id: processed_count += 1 level_1_ids.add(src_id) level_1_ids.add(tgt_id) else: failed_count += 1 # DEBUG: Zeige fehlgeschlagene Kanten if edge_data and edge_data.payload: print(f"DEBUG: Kante konnte nicht verarbeitet werden: {edge_data.payload.get('source_id')} -> {edge_data.payload.get('target_id')}") print(f"DEBUG: Verarbeitete Kanten: {processed_count}, Fehlgeschlagen: {failed_count}") print(f"DEBUG: Nodes im Dict: {len(nodes_dict)}, Edges im Dict: {len(unique_edges)}") # Level 2 Suche (begrenzt für Performance) if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 80: l1_subset = list(level_1_ids - {center_note_id}) if l1_subset: l2_edges = self._find_connected_edges_batch(l1_subset) for edge_data in l2_edges: self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2) # --- SMART CONTENT LOADING --- # A. Fulltext für Center Node holen (Chunks zusammenfügen) center_text = self._fetch_full_text_stitched(center_note_id) if center_note_id in nodes_dict: orig_title = getattr(nodes_dict[center_note_id], 'title', None) or getattr(nodes_dict[center_note_id], 'label', '') clean_full = self._clean_markdown(center_text[:2000]) # Wir packen den Text in den Tooltip (title attribute) nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..." # B. Previews für alle Nachbarn holen (Batch) all_ids = list(nodes_dict.keys()) previews = self._fetch_previews_for_nodes(all_ids) for nid, node_obj in nodes_dict.items(): if nid != center_note_id: prev_raw = previews.get(nid, "Kein Vorschau-Text.") clean_prev = self._clean_markdown(prev_raw[:600]) current_title = getattr(node_obj, 'title', None) or getattr(node_obj, 'label', '') node_obj.title = f"{current_title}\n\n🔍 VORSCHAU:\n{clean_prev}..." # Graphen bauen (Nodes & Edges finalisieren) final_edges = [] for (src, tgt), data in unique_edges.items(): kind = data['kind'] prov = data['provenance'] color = get_edge_color(kind) is_smart = (prov != "explicit" and prov != "rule") # Label Logik label_text = kind if show_labels else " " final_edges.append(Edge( source=src, target=tgt, label=label_text, color=color, dashes=is_smart, title=f"Relation: {kind}\nProvenance: {prov}" )) return list(nodes_dict.values()), final_edges def _clean_markdown(self, text): """Entfernt Markdown-Sonderzeichen für saubere Tooltips im Browser.""" if not text: return "" # Entferne Header Marker (## ) text = re.sub(r'#+\s', '', text) # Entferne Bold/Italic (** oder *) text = re.sub(r'\*\*|__|\*|_', '', text) # Entferne Links [Text](Url) -> Text text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) # Entferne Wikilinks [[Link]] -> Link text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text) return text def _fetch_full_text_stitched(self, note_id): """Lädt alle Chunks einer Note und baut den Text zusammen.""" try: scroll_filter = models.Filter( must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))] ) # Limit hoch genug setzen chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True) # Sortieren nach 'ord' (Reihenfolge im Dokument) chunks.sort(key=lambda x: x.payload.get('ord', 999)) full_text = [] for c in chunks: # 'text' ist der reine Inhalt ohne Overlap txt = c.payload.get('text', '') if txt: full_text.append(txt) return "\n\n".join(full_text) except: return "Fehler beim Laden des Volltexts." def _fetch_previews_for_nodes(self, node_ids): """Holt Batch-weise den ersten Chunk für eine Liste von Nodes.""" if not node_ids: return {} previews = {} try: scroll_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))]) # Limit = Anzahl Nodes * 3 (Puffer) chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True) for c in chunks: nid = c.payload.get("note_id") # Nur den ersten gefundenen Chunk pro Note nehmen if nid and nid not in previews: previews[nid] = c.payload.get("window") or c.payload.get("text") or "" except: pass return previews def _find_connected_edges(self, note_ids, note_title=None): """Findet eingehende und ausgehende Kanten.""" results = [] if not note_ids: return results # 1. OUTGOING EDGES (Der "Owner"-Fix) # Wir suchen Kanten, die im Feld 'note_id' (Owner) eine unserer Notizen haben. # Das findet ALLE ausgehenden Kanten, egal ob sie an einem Chunk oder der Note hängen. out_filter = models.Filter(must=[ models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)), models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES})) ]) # Limit erhöht, um alle Kanten zu finden res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=2000, with_payload=True) results.extend(res_out) # 2. INCOMING EDGES (Ziel = Chunk ID oder Titel oder Note ID) # Hier müssen wir Chunks auflösen, um Treffer auf Chunks zu finden. # Chunk IDs der aktuellen Notes holen (Limit erhöht) chunk_ids = [] c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))]) chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=1000, with_payload=False) chunk_ids = [c.id for c in chunks] shoulds = [] # Case A: Edge zeigt auf einen unserer Chunks if chunk_ids: shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids))) # Case B: Edge zeigt direkt auf unsere Note ID shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids))) # Case C: Edge zeigt auf unseren Titel (Wikilinks) # WICHTIG: target_id ist der vollständige Wikilink-Text ohne [[]], z.B. "Meine Prinzipien 2025#P3 – Disziplin" # Das kann sein: "Titel" oder "Titel#Abschnitt" oder "Titel#Abschnitt (Details)" # PROBLEM: Wikilinks können andere Titel verwenden als der gespeicherte Note-Titel! # Beispiel: Note-Titel = "Persönliches Leitbild (2025–2029)", aber Wikilink = "Mein Persönliches Leitbild 2025" note_titles_to_search = [] if note_title: note_titles_to_search.append(note_title) else: # Fallback: Lade Titel der Notes, wenn note_title nicht übergeben wurde for nid in note_ids: note = self._fetch_note_cached(nid) if note: if note.get("title"): note_titles_to_search.append(note.get("title")) # WICHTIG: Auch Aliases hinzufügen (falls vorhanden) aliases = note.get("aliases", []) if isinstance(aliases, str): aliases = [aliases] for alias in aliases: if alias and alias not in note_titles_to_search: note_titles_to_search.append(alias) # ZUSÄTZLICH: Lade auch Aliases für alle Notes, wenn note_title übergeben wurde # (um auch Varianten wie "Mein Persönliches Leitbild 2025" zu finden) if note_title: for nid in note_ids: note = self._fetch_note_cached(nid) if note: aliases = note.get("aliases", []) if isinstance(aliases, str): aliases = [aliases] for alias in aliases: if alias and alias not in note_titles_to_search: note_titles_to_search.append(alias) # Für jeden Titel: Suche nach exaktem Match # WICHTIG: target_id kann "Titel" oder "Titel#Abschnitt" oder "Titel#Abschnitt (Details)" sein # Wir suchen nach exaktem Match für "Titel" for title in note_titles_to_search: # Exakte Übereinstimmung (für target_id = "Titel") shoulds.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=title))) # WICHTIG: "Titel#*" Varianten werden in Case D gefunden (clientseitige Filterung) if shoulds: in_filter = models.Filter( must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))], should=shoulds ) # Limit erhöht, um alle eingehenden Kanten zu finden res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=2000, with_payload=True) results.extend(res_in) # Case D: ZUSÄTZLICHE Suche für "Titel#Abschnitt" Format (nur für INCOMING edges) # PROBLEM: target_id ist der vollständige Wikilink-Text, z.B. "Meine Prinzipien 2025#P3 – Disziplin" # Da Qdrant keine Wildcard-Suche hat, müssen wir breiter suchen und clientseitig filtern # WICHTIG: Diese Suche ist nur für eingehende Kanten relevant # Für ausgehende Kanten werden alle über note_id gefunden, unabhängig vom target_id Format if note_titles_to_search: # Normalisierungs-Funktion (außerhalb der Schleife für Performance) def normalize_title(t): if not t: return "" # Entferne Klammern und deren Inhalt (z.B. "(2025–2029)") t = re.sub(r'\s*\([^)]*\)', '', t) # Entferne Jahreszahlen (4-stellig, mit oder ohne Bindestrich/En-Dash) # Beispiele: "2025", "2025–2029", "2025-2029" t = re.sub(r'\s*\d{4}[\s–\-]*\d{0,4}', '', t) # Entferne "Mein/Meine" Präfixe t = re.sub(r'^(Mein|Meine)\s+', '', t, flags=re.IGNORECASE) # Normalisiere Whitespace t = re.sub(r'\s+', ' ', t).strip() return t.lower() # Case-insensitive Vergleich # Normalisiere alle Note-Titel VORHER (Performance-Optimierung) note_titles_normalized = {title: normalize_title(title) for title in note_titles_to_search} # Erweiterte Suche: Lade alle relevanten Kanten und filtere clientseitig # Da target_id KEYWORD ist (nicht TEXT), können wir keine Präfix-Suche direkt machen # STRATEGIE: Lade alle Kanten (mit Limit) und filtere clientseitig nach target_id.startswith(title + "#") # Erstelle Set der bereits gefundenen Edge-IDs für schnelle Deduplizierung existing_edge_ids = {r.id for r in results} # Lade alle relevanten Kanten (ohne target_id Filter, da wir Präfixe suchen) extended_filter = models.Filter( must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))] ) # WICHTIG: Wir müssen genug Kanten laden, um alle "Titel#Abschnitt" Varianten zu finden # Verwende paginierte Suche, um sicherzustellen, dass wir alle Kanten durchsuchen res_extended = [] next_offset = None max_iterations = 50 # Maximal 50 Iterationen (50 * 5000 = 250000 Kanten) iteration = 0 while iteration < max_iterations: batch, next_offset = self.client.scroll( self.edges_col, scroll_filter=extended_filter, limit=5000, offset=next_offset, with_payload=True ) if batch: res_extended.extend(batch) if next_offset is None or not batch: break iteration += 1 # Clientseitige Filterung: Finde Kanten, deren target_id mit einem unserer Titel beginnt # WICHTIG: target_id ist der vollständige Wikilink-Text, z.B. "Meine Prinzipien 2025#P3 – Disziplin" matched_count = 0 for edge in res_extended: tgt_id = edge.payload.get("target_id", "") if not tgt_id or edge.id in existing_edge_ids: continue # Prüfe, ob target_id mit einem unserer Titel beginnt (exakte Matches) matched_exact = False for title in note_titles_to_search: # Exakte Übereinstimmung ODER beginnt mit "Titel#" # WICHTIG: startswith findet alle Varianten wie "Titel#P3 – Disziplin" if tgt_id == title or tgt_id.startswith(title + "#"): results.append(edge) existing_edge_ids.add(edge.id) matched_count += 1 matched_exact = True break # Nur einmal hinzufügen, auch wenn mehrere Titel passen # ZUSÄTZLICH: Fuzzy-Matching für ähnliche Titel (nur wenn kein exakter Match) # PROBLEM: Wikilinks können andere Titel verwenden als der gespeicherte Note-Titel # Beispiel: Note-Titel = "Persönliches Leitbild (2025–2029)", aber Wikilink = "Mein Persönliches Leitbild 2025" # STRATEGIE: Da target_id immer "Titel#Abschnitt" ist, nehmen wir einfach den Teil vor dem # # und normalisieren beide (Note-Titel und target_id-Basis) für Vergleich if not matched_exact: # Extrahiere Basis-Titel aus target_id (Teil vor dem #) tgt_base = tgt_id.split("#")[0].strip() tgt_norm = normalize_title(tgt_base) # Prüfe gegen alle normalisierten Note-Titel (inkl. Aliases) for title, title_norm in note_titles_normalized.items(): # Prüfe auf Ähnlichkeit: Exakt gleich oder einer beginnt mit dem anderen if title_norm and tgt_norm and len(title_norm) > 5: if (title_norm == tgt_norm or title_norm.startswith(tgt_norm) or tgt_norm.startswith(title_norm)): results.append(edge) existing_edge_ids.add(edge.id) matched_count += 1 break # Nur einmal hinzufügen return results def _find_connected_edges_batch(self, note_ids): # Wrapper für Level 2 Suche - lade Titel für alle Notes note_titles = [] for nid in note_ids: note = self._fetch_note_cached(nid) if note and note.get("title"): note_titles.append(note.get("title")) # Verwende den ersten Titel als Fallback (oder None, wenn keine gefunden) title = note_titles[0] if note_titles else None return self._find_connected_edges(note_ids, note_title=title) def _process_edge(self, record, nodes_dict, unique_edges, current_depth): """Verarbeitet eine rohe Edge, löst IDs auf und fügt sie den Dictionaries hinzu.""" if not record or not record.payload: return None, None payload = record.payload src_ref = payload.get("source_id") tgt_ref = payload.get("target_id") kind = payload.get("kind") provenance = payload.get("provenance", "explicit") # Prüfe, ob beide Referenzen vorhanden sind if not src_ref or not tgt_ref: return None, None # IDs zu Notes auflösen src_note = self._resolve_note_from_ref(src_ref) tgt_note = self._resolve_note_from_ref(tgt_ref) # DEBUG: Zeige Auflösungs-Ergebnisse (nur bei Fehlern) if not src_note: print(f"DEBUG _process_edge: Konnte src_note nicht auflösen für: {src_ref}") if not tgt_note: print(f"DEBUG _process_edge: Konnte tgt_note nicht auflösen für: {tgt_ref}") if src_note and tgt_note: src_id = src_note.get('note_id') tgt_id = tgt_note.get('note_id') # Prüfe, ob beide IDs vorhanden sind if not src_id or not tgt_id: return None, None if src_id != tgt_id: # Nodes hinzufügen self._add_node_to_dict(nodes_dict, src_note, level=current_depth) self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth) # Kante hinzufügen (mit Deduplizierung) key = (src_id, tgt_id) existing = unique_edges.get(key) should_update = True # Bevorzuge explizite Kanten vor Smart Kanten is_current_explicit = (provenance in ["explicit", "rule"]) if existing: is_existing_explicit = (existing.get('provenance', '') in ["explicit", "rule"]) if is_existing_explicit and not is_current_explicit: should_update = False if should_update: unique_edges[key] = {"source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance} return src_id, tgt_id return None, None def _fetch_note_cached(self, note_id): if note_id in self._note_cache: return self._note_cache[note_id] res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]), limit=1, with_payload=True ) if res: self._note_cache[note_id] = res[0].payload return res[0].payload return None def _resolve_note_from_ref(self, ref_str): """ Löst eine Referenz zu einer Note Payload auf. ref_str kann sein: - Note-ID: "20250101-meine-note" - Chunk-ID: "20250101-meine-note#c01" - Titel: "Meine Prinzipien 2025" - Wikilink-Text: "Meine Prinzipien 2025#P3 – Disziplin (Selbstführung & Familie)" """ if not ref_str: return None # Cache-Check: Wenn wir diese Referenz bereits aufgelöst haben, verwende das Ergebnis if ref_str in self._ref_resolution_cache: cached_result = self._ref_resolution_cache[ref_str] return cached_result # Fall A: Enthält # (kann Chunk-ID oder Wikilink mit Abschnitt sein) if "#" in ref_str: try: # Versuch 1: Chunk ID direkt (Format: note_id#c01) res = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True) if res and res[0].payload: note_id = res[0].payload.get("note_id") if note_id: note = self._fetch_note_cached(note_id) if note: self._ref_resolution_cache[ref_str] = note return note except Exception: pass # Versuch 2: NoteID#Section (Hash abtrennen und als Note-ID versuchen) # z.B. "20250101-meine-note#Abschnitt" -> "20250101-meine-note" possible_note_id = ref_str.split("#")[0] note = self._fetch_note_cached(possible_note_id) if note: self._ref_resolution_cache[ref_str] = note return note # Versuch 3: Wikilink-Text mit Abschnitt (z.B. "Meine Prinzipien 2025#P3 – Disziplin") # WICHTIG: target_id ist der vollständige Wikilink-Text, wir müssen den Titel-Teil extrahieren # Der Teil vor dem ersten "#" ist der Titel possible_title = ref_str.split("#")[0].strip() if possible_title: # Normalisierungs-Funktion (wie in _find_connected_edges) def normalize_title(t): if not t: return "" t = re.sub(r'\s*\([^)]*\)', '', t) t = re.sub(r'\s*\d{4}[\s–\-]*\d{0,4}', '', t) t = re.sub(r'^(Mein|Meine)\s+', '', t, flags=re.IGNORECASE) t = re.sub(r'\s+', ' ', t).strip() return t.lower() possible_title_norm = normalize_title(possible_title) try: # Versuch 3a: Exakte Titel-Suche res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=possible_title))]), limit=1, with_payload=True ) if res and res[0].payload: self._note_cache[res[0].payload['note_id']] = res[0].payload return res[0].payload except Exception: pass try: # Versuch 3b: Text-Suche (kann Teilmatches finden) res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchText(text=possible_title))]), limit=50, with_payload=True # Mehr Ergebnisse für Fuzzy-Matching ) if res: # Prüfe alle Ergebnisse mit normalisiertem Vergleich for r in res: if r.payload: note_title = r.payload.get("title", "") note_title_norm = normalize_title(note_title) if note_title_norm and possible_title_norm and len(possible_title_norm) > 5: if (note_title_norm == possible_title_norm or note_title_norm.startswith(possible_title_norm) or possible_title_norm.startswith(note_title_norm)): self._note_cache[r.payload['note_id']] = r.payload return r.payload except Exception: pass # Versuch 3c: Fallback - Lade alle Notes und filtere clientseitig (nur wenn Text-Suche fehlschlägt) # OPTIMIERUNG: Verwende einen globalen Cache für alle Notes-Titel, um Performance zu verbessern # Dies ist langsamer, aber findet auch Notes, die die Text-Suche nicht findet if possible_title_norm and len(possible_title_norm) > 5: try: # Lade alle Notes (einmalig, dann Cache) - OPTIMIERT mit paginierter Suche if not hasattr(self, '_all_notes_cache') or not self._all_notes_cache: self._all_notes_cache = {} # note_id -> payload self._all_notes_title_map = {} # normalisierter Titel -> Liste von Notes # Paginierte Suche, um ALLE Notes zu laden next_offset = None total_loaded = 0 while True: res_all, next_offset = self.client.scroll( collection_name=self.notes_col, limit=1000, offset=next_offset, with_payload=True ) for r in res_all: if r.payload: note_id = r.payload.get('note_id') note_title = r.payload.get("title", "") if note_id and note_title: self._all_notes_cache[note_id] = r.payload # Normalisiere Titel und speichere Mapping note_title_norm_cached = normalize_title(note_title) if note_title_norm_cached: if note_title_norm_cached not in self._all_notes_title_map: self._all_notes_title_map[note_title_norm_cached] = [] self._all_notes_title_map[note_title_norm_cached].append(r.payload) total_loaded += 1 if next_offset is None: break # Suche im Cache (verwende _all_notes_title_map statt _all_notes_cache) if hasattr(self, '_all_notes_title_map') and possible_title_norm in self._all_notes_title_map: matches = self._all_notes_title_map[possible_title_norm] if matches: # Nimm das erste Match result = matches[0] self._note_cache[result['note_id']] = result self._ref_resolution_cache[ref_str] = result return result # Fallback: Durchsuche alle normalisierten Titel if hasattr(self, '_all_notes_title_map'): for norm_title, notes_list in self._all_notes_title_map.items(): if isinstance(notes_list, list) and notes_list: if (norm_title == possible_title_norm or norm_title.startswith(possible_title_norm) or possible_title_norm.startswith(norm_title)): result = notes_list[0] self._note_cache[result['note_id']] = result self._ref_resolution_cache[ref_str] = result return result except Exception: pass # Fall B: Note ID direkt note = self._fetch_note_cached(ref_str) if note: return note # Fall C: Titel (exakte Übereinstimmung) try: res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=str(ref_str)))]), limit=1, with_payload=True ) if res and res[0].payload: self._note_cache[res[0].payload['note_id']] = res[0].payload return res[0].payload except Exception: pass # Fall D: Titel (Text-Suche für Fuzzy-Matching, falls exakte Suche fehlschlägt) try: res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchText(text=str(ref_str)))]), limit=1, with_payload=True ) if res and res[0].payload: self._note_cache[res[0].payload['note_id']] = res[0].payload return res[0].payload except Exception: pass return None def _add_node_to_dict(self, node_dict, note_payload, level=1): nid = note_payload.get("note_id") if nid in node_dict: return ntype = note_payload.get("type", "default") color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"]) # Basis-Tooltip (wird später erweitert) tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}" if level == 0: size = 45 elif level == 1: size = 25 else: size = 15 node_dict[nid] = Node( id=nid, label=note_payload.get('title', nid), size=size, color=color, shape="dot" if level > 0 else "diamond", title=tooltip, font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0} )