#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script: tests/verify_chunks_integrity.py Version: 1.0.1 Datum: 2025-09-10 Zweck ----- Verifiziert die Text-Integrität der gespeicherten Chunks: 1) Rekonstruiert den Body aus den Chunks (Sortierung: seq → chunk_index → # in chunk_id). 2) Vergleicht mit dem in Qdrant gespeicherten Note-`fulltext` (falls vorhanden). 3) Optional: Vergleicht zusätzlich mit dem Body der zugehörigen Markdown-Datei im Vault. Aufrufe ------- # Nur gegen Qdrant (fulltext vs. Chunks) python3 tests/verify_chunks_integrity.py --prefix mindnet # Zusätzlich gegen den Vault abgleichen (Body der .md-Datei) python3 tests/verify_chunks_integrity.py --prefix mindnet --vault ./test_vault # Streng + CI-geeignet (Fehlercode bei Abweichungen): python3 tests/verify_chunks_integrity.py --prefix mindnet --vault ./test_vault --strict --fail-on-mismatch """ from __future__ import annotations import argparse import json import os import sys from typing import Any, Dict, List, Optional, Tuple # --- FIX: Projekt-Root in sys.path aufnehmen, damit 'app.*' importierbar ist --- PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) import yaml from qdrant_client.http import models as rest from app.core.qdrant import QdrantConfig, get_client # --------------------------- Helpers --------------------------- def collections(prefix: str) -> Tuple[str, str, str]: return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" def scroll_all(client, collection: str, flt: Optional[rest.Filter] = None, limit: int = 256) -> List[Any]: out: List[Any] = [] nextp = None while True: pts, nextp = client.scroll( collection_name=collection, with_payload=True, with_vectors=False, limit=limit, offset=nextp, scroll_filter=flt, ) if not pts: break out.extend(pts) if nextp is None: break return out def sort_key_for_chunk_payload(pl: Dict[str, Any]) -> Tuple[int, int, int]: s = pl.get("seq") or 0 ci = pl.get("chunk_index") or 0 n = 0 cid = pl.get("chunk_id") or "" if isinstance(cid, str) and "#" in cid: try: n = int(cid.rsplit("#", 1)[-1]) except Exception: n = 0 return (int(s), int(ci), int(n)) def reconstruct_from_chunks(chunks_points: List[Any]) -> Tuple[str, int, int]: """Gibt (text, total_chunks, chunks_with_text) zurück.""" chunks_sorted = sorted(chunks_points, key=lambda p: sort_key_for_chunk_payload(p.payload or {})) texts: List[str] = [] have = 0 for p in chunks_sorted: pl = p.payload or {} t = pl.get("text") or pl.get("content") or pl.get("raw") or "" if isinstance(t, str) and t: have += 1 texts.append(t) return ("\n".join(texts).strip(), len(chunks_sorted), have) def normalize(s: str, strict: bool = False) -> str: if s is None: return "" s = s.replace("\r\n", "\n").replace("\r", "\n") if strict: return s lines = [ln.rstrip() for ln in s.strip().split("\n")] return "\n".join(lines).strip() def read_vault_body(vault_root: str, rel_path: str) -> Optional[str]: if not rel_path: return None p = os.path.join(vault_root, rel_path.replace("\\", "/").lstrip("/")) if not os.path.exists(p): return None with open(p, "r", encoding="utf-8") as f: s = f.read() if s.startswith("---"): try: fm_txt, body = s.split("\n---\n", 1) except Exception: return s else: body = s return body # --------------------------- Main ------------------------------ def main(): ap = argparse.ArgumentParser() ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV COLLECTION_PREFIX)") ap.add_argument("--vault", help="Optional: Vault-Wurzelordner für Abgleich gegen .md") ap.add_argument("--strict", action="store_true", help="Strikter Vergleich (kein Trimmen/Normalisieren)") ap.add_argument("--fail-on-mismatch", action="store_true", help="Exit 1, wenn ein Mismatch gefunden wurde") args = ap.parse_args() cfg = QdrantConfig.from_env() if args.prefix: cfg.prefix = args.prefix.strip() client = get_client(cfg) notes_col, chunks_col, _ = collections(cfg.prefix) notes = scroll_all(client, notes_col) mismatches = 0 total = 0 for n in notes: total += 1 pl = n.payload or {} nid = pl.get("note_id") rel_path = (pl.get("path") or "").replace("\\", "/").lstrip("/") fulltext = (pl.get("fulltext") or "").strip() flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=nid))]) chunks = scroll_all(client, chunks_col, flt) recon, cnt, have = reconstruct_from_chunks(chunks) norm_recon = normalize(recon, strict=args.strict) norm_full = normalize(fulltext, strict=args.strict) match_full = (bool(norm_full) and (norm_recon == norm_full)) match_vault = None issues: List[str] = [] if cnt == 0: issues.append("no_chunks") if have < cnt: issues.append(f"missing_text_in_chunks:{cnt-have}/{cnt}") if norm_full == "": issues.append("note_fulltext_empty") if not match_full: issues.append("reconstructed_vs_fulltext_mismatch") if args.vault: body = read_vault_body(args.vault, rel_path) or "" norm_body = normalize(body, strict=args.strict) match_vault = (norm_body == norm_recon) if not match_vault: issues.append("reconstructed_vs_vault_body_mismatch") obj = { "note_id": nid, "path": rel_path, "chunks_count": cnt, "chunks_with_text": have, "match_fulltext": match_full, "match_vault": match_vault, "issues": issues, } print(json.dumps(obj, ensure_ascii=False)) if ("reconstructed_vs_fulltext_mismatch" in issues) or ("reconstructed_vs_vault_body_mismatch" in issues): mismatches += 1 summary = {"summary": "OK" if mismatches == 0 else "DIFFS", "mismatch_count": mismatches, "notes": total} print(json.dumps(summary, ensure_ascii=False)) if args.fail_on_mismatch and mismatches: raise SystemExit(1) if __name__ == "__main__": main()