from qdrant_client import QdrantClient, models from streamlit_agraph import Node, Edge from ui_config import GRAPH_COLORS, get_edge_color, SYSTEM_EDGES class GraphExplorerService: def __init__(self, url, api_key=None, prefix="mindnet"): self.client = QdrantClient(url=url, api_key=api_key) self.prefix = prefix self.notes_col = f"{prefix}_notes" self.chunks_col = f"{prefix}_chunks" self.edges_col = f"{prefix}_edges" self._note_cache = {} def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True): nodes_dict = {} unique_edges = {} # 1. Center Note laden center_note = self._fetch_note_cached(center_note_id) if not center_note: return [], [] # Node vorerst ohne Vorschau hinzufügen self._add_node_to_dict(nodes_dict, center_note, level=0) level_1_ids = {center_note_id} # Suche Kanten für Center l1_edges = self._find_connected_edges([center_note_id], center_note.get("title")) for edge_data in l1_edges: src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1) if src_id: level_1_ids.add(src_id) if tgt_id: level_1_ids.add(tgt_id) # Level 2 Suche if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 60: l1_subset = list(level_1_ids - {center_note_id}) if l1_subset: l2_edges = self._find_connected_edges_batch(l1_subset) for edge_data in l2_edges: self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2) # --- NEU: Content Previews (Chunks) laden --- # Wir holen für alle gesammelten Nodes den ersten Chunk als Vorschau all_node_ids = list(nodes_dict.keys()) previews = self._fetch_previews_for_nodes(all_node_ids) # Nodes aktualisieren mit Vorschau-Text final_nodes = [] for nid, node_obj in nodes_dict.items(): # Preview Text in den Tooltip injizieren prev_text = previews.get(nid, "Kein Inhaltstext gefunden.") # Wir hängen den Text an den existierenden Title (Hover) an node_obj.title = f"{node_obj.title}\n\n📝 VORSCHAU:\n{prev_text[:400]}..." final_nodes.append(node_obj) # Graphen bauen final_edges = [] for (src, tgt), data in unique_edges.items(): kind = data['kind'] prov = data['provenance'] color = get_edge_color(kind) is_smart = (prov != "explicit" and prov != "rule") label_text = kind if show_labels else " " final_edges.append(Edge( source=src, target=tgt, label=label_text, color=color, dashes=is_smart, title=f"Relation: {kind}\nProvenance: {prov}" )) return final_nodes, final_edges def _fetch_previews_for_nodes(self, node_ids): """Holt für eine Liste von Note-IDs jeweils einen Chunk als Vorschau.""" if not node_ids: return {} # Wir suchen Chunks, die zu diesen Notes gehören # Optimierung: Wir holen einfach Chunks und gruppieren sie. # Limit muss hoch genug sein für alle Nodes im Graphen previews = {} try: scroll_filter = models.Filter( must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))] ) # Wir holen Chunks. Sortierung ist in Qdrant schwierig ohne Vektor, # aber Scroll gibt meistens insertion order oder id order. chunks, _ = self.client.scroll( collection_name=self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids) * 3, # 3 Chunks pro Note Puffer with_payload=True ) for c in chunks: nid = c.payload.get("note_id") # Nur den ersten Chunk pro Note speichern if nid and nid not in previews: # Bevorzugt 'window' (Kontext) oder 'text' text = c.payload.get("window") or c.payload.get("text") or "" previews[nid] = text except Exception as e: print(f"Preview fetch error: {e}") return previews def _find_connected_edges(self, note_ids, note_title=None): # Chunks finden scroll_filter = models.Filter( must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))] ) chunks, _ = self.client.scroll( collection_name=self.chunks_col, scroll_filter=scroll_filter, limit=200 ) chunk_ids = [c.id for c in chunks] results = [] source_candidates = chunk_ids + note_ids if source_candidates: out_f = models.Filter(must=[ models.FieldCondition(key="source_id", match=models.MatchAny(any=source_candidates)), models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES})) ]) res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_f, limit=100, with_payload=True) results.extend(res_out) shoulds = [] if chunk_ids: shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids))) if note_title: shoulds.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=note_title))) shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids))) if shoulds: in_f = models.Filter( must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))], should=shoulds ) res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_f, limit=100, with_payload=True) results.extend(res_in) return results def _find_connected_edges_batch(self, note_ids): return self._find_connected_edges(note_ids) def _process_edge(self, record, nodes_dict, unique_edges, current_depth): payload = record.payload src_ref = payload.get("source_id") tgt_ref = payload.get("target_id") kind = payload.get("kind") provenance = payload.get("provenance", "explicit") src_note = self._resolve_note_from_ref(src_ref) tgt_note = self._resolve_note_from_ref(tgt_ref) if src_note and tgt_note: src_id = src_note['note_id'] tgt_id = tgt_note['note_id'] if src_id != tgt_id: self._add_node_to_dict(nodes_dict, src_note, level=current_depth) self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth) key = (src_id, tgt_id) existing = unique_edges.get(key) should_update = True is_current_explicit = (provenance in ["explicit", "rule"]) if existing: is_existing_explicit = (existing['provenance'] in ["explicit", "rule"]) if is_existing_explicit and not is_current_explicit: should_update = False if should_update: unique_edges[key] = { "source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance } return src_id, tgt_id return None, None def _fetch_note_cached(self, note_id): if note_id in self._note_cache: return self._note_cache[note_id] res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]), limit=1, with_payload=True ) if res: self._note_cache[note_id] = res[0].payload return res[0].payload return None def _resolve_note_from_ref(self, ref_str): if not ref_str: return None if "#" in ref_str: try: res = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True) if res: return self._fetch_note_cached(res[0].payload.get("note_id")) except: pass possible_note_id = ref_str.split("#")[0] if self._fetch_note_cached(possible_note_id): return self._fetch_note_cached(possible_note_id) if self._fetch_note_cached(ref_str): return self._fetch_note_cached(ref_str) res, _ = self.client.scroll( collection_name=self.notes_col, scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=ref_str))]), limit=1, with_payload=True ) if res: self._note_cache[res[0].payload['note_id']] = res[0].payload return res[0].payload return None def _add_node_to_dict(self, node_dict, note_payload, level=1): nid = note_payload.get("note_id") if nid in node_dict: return ntype = note_payload.get("type", "default") color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"]) # Basis-Tooltip (wird später erweitert) tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}" if level == 0: size = 45 elif level == 1: size = 25 else: size = 15 node_dict[nid] = Node( id=nid, label=note_payload.get('title', nid), size=size, color=color, shape="dot" if level > 0 else "diamond", title=tooltip, font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0} )