All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
384 lines
16 KiB
Python
384 lines
16 KiB
Python
"""
|
|
FILE: app/frontend/ui_graph_service.py
|
|
DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching").
|
|
VERSION: 2.6.1 (Fix: Anchor-Link & Fragment Resolution)
|
|
STATUS: Active
|
|
DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re
|
|
LAST_ANALYSIS: 2025-12-28
|
|
"""
|
|
|
|
import re
|
|
from qdrant_client import QdrantClient, models
|
|
from streamlit_agraph import Node, Edge
|
|
from ui_config import COLLECTION_PREFIX, GRAPH_COLORS, get_edge_color, SYSTEM_EDGES
|
|
|
|
class GraphExplorerService:
|
|
def __init__(self, url, api_key=None, prefix=None):
|
|
"""
|
|
Initialisiert den Service. Nutzt COLLECTION_PREFIX aus der Config,
|
|
sofern kein spezifischer Prefix übergeben wurde.
|
|
"""
|
|
self.client = QdrantClient(url=url, api_key=api_key)
|
|
self.prefix = prefix if prefix else COLLECTION_PREFIX
|
|
self.notes_col = f"{self.prefix}_notes"
|
|
self.chunks_col = f"{self.prefix}_chunks"
|
|
self.edges_col = f"{self.prefix}_edges"
|
|
self._note_cache = {}
|
|
self._ref_resolution_cache = {}
|
|
|
|
def get_note_with_full_content(self, note_id):
|
|
"""
|
|
Lädt die Metadaten der Note und rekonstruiert den gesamten Text
|
|
aus den Chunks (Stitching). Wichtig für den Editor-Fallback.
|
|
"""
|
|
# 1. Metadaten holen
|
|
meta = self._fetch_note_cached(note_id)
|
|
if not meta: return None
|
|
|
|
# 2. Volltext aus Chunks bauen
|
|
full_text = self._fetch_full_text_stitched(note_id)
|
|
|
|
# 3. Ergebnis kombinieren (Kopie zurückgeben)
|
|
complete_note = meta.copy()
|
|
if full_text:
|
|
complete_note['fulltext'] = full_text
|
|
|
|
return complete_note
|
|
|
|
def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True):
|
|
"""
|
|
Erstellt den Ego-Graphen um eine zentrale Notiz.
|
|
Lädt Volltext für das Zentrum und Snippets für Nachbarn.
|
|
"""
|
|
nodes_dict = {}
|
|
unique_edges = {}
|
|
|
|
# 1. Center Note laden
|
|
center_note = self._fetch_note_cached(center_note_id)
|
|
if not center_note: return [], []
|
|
self._add_node_to_dict(nodes_dict, center_note, level=0)
|
|
|
|
# Initialset für Suche
|
|
level_1_ids = {center_note_id}
|
|
|
|
# Suche Kanten für Center (L1) inkl. Titel für Anchor-Suche
|
|
l1_edges = self._find_connected_edges([center_note_id], center_note.get("title"))
|
|
|
|
for edge_data in l1_edges:
|
|
src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1)
|
|
if src_id: level_1_ids.add(src_id)
|
|
if tgt_id: level_1_ids.add(tgt_id)
|
|
|
|
# Level 2 Suche (begrenzt für Performance)
|
|
if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 80:
|
|
l1_subset = list(level_1_ids - {center_note_id})
|
|
if l1_subset:
|
|
l2_edges = self._find_connected_edges_batch(l1_subset)
|
|
for edge_data in l2_edges:
|
|
self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2)
|
|
|
|
# --- SMART CONTENT LOADING ---
|
|
|
|
# A. Fulltext für Center Node holen (Chunks zusammenfügen)
|
|
center_text = self._fetch_full_text_stitched(center_note_id)
|
|
if center_note_id in nodes_dict:
|
|
orig_title = nodes_dict[center_note_id].title
|
|
clean_full = self._clean_markdown(center_text[:2000])
|
|
nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..."
|
|
|
|
# B. Previews für alle Nachbarn holen (Batch)
|
|
all_ids = list(nodes_dict.keys())
|
|
previews = self._fetch_previews_for_nodes(all_ids)
|
|
|
|
for nid, node_obj in nodes_dict.items():
|
|
if nid != center_note_id:
|
|
prev_raw = previews.get(nid, "Kein Vorschau-Text.")
|
|
clean_prev = self._clean_markdown(prev_raw[:600])
|
|
node_obj.title = f"{node_obj.title}\n\n🔍 VORSCHAU:\n{clean_prev}..."
|
|
|
|
# Graphen bauen (Nodes & Edges finalisieren)
|
|
final_edges = []
|
|
for (src, tgt), data in unique_edges.items():
|
|
kind = data['kind']
|
|
prov = data['provenance']
|
|
color = get_edge_color(kind)
|
|
is_smart = (prov != "explicit" and prov != "rule")
|
|
label_text = kind if show_labels else " "
|
|
|
|
final_edges.append(Edge(
|
|
source=src, target=tgt, label=label_text, color=color, dashes=is_smart,
|
|
title=f"Relation: {kind}\nProvenance: {prov}"
|
|
))
|
|
|
|
return list(nodes_dict.values()), final_edges
|
|
|
|
def _clean_markdown(self, text):
|
|
"""Entfernt Markdown-Sonderzeichen für saubere Tooltips."""
|
|
if not text: return ""
|
|
text = re.sub(r'#+\s', '', text)
|
|
text = re.sub(r'\*\*|__|\*|_', '', text)
|
|
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
|
|
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text)
|
|
return text
|
|
|
|
def _fetch_full_text_stitched(self, note_id):
|
|
"""Lädt alle Chunks einer Note und baut den Text zusammen."""
|
|
try:
|
|
scroll_filter = models.Filter(
|
|
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]
|
|
)
|
|
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True)
|
|
chunks.sort(key=lambda x: x.payload.get('ord', 999))
|
|
full_text = [c.payload.get('text', '') for c in chunks if c.payload.get('text')]
|
|
return "\n\n".join(full_text)
|
|
except:
|
|
return "Fehler beim Laden des Volltexts."
|
|
def _fetch_previews_for_nodes(self, node_ids):
|
|
"""
|
|
Holt Batch-weise den ersten relevanten Textabschnitt für eine Liste von Nodes.
|
|
Optimiert die Ladezeit durch Reduzierung der API-Calls.
|
|
"""
|
|
if not node_ids:
|
|
return {}
|
|
previews = {}
|
|
try:
|
|
scroll_filter = models.Filter(
|
|
must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))]
|
|
)
|
|
# Genügend Chunks laden, um für jede ID eine Vorschau zu finden
|
|
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True)
|
|
|
|
for c in chunks:
|
|
nid = c.payload.get("note_id")
|
|
# Wir nehmen den ersten gefundenen Chunk
|
|
if nid and nid not in previews:
|
|
previews[nid] = c.payload.get("window") or c.payload.get("text") or ""
|
|
except Exception:
|
|
pass
|
|
return previews
|
|
|
|
def _find_connected_edges(self, note_ids, note_title=None):
|
|
"""
|
|
Findet ein- und ausgehende Kanten für eine Liste von IDs.
|
|
Implementiert den Fix für Anker-Links [[Titel#Abschnitt]] durch Präfix-Suche in der target_id.
|
|
"""
|
|
results = []
|
|
if not note_ids:
|
|
return results
|
|
|
|
# 1. AUSGEHENDE KANTEN (Outgoing)
|
|
# Suche über 'note_id' als Besitzer der Kante.
|
|
out_filter = models.Filter(must=[
|
|
models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)),
|
|
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
|
|
])
|
|
res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=2000, with_payload=True)
|
|
results.extend(res_out)
|
|
|
|
# 2. EINGEHENDE KANTEN (Incoming)
|
|
# Suche über target_id (Ziel der Kante).
|
|
|
|
# Sammele alle Chunk-IDs für exakte Treffer auf Segment-Ebene
|
|
c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))])
|
|
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=1000, with_payload=False)
|
|
chunk_ids = [c.id for c in chunks]
|
|
|
|
should_conditions = []
|
|
if chunk_ids:
|
|
should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids)))
|
|
should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids)))
|
|
|
|
# TITEL-BASIERTE SUCHE (Inkl. Anker-Fix)
|
|
titles_to_check = []
|
|
if note_title:
|
|
titles_to_check.append(note_title)
|
|
# Aliase laden für robuste Verlinkung
|
|
for nid in note_ids:
|
|
note = self._fetch_note_cached(nid)
|
|
if note:
|
|
aliases = note.get("aliases", [])
|
|
if isinstance(aliases, str): aliases = [aliases]
|
|
titles_to_check.extend([a for a in aliases if a not in titles_to_check])
|
|
|
|
# Exakte Titel-Matches hinzufügen
|
|
for t in titles_to_check:
|
|
should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=t)))
|
|
|
|
if should_conditions:
|
|
in_filter = models.Filter(
|
|
must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))],
|
|
should=should_conditions
|
|
)
|
|
res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=2000, with_payload=True)
|
|
results.extend(res_in)
|
|
|
|
# FIX FÜR [[Titel#Abschnitt]]: Suche nach Fragmenten
|
|
if titles_to_check:
|
|
for t in titles_to_check:
|
|
anchor_filter = models.Filter(must=[
|
|
models.FieldCondition(key="target_id", match=models.MatchText(text=t)),
|
|
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
|
|
])
|
|
res_anchor, _ = self.client.scroll(self.edges_col, scroll_filter=anchor_filter, limit=1000, with_payload=True)
|
|
|
|
existing_ids = {r.id for r in results}
|
|
for edge in res_anchor:
|
|
tgt = edge.payload.get("target_id", "")
|
|
# Client-seitige Filterung: Nur Kanten nehmen, die mit Titel# beginnen
|
|
if edge.id not in existing_ids and (tgt == t or tgt.startswith(f"{t}#")):
|
|
results.append(edge)
|
|
|
|
return results
|
|
|
|
def _find_connected_edges_batch(self, note_ids):
|
|
"""Wrapper für die Suche in tieferen Ebenen des Graphen."""
|
|
first_note = self._fetch_note_cached(note_ids[0]) if note_ids else None
|
|
title = first_note.get("title") if first_note else None
|
|
return self._find_connected_edges(note_ids, note_title=title)
|
|
|
|
def _process_edge(self, record, nodes_dict, unique_edges, current_depth):
|
|
"""
|
|
Verarbeitet eine rohe Kante, löst Quell- und Ziel-Referenzen auf
|
|
und fügt sie den Dictionaries für den Graphen hinzu.
|
|
"""
|
|
if not record or not record.payload:
|
|
return None, None
|
|
|
|
payload = record.payload
|
|
src_ref = payload.get("source_id")
|
|
tgt_ref = payload.get("target_id")
|
|
kind = payload.get("kind")
|
|
provenance = payload.get("provenance", "explicit")
|
|
|
|
if not src_ref or not tgt_ref:
|
|
return None, None
|
|
|
|
# IDs zu Notes auflösen (Hier greift der Fragment-Fix)
|
|
src_note = self._resolve_note_from_ref(src_ref)
|
|
tgt_note = self._resolve_note_from_ref(tgt_ref)
|
|
|
|
if src_note and tgt_note:
|
|
src_id = src_note.get('note_id')
|
|
tgt_id = tgt_note.get('note_id')
|
|
|
|
if src_id and tgt_id and src_id != tgt_id:
|
|
# Knoten zum Set hinzufügen
|
|
self._add_node_to_dict(nodes_dict, src_note, level=current_depth)
|
|
self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth)
|
|
|
|
# Kante registrieren (Deduplizierung)
|
|
key = (src_id, tgt_id)
|
|
existing = unique_edges.get(key)
|
|
|
|
is_current_explicit = (provenance in ["explicit", "rule"])
|
|
should_update = True
|
|
|
|
if existing:
|
|
is_existing_explicit = (existing.get('provenance', '') in ["explicit", "rule"])
|
|
if is_existing_explicit and not is_current_explicit:
|
|
should_update = False
|
|
|
|
if should_update:
|
|
unique_edges[key] = {
|
|
"source": src_id,
|
|
"target": tgt_id,
|
|
"kind": kind,
|
|
"provenance": provenance
|
|
}
|
|
return src_id, tgt_id
|
|
return None, None
|
|
|
|
def _fetch_note_cached(self, note_id):
|
|
"""Lädt eine Note aus Qdrant mit Session-Caching."""
|
|
if not note_id:
|
|
return None
|
|
if note_id in self._note_cache:
|
|
return self._note_cache[note_id]
|
|
|
|
try:
|
|
res, _ = self.client.scroll(
|
|
collection_name=self.notes_col,
|
|
scroll_filter=models.Filter(must=[
|
|
models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))
|
|
]),
|
|
limit=1, with_payload=True
|
|
)
|
|
if res and res[0].payload:
|
|
payload = res[0].payload
|
|
self._note_cache[note_id] = payload
|
|
return payload
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _resolve_note_from_ref(self, ref_str):
|
|
"""
|
|
Löst eine Referenz (ID, Chunk-ID oder Wikilink mit Anker) auf eine Note auf.
|
|
Bereinigt Anker (#) vor der Suche.
|
|
"""
|
|
if not ref_str:
|
|
return None
|
|
|
|
if ref_str in self._ref_resolution_cache:
|
|
return self._ref_resolution_cache[ref_str]
|
|
|
|
# Fragment-Behandlung: Trenne Anker ab
|
|
base_ref = ref_str.split("#")[0].strip()
|
|
|
|
# 1. Versuch: Direkte Note-ID Suche
|
|
note = self._fetch_note_cached(base_ref)
|
|
if note:
|
|
self._ref_resolution_cache[ref_str] = note
|
|
return note
|
|
|
|
# 2. Versuch: Titel-Suche (Keyword-Match)
|
|
try:
|
|
res, _ = self.client.scroll(
|
|
collection_name=self.notes_col,
|
|
scroll_filter=models.Filter(must=[
|
|
models.FieldCondition(key="title", match=models.MatchValue(value=base_ref))
|
|
]),
|
|
limit=1, with_payload=True
|
|
)
|
|
if res and res[0].payload:
|
|
payload = res[0].payload
|
|
self._ref_resolution_cache[ref_str] = payload
|
|
return payload
|
|
except Exception:
|
|
pass
|
|
|
|
# 3. Versuch: Auflösung über Chunks
|
|
if "#" in ref_str:
|
|
try:
|
|
res_chunk = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True)
|
|
if res_chunk and res_chunk[0].payload:
|
|
note_id = res_chunk[0].payload.get("note_id")
|
|
note = self._fetch_note_cached(note_id)
|
|
if note:
|
|
self._ref_resolution_cache[ref_str] = note
|
|
return note
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _add_node_to_dict(self, node_dict, note_payload, level=1):
|
|
"""Erstellt ein Node-Objekt für streamlit-agraph mit Styling."""
|
|
nid = note_payload.get("note_id")
|
|
if not nid or nid in node_dict:
|
|
return
|
|
|
|
ntype = note_payload.get("type", "default")
|
|
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS.get("default", "#8395a7"))
|
|
tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}"
|
|
|
|
size = 45 if level == 0 else (25 if level == 1 else 15)
|
|
node_dict[nid] = Node(
|
|
id=nid,
|
|
label=note_payload.get('title', nid),
|
|
size=size,
|
|
color=color,
|
|
shape="dot" if level > 0 else "diamond",
|
|
title=tooltip,
|
|
font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0}
|
|
) |