#!/usr/bin/env python3 # scripts/gc_qdrant_after_vault_scan.py # Version: 1.0.0 (2025-09-05) # # Zweck # ----- # Garbage-Collector für Qdrant: löscht Einträge (Edges, Chunks, Notes), deren Note-ID # nicht mehr im Obsidian-Vault vorhanden ist. So bleiben Vektorindex/Graph mit dem Vault konsistent. # # Aufrufparameter # --------------- # --vault PATH Pfad zum Obsidian-Vault (z. B. ./vault) [erforderlich] # --mode {edges,content,all} # edges -> nur Kanten mit source_id/target_id ∉ Vault-Notes löschen # content -> Chunks & Notes löschen, deren note_id ∉ Vault-Notes (keine Edges) # all -> zuerst edges, dann content # --prefix PREFIX Collection-Präfix (Default: aus ENV COLLECTION_PREFIX oder "mindnet") # --apply Ohne diesen Schalter: Dry-Run (nur Vorschau). Mit Schalter: ausführen. # --yes Sicherheitsabfrage überspringen (non-interaktiv) # --batch-size N Größe für In-Filter (Default: 1000) # # Hinweise # -------- # - Benötigt funktionierendes Python-venv mit allen Abhängigkeiten von mindnet (qdrant-client usw.). # - Läuft gegen die drei Collections {prefix}_notes, {prefix}_chunks, {prefix}_edges (keine anderen Projekte). # - Nutzt OR-Filter (Filter.should) korrekt, keine minimum_should (Pydantic v2 lässt die Option nicht zu). # # Änderungen ggü. vorher # ---------------------- # - Neu: eigenes GC-Tool; berührt ausschließlich das übergebene Projekt-Präfix. # - Sichere Dry-Run-Vorschau mit Zählung pro Collection; interaktive Bestätigung. # - Löscht Edges mittels Filter auf (source_id ∈ MISSING) ODER (target_id ∈ MISSING). # - Löscht Chunks/Notes mittels Filter auf note_id ∈ MISSING. # # Beispiele # --------- # DRY RUN: python3 -m scripts.gc_qdrant_after_vault_scan --vault ./vault --mode all # APPLY: python3 -m scripts.gc_qdrant_after_vault_scan --vault ./vault --mode all --apply # CI-Modus: python3 -m scripts.gc_qdrant_after_vault_scan --vault ./vault --mode edges --apply --yes # # Kompatibilität / Kontext # ------------------------ # - Frontmatter-Validierung & Parsing wie im Projekt (parser.read_markdown, validate_required_frontmatter). :contentReference[oaicite:0]{index=0} # - Note-Payload-Schema (u. a. note_id, hash_fulltext) wie spezifiziert. :contentReference[oaicite:1]{index=1} # - Collection-Namen & Anlage-Logik über app.core.qdrant (notes/chunks = dim, edges = 1D). :contentReference[oaicite:2]{index=2} # - Edges nutzen Felder source_id/target_id; Chunks/Notes tragen note_id in payload. :contentReference[oaicite:3]{index=3} :contentReference[oaicite:4]{index=4} # from __future__ import annotations import argparse, os, glob, json, sys from typing import Iterable, Set, Tuple, List from dotenv import load_dotenv from qdrant_client import QdrantClient from qdrant_client.http import models as rest from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # :contentReference[oaicite:5]{index=5} from app.core.qdrant import QdrantConfig, get_client, collection_names # :contentReference[oaicite:6]{index=6} def iter_note_ids_from_vault(vault_root: str) -> Set[str]: """Liest alle Markdown-Dateien im Vault und sammelt valide Frontmatter-IDs.""" paths = [p for p in glob.glob(os.path.join(vault_root, "**", "*.md"), recursive=True)] out: Set[str] = set() for p in paths: try: parsed = read_markdown(p) fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) nid = fm.get("id") or fm.get("note_id") if nid: out.add(nid) except Exception: # ungültige oder Template-Dateien ohne Pflichtfelder ignorieren continue return out def scroll_payload_values(client: QdrantClient, collection: str, field: str, limit: int = 2000) -> Set[str]: """Scrollt eine Collection und sammelt alle Payload-Werte eines Feldes (falls vorhanden).""" out: Set[str] = set() offset = None while True: res = client.scroll( collection_name=collection, scroll_filter=None, with_payload=True, with_vectors=False, limit=limit, offset=offset ) # qdrant-client (neuere Versionen) geben (points, next_offset) if isinstance(res, tuple) and len(res) == 2: points, offset = res else: # Fallback, falls sich API signifikant ändert points, offset = res[0], res[1] if len(res) > 1 else None for pt in points or []: pl = getattr(pt, "payload", None) or {} v = pl.get(field) if isinstance(v, str): out.add(v) if not offset: break return out def chunked(iterable: Iterable[str], n: int) -> Iterable[List[str]]: buf: List[str] = [] for x in iterable: buf.append(x) if len(buf) >= n: yield buf buf = [] if buf: yield buf def count_matches(client: QdrantClient, collection: str, filter_: rest.Filter) -> int: """Grober Zähler über scroll (Qdrant Count-API ist optional je Version).""" total = 0 offset = None while True: points, offset = client.scroll( collection_name=collection, scroll_filter=filter_, with_payload=False, with_vectors=False, limit=2000, offset=offset ) total += len(points or []) if not offset: break return total def build_filters_for_missing(prefix: str, missing_note_ids: Set[str]) -> Tuple[rest.Filter, rest.Filter, rest.Filter]: """ Erzeugt je Collection passende Filter: - Notes/Chunks: note_id ∈ missing - Edges: (source_id ∈ missing) ODER (target_id ∈ missing) """ notes_col, chunks_col, edges_col = collection_names(prefix) # :contentReference[oaicite:7]{index=7} # MatchAny statt MatchValue für große Mengen any_missing = rest.MatchAny(any=list(missing_note_ids)) f_notes = rest.Filter(must=[rest.FieldCondition(key="note_id", match=any_missing)]) f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=any_missing)]) f_edges = rest.Filter(should=[ rest.FieldCondition(key="source_id", match=any_missing), rest.FieldCondition(key="target_id", match=any_missing), ]) return f_notes, f_chunks, f_edges def preview(client: QdrantClient, prefix: str, missing: Set[str]) -> dict: """Erstellt eine Dry-Run-Vorschau (Anzahl Treffer pro Collection).""" notes_col, chunks_col, edges_col = collection_names(prefix) f_notes, f_chunks, f_edges = build_filters_for_missing(prefix, missing) return { "prefix": prefix, "collections": {"notes": notes_col, "chunks": chunks_col, "edges": edges_col}, "missing_note_ids_count": len(missing), "to_delete_counts": { "edges": count_matches(client, edges_col, f_edges), "chunks": count_matches(client, chunks_col, f_chunks), "notes": count_matches(client, notes_col, f_notes), } } def delete_with_filter(client: QdrantClient, collection: str, filter_: rest.Filter) -> int: """Löscht alle Punkte, die den Filter matchen. Gibt grobe Trefferzahl zurück.""" # erst zählen (für Feedback) to_del = count_matches(client, collection, filter_) if to_del == 0: return 0 client.delete( collection_name=collection, points_selector=filter_, wait=True ) return to_del def main(): load_dotenv() ap = argparse.ArgumentParser() ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (z. B. ./vault)") ap.add_argument("--mode", choices=["edges", "content", "all"], default="all", help="Löschmodus (edges=nur Kanten, content=Chunks+Notes, all=beides)") ap.add_argument("--prefix", help="Collection-Präfix (Default aus ENV COLLECTION_PREFIX)") ap.add_argument("--apply", action="store_true", help="Ohne diesen Schalter: Dry-Run") ap.add_argument("--yes", action="store_true", help="Rückfrage unterdrücken") ap.add_argument("--batch-size", type=int, default=1000, help="Batch-Größe für MatchAny") args = ap.parse_args() cfg = QdrantConfig.from_env() if args.prefix: cfg.prefix = args.prefix client = get_client(cfg) notes_col, chunks_col, edges_col = collection_names(cfg.prefix) # 1) Vault lesen → Menge gültiger Note-IDs vault_root = os.path.abspath(args.vault) vault_ids = iter_note_ids_from_vault(vault_root) # 2) Qdrant vorhandene Note-IDs einsammeln existing_note_ids = scroll_payload_values(client, notes_col, "note_id") # 3) Differenz ermitteln missing = existing_note_ids - vault_ids # 4) Vorschau summary = preview(client, cfg.prefix, missing) print(json.dumps({"mode": args.mode, "apply": args.apply, "summary": summary}, ensure_ascii=False, indent=2)) if not missing: # nichts zu tun return if not args.apply: # Dry-Run Ende return if not args.yes: resp = input("Fortfahren und die oben gezeigten Objekte löschen? (yes/no): ").strip().lower() if resp not in ("y", "yes"): print("Abgebrochen.") return # 5) Löschen je Modus f_notes, f_chunks, f_edges = build_filters_for_missing(cfg.prefix, missing) report = {"deleted": {"edges": 0, "chunks": 0, "notes": 0}} try: if args.mode in ("edges", "all"): report["deleted"]["edges"] = delete_with_filter(client, edges_col, f_edges) if args.mode in ("content", "all"): # Reihenfolge: erst chunks, dann notes report["deleted"]["chunks"] = delete_with_filter(client, chunks_col, f_chunks) report["deleted"]["notes"] = delete_with_filter(client, notes_col, f_notes) finally: print(json.dumps(report, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()