""" FILE: app/frontend/ui_graph_service.py DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching"). VERSION: 2.6.1 (Fix: Anchor-Link & Fragment Resolution) STATUS: Active DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re LAST_ANALYSIS: 2025-12-28 """ import re from qdrant_client import QdrantClient, models from streamlit_agraph import Node, Edge from ui_config import COLLECTION_PREFIX, GRAPH_COLORS, get_edge_color, SYSTEM_EDGES class GraphExplorerService: def __init__(self, url, api_key=None, prefix=None): """ Initialisiert den Service. Nutzt COLLECTION_PREFIX aus der Config, sofern kein spezifischer Prefix übergeben wurde. """ self.client = QdrantClient(url=url, api_key=api_key) self.prefix = prefix if prefix else COLLECTION_PREFIX self.notes_col = f"{self.prefix}_notes" self.chunks_col = f"{self.prefix}_chunks" self.edges_col = f"{self.prefix}_edges" self._note_cache = {} self._ref_resolution_cache = {} def get_note_with_full_content(self, note_id): """ Lädt die Metadaten der Note und rekonstruiert den gesamten Text aus den Chunks (Stitching). Wichtig für den Editor-Fallback. """ # 1. Metadaten holen meta = self._fetch_note_cached(note_id) if not meta: return None # 2. Volltext aus Chunks bauen full_text = self._fetch_full_text_stitched(note_id) # 3. Ergebnis kombinieren (Kopie zurückgeben) complete_note = meta.copy() if full_text: complete_note['fulltext'] = full_text return complete_note def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True): """ Erstellt den Ego-Graphen um eine zentrale Notiz. Lädt Volltext für das Zentrum und Snippets für Nachbarn. """ nodes_dict = {} unique_edges = {} # 1. Center Note laden center_note = self._fetch_note_cached(center_note_id) if not center_note: return [], [] self._add_node_to_dict(nodes_dict, center_note, level=0) # Initialset für Suche level_1_ids = {center_note_id} # Suche Kanten für Center (L1) inkl. Titel für Anchor-Suche l1_edges = self._find_connected_edges([center_note_id], center_note.get("title")) for edge_data in l1_edges: src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1) if src_id: level_1_ids.add(src_id) if tgt_id: level_1_ids.add(tgt_id) # Level 2 Suche (begrenzt für Performance) if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 80: l1_subset = list(level_1_ids - {center_note_id}) if l1_subset: l2_edges = self._find_connected_edges_batch(l1_subset) for edge_data in l2_edges: self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2) # --- SMART CONTENT LOADING --- # A. Fulltext für Center Node holen (Chunks zusammenfügen) center_text = self._fetch_full_text_stitched(center_note_id) if center_note_id in nodes_dict: orig_title = nodes_dict[center_note_id].title clean_full = self._clean_markdown(center_text[:2000]) nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..." # B. Previews für alle Nachbarn holen (Batch) all_ids = list(nodes_dict.keys()) previews = self._fetch_previews_for_nodes(all_ids) for nid, node_obj in nodes_dict.items(): if nid != center_note_id: prev_raw = previews.get(nid, "Kein Vorschau-Text.") clean_prev = self._clean_markdown(prev_raw[:600]) node_obj.title = f"{node_obj.title}\n\n🔍 VORSCHAU:\n{clean_prev}..." # Graphen bauen (Nodes & Edges finalisieren) final_edges = [] for (src, tgt), data in unique_edges.items(): kind = data['kind'] prov = data['provenance'] color = get_edge_color(kind) is_smart = (prov != "explicit" and prov != "rule") label_text = kind if show_labels else " " final_edges.append(Edge( source=src, target=tgt, label=label_text, color=color, dashes=is_smart, title=f"Relation: {kind}\nProvenance: {prov}" )) return list(nodes_dict.values()), final_edges def _clean_markdown(self, text): """Entfernt Markdown-Sonderzeichen für saubere Tooltips.""" if not text: return "" text = re.sub(r'#+\s', '', text) text = re.sub(r'\*\*|__|\*|_', '', text) text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text) return text def _fetch_full_text_stitched(self, note_id): """Lädt alle Chunks einer Note und baut den Text zusammen.""" try: scroll_filter = models.Filter( must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))] ) chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True) chunks.sort(key=lambda x: x.payload.get('ord', 999)) full_text = [c.payload.get('text', '') for c in chunks if c.payload.get('text')] return "\n\n".join(full_text) except: return "Fehler beim Laden des Volltexts." def _fetch_previews_for_nodes(self, node_ids): """ Holt Batch-weise den ersten relevanten Textabschnitt für eine Liste von Nodes. Optimiert die Ladezeit durch Reduzierung der API-Calls. """ if not node_ids: return {} previews = {} try: scroll_filter = models.Filter( must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))] ) # Genügend Chunks laden, um für jede ID eine Vorschau zu finden chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True) for c in chunks: nid = c.payload.get("note_id") # Wir nehmen den ersten gefundenen Chunk if nid and nid not in previews: previews[nid] = c.payload.get("window") or c.payload.get("text") or "" except Exception: pass return previews def _find_connected_edges(self, note_ids, note_title=None): """ Findet ein- und ausgehende Kanten für eine Liste von IDs. Implementiert den Fix für Anker-Links [[Titel#Abschnitt]] durch Präfix-Suche in der target_id. """ results = [] if not note_ids: return results # 1. AUSGEHENDE KANTEN (Outgoing) # Suche über 'note_id' als Besitzer der Kante. out_filter = models.Filter(must=[ models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)), models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES})) ]) res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=2000, with_payload=True) results.extend(res_out) # 2. EINGEHENDE KANTEN (Incoming) # Suche über target_id (Ziel der Kante). # Sammele alle Chunk-IDs für exakte Treffer auf Segment-Ebene c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))]) chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=1000, with_payload=False) chunk_ids = [c.id for c in chunks] should_conditions = [] if chunk_ids: should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids))) should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids))) # TITEL-BASIERTE SUCHE (Inkl. Anker-Fix) titles_to_check = [] if note_title: titles_to_check.append(note_title) # Aliase laden für robuste Verlinkung (auch wenn note_title fehlt) for nid in note_ids: note = self._fetch_note_cached(nid) if note: # Füge Titel hinzu, falls noch nicht vorhanden note_title_from_db = note.get("title") if note_title_from_db and note_title_from_db not in titles_to_check: titles_to_check.append(note_title_from_db) # Aliase hinzufügen aliases = note.get("aliases", []) if isinstance(aliases, str): aliases = [aliases] titles_to_check.extend([a for a in aliases if a and a not in titles_to_check]) # Exakte Titel-Matches hinzufügen for t in titles_to_check: should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=t))) if should_conditions: in_filter = models.Filter( must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))], should=should_conditions ) res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=2000, with_payload=True) results.extend(res_in) # FIX FÜR [[Titel#Abschnitt]]: Suche nach Fragmenten if titles_to_check: for t in titles_to_check: anchor_filter = models.Filter(must=[ models.FieldCondition(key="target_id", match=models.MatchText(text=t)), models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES})) ]) res_anchor, _ = self.client.scroll(self.edges_col, scroll_filter=anchor_filter, limit=1000, with_payload=True) existing_ids = {r.id for r in results} for edge in res_anchor: tgt = edge.payload.get("target_id", "") # Client-seitige Filterung: Nur Kanten nehmen, die mit Titel# beginnen if edge.id not in existing_ids and (tgt == t or tgt.startswith(f"{t}#")): results.append(edge) return results def _find_connected_edges_batch(self, note_ids): """Wrapper für die Suche in tieferen Ebenen des Graphen.""" first_note = self._fetch_note_cached(note_ids[0]) if note_ids else None title = first_note.get("title") if first_note else None return self._find_connected_edges(note_ids, note_title=title) def _process_edge(self, record, nodes_dict, unique_edges, current_depth): """ Verarbeitet eine rohe Kante, löst Quell- und Ziel-Referenzen auf und fügt sie den Dictionaries für den Graphen hinzu. """ if not record or not record.payload: return None, None payload = record.payload src_ref = payload.get("source_id") tgt_ref = payload.get("target_id") kind = payload.get("kind") provenance = payload.get("provenance", "explicit") if not src_ref or not tgt_ref: return None, None # IDs zu Notes auflösen (Hier greift der Fragment-Fix) src_note = self._resolve_note_from_ref(src_ref) tgt_note = self._resolve_note_from_ref(tgt_ref) if src_note and tgt_note: src_id = src_note.get('note_id') tgt_id = tgt_note.get('note_id') if src_id and tgt_id and src_id != tgt_id: # Knoten zum Set hinzufügen self._add_node_to_dict(nodes_dict, src_note, level=current_depth) self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth) # Kante registrieren (Deduplizierung) key = (src_id, tgt_id) existing = unique_edges.get(key) is_current_explicit = (provenance in ["explicit", "rule"]) should_update = True if existing: is_existing_explicit = (existing.get('provenance', '') in ["explicit", "rule"]) if is_existing_explicit and not is_current_explicit: should_update = False if should_update: unique_edges[key] = { "source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance } return src_id, tgt_id return None, None def _fetch_note_cached(self, note_id): """Lädt eine Note aus Qdrant mit Session-Caching.""" if not note_id: return None if note_id in self._note_cache: return self._note_cache[note_id] try: res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[ models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id)) ]), limit=1, with_payload=True ) if res and res[0].payload: payload = res[0].payload self._note_cache[note_id] = payload return payload except Exception: pass return None def _resolve_note_from_ref(self, ref_str): """ Löst eine Referenz (ID, Chunk-ID oder Wikilink mit Anker) auf eine Note auf. Bereinigt Anker (#) vor der Suche. """ if not ref_str: return None if ref_str in self._ref_resolution_cache: return self._ref_resolution_cache[ref_str] # Fragment-Behandlung: Trenne Anker ab base_ref = ref_str.split("#")[0].strip() # 1. Versuch: Direkte Note-ID Suche note = self._fetch_note_cached(base_ref) if note: self._ref_resolution_cache[ref_str] = note return note # 2. Versuch: Titel-Suche (erst exakt, dann Text-Suche für Fuzzy-Matching) try: # 2a: Exakte Übereinstimmung res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[ models.FieldCondition(key="title", match=models.MatchValue(value=base_ref)) ]), limit=1, with_payload=True ) if res and res[0].payload: payload = res[0].payload self._ref_resolution_cache[ref_str] = payload return payload # 2b: Text-Suche für Fuzzy-Matching (falls exakt fehlschlägt) res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[ models.FieldCondition(key="title", match=models.MatchText(text=base_ref)) ]), limit=10, with_payload=True ) if res: # Prüfe alle Ergebnisse und nimm das beste Match for r in res: if r.payload: note_title = r.payload.get("title", "") # Exakte Übereinstimmung oder beginnt mit base_ref if note_title == base_ref or note_title.startswith(base_ref): payload = r.payload self._ref_resolution_cache[ref_str] = payload return payload except Exception: pass # 3. Versuch: Auflösung über Chunks if "#" in ref_str: try: res_chunk = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True) if res_chunk and res_chunk[0].payload: note_id = res_chunk[0].payload.get("note_id") note = self._fetch_note_cached(note_id) if note: self._ref_resolution_cache[ref_str] = note return note except Exception: pass return None def _add_node_to_dict(self, node_dict, note_payload, level=1): """Erstellt ein Node-Objekt für streamlit-agraph mit Styling.""" nid = note_payload.get("note_id") if not nid or nid in node_dict: return ntype = note_payload.get("type", "default") color = GRAPH_COLORS.get(ntype, GRAPH_COLORS.get("default", "#8395a7")) tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}" size = 45 if level == 0 else (25 if level == 1 else 15) node_dict[nid] = Node( id=nid, label=note_payload.get('title', nid), size=size, color=color, shape="dot" if level > 0 else "diamond", title=tooltip, font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0} )