#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ scripts/test_edges_smoke.py — Progressive Ausgabe Prüft edge-Integrität je Note mit Live-Ausgabe: - belongs_to == #Chunks - next == prev == max(#Chunks-1,0) - Duplikat-Edges (Key: (kind,source_id,target_id,scope)) == 0 - Zählt references (chunk/note), backlink Optionen: --max-notes N : prüft nur die ersten N Notizen --limit L : Scroll-Limit pro Anfrage (Default 256) --flush : jede Zeile sofort flushen """ from __future__ import annotations import os, sys, json, argparse from typing import Dict, Any, List, Tuple, Set from qdrant_client.http import models as rest from app.core.qdrant import QdrantConfig, get_client def collections(prefix: str) -> Tuple[str, str, str]: return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" def scroll_iter(client, collection: str, filt: rest.Filter | None, with_payload: bool, limit: int): next_page = None while True: pts, next_page = client.scroll( collection_name=collection, scroll_filter=filt, with_payload=with_payload, with_vectors=False, limit=limit, offset=next_page, ) if not pts: break for p in pts: yield p if next_page is None: break def list_notes(client, prefix: str, limit: int, max_notes: int | None) -> List[Dict[str, Any]]: notes_col, _, _ = collections(prefix) out: List[Dict[str, Any]] = [] for p in scroll_iter(client, notes_col, None, True, limit): pl = p.payload or {} nid = pl.get("note_id") or pl.get("id") if nid: out.append({"note_id": nid, "title": pl.get("title"), "type": pl.get("type")}) if max_notes is not None and len(out) >= max_notes: break return out def count_chunks_for_note(client, prefix: str, note_id: str, limit: int) -> int: _, chunks_col, _ = collections(prefix) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) return sum(1 for _ in scroll_iter(client, chunks_col, f, False, limit)) def fetch_edges_for_note(client, prefix: str, note_id: str, limit: int) -> List[Dict[str, Any]]: _, _, edges_col = collections(prefix) f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) return [p.payload or {} for p in scroll_iter(client, edges_col, f, True, limit)] def main(): ap = argparse.ArgumentParser() ap.add_argument("--max-notes", type=int) ap.add_argument("--limit", type=int, default=256) ap.add_argument("--flush", action="store_true") args = ap.parse_args() cfg = QdrantConfig.from_env() client = get_client(cfg) notes = list_notes(client, cfg.prefix, args.limit, args.max_notes) total = {"notes": 0, "chunks": 0, "belongs_to": 0, "next": 0, "prev": 0, "refs_chunk": 0, "refs_note": 0, "backlink": 0, "dup_edges": 0} for n in notes: nid = n["note_id"] total["notes"] += 1 chunk_count = count_chunks_for_note(client, cfg.prefix, nid, args.limit) total["chunks"] += chunk_count edges = fetch_edges_for_note(client, cfg.prefix, nid, args.limit) by_kind = {} keys: Set[tuple] = set() dup_count = 0 refs_chunk = 0 refs_note = 0 backlink = 0 for e in edges: k = e.get("kind") by_kind[k] = by_kind.get(k, 0) + 1 t = (e.get("kind"), e.get("source_id"), e.get("target_id"), e.get("scope")) if t in keys: dup_count += 1 else: keys.add(t) if k == "references" and e.get("scope") == "chunk": refs_chunk += 1 if k == "references" and e.get("scope") == "note": refs_note += 1 if k == "backlink": backlink += 1 total["belongs_to"] += by_kind.get("belongs_to", 0) total["next"] += by_kind.get("next", 0) total["prev"] += by_kind.get("prev", 0) total["refs_chunk"] += refs_chunk total["refs_note"] += refs_note total["backlink"] += backlink total["dup_edges"] += dup_count ok_bt = (by_kind.get("belongs_to", 0) == chunk_count) ok_seq = (by_kind.get("next", 0) == max(chunk_count - 1, 0) and by_kind.get("prev", 0) == max(chunk_count - 1, 0)) ok_dup = (dup_count == 0) line = { "note_id": nid, "title": n.get("title"), "type": n.get("type"), "chunks": chunk_count, "edges_by_kind": by_kind, "checks": { "belongs_to_equals_chunks": ok_bt, "next_prev_match": ok_seq, "no_duplicate_edges": ok_dup, } } print(json.dumps(line, ensure_ascii=False)) if args.flush: sys.stdout.flush() print(json.dumps({"prefix": cfg.prefix, "summary": total}, ensure_ascii=False)) if __name__ == "__main__": main()