mindnet/scripts/verify_chunk_texts.py
Lars 5d5de06290
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 2s
scripts/verify_chunk_texts.py hinzugefügt
2025-09-09 15:58:12 +02:00

186 lines
6.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: scripts/verify_chunk_texts.py
Version: 1.0.0
Datum: 2025-09-09
Kurzbeschreibung
---------------
Verifiziert, dass in Qdrant für jede Note die zugehörigen Chunks ein Textfeld
enthalten und der Body (notes.payload.fulltext) aus den Chunk-Texten sinnvoll
rekonstruiert werden kann.
Prüfungen pro Note:
- Alle Chunks vorhanden (>=1).
- Jedes Chunk-Payload hat einen nutzbaren Textschlüssel: "text" (bevorzugt), sonst "content", sonst "raw".
- Reihenfolge der Chunks wird stabil bestimmt (payload.chunk_index -> Nummer aus chunk_id).
- Coverage: Summe der im Fulltext gefundenen Chunk-Textsegmente / len(Fulltext) (Toleranz für Overlaps).
-> OK wenn coverage >= 0.90 (konfigurierbar via --min-coverage)
Ausgabe:
- JSON mit Gesamtsummen und Details je Note.
ENV:
- QDRANT_URL | QDRANT_HOST/QDRANT_PORT | QDRANT_API_KEY
- COLLECTION_PREFIX (Fallback, wenn --prefix fehlt)
Beispiele:
# Alle Notes prüfen (Prefix aus ENV)
python3 -m scripts.verify_chunk_texts
# Nur eine Note prüfen
python3 -m scripts.verify_chunk_texts --note-id concept-alpha
# Prefix explizit setzen und strengere Coverage verlangen
python3 -m scripts.verify_chunk_texts --prefix mindnet --min-coverage 0.95
"""
from __future__ import annotations
import argparse, json, os, re, sys
from typing import Dict, List, Tuple, Optional
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
def _names(prefix: str) -> Tuple[str,str,str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _client() -> QdrantClient:
url = os.getenv("QDRANT_URL")
if not url:
host = os.getenv("QDRANT_HOST", "127.0.0.1")
port = int(os.getenv("QDRANT_PORT", "6333"))
url = f"http://{host}:{port}"
api_key = os.getenv("QDRANT_API_KEY") or None
return QdrantClient(url=url, api_key=api_key)
def _chunk_sort_key(p: Dict, pid: str) -> Tuple[int,int,str]:
# Primär: payload.chunk_index, sekundär: Nummer am Ende der ID (#cNN oder #NN), sonst 0
ci = p.get("chunk_index")
n = 0
m = re.search(r'#c?(\d+)$', pid or "")
if m:
try:
n = int(m.group(1))
except Exception:
n = 0
return (ci if isinstance(ci, int) else 1_000_000 + n, n, pid)
def _choose_text(payload: Dict) -> Optional[str]:
for k in ("text", "content", "raw"):
v = payload.get(k)
if isinstance(v, str) and v.strip():
return v
return None
def _coverage(fulltext: str, pieces: List[str]) -> float:
"""Berechnet die Abdeckungsquote der Stücke im Fulltext (sequenzielles Matching)."""
if not fulltext:
return 0.0 if pieces else 1.0
cursor = 0
covered = 0
ft = fulltext
for piece in pieces:
if not piece:
continue
# Tolerant gegen Whitespace-Unterschiede: normalisieren nur \r\n→\n
p = piece.replace("\r\n", "\n").replace("\r", "\n")
idx = ft.find(p, cursor)
if idx == -1:
# Versuche ein paar Heuristiken: trimmen
p2 = p.strip()
if p2 and len(p2) > 8:
idx = ft.find(p2, cursor)
if idx != -1:
covered += len(p)
cursor = idx + len(p)
# sonst: nicht abgedeckt
return covered / max(1, len(ft))
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--prefix", help="Collection-Prefix (Default: ENV COLLECTION_PREFIX oder 'mindnet')")
ap.add_argument("--note-id", help="Nur eine bestimmte Note prüfen")
ap.add_argument("--min-coverage", type=float, default=0.90, help="Mindestabdeckung durch Chunks (Default: 0.90)")
args = ap.parse_args()
prefix = args.prefix or os.getenv("COLLECTION_PREFIX", "mindnet")
notes_col, chunks_col, _ = _names(prefix)
cli = _client()
# Notes abrufen (optional filter by note_id)
notes_filter = None
if args.note_id:
notes_filter = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=args.note_id))])
notes: List[Dict] = []
off = None
while True:
pts, off = cli.scroll(collection_name=notes_col, scroll_filter=notes_filter,
with_payload=True, with_vectors=False, limit=256, offset=off)
if not pts:
break
for p in pts:
notes.append({"id": p.id, "payload": p.payload or {}})
if off is None:
break
results = []
total_missing_text = 0
total_notes_ok = 0
for n in notes:
pl = n["payload"]
nid = pl.get("note_id") or pl.get("id") or n.get("id")
fulltext = pl.get("fulltext") or ""
# Chunks der Note holen
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=nid))])
chunks = []
off = None
while True:
pts, off = cli.scroll(collection_name=chunks_col, scroll_filter=f,
with_payload=True, with_vectors=False, limit=256, offset=off)
if not pts:
break
for p in pts:
chunks.append({"id": p.id, "payload": p.payload or {}})
if off is None:
break
# sortieren
chunks.sort(key=lambda c: _chunk_sort_key(c["payload"], c["id"]))
texts = []
missing_text = 0
for c in chunks:
t = _choose_text(c["payload"])
if t is None:
missing_text += 1
texts.append("")
else:
texts.append(t)
cov = _coverage(fulltext, texts)
ok = (missing_text == 0) and (cov >= args.min_coverage or not fulltext)
if ok:
total_notes_ok += 1
total_missing_text += missing_text
results.append({
"note_id": nid,
"title": pl.get("title"),
"chunks": len(chunks),
"missing_chunk_texts": missing_text,
"coverage": round(cov, 4),
"has_fulltext": bool(fulltext),
"ok": ok
})
out = {
"collections": {"notes": notes_col, "chunks": chunks_col},
"notes_checked": len(notes),
"notes_ok": total_notes_ok,
"total_missing_chunk_texts": total_missing_text,
"min_coverage": args.min_coverage,
"details": results
}
print(json.dumps(out, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()