ui_fraph.old Version

This commit is contained in:
Lars 2025-12-29 07:51:07 +01:00
parent fdf99b2bb0
commit ea9a54421a

View File

@ -1,30 +1,25 @@
""" """
FILE: app/frontend/ui_graph_service.py FILE: app/frontend/ui_graph_service.py
DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching"). DESCRIPTION: Data Layer für den Graphen. Greift direkt auf Qdrant zu (Performance), um Knoten/Kanten zu laden und Texte zu rekonstruieren ("Stitching").
VERSION: 2.6.1 (Fix: Anchor-Link & Fragment Resolution) VERSION: 2.6.0
STATUS: Active STATUS: Active
DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re DEPENDENCIES: qdrant_client, streamlit_agraph, ui_config, re
LAST_ANALYSIS: 2025-12-28 LAST_ANALYSIS: 2025-12-15
""" """
import re import re
from qdrant_client import QdrantClient, models from qdrant_client import QdrantClient, models
from streamlit_agraph import Node, Edge from streamlit_agraph import Node, Edge
from ui_config import COLLECTION_PREFIX, GRAPH_COLORS, get_edge_color, SYSTEM_EDGES from ui_config import GRAPH_COLORS, get_edge_color, SYSTEM_EDGES
class GraphExplorerService: class GraphExplorerService:
def __init__(self, url, api_key=None, prefix=None): def __init__(self, url, api_key=None, prefix="mindnet"):
"""
Initialisiert den Service. Nutzt COLLECTION_PREFIX aus der Config,
sofern kein spezifischer Prefix übergeben wurde.
"""
self.client = QdrantClient(url=url, api_key=api_key) self.client = QdrantClient(url=url, api_key=api_key)
self.prefix = prefix if prefix else COLLECTION_PREFIX self.prefix = prefix
self.notes_col = f"{self.prefix}_notes" self.notes_col = f"{prefix}_notes"
self.chunks_col = f"{self.prefix}_chunks" self.chunks_col = f"{prefix}_chunks"
self.edges_col = f"{self.prefix}_edges" self.edges_col = f"{prefix}_edges"
self._note_cache = {} self._note_cache = {}
self._ref_resolution_cache = {}
def get_note_with_full_content(self, note_id): def get_note_with_full_content(self, note_id):
""" """
@ -38,7 +33,8 @@ class GraphExplorerService:
# 2. Volltext aus Chunks bauen # 2. Volltext aus Chunks bauen
full_text = self._fetch_full_text_stitched(note_id) full_text = self._fetch_full_text_stitched(note_id)
# 3. Ergebnis kombinieren (Kopie zurückgeben) # 3. Ergebnis kombinieren (Wir überschreiben das 'fulltext' Feld mit dem frischen Stitching)
# Wir geben eine Kopie zurück, um den Cache nicht zu verfälschen
complete_note = meta.copy() complete_note = meta.copy()
if full_text: if full_text:
complete_note['fulltext'] = full_text complete_note['fulltext'] = full_text
@ -61,7 +57,7 @@ class GraphExplorerService:
# Initialset für Suche # Initialset für Suche
level_1_ids = {center_note_id} level_1_ids = {center_note_id}
# Suche Kanten für Center (L1) inkl. Titel für Anchor-Suche # Suche Kanten für Center (L1)
l1_edges = self._find_connected_edges([center_note_id], center_note.get("title")) l1_edges = self._find_connected_edges([center_note_id], center_note.get("title"))
for edge_data in l1_edges: for edge_data in l1_edges:
@ -84,6 +80,7 @@ class GraphExplorerService:
if center_note_id in nodes_dict: if center_note_id in nodes_dict:
orig_title = nodes_dict[center_note_id].title orig_title = nodes_dict[center_note_id].title
clean_full = self._clean_markdown(center_text[:2000]) clean_full = self._clean_markdown(center_text[:2000])
# Wir packen den Text in den Tooltip (title attribute)
nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..." nodes_dict[center_note_id].title = f"{orig_title}\n\n📄 INHALT:\n{clean_full}..."
# B. Previews für alle Nachbarn holen (Batch) # B. Previews für alle Nachbarn holen (Batch)
@ -103,6 +100,8 @@ class GraphExplorerService:
prov = data['provenance'] prov = data['provenance']
color = get_edge_color(kind) color = get_edge_color(kind)
is_smart = (prov != "explicit" and prov != "rule") is_smart = (prov != "explicit" and prov != "rule")
# Label Logik
label_text = kind if show_labels else " " label_text = kind if show_labels else " "
final_edges.append(Edge( final_edges.append(Edge(
@ -113,11 +112,15 @@ class GraphExplorerService:
return list(nodes_dict.values()), final_edges return list(nodes_dict.values()), final_edges
def _clean_markdown(self, text): def _clean_markdown(self, text):
"""Entfernt Markdown-Sonderzeichen für saubere Tooltips.""" """Entfernt Markdown-Sonderzeichen für saubere Tooltips im Browser."""
if not text: return "" if not text: return ""
# Entferne Header Marker (## )
text = re.sub(r'#+\s', '', text) text = re.sub(r'#+\s', '', text)
# Entferne Bold/Italic (** oder *)
text = re.sub(r'\*\*|__|\*|_', '', text) text = re.sub(r'\*\*|__|\*|_', '', text)
# Entferne Links [Text](Url) -> Text
text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text) text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
# Entferne Wikilinks [[Link]] -> Link
text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text) text = re.sub(r'\[\[([^\]]+)\]\]', r'\1', text)
return text return text
@ -127,252 +130,186 @@ class GraphExplorerService:
scroll_filter = models.Filter( scroll_filter = models.Filter(
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))] must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]
) )
# Limit hoch genug setzen
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True) chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True)
# Sortieren nach 'ord' (Reihenfolge im Dokument)
chunks.sort(key=lambda x: x.payload.get('ord', 999)) chunks.sort(key=lambda x: x.payload.get('ord', 999))
full_text = [c.payload.get('text', '') for c in chunks if c.payload.get('text')]
full_text = []
for c in chunks:
# 'text' ist der reine Inhalt ohne Overlap
txt = c.payload.get('text', '')
if txt: full_text.append(txt)
return "\n\n".join(full_text) return "\n\n".join(full_text)
except: except:
return "Fehler beim Laden des Volltexts." return "Fehler beim Laden des Volltexts."
def _fetch_previews_for_nodes(self, node_ids): def _fetch_previews_for_nodes(self, node_ids):
""" """Holt Batch-weise den ersten Chunk für eine Liste von Nodes."""
Holt Batch-weise den ersten relevanten Textabschnitt für eine Liste von Nodes. if not node_ids: return {}
Optimiert die Ladezeit durch Reduzierung der API-Calls.
"""
if not node_ids:
return {}
previews = {} previews = {}
try: try:
scroll_filter = models.Filter( scroll_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))])
must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=node_ids))] # Limit = Anzahl Nodes * 3 (Puffer)
)
# Genügend Chunks laden, um für jede ID eine Vorschau zu finden
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True) chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=scroll_filter, limit=len(node_ids)*3, with_payload=True)
for c in chunks: for c in chunks:
nid = c.payload.get("note_id") nid = c.payload.get("note_id")
# Wir nehmen den ersten gefundenen Chunk # Nur den ersten gefundenen Chunk pro Note nehmen
if nid and nid not in previews: if nid and nid not in previews:
previews[nid] = c.payload.get("window") or c.payload.get("text") or "" previews[nid] = c.payload.get("window") or c.payload.get("text") or ""
except Exception: except: pass
pass
return previews return previews
def _find_connected_edges(self, note_ids, note_title=None): def _find_connected_edges(self, note_ids, note_title=None):
""" """Findet eingehende und ausgehende Kanten."""
Findet ein- und ausgehende Kanten für eine Liste von IDs.
Implementiert den Fix für Anker-Links [[Titel#Abschnitt]] durch Präfix-Suche in der target_id.
"""
results = [] results = []
if not note_ids:
return results
# 1. AUSGEHENDE KANTEN (Outgoing) # 1. OUTGOING EDGES (Der "Owner"-Fix)
# Suche über 'note_id' als Besitzer der Kante. # Wir suchen Kanten, die im Feld 'note_id' (Owner) eine unserer Notizen haben.
out_filter = models.Filter(must=[ # Das findet ALLE ausgehenden Kanten, egal ob sie an einem Chunk oder der Note hängen.
models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)), if note_ids:
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES})) out_filter = models.Filter(must=[
]) models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids)),
res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=2000, with_payload=True) models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
results.extend(res_out) ])
# Limit hoch, um alles zu finden
res_out, _ = self.client.scroll(self.edges_col, scroll_filter=out_filter, limit=500, with_payload=True)
results.extend(res_out)
# 2. EINGEHENDE KANTEN (Incoming) # 2. INCOMING EDGES (Ziel = Chunk ID oder Titel oder Note ID)
# Suche über target_id (Ziel der Kante). # Hier müssen wir Chunks auflösen, um Treffer auf Chunks zu finden.
# Sammele alle Chunk-IDs für exakte Treffer auf Segment-Ebene # Chunk IDs der aktuellen Notes holen
c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))]) chunk_ids = []
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=1000, with_payload=False) if note_ids:
chunk_ids = [c.id for c in chunks] c_filter = models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchAny(any=note_ids))])
chunks, _ = self.client.scroll(self.chunks_col, scroll_filter=c_filter, limit=300)
chunk_ids = [c.id for c in chunks]
should_conditions = [] shoulds = []
# Case A: Edge zeigt auf einen unserer Chunks
if chunk_ids: if chunk_ids:
should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids))) shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=chunk_ids)))
should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids)))
# TITEL-BASIERTE SUCHE (Inkl. Anker-Fix)
titles_to_check = []
if note_title:
titles_to_check.append(note_title)
# Aliase laden für robuste Verlinkung
for nid in note_ids:
note = self._fetch_note_cached(nid)
if note:
aliases = note.get("aliases", [])
if isinstance(aliases, str): aliases = [aliases]
titles_to_check.extend([a for a in aliases if a not in titles_to_check])
# Exakte Titel-Matches hinzufügen
for t in titles_to_check:
should_conditions.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=t)))
if should_conditions: # Case B: Edge zeigt direkt auf unsere Note ID
if note_ids:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchAny(any=note_ids)))
# Case C: Edge zeigt auf unseren Titel (Wikilinks)
if note_title:
shoulds.append(models.FieldCondition(key="target_id", match=models.MatchValue(value=note_title)))
if shoulds:
in_filter = models.Filter( in_filter = models.Filter(
must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))], must=[models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))],
should=should_conditions should=shoulds
) )
res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=2000, with_payload=True) res_in, _ = self.client.scroll(self.edges_col, scroll_filter=in_filter, limit=500, with_payload=True)
results.extend(res_in) results.extend(res_in)
# FIX FÜR [[Titel#Abschnitt]]: Suche nach Fragmenten
if titles_to_check:
for t in titles_to_check:
anchor_filter = models.Filter(must=[
models.FieldCondition(key="target_id", match=models.MatchText(text=t)),
models.FieldCondition(key="kind", match=models.MatchExcept(**{"except": SYSTEM_EDGES}))
])
res_anchor, _ = self.client.scroll(self.edges_col, scroll_filter=anchor_filter, limit=1000, with_payload=True)
existing_ids = {r.id for r in results}
for edge in res_anchor:
tgt = edge.payload.get("target_id", "")
# Client-seitige Filterung: Nur Kanten nehmen, die mit Titel# beginnen
if edge.id not in existing_ids and (tgt == t or tgt.startswith(f"{t}#")):
results.append(edge)
return results return results
def _find_connected_edges_batch(self, note_ids): def _find_connected_edges_batch(self, note_ids):
"""Wrapper für die Suche in tieferen Ebenen des Graphen.""" # Wrapper für Level 2 Suche
first_note = self._fetch_note_cached(note_ids[0]) if note_ids else None return self._find_connected_edges(note_ids)
title = first_note.get("title") if first_note else None
return self._find_connected_edges(note_ids, note_title=title)
def _process_edge(self, record, nodes_dict, unique_edges, current_depth): def _process_edge(self, record, nodes_dict, unique_edges, current_depth):
""" """Verarbeitet eine rohe Edge, löst IDs auf und fügt sie den Dictionaries hinzu."""
Verarbeitet eine rohe Kante, löst Quell- und Ziel-Referenzen auf
und fügt sie den Dictionaries für den Graphen hinzu.
"""
if not record or not record.payload:
return None, None
payload = record.payload payload = record.payload
src_ref = payload.get("source_id") src_ref = payload.get("source_id")
tgt_ref = payload.get("target_id") tgt_ref = payload.get("target_id")
kind = payload.get("kind") kind = payload.get("kind")
provenance = payload.get("provenance", "explicit") provenance = payload.get("provenance", "explicit")
if not src_ref or not tgt_ref: # IDs zu Notes auflösen
return None, None
# IDs zu Notes auflösen (Hier greift der Fragment-Fix)
src_note = self._resolve_note_from_ref(src_ref) src_note = self._resolve_note_from_ref(src_ref)
tgt_note = self._resolve_note_from_ref(tgt_ref) tgt_note = self._resolve_note_from_ref(tgt_ref)
if src_note and tgt_note: if src_note and tgt_note:
src_id = src_note.get('note_id') src_id = src_note['note_id']
tgt_id = tgt_note.get('note_id') tgt_id = tgt_note['note_id']
if src_id and tgt_id and src_id != tgt_id: if src_id != tgt_id:
# Knoten zum Set hinzufügen # Nodes hinzufügen
self._add_node_to_dict(nodes_dict, src_note, level=current_depth) self._add_node_to_dict(nodes_dict, src_note, level=current_depth)
self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth) self._add_node_to_dict(nodes_dict, tgt_note, level=current_depth)
# Kante registrieren (Deduplizierung) # Kante hinzufügen (mit Deduplizierung)
key = (src_id, tgt_id) key = (src_id, tgt_id)
existing = unique_edges.get(key) existing = unique_edges.get(key)
is_current_explicit = (provenance in ["explicit", "rule"])
should_update = True should_update = True
# Bevorzuge explizite Kanten vor Smart Kanten
is_current_explicit = (provenance in ["explicit", "rule"])
if existing: if existing:
is_existing_explicit = (existing.get('provenance', '') in ["explicit", "rule"]) is_existing_explicit = (existing['provenance'] in ["explicit", "rule"])
if is_existing_explicit and not is_current_explicit: if is_existing_explicit and not is_current_explicit:
should_update = False should_update = False
if should_update: if should_update:
unique_edges[key] = { unique_edges[key] = {"source": src_id, "target": tgt_id, "kind": kind, "provenance": provenance}
"source": src_id,
"target": tgt_id,
"kind": kind,
"provenance": provenance
}
return src_id, tgt_id return src_id, tgt_id
return None, None return None, None
def _fetch_note_cached(self, note_id): def _fetch_note_cached(self, note_id):
"""Lädt eine Note aus Qdrant mit Session-Caching.""" if note_id in self._note_cache: return self._note_cache[note_id]
if not note_id: res, _ = self.client.scroll(
return None collection_name=self.notes_col,
if note_id in self._note_cache: scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]),
return self._note_cache[note_id] limit=1, with_payload=True
)
try: if res:
res, _ = self.client.scroll( self._note_cache[note_id] = res[0].payload
collection_name=self.notes_col, return res[0].payload
scroll_filter=models.Filter(must=[
models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))
]),
limit=1, with_payload=True
)
if res and res[0].payload:
payload = res[0].payload
self._note_cache[note_id] = payload
return payload
except Exception:
pass
return None return None
def _resolve_note_from_ref(self, ref_str): def _resolve_note_from_ref(self, ref_str):
""" """Löst eine ID (Chunk, Note oder Titel) zu einer Note Payload auf."""
Löst eine Referenz (ID, Chunk-ID oder Wikilink mit Anker) auf eine Note auf. if not ref_str: return None
Bereinigt Anker (#) vor der Suche.
"""
if not ref_str:
return None
if ref_str in self._ref_resolution_cache: # Fall A: Chunk ID (enthält #)
return self._ref_resolution_cache[ref_str]
# Fragment-Behandlung: Trenne Anker ab
base_ref = ref_str.split("#")[0].strip()
# 1. Versuch: Direkte Note-ID Suche
note = self._fetch_note_cached(base_ref)
if note:
self._ref_resolution_cache[ref_str] = note
return note
# 2. Versuch: Titel-Suche (Keyword-Match)
try:
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[
models.FieldCondition(key="title", match=models.MatchValue(value=base_ref))
]),
limit=1, with_payload=True
)
if res and res[0].payload:
payload = res[0].payload
self._ref_resolution_cache[ref_str] = payload
return payload
except Exception:
pass
# 3. Versuch: Auflösung über Chunks
if "#" in ref_str: if "#" in ref_str:
try: try:
res_chunk = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True) # Versuch 1: Chunk ID direkt
if res_chunk and res_chunk[0].payload: res = self.client.retrieve(self.chunks_col, ids=[ref_str], with_payload=True)
note_id = res_chunk[0].payload.get("note_id") if res: return self._fetch_note_cached(res[0].payload.get("note_id"))
note = self._fetch_note_cached(note_id) except: pass
if note:
self._ref_resolution_cache[ref_str] = note # Versuch 2: NoteID#Section (Hash abtrennen)
return note possible_note_id = ref_str.split("#")[0]
except Exception: if self._fetch_note_cached(possible_note_id): return self._fetch_note_cached(possible_note_id)
pass
# Fall B: Note ID direkt
if self._fetch_note_cached(ref_str): return self._fetch_note_cached(ref_str)
# Fall C: Titel
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=ref_str))]),
limit=1, with_payload=True
)
if res:
self._note_cache[res[0].payload['note_id']] = res[0].payload
return res[0].payload
return None return None
def _add_node_to_dict(self, node_dict, note_payload, level=1): def _add_node_to_dict(self, node_dict, note_payload, level=1):
"""Erstellt ein Node-Objekt für streamlit-agraph mit Styling."""
nid = note_payload.get("note_id") nid = note_payload.get("note_id")
if not nid or nid in node_dict: if nid in node_dict: return
return
ntype = note_payload.get("type", "default") ntype = note_payload.get("type", "default")
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS.get("default", "#8395a7")) color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"])
# Basis-Tooltip (wird später erweitert)
tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}" tooltip = f"Titel: {note_payload.get('title')}\nTyp: {ntype}"
size = 45 if level == 0 else (25 if level == 1 else 15) if level == 0: size = 45
elif level == 1: size = 25
else: size = 15
node_dict[nid] = Node( node_dict[nid] = Node(
id=nid, id=nid,
label=note_payload.get('title', nid), label=note_payload.get('title', nid),