tests/test_edges_all.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-17 16:37:23 +01:00
parent 46b26c9624
commit 859c17b49d

View File

@ -1,86 +1,116 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tests/test_edges_all.py
Ein knapper Integrationscheck:
- Es existieren Notes/Chunks/Edges
- Inline-Edges (rule_id startswith "inline:") werden erkannt
- Callout-Edges (rule_id == "callout:edge") werden erkannt
- Defaults (rule_id startswith "edge_defaults:") werden erkannt
- Strukturkanten stimmen (belongs_to == chunks; next == prev == chunks-1)
"""
from __future__ import annotations
import sys, json
from collections import Counter
import json
from collections import Counter, defaultdict
from typing import Dict, Any, List, Tuple
from app.core.qdrant import QdrantConfig, get_client
def fail(msg, payload=None):
print(json.dumps({"ok": False, "error": msg, "details": payload}, ensure_ascii=False, indent=2))
sys.exit(1)
def fetch_all(client, col):
points = []
next_offset = None
def _scroll_all(client, collection: str):
pts_all = []
offset = None
while True:
res = client.scroll(collection_name=col, with_payload=True, with_vectors=False, limit=2048, offset=next_offset)
batch = res[0]
next_offset = res[1]
points.extend(batch)
if not next_offset:
pts, offset = client.scroll(
collection_name=collection,
with_payload=True,
with_vectors=False,
limit=2048,
offset=offset,
)
pts_all.extend(pts or [])
if offset is None:
break
return points
return pts_all
def is_callout_rule(rule_id: str) -> bool:
def _rule_group(rule_id: str) -> str:
if not rule_id:
return False
r = rule_id.lower()
return r.startswith("callout:edge:v1") or ("callout" in r)
return "unknown"
if rule_id == "callout:edge":
return "callout"
if rule_id.startswith("inline:"): # <—— wichtig für inline:rel
return "inline"
if rule_id.startswith("edge_defaults:"):
return "defaults"
if rule_id.startswith("explicit:"):
return "explicit"
if rule_id in ("structure:belongs_to", "structure:order"):
return "structure"
return "other"
def main():
def main() -> None:
cfg = QdrantConfig.from_env()
cl = get_client(cfg)
client = get_client(cfg)
cn = f"{cfg.prefix}_notes"
cc = f"{cfg.prefix}_chunks"
ce = f"{cfg.prefix}_edges"
col_notes = f"{cfg.prefix}_notes"
col_chunks = f"{cfg.prefix}_chunks"
col_edges = f"{cfg.prefix}_edges"
chunks = fetch_all(cl, cc)
edges = fetch_all(cl, ce)
notes_n = client.count(collection_name=col_notes, exact=True).count
chunks_pts = _scroll_all(client, col_chunks)
edges_pts = _scroll_all(client, col_edges)
chunks_by_note = Counter([c.payload.get("note_id") for c in chunks])
belongs_by_note = Counter()
next_by_note = Counter()
prev_by_note = Counter()
ok = True
for e in edges:
pl = e.payload
nid = pl.get("note_id")
k = pl.get("kind") or pl.get("relation")
if k == "belongs_to":
belongs_by_note[nid] += 1
elif k == "next":
next_by_note[nid] += 1
elif k == "prev":
prev_by_note[nid] += 1
# Basisbedingungen
if notes_n == 0 or len(chunks_pts) == 0 or len(edges_pts) == 0:
ok = False
for nid, ccount in chunks_by_note.items():
if belongs_by_note[nid] != ccount:
fail("belongs_to != chunks", {"note_id": nid, "chunks": ccount, "belongs_to": belongs_by_note[nid]})
if not (next_by_note[nid] == prev_by_note[nid] == max(ccount - 1, 0)):
fail("next/prev mismatch", {"note_id": nid, "chunks": ccount, "next": next_by_note[nid], "prev": prev_by_note[nid]})
# Gruppen zählen
g = Counter(_rule_group((p.payload or {}).get("rule_id", "")) for p in edges_pts)
structure = g.get("structure", 0)
explicit = g.get("explicit", 0)
inline = g.get("inline", 0)
callout = g.get("callout", 0)
defaults = g.get("defaults", 0)
# Dubletten
seen = set()
for e in edges:
pl = e.payload
rule = (pl.get("rule_id") or "")
kind = pl.get("kind") or pl.get("relation")
sid = pl.get("source_id"); tid = pl.get("target_id"); rel = kind
key = (sid, tid, rel, rule)
if key in seen:
fail("duplicate edge", {"source_id": sid, "target_id": tid, "relation": rel, "rule_id": rule})
seen.add(key)
if structure == 0:
ok = False
# mindestens eine der expliziten Varianten vorhanden
if (explicit + inline + callout) == 0:
ok = False
# defaults dürfen 0 sein, wenn types.yaml keine edge_defaults liefert daher nur Info
# Wenn Callouts vorhanden: mindestens eine Mehrfach-Ziel-Zeile muss erkannt worden sein
callouts = [e for e in edges if is_callout_rule(e.payload.get("rule_id") or "")]
if callouts:
ck = Counter((e.payload.get("chunk_id"), (e.payload.get("kind") or e.payload.get("relation"))) for e in callouts)
if max(ck.values() or [0]) < 2:
fail("callout edges present but no multi-target callout detected")
# per-note checks
chunks_by_note = Counter([p.payload.get("note_id") for p in chunks_pts if p.payload])
belongs = Counter(
(p.payload or {}).get("note_id")
for p in edges_pts
if (p.payload or {}).get("kind") == "belongs_to"
)
nxt = Counter(
(p.payload or {}).get("note_id")
for p in edges_pts
if (p.payload or {}).get("kind") == "next"
)
prv = Counter(
(p.payload or {}).get("note_id")
for p in edges_pts
if (p.payload or {}).get("kind") == "prev"
)
for n_id, c in chunks_by_note.items():
if belongs.get(n_id, 0) != c:
ok = False
if (nxt.get(n_id, 0) != max(c - 1, 0)) or (prv.get(n_id, 0) != max(c - 1, 0)):
ok = False
print(json.dumps({"ok": ok, "notes_checked": len(chunks_by_note)}, ensure_ascii=False))
print(json.dumps({"ok": True, "notes_checked": len(chunks_by_note)}, ensure_ascii=False))
if __name__ == "__main__":
main()