#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script: scripts/verify_chunk_texts.py Version: 1.0.0 Datum: 2025-09-09 Kurzbeschreibung --------------- Verifiziert, dass in Qdrant für jede Note die zugehörigen Chunks ein Textfeld enthalten und der Body (notes.payload.fulltext) aus den Chunk-Texten sinnvoll rekonstruiert werden kann. Prüfungen pro Note: - Alle Chunks vorhanden (>=1). - Jedes Chunk-Payload hat einen nutzbaren Textschlüssel: "text" (bevorzugt), sonst "content", sonst "raw". - Reihenfolge der Chunks wird stabil bestimmt (payload.chunk_index -> Nummer aus chunk_id). - Coverage: Summe der im Fulltext gefundenen Chunk-Textsegmente / len(Fulltext) (Toleranz für Overlaps). -> OK wenn coverage >= 0.90 (konfigurierbar via --min-coverage) Ausgabe: - JSON mit Gesamtsummen und Details je Note. ENV: - QDRANT_URL | QDRANT_HOST/QDRANT_PORT | QDRANT_API_KEY - COLLECTION_PREFIX (Fallback, wenn --prefix fehlt) Beispiele: # Alle Notes prüfen (Prefix aus ENV) python3 -m scripts.verify_chunk_texts # Nur eine Note prüfen python3 -m scripts.verify_chunk_texts --note-id concept-alpha # Prefix explizit setzen und strengere Coverage verlangen python3 -m scripts.verify_chunk_texts --prefix mindnet --min-coverage 0.95 """ from __future__ import annotations import argparse, json, os, re, sys from typing import Dict, List, Tuple, Optional from qdrant_client import QdrantClient from qdrant_client.http import models as rest def _names(prefix: str) -> Tuple[str,str,str]: return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" def _client() -> QdrantClient: url = os.getenv("QDRANT_URL") if not url: host = os.getenv("QDRANT_HOST", "127.0.0.1") port = int(os.getenv("QDRANT_PORT", "6333")) url = f"http://{host}:{port}" api_key = os.getenv("QDRANT_API_KEY") or None return QdrantClient(url=url, api_key=api_key) def _chunk_sort_key(p: Dict, pid: str) -> Tuple[int,int,str]: # Primär: payload.chunk_index, sekundär: Nummer am Ende der ID (#cNN oder #NN), sonst 0 ci = p.get("chunk_index") n = 0 m = re.search(r'#c?(\d+)$', pid or "") if m: try: n = int(m.group(1)) except Exception: n = 0 return (ci if isinstance(ci, int) else 1_000_000 + n, n, pid) def _choose_text(payload: Dict) -> Optional[str]: for k in ("text", "content", "raw"): v = payload.get(k) if isinstance(v, str) and v.strip(): return v return None def _coverage(fulltext: str, pieces: List[str]) -> float: """Berechnet die Abdeckungsquote der Stücke im Fulltext (sequenzielles Matching).""" if not fulltext: return 0.0 if pieces else 1.0 cursor = 0 covered = 0 ft = fulltext for piece in pieces: if not piece: continue # Tolerant gegen Whitespace-Unterschiede: normalisieren nur \r\n→\n p = piece.replace("\r\n", "\n").replace("\r", "\n") idx = ft.find(p, cursor) if idx == -1: # Versuche ein paar Heuristiken: trimmen p2 = p.strip() if p2 and len(p2) > 8: idx = ft.find(p2, cursor) if idx != -1: covered += len(p) cursor = idx + len(p) # sonst: nicht abgedeckt return covered / max(1, len(ft)) def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--prefix", help="Collection-Prefix (Default: ENV COLLECTION_PREFIX oder 'mindnet')") ap.add_argument("--note-id", help="Nur eine bestimmte Note prüfen") ap.add_argument("--min-coverage", type=float, default=0.90, help="Mindestabdeckung durch Chunks (Default: 0.90)") args = ap.parse_args() prefix = args.prefix or os.getenv("COLLECTION_PREFIX", "mindnet") notes_col, chunks_col, _ = _names(prefix) cli = _client() # Notes abrufen (optional filter by note_id) notes_filter = None if args.note_id: notes_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=args.note_id))]) notes: List[Dict] = [] off = None while True: pts, off = cli.scroll(collection_name=notes_col, scroll_filter=notes_filter, with_payload=True, with_vectors=False, limit=256, offset=off) if not pts: break for p in pts: notes.append({"id": p.id, "payload": p.payload or {}}) if off is None: break results = [] total_missing_text = 0 total_notes_ok = 0 for n in notes: pl = n["payload"] nid = pl.get("note_id") or pl.get("id") or n.get("id") fulltext = pl.get("fulltext") or "" # Chunks der Note holen f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=nid))]) chunks = [] off = None while True: pts, off = cli.scroll(collection_name=chunks_col, scroll_filter=f, with_payload=True, with_vectors=False, limit=256, offset=off) if not pts: break for p in pts: chunks.append({"id": p.id, "payload": p.payload or {}}) if off is None: break # sortieren chunks.sort(key=lambda c: _chunk_sort_key(c["payload"], c["id"])) texts = [] missing_text = 0 for c in chunks: t = _choose_text(c["payload"]) if t is None: missing_text += 1 texts.append("") else: texts.append(t) cov = _coverage(fulltext, texts) ok = (missing_text == 0) and (cov >= args.min_coverage or not fulltext) if ok: total_notes_ok += 1 total_missing_text += missing_text results.append({ "note_id": nid, "title": pl.get("title"), "chunks": len(chunks), "missing_chunk_texts": missing_text, "coverage": round(cov, 4), "has_fulltext": bool(fulltext), "ok": ok }) out = { "collections": {"notes": notes_col, "chunks": chunks_col}, "notes_checked": len(notes), "notes_ok": total_notes_ok, "total_missing_chunk_texts": total_missing_text, "min_coverage": args.min_coverage, "details": results } print(json.dumps(out, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()