mindnet/tests/debug_edge_search.py
Lars 53058d1504
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
bug fix
2025-12-28 11:49:41 +01:00

172 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Diagnose-Script: Prüft welche Kanten für eine Note gefunden werden.
Hilft beim Debugging der Graph-Anzeige.
"""
import sys
import os
from pathlib import Path
# Projekt-Root zum sys.path hinzufügen
PROJECT_ROOT = Path(__file__).parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from app.core.database.qdrant import QdrantConfig, get_client, collection_names
from qdrant_client.http import models as rest
def find_edges_for_note(note_id: str, prefix: str = "mindnet"):
"""Findet alle Kanten für eine Note und zeigt sie an."""
cfg = QdrantConfig.from_env()
if prefix:
cfg.prefix = prefix
client = get_client(cfg)
_, _, edges_col = collection_names(cfg.prefix)
notes_col = f"{cfg.prefix}_notes"
# 1. Lade Note, um Titel zu bekommen
note_res, _ = client.scroll(
collection_name=notes_col,
scroll_filter=rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]),
limit=1, with_payload=True
)
if not note_res:
print(f"❌ Note '{note_id}' nicht gefunden!")
return
note = note_res[0].payload
note_title = note.get("title", "")
print(f"📄 Note: {note_id}")
print(f" Titel: {note_title}\n")
# 2. Ausgehende Kanten (note_id = unsere Note)
print("🔍 AUSGEHENDE KANTEN (note_id = unsere Note):")
out_filter = rest.Filter(must=[
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))
])
out_edges, _ = client.scroll(edges_col, scroll_filter=out_filter, limit=2000, with_payload=True)
print(f" Gefunden: {len(out_edges)} Kanten")
for i, e in enumerate(out_edges[:10], 1): # Zeige erste 10
pl = e.payload
print(f" {i}. {pl.get('kind')}: {pl.get('source_id')} -> {pl.get('target_id')}")
if len(out_edges) > 10:
print(f" ... und {len(out_edges) - 10} weitere\n")
else:
print()
# 3. Eingehende Kanten - exakte Titel-Suche
print("🔍 EINGEHENDE KANTEN (target_id = exakter Titel):")
if note_title:
in_filter_exact = rest.Filter(
must=[rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))],
should=[rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_title))]
)
in_edges_exact, _ = client.scroll(edges_col, scroll_filter=in_filter_exact, limit=2000, with_payload=True)
print(f" Gefunden: {len(in_edges_exact)} Kanten (exakter Titel)")
for i, e in enumerate(in_edges_exact[:10], 1):
pl = e.payload
print(f" {i}. {pl.get('kind')}: {pl.get('source_id')} -> {pl.get('target_id')}")
if len(in_edges_exact) > 10:
print(f" ... und {len(in_edges_exact) - 10} weitere\n")
else:
print()
# 4. Eingehende Kanten - Titel#Abschnitt Varianten mit Fuzzy-Matching
print("🔍 EINGEHENDE KANTEN (target_id beginnt mit 'Titel#' + Fuzzy-Matching):")
if note_title:
import re
# Lade alle Kanten und filtere clientseitig (wie Case D in ui_graph_service.py)
all_filter = rest.Filter(
must=[rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))]
)
all_edges, _ = client.scroll(edges_col, scroll_filter=all_filter, limit=10000, with_payload=True)
print(f" Gesamt geladen: {len(all_edges)} Kanten aus der Datenbank")
# Normalisierungs-Funktion (wie in ui_graph_service.py)
def normalize_title(t):
if not t:
return ""
t = re.sub(r'\s*\([^)]*\)', '', t)
t = re.sub(r'\s*\d{4}[\s\-]*\d{0,4}', '', t)
t = re.sub(r'^(Mein|Meine)\s+', '', t, flags=re.IGNORECASE)
t = re.sub(r'\s+', ' ', t).strip()
return t.lower()
note_title_norm = normalize_title(note_title)
print(f" Normalisierter Note-Titel: '{note_title_norm}'")
# Clientseitige Filterung: Exakte Matches
matched_exact = []
for e in all_edges:
tgt_id = e.payload.get("target_id", "")
if tgt_id and (tgt_id == note_title or tgt_id.startswith(note_title + "#")):
matched_exact.append(e)
# Clientseitige Filterung: Fuzzy-Matches
matched_fuzzy = []
for e in all_edges:
tgt_id = e.payload.get("target_id", "")
if not tgt_id or e in matched_exact:
continue
tgt_base = tgt_id.split("#")[0].strip()
tgt_norm = normalize_title(tgt_base)
if tgt_norm and note_title_norm and len(note_title_norm) > 5:
if (tgt_norm == note_title_norm or
tgt_norm.startswith(note_title_norm) or
note_title_norm.startswith(tgt_norm)):
matched_fuzzy.append((e, tgt_norm))
matched = matched_exact + [e for e, _ in matched_fuzzy]
print(f" Gefunden: {len(matched_exact)} exakte Matches, {len(matched_fuzzy)} Fuzzy-Matches")
print(f" Gesamt: {len(matched)} Kanten")
for i, e in enumerate(matched[:10], 1):
pl = e.payload
match_type = "EXAKT" if e in matched_exact else "FUZZY"
print(f" {i}. [{match_type}] {pl.get('kind')}: {pl.get('source_id')} -> {pl.get('target_id')}")
if len(matched) > 10:
print(f" ... und {len(matched) - 10} weitere\n")
else:
print()
# Zeige auch einige Beispiele von target_ids, die NICHT matchen
print(" 🔍 DEBUG: Beispiel target_ids die NICHT matchen (erste 10):")
non_matched = []
for e in all_edges[:200]: # Erste 200 prüfen
tgt_id = e.payload.get("target_id", "")
if not tgt_id or e in matched:
continue
tgt_base = tgt_id.split("#")[0].strip()
tgt_norm = normalize_title(tgt_base)
non_matched.append((tgt_id, tgt_norm))
if len(non_matched) >= 10:
break
for i, (tgt, tgt_norm) in enumerate(non_matched, 1):
print(f" {i}. '{tgt}' (normalisiert: '{tgt_norm}')")
print()
# 5. Zeige Beispiel target_ids aus der Datenbank
print("📊 BEISPIEL target_ids aus der Datenbank (erste 20 mit #):")
sample_filter = rest.Filter(
must=[rest.FieldCondition(key="kind", match=rest.MatchExcept(**{"except": ["prev", "next", "belongs_to"]}))]
)
sample_edges, _ = client.scroll(edges_col, scroll_filter=sample_filter, limit=100, with_payload=True)
hash_edges = [e for e in sample_edges if "#" in str(e.payload.get("target_id", ""))]
for i, e in enumerate(hash_edges[:20], 1):
pl = e.payload
print(f" {i}. target_id: '{pl.get('target_id')}' (kind: {pl.get('kind')})")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--note-id", required=True, help="Note-ID zum Testen")
parser.add_argument("--prefix", default="mindnet", help="Collection-Prefix")
args = parser.parse_args()
find_edges_for_note(args.note_id, args.prefix)