mindnet/tests/debug_edge_search.py
Lars 459193e7b1
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
test ui
2025-12-28 11:38:51 +01:00

124 lines
5.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Diagnose-Script: Prüft welche Kanten für eine Note gefunden werden.
Hilft beim Debugging der Graph-Anzeige.
"""
import sys
import os
from pathlib import Path
# Projekt-Root zum sys.path hinzufügen
PROJECT_ROOT = Path(__file__).parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from app.core.database.qdrant import QdrantConfig, get_client, collection_names
from qdrant_client.http import models as rest
def find_edges_for_note(note_id: str, prefix: str = "mindnet"):
"""Findet alle Kanten für eine Note und zeigt sie an."""
cfg = QdrantConfig.from_env()
if prefix:
cfg.prefix = prefix
client = get_client(cfg)
_, _, edges_col = collection_names(cfg.prefix)
notes_col = f"{cfg.prefix}_notes"
# 1. Lade Note, um Titel zu bekommen
note_res, _ = client.scroll(
collection_name=notes_col,
scroll_filter=rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]),
limit=1, with_payload=True
)
if not note_res:
print(f"❌ Note '{note_id}' nicht gefunden!")
return
note = note_res[0].payload
note_title = note.get("title", "")
print(f"📄 Note: {note_id}")
print(f" Titel: {note_title}\n")
# 2. Ausgehende Kanten (note_id = unsere Note)
print("🔍 AUSGEHENDE KANTEN (note_id = unsere Note):")
out_filter = rest.Filter(must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))
])
out_edges, _ = client.scroll(edges_col, scroll_filter=out_filter, limit=2000, with_payload=True)
print(f" Gefunden: {len(out_edges)} Kanten")
for i, e in enumerate(out_edges[:10], 1): # Zeige erste 10
pl = e.payload
print(f" {i}. {pl.get('kind')}: {pl.get('source_id')} -> {pl.get('target_id')}")
if len(out_edges) > 10:
print(f" ... und {len(out_edges) - 10} weitere\n")
else:
print()
# 3. Eingehende Kanten - exakte Titel-Suche
print("🔍 EINGEHENDE KANTEN (target_id = exakter Titel):")
if note_title:
in_filter_exact = rest.Filter(
must=[rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))],
should=[rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_title))]
)
in_edges_exact, _ = client.scroll(edges_col, scroll_filter=in_filter_exact, limit=2000, with_payload=True)
print(f" Gefunden: {len(in_edges_exact)} Kanten (exakter Titel)")
for i, e in enumerate(in_edges_exact[:10], 1):
pl = e.payload
print(f" {i}. {pl.get('kind')}: {pl.get('source_id')} -> {pl.get('target_id')}")
if len(in_edges_exact) > 10:
print(f" ... und {len(in_edges_exact) - 10} weitere\n")
else:
print()
# 4. Eingehende Kanten - Titel#Abschnitt Varianten
print("🔍 EINGEHENDE KANTEN (target_id beginnt mit 'Titel#'):")
if note_title:
# Lade alle Kanten und filtere clientseitig
all_filter = rest.Filter(
must=[rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))]
)
all_edges, _ = client.scroll(edges_col, scroll_filter=all_filter, limit=5000, with_payload=True)
# Clientseitige Filterung
matched = []
for e in all_edges:
tgt_id = e.payload.get("target_id", "")
if tgt_id and (tgt_id == note_title or tgt_id.startswith(note_title + "#")):
matched.append(e)
print(f" Gefunden: {len(matched)} Kanten (mit Titel#Abschnitt Varianten)")
for i, e in enumerate(matched[:10], 1):
pl = e.payload
print(f" {i}. {pl.get('kind')}: {pl.get('source_id')} -> {pl.get('target_id')}")
if len(matched) > 10:
print(f" ... und {len(matched) - 10} weitere\n")
else:
print()
# 5. Zeige Beispiel target_ids aus der Datenbank
print("📊 BEISPIEL target_ids aus der Datenbank (erste 20 mit #):")
sample_filter = rest.Filter(
must=[rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))]
)
sample_edges, _ = client.scroll(edges_col, scroll_filter=sample_filter, limit=100, with_payload=True)
hash_edges = [e for e in sample_edges if "#" in str(e.payload.get("target_id", ""))]
for i, e in enumerate(hash_edges[:20], 1):
pl = e.payload
print(f" {i}. target_id: '{pl.get('target_id')}' (kind: {pl.get('kind')})")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--note-id", required=True, help="Note-ID zum Testen")
parser.add_argument("--prefix", default="mindnet", help="Collection-Prefix")
args = parser.parse_args()
find_edges_for_note(args.note_id, args.prefix)