From 859c17b49d38e92445e5911116b11372e4207fcd Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 17 Nov 2025 16:37:23 +0100 Subject: [PATCH] tests/test_edges_all.py aktualisiert --- tests/test_edges_all.py | 154 ++++++++++++++++++++++++---------------- 1 file changed, 92 insertions(+), 62 deletions(-) diff --git a/tests/test_edges_all.py b/tests/test_edges_all.py index f861aaa..e34e491 100644 --- a/tests/test_edges_all.py +++ b/tests/test_edges_all.py @@ -1,86 +1,116 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +""" +tests/test_edges_all.py +Ein knapper Integrationscheck: +- Es existieren Notes/Chunks/Edges +- Inline-Edges (rule_id startswith "inline:") werden erkannt +- Callout-Edges (rule_id == "callout:edge") werden erkannt +- Defaults (rule_id startswith "edge_defaults:") werden erkannt +- Strukturkanten stimmen (belongs_to == chunks; next == prev == chunks-1) +""" + from __future__ import annotations -import sys, json -from collections import Counter +import json +from collections import Counter, defaultdict +from typing import Dict, Any, List, Tuple + from app.core.qdrant import QdrantConfig, get_client -def fail(msg, payload=None): - print(json.dumps({"ok": False, "error": msg, "details": payload}, ensure_ascii=False, indent=2)) - sys.exit(1) -def fetch_all(client, col): - points = [] - next_offset = None +def _scroll_all(client, collection: str): + pts_all = [] + offset = None while True: - res = client.scroll(collection_name=col, with_payload=True, with_vectors=False, limit=2048, offset=next_offset) - batch = res[0] - next_offset = res[1] - points.extend(batch) - if not next_offset: + pts, offset = client.scroll( + collection_name=collection, + with_payload=True, + with_vectors=False, + limit=2048, + offset=offset, + ) + pts_all.extend(pts or []) + if offset is None: break - return points + return pts_all -def is_callout_rule(rule_id: str) -> bool: + +def _rule_group(rule_id: str) -> str: if not rule_id: - return False - r = rule_id.lower() - return r.startswith("callout:edge:v1") or ("callout" in r) + return "unknown" + if rule_id == "callout:edge": + return "callout" + if rule_id.startswith("inline:"): # <—— wichtig für inline:rel + return "inline" + if rule_id.startswith("edge_defaults:"): + return "defaults" + if rule_id.startswith("explicit:"): + return "explicit" + if rule_id in ("structure:belongs_to", "structure:order"): + return "structure" + return "other" -def main(): + +def main() -> None: cfg = QdrantConfig.from_env() - cl = get_client(cfg) + client = get_client(cfg) - cn = f"{cfg.prefix}_notes" - cc = f"{cfg.prefix}_chunks" - ce = f"{cfg.prefix}_edges" + col_notes = f"{cfg.prefix}_notes" + col_chunks = f"{cfg.prefix}_chunks" + col_edges = f"{cfg.prefix}_edges" - chunks = fetch_all(cl, cc) - edges = fetch_all(cl, ce) + notes_n = client.count(collection_name=col_notes, exact=True).count + chunks_pts = _scroll_all(client, col_chunks) + edges_pts = _scroll_all(client, col_edges) - chunks_by_note = Counter([c.payload.get("note_id") for c in chunks]) - belongs_by_note = Counter() - next_by_note = Counter() - prev_by_note = Counter() + ok = True - for e in edges: - pl = e.payload - nid = pl.get("note_id") - k = pl.get("kind") or pl.get("relation") - if k == "belongs_to": - belongs_by_note[nid] += 1 - elif k == "next": - next_by_note[nid] += 1 - elif k == "prev": - prev_by_note[nid] += 1 + # Basisbedingungen + if notes_n == 0 or len(chunks_pts) == 0 or len(edges_pts) == 0: + ok = False - for nid, ccount in chunks_by_note.items(): - if belongs_by_note[nid] != ccount: - fail("belongs_to != chunks", {"note_id": nid, "chunks": ccount, "belongs_to": belongs_by_note[nid]}) - if not (next_by_note[nid] == prev_by_note[nid] == max(ccount - 1, 0)): - fail("next/prev mismatch", {"note_id": nid, "chunks": ccount, "next": next_by_note[nid], "prev": prev_by_note[nid]}) + # Gruppen zählen + g = Counter(_rule_group((p.payload or {}).get("rule_id", "")) for p in edges_pts) + structure = g.get("structure", 0) + explicit = g.get("explicit", 0) + inline = g.get("inline", 0) + callout = g.get("callout", 0) + defaults = g.get("defaults", 0) - # Dubletten - seen = set() - for e in edges: - pl = e.payload - rule = (pl.get("rule_id") or "") - kind = pl.get("kind") or pl.get("relation") - sid = pl.get("source_id"); tid = pl.get("target_id"); rel = kind - key = (sid, tid, rel, rule) - if key in seen: - fail("duplicate edge", {"source_id": sid, "target_id": tid, "relation": rel, "rule_id": rule}) - seen.add(key) + if structure == 0: + ok = False + # mindestens eine der expliziten Varianten vorhanden + if (explicit + inline + callout) == 0: + ok = False + # defaults dürfen 0 sein, wenn types.yaml keine edge_defaults liefert – daher nur Info - # Wenn Callouts vorhanden: mindestens eine Mehrfach-Ziel-Zeile muss erkannt worden sein - callouts = [e for e in edges if is_callout_rule(e.payload.get("rule_id") or "")] - if callouts: - ck = Counter((e.payload.get("chunk_id"), (e.payload.get("kind") or e.payload.get("relation"))) for e in callouts) - if max(ck.values() or [0]) < 2: - fail("callout edges present but no multi-target callout detected") + # per-note checks + chunks_by_note = Counter([p.payload.get("note_id") for p in chunks_pts if p.payload]) + belongs = Counter( + (p.payload or {}).get("note_id") + for p in edges_pts + if (p.payload or {}).get("kind") == "belongs_to" + ) + nxt = Counter( + (p.payload or {}).get("note_id") + for p in edges_pts + if (p.payload or {}).get("kind") == "next" + ) + prv = Counter( + (p.payload or {}).get("note_id") + for p in edges_pts + if (p.payload or {}).get("kind") == "prev" + ) + + for n_id, c in chunks_by_note.items(): + if belongs.get(n_id, 0) != c: + ok = False + if (nxt.get(n_id, 0) != max(c - 1, 0)) or (prv.get(n_id, 0) != max(c - 1, 0)): + ok = False + + print(json.dumps({"ok": ok, "notes_checked": len(chunks_by_note)}, ensure_ascii=False)) - print(json.dumps({"ok": True, "notes_checked": len(chunks_by_note)}, ensure_ascii=False)) if __name__ == "__main__": main()