mindnet/app/frontend/ui_graph_service.py
2025-12-29 11:00:00 +01:00

448 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/frontend/ui_graph_service.py
DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching").
VERSION: 2.6.0
STATUS: Active
DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re
LAST_ANALYSIS: 2025-12-15
"""
import re
from qdrant_client import QdrantClient, models
from streamlit_agraph import Node, Edge
from ui_config import COLLECTION_PREFIX, GRAPH_COLORS, get_edge_color, SYSTEM_EDGES
class GraphExplorerService:
def __init__(self, url, api_key=None, prefix=None):
"""
Initialisiert den Service. Nutzt COLLECTION_PREFIX aus der Config,
sofern kein spezifischer Prefix übergeben wurde.
"""
self.client = QdrantClient(url=url, api_key=api_key)
self.prefix = prefix if prefix else COLLECTION_PREFIX
self.notes_col = f"{self.prefix}_notes"
self.chunks_col = f"{self.prefix}_chunks"
self.edges_col = f"{self.prefix}_edges"
self._note_cache = {}
def get_note_with_full_content(self, note_id):
"""
Lädt die Metadaten der Note und rekonstruiert den gesamten Text
aus den Chunks (Stitching). Wichtig für den Editor-Fallback.
"""
# 1. Metadaten holen
meta = self._fetch_note_cached(note_id)
if not meta: return None
# 2. Volltext aus Chunks bauen
full_text = self._fetch_full_text_stitched(note_id)
# 3. Ergebnis kombinieren (Wir überschreiben das 'fulltext' Feld mit dem frischen Stitching)
# Wir geben eine Kopie zurück, um den Cache nicht zu verfälschen
complete_note = meta.copy()
if full_text:
complete_note['fulltext'] = full_text
return complete_note
def get_ego_graph(self, center_note_id: str, depth=2, show_labels=True):
"""
Erstellt den Ego-Graphen um eine zentrale Notiz.
Lädt Volltext für das Zentrum und Snippets für Nachbarn.
"""
nodes_dict = {}
unique_edges = {}
# 1. Center Note laden
center_note = self._fetch_note_cached(center_note_id)
if not center_note: return [], []
self._add_node_to_dict(nodes_dict, center_note, level=0)
# Initialset für Suche
level_1_ids = {center_note_id}
# Suche Kanten für Center (L1)
l1_edges = self._find_connected_edges([center_note_id], center_note.get("title"))
for edge_data in l1_edges:
src_id, tgt_id = self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=1)
if src_id: level_1_ids.add(src_id)
if tgt_id: level_1_ids.add(tgt_id)
# Level 2 Suche (begrenzt für Performance)
if depth > 1 and len(level_1_ids) > 1 and len(level_1_ids) < 80:
l1_subset = list(level_1_ids - {center_note_id})
if l1_subset:
l2_edges = self._find_connected_edges_batch(l1_subset)
for edge_data in l2_edges:
self._process_edge(edge_data, nodes_dict, unique_edges, current_depth=2)
# --- SMART CONTENT LOADING ---
# A. Fulltext für Center Node holen (Chunks zusammenfügen)
center_text = self._fetch_full_text_stitched(center_note_id)
if center_note_id in nodes_dict:
orig_title = nodes_dict[center_note_id].title
clean_full = self._clean_markdown(center_text[:2000])
# Wir packen den Text in den Tooltip (title attribute)
nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..."
# B. Previews für alle Nachbarn holen (Batch)
all_ids = list(nodes_dict.keys())
previews = self._fetch_previews_for_nodes(all_ids)
for nid, node_obj in nodes_dict.items():
if nid != center_note_id:
prev_raw = previews.get(nid, "Kein Vorschau-Text.")
clean_prev = self._clean_markdown(prev_raw[:600])
node_obj.title = f"{node_obj.title}\n\n🔍 VORSCHAU:\n{clean_prev}..."
# Graphen bauen (Nodes & Edges finalisieren)
final_edges = []
for (src, tgt), data in unique_edges.items():
kind = data['kind']
prov = data['provenance']
color = get_edge_color(kind)
is_smart = (prov != "explicit" and prov != "rule")
# Label Logik
label_text = kind if show_labels else " "
final_edges.append(Edge(
source=src, target=tgt, label=label_text, color=color, dashes=is_smart,
title=f"Relation: {kind}\nProvenance: {prov}"
))
return list(nodes_dict.values()), final_edges
def _clean_markdown(self, text):
"""Entfernt Markdown-Sonderzeichen für saubere Tooltips im Browser."""
if not text: return ""
# Entferne Header Marker (## )
text = re.sub(r'#+\s', '', text)
# Entferne Bold/Italic (** oder *)
text = re.sub(r'\*\*|__|\*|_', '', text)
# Entferne Links [Text](Url) -> Text
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
# Entferne Wikilinks [[Link]] -> Link
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text)
return text
def _fetch_full_text_stitched(self, note_id):
"""Lädt alle Chunks einer Note und baut den Text zusammen."""
try:
scroll_filter = models.Filter(
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]
)
# Limit hoch genug setzen
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True)
# Sortieren nach 'ord' (Reihenfolge im Dokument)
chunks.sort(key=lambda x: x.payload.get('ord', 999))
full_text = []
for c in chunks:
# 'text' ist der reine Inhalt ohne Overlap
txt = c.payload.get('text', '')
if txt: full_text.append(txt)
return "\n\n".join(full_text)
except:
return "Fehler beim Laden des Volltexts."
def _fetch_previews_for_nodes(self, node_ids):
"""Holt Batch-weise den ersten Chunk für eine Liste von Nodes."""
if not node_ids: return {}
previews = {}
try:
scroll_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))])
# Limit = Anzahl Nodes * 3 (Puffer)
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True)
for c in chunks:
nid = c.payload.get("note_id")
# Nur den ersten gefundenen Chunk pro Note nehmen
if nid and nid not in previews:
previews[nid] = c.payload.get("window") or c.payload.get("text") or ""
except: pass
return previews
def _find_connected_edges(self, note_ids, note_title=None):
"""
Findet eingehende und ausgehende Kanten.
WICHTIG: target_id enthält nur den Titel (ohne #Abschnitt).
target_section ist ein separates Feld für Abschnitt-Informationen.
"""
results = []
if not note_ids:
return results
# 1. OUTGOING EDGES (Der "Owner"-Fix)
# Wir suchen Kanten, die im Feld 'note_id' (Owner) eine unserer Notizen haben.
# Das findet ALLE ausgehenden Kanten, egal ob sie an einem Chunk oder der Note hängen.
out_filter = models.Filter(must=[
models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)),
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
])
res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=2000, with_payload=True)
results.extend(res_out)
# 2. INCOMING EDGES (Ziel = Chunk ID, Note ID oder Titel)
# WICHTIG: target_id enthält nur den Titel, target_section ist separat
# Chunk IDs der aktuellen Notes holen
c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))])
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=1000, with_payload=False)
chunk_ids = [c.id for c in chunks]
shoulds = []
# Case A: Edge zeigt auf einen unserer Chunks
if chunk_ids:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids)))
# Case B: Edge zeigt direkt auf unsere Note ID
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids)))
# Case C: Edge zeigt auf unseren Titel
# WICHTIG: target_id enthält nur den Titel (z.B. "Meine Prinzipien 2025")
# target_section enthält die Abschnitt-Information (z.B. "P3 Disziplin"), wenn gesetzt
# Sammle alle relevanten Titel (inkl. Aliase)
titles_to_search = []
if note_title:
titles_to_search.append(note_title)
# Lade auch Titel aus den Notes selbst (falls note_title nicht übergeben wurde)
for nid in note_ids:
note = self._fetch_note_cached(nid)
if note:
note_title_from_db = note.get("title")
if note_title_from_db and note_title_from_db not in titles_to_search:
titles_to_search.append(note_title_from_db)
# Aliase hinzufügen
aliases = note.get("aliases", [])
if isinstance(aliases, str):
aliases = [aliases]
for alias in aliases:
if alias and alias not in titles_to_search:
titles_to_search.append(alias)
# Für jeden Titel: Suche nach exaktem Match
# target_id enthält nur den Titel, daher reicht MatchValue
for title in titles_to_search:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=title)))
if shoulds:
in_filter = models.Filter(
must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))],
should=shoulds
)
res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=2000, with_payload=True)
results.extend(res_in)
return results
def _find_connected_edges_batch(self, note_ids):
"""
Wrapper für Level 2 Suche.
Lädt Titel der ersten Note für Titel-basierte Suche.
"""
if not note_ids:
return []
first_note = self._fetch_note_cached(note_ids[0])
note_title = first_note.get("title") if first_note else None
return self._find_connected_edges(note_ids, note_title=note_title)
def _process_edge(self, record, nodes_dict, unique_edges, current_depth):
"""
Verarbeitet eine rohe Edge, löst IDs auf und fügt sie den Dictionaries hinzu.
WICHTIG: Beide Richtungen werden unterstützt:
- Ausgehende Kanten: source_id gehört zu unserer Note (via note_id Owner)
- Eingehende Kanten: target_id zeigt auf unsere Note (via target_id Match)
"""
if not record or not record.payload:
return None, None
payload = record.payload
src_ref = payload.get("source_id")
tgt_ref = payload.get("target_id")
kind = payload.get("kind")
provenance = payload.get("provenance", "explicit")
# Prüfe, ob beide Referenzen vorhanden sind
if not src_ref or not tgt_ref:
return None, None
# IDs zu Notes auflösen
# WICHTIG: source_id kann Chunk-ID (note_id#c01), Note-ID oder Titel sein
# WICHTIG: target_id kann Chunk-ID, Note-ID oder Titel sein (ohne #Abschnitt)
src_note = self._resolve_note_from_ref(src_ref)
tgt_note = self._resolve_note_from_ref(tgt_ref)
if src_note and tgt_note:
src_id = src_note.get('note_id')
tgt_id = tgt_note.get('note_id')
# Prüfe, ob beide IDs vorhanden sind
if not src_id or not tgt_id:
return None, None
if src_id != tgt_id:
# Nodes hinzufügen
self._add_node_to_dict(nodes_dict, src_note, level=current_depth)
self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth)
# Kante hinzufügen (mit Deduplizierung)
key = (src_id, tgt_id)
existing = unique_edges.get(key)
should_update = True
# Bevorzuge explizite Kanten vor Smart Kanten
is_current_explicit = (provenance in ["explicit", "rule"])
if existing:
is_existing_explicit = (existing.get('provenance', '') in ["explicit", "rule"])
if is_existing_explicit and not is_current_explicit:
should_update = False
if should_update:
unique_edges[key] = {"source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance}
return src_id, tgt_id
return None, None
def _fetch_note_cached(self, note_id):
if note_id in self._note_cache: return self._note_cache[note_id]
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]),
limit=1, with_payload=True
)
if res:
self._note_cache[note_id] = res[0].payload
return res[0].payload
return None
def _resolve_note_from_ref(self, ref_str):
"""
Löst eine Referenz zu einer Note Payload auf.
WICHTIG: Wenn ref_str ein Titel#Abschnitt Format hat, wird nur der Titel-Teil verwendet.
Unterstützt:
- Note-ID: "20250101-meine-note"
- Chunk-ID: "20250101-meine-note#c01"
- Titel: "Meine Prinzipien 2025"
- Titel#Abschnitt: "Meine Prinzipien 2025#P3 Disziplin" (trennt Abschnitt ab, sucht nur nach Titel)
"""
if not ref_str:
return None
# Fall A: Enthält # (kann Chunk-ID oder Titel#Abschnitt sein)
if "#" in ref_str:
try:
# Versuch 1: Chunk ID direkt (Format: note_id#c01)
res = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True)
if res and res[0].payload:
note_id = res[0].payload.get("note_id")
if note_id:
return self._fetch_note_cached(note_id)
except:
pass
# Versuch 2: NoteID#Section (Hash abtrennen und als Note-ID versuchen)
# z.B. "20250101-meine-note#Abschnitt" -> "20250101-meine-note"
possible_note_id = ref_str.split("#")[0].strip()
note = self._fetch_note_cached(possible_note_id)
if note:
return note
# Versuch 3: Titel#Abschnitt (Hash abtrennen und als Titel suchen)
# z.B. "Meine Prinzipien 2025#P3 Disziplin" -> "Meine Prinzipien 2025"
# WICHTIG: target_id enthält nur den Titel, daher suchen wir nur nach dem Titel-Teil
possible_title = ref_str.split("#")[0].strip()
if possible_title:
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[
models.FieldCondition(key="title", match=models.MatchValue(value=possible_title))
]),
limit=1, with_payload=True
)
if res and res[0].payload:
payload = res[0].payload
self._note_cache[payload['note_id']] = payload
return payload
# Fallback: Text-Suche für Fuzzy-Matching
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[
models.FieldCondition(key="title", match=models.MatchText(text=possible_title))
]),
limit=10, with_payload=True
)
if res:
# Nimm das erste Ergebnis, das exakt oder beginnend mit possible_title übereinstimmt
for r in res:
if r.payload:
note_title = r.payload.get("title", "")
if note_title == possible_title or note_title.startswith(possible_title):
payload = r.payload
self._note_cache[payload['note_id']] = payload
return payload
# Fall B: Note ID direkt
note = self._fetch_note_cached(ref_str)
if note:
return note
# Fall C: Titel (exakte Übereinstimmung)
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[
models.FieldCondition(key="title", match=models.MatchValue(value=ref_str))
]),
limit=1, with_payload=True
)
if res and res[0].payload:
payload = res[0].payload
self._note_cache[payload['note_id']] = payload
return payload
# Fall D: Titel (Text-Suche für Fuzzy-Matching)
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[
models.FieldCondition(key="title", match=models.MatchText(text=ref_str))
]),
limit=1, with_payload=True
)
if res and res[0].payload:
payload = res[0].payload
self._note_cache[payload['note_id']] = payload
return payload
return None
def _add_node_to_dict(self, node_dict, note_payload, level=1):
nid = note_payload.get("note_id")
if not nid or nid in node_dict: return
ntype = note_payload.get("type", "default")
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"])
# Basis-Tooltip (wird später erweitert)
tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}"
if level == 0: size = 45
elif level == 1: size = 25
else: size = 15
node_dict[nid] = Node(
id=nid,
label=note_payload.get('title', nid),
size=size,
color=color,
shape="dot" if level > 0 else "diamond",
title=tooltip,
font={'color': 'black', 'face': 'arial', 'size': 14 if level < 2 else 0}
)