From 0cbc4a2d72bc0d470e4807802d620cf295196ab2 Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 17 Nov 2025 15:17:29 +0100 Subject: [PATCH] =?UTF-8?q?tests/test=5Fedges=5Fall.py=20hinzugef=C3=BCgt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_edges_all.py | 75 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 tests/test_edges_all.py diff --git a/tests/test_edges_all.py b/tests/test_edges_all.py new file mode 100644 index 0000000..971b224 --- /dev/null +++ b/tests/test_edges_all.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from __future__ import annotations +import sys, json +from collections import Counter +from app.core.qdrant import QdrantConfig, get_client + +def fail(msg, payload=None): + print(json.dumps({"ok": False, "error": msg, "details": payload}, ensure_ascii=False, indent=2)) + sys.exit(1) + +def fetch_all(client, col): + points = [] + next_offset = None + while True: + res = client.scroll(collection_name=col, with_payload=True, with_vectors=False, limit=2048, offset=next_offset) + batch = res[0] + next_offset = res[1] + points.extend(batch) + if not next_offset: + break + return points + +def main(): + cfg = QdrantConfig.from_env() + cl = get_client(cfg) + + cn = f"{cfg.prefix}_notes" + cc = f"{cfg.prefix}_chunks" + ce = f"{cfg.prefix}_edges" + + chunks = fetch_all(cl, cc) + edges = fetch_all(cl, ce) + + chunks_by_note = Counter([c.payload.get("note_id") for c in chunks]) + belongs_by_note = Counter(); next_by_note = Counter(); prev_by_note = Counter() + + for e in edges: + pl = e.payload + nid = pl.get("note_id") + k = pl.get("kind") or pl.get("relation") + if k == "belongs_to": belongs_by_note[nid] += 1 + elif k == "next": next_by_note[nid] += 1 + elif k == "prev": prev_by_note[nid] += 1 + + for nid, ccount in chunks_by_note.items(): + if belongs_by_note[nid] != ccount: + fail("belongs_to != chunks", {"note_id": nid, "chunks": ccount, "belongs_to": belongs_by_note[nid]}) + if not (next_by_note[nid] == prev_by_note[nid] == max(ccount-1, 0)): + fail("next/prev mismatch", {"note_id": nid, "chunks": ccount, "next": next_by_note[nid], "prev": prev_by_note[nid]}) + + # Dubletten + seen = set() + for e in edges: + pl = e.payload + rule = (pl.get("rule_id") or "") + kind = pl.get("kind") or pl.get("relation") + sid = pl.get("source_id"); tid = pl.get("target_id"); rel = kind + key = (sid, tid, rel, rule) + if key in seen: + fail("duplicate edge", {"source_id": sid, "target_id": tid, "relation": rel, "rule_id": rule}) + seen.add(key) + + # Wenn Callouts vorhanden: mindestens eine Mehrfach-Ziel-Zeile muss erkannt worden sein + callouts = [e for e in edges if (e.payload.get("rule_id") or "").startswith("callout:edge:v1")] + if callouts: + from collections import Counter + cnt = Counter((e.payload.get("chunk_id"), e.payload.get("kind") or e.payload.get("relation")) for e in callouts) + if max(cnt.values() or [0]) < 2: + fail("callout edges present but no multi-target callout detected") + + print(json.dumps({"ok": True, "notes_checked": len(chunks_by_note)}, ensure_ascii=False)) + +if __name__ == "__main__": + main()