All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
128 lines
4.1 KiB
Python
128 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
FILE: scripts/audit_chunks.py
|
|
VERSION: 2.1.0 (2025-12-15)
|
|
STATUS: Active
|
|
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b)
|
|
|
|
Zweck:
|
|
-------
|
|
Audit-Tool zur Analyse der Chunk-Qualität im Vault.
|
|
Erkennt Probleme wie überdimensionierte Chunks, leere Chunks und defekte Nachbarschafts-Links.
|
|
|
|
Funktionsweise:
|
|
---------------
|
|
1. Scannt alle Markdown-Dateien im Vault
|
|
2. Für jede Datei:
|
|
- Erzeugt Chunks via assemble_chunks
|
|
- Prüft Token-Anzahl pro Chunk
|
|
- Validiert Nachbarschafts-Links (prev/next)
|
|
3. Aggregiert Statistiken und identifiziert Probleme
|
|
|
|
Ergebnis-Interpretation:
|
|
------------------------
|
|
- Ausgabe: JSON mit Zusammenfassung
|
|
* notes: Anzahl verarbeiteter Notizen
|
|
* chunks_total: Gesamtanzahl Chunks
|
|
* chunks_per_note_avg: Durchschnittliche Chunks pro Note
|
|
* tokens_avg: Durchschnittliche Token pro Chunk
|
|
* tokens_p95: 95. Perzentil der Token-Verteilung
|
|
* issues_counts: Anzahl gefundener Probleme
|
|
- Zusätzlich: Liste der ersten 20 Probleme pro Kategorie
|
|
* oversize: Chunks > 600 Tokens
|
|
* broken_neighbors: Defekte prev/next Links
|
|
* empty: Leere Chunks (0 Tokens)
|
|
|
|
Verwendung:
|
|
-----------
|
|
- Qualitätskontrolle nach Chunking-Änderungen
|
|
- Identifikation von Problemen vor dem Import
|
|
- Monitoring der Chunk-Verteilung
|
|
|
|
Hinweise:
|
|
---------
|
|
- Nutzt synchrones assemble_chunks (kann async sein)
|
|
- Heuristik für Oversize: > 600 Tokens
|
|
- Prüft nur strukturelle Integrität, keine semantische Qualität
|
|
|
|
Aufruf:
|
|
-------
|
|
python3 -m scripts.audit_chunks --vault ./vault
|
|
|
|
Parameter:
|
|
----------
|
|
--vault PATH Pfad zum Vault-Verzeichnis (erforderlich)
|
|
|
|
Änderungen:
|
|
-----------
|
|
v2.1.0 (2025-12-15): Dokumentation aktualisiert
|
|
v1.0.0: Initial Release
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse, os, json, glob, statistics as stats
|
|
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
|
|
from app.core.chunking import assemble_chunks
|
|
|
|
def iter_md(root: str):
|
|
for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True):
|
|
pn = p.replace("\\","/")
|
|
if any(x in pn for x in ("/.obsidian/", "/_backup_frontmatter/", "/_imported/")):
|
|
continue
|
|
yield p
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--vault", required=True)
|
|
args = ap.parse_args()
|
|
|
|
root = os.path.abspath(args.vault)
|
|
totals = []
|
|
token_all = []
|
|
issues = {"oversize": [], "broken_neighbors": [], "empty": []}
|
|
|
|
for path in iter_md(root):
|
|
parsed = read_markdown(path)
|
|
fm = normalize_frontmatter(parsed.frontmatter)
|
|
try:
|
|
validate_required_frontmatter(fm)
|
|
except Exception:
|
|
continue
|
|
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
|
|
totals.append(len(chunks))
|
|
|
|
# Checks
|
|
prev = None
|
|
for ch in chunks:
|
|
token_all.append(ch.token_count)
|
|
if ch.token_count <= 0:
|
|
issues["empty"].append((fm["id"], ch.id))
|
|
if prev and ch.neighbors_prev != prev.id:
|
|
issues["broken_neighbors"].append((fm["id"], ch.id, "prev-mismatch"))
|
|
prev = ch
|
|
# Oversize Heuristik: > 600 Tokens (global) markieren
|
|
for ch in chunks:
|
|
if ch.token_count > 600:
|
|
issues["oversize"].append((fm["id"], ch.id, ch.token_count))
|
|
|
|
summary = {
|
|
"notes": len(totals),
|
|
"chunks_total": sum(totals),
|
|
"chunks_per_note_avg": round(sum(totals)/max(1,len(totals)),2),
|
|
"tokens_avg": round(stats.mean(token_all),1) if token_all else 0,
|
|
"tokens_p95": (sorted(token_all)[int(0.95*len(token_all))] if token_all else 0),
|
|
"issues_counts": {k: len(v) for k,v in issues.items()}
|
|
}
|
|
print(json.dumps(summary, ensure_ascii=False, indent=2))
|
|
# Optional: Liste der Issues ausgeben
|
|
for k, lst in issues.items():
|
|
if lst:
|
|
print(f"\n{k.upper()} ({len(lst)}):")
|
|
for item in lst[:20]:
|
|
print(" -", item)
|
|
if len(lst) > 20:
|
|
print(f" … (+{len(lst)-20} weitere)")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|