243 lines
10 KiB
Python
243 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# scripts/gc_qdrant_after_vault_scan.py
|
|
# Version: 1.0.0 (2025-09-05)
|
|
#
|
|
# Zweck
|
|
# -----
|
|
# Garbage-Collector für Qdrant: löscht Einträge (Edges, Chunks, Notes), deren Note-ID
|
|
# nicht mehr im Obsidian-Vault vorhanden ist. So bleiben Vektorindex/Graph mit dem Vault konsistent.
|
|
#
|
|
# Aufrufparameter
|
|
# ---------------
|
|
# --vault PATH Pfad zum Obsidian-Vault (z. B. ./vault) [erforderlich]
|
|
# --mode {edges,content,all}
|
|
# edges -> nur Kanten mit source_id/target_id ∉ Vault-Notes löschen
|
|
# content -> Chunks & Notes löschen, deren note_id ∉ Vault-Notes (keine Edges)
|
|
# all -> zuerst edges, dann content
|
|
# --prefix PREFIX Collection-Präfix (Default: aus ENV COLLECTION_PREFIX oder "mindnet")
|
|
# --apply Ohne diesen Schalter: Dry-Run (nur Vorschau). Mit Schalter: ausführen.
|
|
# --yes Sicherheitsabfrage überspringen (non-interaktiv)
|
|
# --batch-size N Größe für In-Filter (Default: 1000)
|
|
#
|
|
# Hinweise
|
|
# --------
|
|
# - Benötigt funktionierendes Python-venv mit allen Abhängigkeiten von mindnet (qdrant-client usw.).
|
|
# - Läuft gegen die drei Collections {prefix}_notes, {prefix}_chunks, {prefix}_edges (keine anderen Projekte).
|
|
# - Nutzt OR-Filter (Filter.should) korrekt, keine minimum_should (Pydantic v2 lässt die Option nicht zu).
|
|
#
|
|
# Änderungen ggü. vorher
|
|
# ----------------------
|
|
# - Neu: eigenes GC-Tool; berührt ausschließlich das übergebene Projekt-Präfix.
|
|
# - Sichere Dry-Run-Vorschau mit Zählung pro Collection; interaktive Bestätigung.
|
|
# - Löscht Edges mittels Filter auf (source_id ∈ MISSING) ODER (target_id ∈ MISSING).
|
|
# - Löscht Chunks/Notes mittels Filter auf note_id ∈ MISSING.
|
|
#
|
|
# Beispiele
|
|
# ---------
|
|
# DRY RUN: python3 -m scripts.gc_qdrant_after_vault_scan --vault ./vault --mode all
|
|
# APPLY: python3 -m scripts.gc_qdrant_after_vault_scan --vault ./vault --mode all --apply
|
|
# CI-Modus: python3 -m scripts.gc_qdrant_after_vault_scan --vault ./vault --mode edges --apply --yes
|
|
#
|
|
# Kompatibilität / Kontext
|
|
# ------------------------
|
|
# - Frontmatter-Validierung & Parsing wie im Projekt (parser.read_markdown, validate_required_frontmatter). :contentReference[oaicite:0]{index=0}
|
|
# - Note-Payload-Schema (u. a. note_id, hash_fulltext) wie spezifiziert. :contentReference[oaicite:1]{index=1}
|
|
# - Collection-Namen & Anlage-Logik über app.core.qdrant (notes/chunks = dim, edges = 1D). :contentReference[oaicite:2]{index=2}
|
|
# - Edges nutzen Felder source_id/target_id; Chunks/Notes tragen note_id in payload. :contentReference[oaicite:3]{index=3} :contentReference[oaicite:4]{index=4}
|
|
#
|
|
from __future__ import annotations
|
|
import argparse, os, glob, json, sys
|
|
from typing import Iterable, Set, Tuple, List
|
|
|
|
from dotenv import load_dotenv
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import models as rest
|
|
|
|
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # :contentReference[oaicite:5]{index=5}
|
|
from app.core.qdrant import QdrantConfig, get_client, collection_names # :contentReference[oaicite:6]{index=6}
|
|
|
|
def iter_note_ids_from_vault(vault_root: str) -> Set[str]:
|
|
"""Liest alle Markdown-Dateien im Vault und sammelt valide Frontmatter-IDs."""
|
|
paths = [p for p in glob.glob(os.path.join(vault_root, "**", "*.md"), recursive=True)]
|
|
out: Set[str] = set()
|
|
for p in paths:
|
|
try:
|
|
parsed = read_markdown(p)
|
|
fm = normalize_frontmatter(parsed.frontmatter)
|
|
validate_required_frontmatter(fm)
|
|
nid = fm.get("id") or fm.get("note_id")
|
|
if nid:
|
|
out.add(nid)
|
|
except Exception:
|
|
# ungültige oder Template-Dateien ohne Pflichtfelder ignorieren
|
|
continue
|
|
return out
|
|
|
|
def scroll_payload_values(client: QdrantClient, collection: str, field: str, limit: int = 2000) -> Set[str]:
|
|
"""Scrollt eine Collection und sammelt alle Payload-Werte eines Feldes (falls vorhanden)."""
|
|
out: Set[str] = set()
|
|
offset = None
|
|
while True:
|
|
res = client.scroll(
|
|
collection_name=collection,
|
|
scroll_filter=None,
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
limit=limit,
|
|
offset=offset
|
|
)
|
|
# qdrant-client (neuere Versionen) geben (points, next_offset)
|
|
if isinstance(res, tuple) and len(res) == 2:
|
|
points, offset = res
|
|
else:
|
|
# Fallback, falls sich API signifikant ändert
|
|
points, offset = res[0], res[1] if len(res) > 1 else None
|
|
|
|
for pt in points or []:
|
|
pl = getattr(pt, "payload", None) or {}
|
|
v = pl.get(field)
|
|
if isinstance(v, str):
|
|
out.add(v)
|
|
if not offset:
|
|
break
|
|
return out
|
|
|
|
def chunked(iterable: Iterable[str], n: int) -> Iterable[List[str]]:
|
|
buf: List[str] = []
|
|
for x in iterable:
|
|
buf.append(x)
|
|
if len(buf) >= n:
|
|
yield buf
|
|
buf = []
|
|
if buf:
|
|
yield buf
|
|
|
|
def count_matches(client: QdrantClient, collection: str, filter_: rest.Filter) -> int:
|
|
"""Grober Zähler über scroll (Qdrant Count-API ist optional je Version)."""
|
|
total = 0
|
|
offset = None
|
|
while True:
|
|
points, offset = client.scroll(
|
|
collection_name=collection,
|
|
scroll_filter=filter_,
|
|
with_payload=False,
|
|
with_vectors=False,
|
|
limit=2000,
|
|
offset=offset
|
|
)
|
|
total += len(points or [])
|
|
if not offset:
|
|
break
|
|
return total
|
|
|
|
def build_filters_for_missing(prefix: str, missing_note_ids: Set[str]) -> Tuple[rest.Filter, rest.Filter, rest.Filter]:
|
|
"""
|
|
Erzeugt je Collection passende Filter:
|
|
- Notes/Chunks: note_id ∈ missing
|
|
- Edges: (source_id ∈ missing) ODER (target_id ∈ missing)
|
|
"""
|
|
notes_col, chunks_col, edges_col = collection_names(prefix) # :contentReference[oaicite:7]{index=7}
|
|
# MatchAny statt MatchValue für große Mengen
|
|
any_missing = rest.MatchAny(any=list(missing_note_ids))
|
|
|
|
f_notes = rest.Filter(must=[rest.FieldCondition(key="note_id", match=any_missing)])
|
|
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=any_missing)])
|
|
f_edges = rest.Filter(should=[
|
|
rest.FieldCondition(key="source_id", match=any_missing),
|
|
rest.FieldCondition(key="target_id", match=any_missing),
|
|
])
|
|
return f_notes, f_chunks, f_edges
|
|
|
|
def preview(client: QdrantClient, prefix: str, missing: Set[str]) -> dict:
|
|
"""Erstellt eine Dry-Run-Vorschau (Anzahl Treffer pro Collection)."""
|
|
notes_col, chunks_col, edges_col = collection_names(prefix)
|
|
f_notes, f_chunks, f_edges = build_filters_for_missing(prefix, missing)
|
|
|
|
return {
|
|
"prefix": prefix,
|
|
"collections": {"notes": notes_col, "chunks": chunks_col, "edges": edges_col},
|
|
"missing_note_ids_count": len(missing),
|
|
"to_delete_counts": {
|
|
"edges": count_matches(client, edges_col, f_edges),
|
|
"chunks": count_matches(client, chunks_col, f_chunks),
|
|
"notes": count_matches(client, notes_col, f_notes),
|
|
}
|
|
}
|
|
|
|
def delete_with_filter(client: QdrantClient, collection: str, filter_: rest.Filter) -> int:
|
|
"""Löscht alle Punkte, die den Filter matchen. Gibt grobe Trefferzahl zurück."""
|
|
# erst zählen (für Feedback)
|
|
to_del = count_matches(client, collection, filter_)
|
|
if to_del == 0:
|
|
return 0
|
|
client.delete(
|
|
collection_name=collection,
|
|
points_selector=filter_,
|
|
wait=True
|
|
)
|
|
return to_del
|
|
|
|
def main():
|
|
load_dotenv()
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (z. B. ./vault)")
|
|
ap.add_argument("--mode", choices=["edges", "content", "all"], default="all",
|
|
help="Löschmodus (edges=nur Kanten, content=Chunks+Notes, all=beides)")
|
|
ap.add_argument("--prefix", help="Collection-Präfix (Default aus ENV COLLECTION_PREFIX)")
|
|
ap.add_argument("--apply", action="store_true", help="Ohne diesen Schalter: Dry-Run")
|
|
ap.add_argument("--yes", action="store_true", help="Rückfrage unterdrücken")
|
|
ap.add_argument("--batch-size", type=int, default=1000, help="Batch-Größe für MatchAny")
|
|
args = ap.parse_args()
|
|
|
|
cfg = QdrantConfig.from_env()
|
|
if args.prefix:
|
|
cfg.prefix = args.prefix
|
|
client = get_client(cfg)
|
|
|
|
notes_col, chunks_col, edges_col = collection_names(cfg.prefix)
|
|
|
|
# 1) Vault lesen → Menge gültiger Note-IDs
|
|
vault_root = os.path.abspath(args.vault)
|
|
vault_ids = iter_note_ids_from_vault(vault_root)
|
|
|
|
# 2) Qdrant vorhandene Note-IDs einsammeln
|
|
existing_note_ids = scroll_payload_values(client, notes_col, "note_id")
|
|
|
|
# 3) Differenz ermitteln
|
|
missing = existing_note_ids - vault_ids
|
|
|
|
# 4) Vorschau
|
|
summary = preview(client, cfg.prefix, missing)
|
|
print(json.dumps({"mode": args.mode, "apply": args.apply, "summary": summary}, ensure_ascii=False, indent=2))
|
|
|
|
if not missing:
|
|
# nichts zu tun
|
|
return
|
|
|
|
if not args.apply:
|
|
# Dry-Run Ende
|
|
return
|
|
|
|
if not args.yes:
|
|
resp = input("Fortfahren und die oben gezeigten Objekte löschen? (yes/no): ").strip().lower()
|
|
if resp not in ("y", "yes"):
|
|
print("Abgebrochen.")
|
|
return
|
|
|
|
# 5) Löschen je Modus
|
|
f_notes, f_chunks, f_edges = build_filters_for_missing(cfg.prefix, missing)
|
|
report = {"deleted": {"edges": 0, "chunks": 0, "notes": 0}}
|
|
|
|
try:
|
|
if args.mode in ("edges", "all"):
|
|
report["deleted"]["edges"] = delete_with_filter(client, edges_col, f_edges)
|
|
if args.mode in ("content", "all"):
|
|
# Reihenfolge: erst chunks, dann notes
|
|
report["deleted"]["chunks"] = delete_with_filter(client, chunks_col, f_chunks)
|
|
report["deleted"]["notes"] = delete_with_filter(client, notes_col, f_notes)
|
|
finally:
|
|
print(json.dumps(report, ensure_ascii=False, indent=2))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|