tests/test_edges_smoke.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-17 16:04:57 +01:00
parent 3e08c8347e
commit 3476fe5fae

View File

@ -1,140 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scripts/test_edges_smoke.py Progressive Ausgabe
Prüft edge-Integrität je Note mit Live-Ausgabe:
- belongs_to == #Chunks
- next == prev == max(#Chunks-1,0)
- Duplikat-Edges (Key: (kind,source_id,target_id,scope)) == 0
- Zählt references (chunk/note), backlink
Optionen:
--max-notes N : prüft nur die ersten N Notizen
--limit L : Scroll-Limit pro Anfrage (Default 256)
--flush : jede Zeile sofort flushen
"""
from __future__ import annotations
import os, sys, json, argparse
from typing import Dict, Any, List, Tuple, Set
import json
import os
from collections import Counter, defaultdict
from qdrant_client.http import models as rest
from app.core.qdrant import QdrantConfig, get_client
def collections(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def scroll_iter(client, collection: str, filt: rest.Filter | None, with_payload: bool, limit: int):
def _rel(pl: dict) -> str:
return pl.get("relation") or pl.get("kind") or "edge"
def _scroll(client, col):
pts = []
next_page = None
while True:
pts, next_page = client.scroll(
collection_name=collection,
scroll_filter=filt,
with_payload=with_payload,
res, next_page = client.scroll(
collection_name=col,
with_payload=True,
with_vectors=False,
limit=limit,
limit=1024,
offset=next_page,
)
if not pts:
break
for p in pts:
yield p
pts.extend(res)
if next_page is None:
break
return pts
def list_notes(client, prefix: str, limit: int, max_notes: int | None) -> List[Dict[str, Any]]:
notes_col, _, _ = collections(prefix)
out: List[Dict[str, Any]] = []
for p in scroll_iter(client, notes_col, None, True, limit):
pl = p.payload or {}
nid = pl.get("note_id") or pl.get("id")
if nid:
out.append({"note_id": nid, "title": pl.get("title"), "type": pl.get("type")})
if max_notes is not None and len(out) >= max_notes:
break
return out
def count_chunks_for_note(client, prefix: str, note_id: str, limit: int) -> int:
_, chunks_col, _ = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
return sum(1 for _ in scroll_iter(client, chunks_col, f, False, limit))
def fetch_edges_for_note(client, prefix: str, note_id: str, limit: int) -> List[Dict[str, Any]]:
_, _, edges_col = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
return [p.payload or {} for p in scroll_iter(client, edges_col, f, True, limit)]
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--max-notes", type=int)
ap.add_argument("--limit", type=int, default=256)
ap.add_argument("--flush", action="store_true")
args = ap.parse_args()
cfg = QdrantConfig.from_env()
client = get_client(cfg)
prefix = os.environ.get("COLLECTION_PREFIX", cfg.prefix)
notes = list_notes(client, cfg.prefix, args.limit, args.max_notes)
total = {"notes": 0, "chunks": 0, "belongs_to": 0, "next": 0, "prev": 0, "refs_chunk": 0, "refs_note": 0, "backlink": 0, "dup_edges": 0}
cols = {
"notes": f"{prefix}_notes",
"chunks": f"{prefix}_chunks",
"edges": f"{prefix}_edges",
}
for n in notes:
nid = n["note_id"]
total["notes"] += 1
chunk_count = count_chunks_for_note(client, cfg.prefix, nid, args.limit)
total["chunks"] += chunk_count
# Index: notes -> title/type
notes_meta = {}
for p in _scroll(client, cols["notes"]):
pl = p.payload or {}
nid = pl.get("note_id")
if nid:
notes_meta[nid] = {
"title": pl.get("title", ""),
"type": pl.get("type", ""),
}
edges = fetch_edges_for_note(client, cfg.prefix, nid, args.limit)
by_kind = {}
keys: Set[tuple] = set()
dup_count = 0
refs_chunk = 0
refs_note = 0
backlink = 0
# chunks je note
chunks_by_note = defaultdict(int)
for p in _scroll(client, cols["chunks"]):
pl = p.payload or {}
nid = pl.get("note_id")
if nid:
chunks_by_note[nid] += 1
for e in edges:
k = e.get("kind")
by_kind[k] = by_kind.get(k, 0) + 1
t = (e.get("kind"), e.get("source_id"), e.get("target_id"), e.get("scope"))
if t in keys:
dup_count += 1
else:
keys.add(t)
if k == "references" and e.get("scope") == "chunk":
refs_chunk += 1
if k == "references" and e.get("scope") == "note":
refs_note += 1
if k == "backlink":
backlink += 1
# edges je note
edges_by_note = defaultdict(list)
edges_all = _scroll(client, cols["edges"])
for p in edges_all:
pl = p.payload or {}
nid = pl.get("note_id")
if nid:
edges_by_note[nid].append(pl)
total["belongs_to"] += by_kind.get("belongs_to", 0)
total["next"] += by_kind.get("next", 0)
total["prev"] += by_kind.get("prev", 0)
total["refs_chunk"] += refs_chunk
total["refs_note"] += refs_note
total["backlink"] += backlink
total["dup_edges"] += dup_count
# pro note ausgeben
summary_edges = Counter()
total_chunks = 0
for nid in sorted(notes_meta.keys()):
meta = notes_meta[nid]
chunks = chunks_by_note.get(nid, 0)
total_chunks += chunks
ok_bt = (by_kind.get("belongs_to", 0) == chunk_count)
ok_seq = (by_kind.get("next", 0) == max(chunk_count - 1, 0) and by_kind.get("prev", 0) == max(chunk_count - 1, 0))
ok_dup = (dup_count == 0)
kinds = Counter(_rel(pl) for pl in edges_by_note[nid])
summary_edges.update(kinds)
line = {
row = {
"note_id": nid,
"title": n.get("title"),
"type": n.get("type"),
"chunks": chunk_count,
"edges_by_kind": by_kind,
"title": meta["title"],
"type": meta["type"],
"chunks": chunks,
"edges_by_kind": dict(kinds),
"checks": {
"belongs_to_equals_chunks": ok_bt,
"next_prev_match": ok_seq,
"no_duplicate_edges": ok_dup,
"belongs_to_equals_chunks": (kinds.get("belongs_to", 0) == chunks),
"next_prev_match": (kinds.get("next", 0) == kinds.get("prev", 0) == max(0, chunks - 1)),
"no_duplicate_edges": _no_dupes(edges_by_note[nid]),
},
}
}
print(json.dumps(line, ensure_ascii=False))
if args.flush:
sys.stdout.flush()
print(json.dumps(row, ensure_ascii=False))
# Gesamtsummary
total_notes = len(notes_meta)
out = {
"prefix": prefix,
"summary": {
"notes": total_notes,
"chunks": total_chunks,
"belongs_to": summary_edges.get("belongs_to", 0),
"next": summary_edges.get("next", 0),
"prev": summary_edges.get("prev", 0),
"refs_chunk": summary_edges.get("references", 0),
"refs_note": summary_edges.get("references_note", 0),
"backlink": summary_edges.get("backlink", 0),
"dup_edges": 0, # per-Note geprüft
},
}
print(json.dumps(out, ensure_ascii=False))
def _no_dupes(pls):
seen = set()
for pl in pls:
key = (
str(pl.get("source_id") or ""),
str(pl.get("target_id") or ""),
str(pl.get("relation") or pl.get("kind") or ""),
str(pl.get("rule_id") or ""),
)
if key in seen:
return False
seen.add(key)
return True
print(json.dumps({"prefix": cfg.prefix, "summary": total}, ensure_ascii=False))
if __name__ == "__main__":
main()