bidirektionale Kanten

This commit is contained in:
Lars 2025-12-14 08:07:53 +01:00
parent 028ad7e941
commit dc16bbf8a4

View File

@ -76,22 +76,30 @@ if "messages" not in st.session_state: st.session_state.messages = []
if "user_id" not in st.session_state: st.session_state.user_id = str(uuid.uuid4()) if "user_id" not in st.session_state: st.session_state.user_id = str(uuid.uuid4())
# --- GRAPH STYLING CONFIG (WP-19) --- # --- GRAPH STYLING CONFIG (WP-19) ---
# Colors based on types.yaml and standard conventions
GRAPH_COLORS = { GRAPH_COLORS = {
"project": "#ff9f43", # Orange "project": "#ff9f43", # Orange
"concept": "#54a0ff", # Blau "concept": "#54a0ff", # Blue
"decision": "#5f27cd", # Lila "decision": "#5f27cd", # Purple
"risk": "#ff6b6b", # Rot "risk": "#ff6b6b", # Red
"person": "#1dd1a1", # Grün "person": "#1dd1a1", # Green
"experience": "#feca57",# Gelb "experience": "#feca57",# Yellow
"default": "#8395a7" # Grau "value": "#00d2d3", # Cyan
"goal": "#ff9ff3", # Pink
"default": "#8395a7" # Grey
} }
# Colors based on edge 'kind'
EDGE_COLORS = { EDGE_COLORS = {
"depends_on": "#ff6b6b", # Rot (Blocker) "depends_on": "#ff6b6b", # Red (Blocker)
"blocks": "#ee5253", # Dunkelrot "blocks": "#ee5253", # Dark Red
"related_to": "#c8d6e5", # Hellgrau "caused_by": "#ff9ff3", # Pink
"next": "#54a0ff", # Blau "related_to": "#c8d6e5", # Light Grey
"derived_from": "#ff9ff3"# Pink "similar_to": "#c8d6e5", # Light Grey
"next": "#54a0ff", # Blue
"derived_from": "#ff9ff3",# Pink
"references": "#bdc3c7", # Grey
"belongs_to": "#2e86de" # Dark Blue
} }
# --- HELPER FUNCTIONS --- # --- HELPER FUNCTIONS ---
@ -228,7 +236,7 @@ def load_history_from_logs(limit=10):
except: pass except: pass
return queries return queries
# --- WP-19 GRAPH SERVICE --- # --- WP-19 GRAPH SERVICE (Advanced) ---
class GraphExplorerService: class GraphExplorerService:
def __init__(self, url, api_key=None, prefix="mindnet"): def __init__(self, url, api_key=None, prefix="mindnet"):
@ -237,106 +245,199 @@ class GraphExplorerService:
self.notes_col = f"{prefix}_notes" self.notes_col = f"{prefix}_notes"
self.chunks_col = f"{prefix}_chunks" self.chunks_col = f"{prefix}_chunks"
self.edges_col = f"{prefix}_edges" self.edges_col = f"{prefix}_edges"
self._note_cache = {} # Simple in-memory cache for the session
def get_ego_graph(self, center_note_id: str): def get_ego_graph(self, center_note_id: str):
"""Erzeugt einen Ego-Graphen (Node + Nachbarn) für die Visualisierung.""" """
nodes = {} # id -> Node Object Bidirektionaler Ego-Graph:
edges_list = [] # List of Edge Objects 1. Lädt Center Node.
2. Findet OUTGOING Edges (Source = Chunk von Center).
3. Findet INCOMING Edges (Target = Chunk von Center ODER Target = Titel von Center).
4. Dedupliziert auf Notiz-Ebene.
"""
nodes_dict = {} # note_id -> Node Object
unique_edges = {} # (source_note_id, target_note_id) -> Edge Data
# 1. Zentrale Note laden # 1. Zentrale Note laden
center_note = self._fetch_note(center_note_id) center_note = self._fetch_note_cached(center_note_id)
if not center_note: return [], [] if not center_note: return [], []
self._add_node(nodes, center_note, is_center=True) self._add_node_to_dict(nodes_dict, center_note, is_center=True)
# 2. Chunks der Note finden (Source Chunks) center_title = center_note.get("title")
# 2. Chunks der Note finden (für Edge-Suche)
scroll_filter = models.Filter( scroll_filter = models.Filter(
must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=center_note_id))] must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=center_note_id))]
) )
chunks, _ = self.client.scroll( chunks, _ = self.client.scroll(
collection_name=self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True collection_name=self.chunks_col, scroll_filter=scroll_filter, limit=100, with_payload=True
) )
chunk_ids = [c.id for c in chunks] center_chunk_ids = [c.id for c in chunks]
# 3. Kanten finden raw_edges = []
if chunk_ids:
edge_filter = models.Filter( # 3. OUTGOING EDGES Suche
must=[models.FieldCondition(key="source_id", match=models.MatchAny(any=chunk_ids))] if center_chunk_ids:
out_filter = models.Filter(
must=[models.FieldCondition(key="source_id", match=models.MatchAny(any=center_chunk_ids))]
) )
raw_edges, _ = self.client.scroll( res_out, _ = self.client.scroll(
collection_name=self.edges_col, scroll_filter=edge_filter, limit=100, with_payload=True collection_name=self.edges_col, scroll_filter=out_filter, limit=100, with_payload=True
) )
raw_edges.extend(res_out)
# 4. Targets auflösen # 4. INCOMING EDGES Suche
for re in raw_edges: # Case A: Target ist einer unserer Chunks
payload = re.payload if center_chunk_ids:
target_chunk_id = payload.get("target_id") in_chunk_filter = models.Filter(
kind = payload.get("kind") must=[models.FieldCondition(key="target_id", match=models.MatchAny(any=center_chunk_ids))]
provenance = payload.get("provenance", "explicit") )
res_in_c, _ = self.client.scroll(
collection_name=self.edges_col, scroll_filter=in_chunk_filter, limit=100, with_payload=True
)
raw_edges.extend(res_in_c)
target_note = self._resolve_note_from_chunk(target_chunk_id) # Case B: Target ist unser Titel (Wikilinks)
if center_title:
in_title_filter = models.Filter(
must=[models.FieldCondition(key="target_id", match=models.MatchValue(value=center_title))]
)
res_in_t, _ = self.client.scroll(
collection_name=self.edges_col, scroll_filter=in_title_filter, limit=50, with_payload=True
)
raw_edges.extend(res_in_t)
if target_note and target_note['note_id'] != center_note_id: # 5. Kanten verarbeiten und auflösen
self._add_node(nodes, target_note) for record in raw_edges:
payload = record.payload
# Styling src_ref = payload.get("source_id")
color = EDGE_COLORS.get(kind, "#bdc3c7") tgt_ref = payload.get("target_id")
is_smart = provenance != "explicit" and provenance != "rule" kind = payload.get("kind", "related_to")
provenance = payload.get("provenance", "explicit")
label = f"{kind}" # Resolve Source Note
if is_smart: label += " 🤖" src_note = self._resolve_note_from_ref(src_ref)
# Resolve Target Note
tgt_note = self._resolve_note_from_ref(tgt_ref)
edges_list.append(Edge( if src_note and tgt_note:
source=center_note_id, src_id = src_note['note_id']
target=target_note['note_id'], tgt_id = tgt_note['note_id']
label=label,
color=color,
dashes=is_smart,
title=f"Provenance: {provenance}"
))
return list(nodes.values()), edges_list # Keine Self-Loops und valide Verbindung
if src_id != tgt_id:
# Nodes hinzufügen (falls noch nicht da)
self._add_node_to_dict(nodes_dict, src_note)
self._add_node_to_dict(nodes_dict, tgt_note)
# Deduplizierung: Wir behalten die "stärkste" Kante
# Wenn bereits eine explizite Kante existiert, überschreiben wir sie nicht mit einer AI-Kante
key = (src_id, tgt_id)
existing = unique_edges.get(key)
is_current_explicit = (provenance == "explicit" or provenance == "rule")
should_update = True
if existing:
is_existing_explicit = (existing['provenance'] == "explicit" or existing['provenance'] == "rule")
if is_existing_explicit and not is_current_explicit:
should_update = False
if should_update:
unique_edges[key] = {
"source": src_id,
"target": tgt_id,
"kind": kind,
"provenance": provenance
}
# 6. Agraph Edge Objekte erstellen
final_edges = []
for (src, tgt), data in unique_edges.items():
kind = data['kind']
prov = data['provenance']
color = EDGE_COLORS.get(kind, "#bdc3c7")
is_smart = (prov != "explicit" and prov != "rule")
label = f"{kind}"
# AI Edges gestrichelt
dashes = is_smart
final_edges.append(Edge(
source=src,
target=tgt,
label=label,
color=color,
dashes=dashes,
title=f"Provenance: {prov}, Type: {kind}"
))
return list(nodes_dict.values()), final_edges
def _fetch_note_cached(self, note_id):
if note_id in self._note_cache: return self._note_cache[note_id]
def _fetch_note(self, note_id):
res, _ = self.client.scroll( res, _ = self.client.scroll(
collection_name=self.notes_col, collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]), scroll_filter=models.Filter(must=[models.FieldCondition(key="note_id", match=models.MatchValue(value=note_id))]),
limit=1, with_payload=True limit=1, with_payload=True
) )
return res[0].payload if res else None if res:
self._note_cache[note_id] = res[0].payload
def _resolve_note_from_chunk(self, chunk_id_or_title): return res[0].payload
if "#" in chunk_id_or_title:
res = self.client.retrieve(collection_name=self.chunks_col, ids=[chunk_id_or_title], with_payload=True)
if res:
parent_id = res[0].payload.get("note_id")
return self._fetch_note(parent_id)
else:
# Versuch: Direkter Match auf Note Titel (für WikiLinks)
# In Production sollte das optimiert werden (Cache)
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=chunk_id_or_title))]),
limit=1, with_payload=True
)
return res[0].payload if res else None
return None return None
def _add_node(self, node_dict, note_payload, is_center=False): def _resolve_note_from_ref(self, ref_str):
"""Löst eine ID (Chunk) oder einen String (Titel) zu einer Note Payload auf."""
if not ref_str: return None
# Fall 1: Chunk ID (enthält '#')
if "#" in ref_str:
# Wir könnten den Chunk holen, aber effizienter ist es, die note_id aus dem Chunk-String zu parsen,
# WENN das Format strikt 'note_id#cXX' ist. Um sicher zu gehen, fragen wir Qdrant.
try:
res = self.client.retrieve(collection_name=self.chunks_col, ids=[ref_str], with_payload=True)
if res:
parent_id = res[0].payload.get("note_id")
return self._fetch_note_cached(parent_id)
except: pass # Falls ID nicht existiert
# Fall 2: Vermutlich ein Titel (Wikilink) oder Note ID
# Versuch als Note ID
note_by_id = self._fetch_note_cached(ref_str)
if note_by_id: return note_by_id
# Versuch als Titel
res, _ = self.client.scroll(
collection_name=self.notes_col,
scroll_filter=models.Filter(must=[models.FieldCondition(key="title", match=models.MatchValue(value=ref_str))]),
limit=1, with_payload=True
)
if res:
payload = res[0].payload
self._note_cache[payload['note_id']] = payload
return payload
return None
def _add_node_to_dict(self, node_dict, note_payload, is_center=False):
nid = note_payload.get("note_id") nid = note_payload.get("note_id")
if nid in node_dict: return if nid in node_dict: return
ntype = note_payload.get("type", "default") ntype = note_payload.get("type", "default")
color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"]) color = GRAPH_COLORS.get(ntype, GRAPH_COLORS["default"])
size = 35 if is_center else 20 size = 40 if is_center else 20
node_dict[nid] = Node( node_dict[nid] = Node(
id=nid, id=nid,
label=note_payload.get("title", nid), label=note_payload.get("title", nid),
size=size, size=size,
color=color, color=color,
shape="dot", shape="dot" if not is_center else "diamond",
title=f"Type: {ntype}\nTags: {note_payload.get('tags')}", title=f"Type: {ntype}\nTags: {note_payload.get('tags')}",
font={'color': 'black'} font={'color': 'black', 'face': 'arial'}
) )
# Init Graph Service # Init Graph Service
@ -626,6 +727,7 @@ def render_graph_explorer():
selected_note_id = None selected_note_id = None
if search_term: if search_term:
# Suche nach Titel für Autocomplete
hits, _ = graph_service.client.scroll( hits, _ = graph_service.client.scroll(
collection_name=f"{COLLECTION_PREFIX}_notes", collection_name=f"{COLLECTION_PREFIX}_notes",
scroll_filter=models.Filter( scroll_filter=models.Filter(
@ -645,7 +747,8 @@ def render_graph_explorer():
st.markdown(f"🔴 **Blocker** (Risk/Block)") st.markdown(f"🔴 **Blocker** (Risk/Block)")
st.markdown(f"🔵 **Konzept/Struktur**") st.markdown(f"🔵 **Konzept/Struktur**")
st.markdown(f"🟣 **Entscheidung**") st.markdown(f"🟣 **Entscheidung**")
st.markdown(f"🤖 _Gestrichelt = Smart Edge (KI)_") st.markdown(f"--- **Solid**: Explicit Link")
st.markdown(f"- - **Dashed**: Smart/AI Link")
with col_graph: with col_graph:
if selected_note_id: if selected_note_id:
@ -656,8 +759,8 @@ def render_graph_explorer():
st.error("Knoten konnte nicht geladen werden.") st.error("Knoten konnte nicht geladen werden.")
else: else:
config = Config( config = Config(
width=800, width=900,
height=600, height=700,
directed=True, directed=True,
physics=True, physics=True,
hierarchical=False, hierarchical=False,
@ -665,9 +768,12 @@ def render_graph_explorer():
highlightColor="#F7A7A6", highlightColor="#F7A7A6",
collapsible=False collapsible=False
) )
# Rendering the Graph
st.caption(f"Graph zeigt {len(nodes)} Knoten und {len(edges)} Kanten.")
return_value = agraph(nodes=nodes, edges=edges, config=config) return_value = agraph(nodes=nodes, edges=edges, config=config)
if return_value: if return_value:
st.info(f"Node geklickt: {return_value}") st.info(f"Auswahl: {return_value}")
else: else:
st.info("👈 Bitte wähle links eine Notiz aus, um den Graphen zu starten.") st.info("👈 Bitte wähle links eine Notiz aus, um den Graphen zu starten.")