mindnet/scripts/prune_qdrant_vs_vault.py
Lars e93bab6ea7
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
Fassadenauflösung unter app/core
2025-12-28 11:04:40 +01:00

275 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FILE: scripts/prune_qdrant_vs_vault.py
VERSION: 2.1.0 (2025-12-15)
STATUS: Active
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b)
Zweck:
-------
Bereinigt verwaiste Einträge in Qdrant, wenn die zugehörigen Markdown-Dateien
im Vault nicht mehr existieren. Stellt Konsistenz zwischen Vault und Datenbank her.
Funktionsweise:
---------------
1. Liest alle note_id aus dem Vault (aus Frontmatter: id / note_id)
2. Liest alle note_id aus Qdrant (Collection: {prefix}_notes)
3. Berechnet Differenz: note_ids nur in Qdrant vorhanden (verwaist)
4. Für jede verwaiste note_id (nur mit --apply):
- Löscht Note-Point(s) aus {prefix}_notes
- Löscht alle Chunks der Note aus {prefix}_chunks
- Löscht alle Edges, die auf diese Note referenzieren:
* source_id == note_id
* target_id == note_id
* source_note_id == note_id
* target_note_id == note_id
Ergebnis-Interpretation:
------------------------
- Ausgabe: JSON mit Preview-Informationen
* mode: "DRY-RUN" oder "APPLY"
* counts: Statistiken (vault_note_ids, qdrant_note_ids, orphans)
* orphans_sample: Erste 20 verwaiste note_ids
- Bei --apply: Zusätzlich JSON mit deleted-Statistiken
* deleted.notes: Anzahl gelöschter Note-Points
* deleted.chunks: Anzahl gelöschter Chunks
* deleted.edges: Anzahl gelöschter Edges
Verwendung:
-----------
- Regelmäßige Wartung nach Vault-Bereinigung
- Vor Migrationen oder größeren Umstrukturierungen
- Konsistenz-Check zwischen Vault und Datenbank
Sicherheitsmerkmale:
-------------------
- Dry-Run standardmäßig (keine Änderungen ohne --apply)
- Interaktive Bestätigung (außer mit --yes)
- Nur betroffene note_ids werden gelöscht (kein globaler Delete)
- Zeigt Preview vor Ausführung
Aufruf:
-------
# Dry-Run (nur Analyse)
python3 -m scripts.prune_qdrant_vs_vault --vault ./vault --prefix mindnet
# Tatsächliches Löschen
python3 -m scripts.prune_qdrant_vs_vault --vault ./vault --prefix mindnet --apply
# Ohne Rückfrage
python3 -m scripts.prune_qdrant_vs_vault --vault ./vault --prefix mindnet --apply --yes
Parameter:
----------
--vault PATH Pfad zum Vault-Verzeichnis (erforderlich)
--prefix TEXT Collection-Präfix (Default: mindnet)
--apply Führt tatsächliches Löschen durch (sonst nur Dry-Run)
--yes Keine interaktive Bestätigung
Änderungen:
-----------
v2.1.0 (2025-12-15): Kompatibilität mit WP-14 Modularisierung
- Geändert: parse_markdown_file → read_markdown + normalize_frontmatter
- Parsing-Logik an neue API angepasst
v1.0.0 (2025-09-08): Erster Release
"""
import argparse
import json
import os
import sys
from pathlib import Path
from typing import Dict, List, Set, Tuple
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.database.qdrant import QdrantConfig, get_client, collection_names
from app.core.parser import read_markdown, normalize_frontmatter
def read_vault_note_ids(vault_dir: Path) -> Set[str]:
note_ids: Set[str] = set()
for p in vault_dir.rglob("*.md"):
try:
parsed = read_markdown(str(p))
if parsed:
fm = normalize_frontmatter(parsed.frontmatter) if parsed.frontmatter else {}
nid = fm.get("id") or fm.get("note_id")
if nid:
note_ids.add(str(nid))
except Exception:
# still und leise weiter wir wollen robust sein
continue
return note_ids
def qdrant_note_ids(client: QdrantClient, notes_col: str) -> Set[str]:
ids: Set[str] = set()
scroll_filter = None
offset = None
while True:
res = client.scroll(
collection_name=notes_col,
scroll_filter=scroll_filter,
offset=offset,
limit=256,
with_payload=True,
with_vectors=False,
)
points, next_offset = res
for pt in points:
pl = pt.payload or {}
nid = pl.get("note_id")
if nid:
ids.add(str(nid))
if next_offset is None:
break
offset = next_offset
return ids
def delete_by_filter(client: QdrantClient, collection: str, flt: rest.Filter) -> int:
# Sammel erst IDs via scroll (robuster Überblick), lösche dann über point_ids (batch)
to_delete: List[int] = []
offset = None
while True:
pts, next_offset = client.scroll(
collection_name=collection,
scroll_filter=flt,
offset=offset,
limit=256,
with_payload=False,
with_vectors=False,
)
for pt in pts:
to_delete.append(pt.id)
if next_offset is None:
break
offset = next_offset
if not to_delete:
return 0
client.delete(collection_name=collection, points_selector=rest.PointIdsList(points=to_delete), wait=True)
return len(to_delete)
def prune_for_note(
client: QdrantClient,
cols: Dict[str, str],
note_id: str,
dry_run: bool = True,
) -> Dict[str, int]:
stats = {"notes": 0, "chunks": 0, "edges": 0}
# Notes löschen (Filter auf payload.note_id)
f_notes = rest.Filter(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
)
# Chunks löschen (Filter auf payload.note_id)
f_chunks = rest.Filter(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
)
# Edges löschen: mehrere Teilmengen erfassen
edge_filters = [
rest.Filter(must=[rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))]),
rest.Filter(must=[rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id))]),
rest.Filter(must=[rest.FieldCondition(key="source_note_id", match=rest.MatchValue(value=note_id))]),
rest.Filter(must=[rest.FieldCondition(key="target_note_id", match=rest.MatchValue(value=note_id))]),
]
if dry_run:
# Zähle nur, was *wäre* gelöscht worden
def count(flt: rest.Filter, col: str) -> int:
total = 0
offset = None
while True:
pts, next_offset = client.scroll(
collection_name=col,
scroll_filter=flt,
offset=offset,
limit=256,
with_payload=False,
with_vectors=False,
)
total += len(pts)
if next_offset is None:
break
offset = next_offset
return total
stats["notes"] = count(f_notes, cols["notes"])
stats["chunks"] = count(f_chunks, cols["chunks"])
stats["edges"] = sum(count(f, cols["edges"]) for f in edge_filters)
return stats
# tatsächliches Löschen
stats["notes"] = delete_by_filter(client, cols["notes"], f_notes)
stats["chunks"] = delete_by_filter(client, cols["chunks"], f_chunks)
e_deleted = 0
for f in edge_filters:
e_deleted += delete_by_filter(client, cols["edges"], f)
stats["edges"] = e_deleted
return stats
def main():
ap = argparse.ArgumentParser(description="Prune Qdrant data for notes missing in the Vault.")
ap.add_argument("--vault", required=True, help="Pfad zum Vault-Verzeichnis (Root).")
ap.add_argument("--prefix", default="mindnet", help="Collections-Präfix (Default: mindnet).")
ap.add_argument("--apply", action="store_true", help="Ohne diesen Schalter wird nur ein Dry-Run gemacht.")
ap.add_argument("--yes", action="store_true", help="Ohne Nachfrage ausführen.")
args = ap.parse_args()
vault_root = Path(args.vault).resolve()
if not vault_root.exists():
print(f"Vault-Verzeichnis nicht gefunden: {vault_root}", file=sys.stderr)
sys.exit(1)
cfg = QdrantConfig()
client = get_client(cfg)
cols = collection_names(args.prefix)
vault_ids = read_vault_note_ids(vault_root)
qdrant_ids = qdrant_note_ids(client, cols["notes"])
orphans = sorted(qdrant_ids - vault_ids)
preview = {
"mode": "APPLY" if args.apply else "DRY-RUN",
"prefix": args.prefix,
"vault_root": str(vault_root),
"collections": cols,
"counts": {
"vault_note_ids": len(vault_ids),
"qdrant_note_ids": len(qdrant_ids),
"orphans": len(orphans),
},
"orphans_sample": orphans[:20],
}
print(json.dumps(preview, ensure_ascii=False, indent=2))
if not orphans:
print("Keine verwaisten Einträge gefunden. Nichts zu tun.")
return
if not args.yes:
resp = input("\nFortfahren mit dem oben angezeigten Modus? (yes/no): ").strip().lower()
if resp not in ("y", "yes", "j", "ja"):
print("Abgebrochen.")
return
total_stats = {"notes": 0, "chunks": 0, "edges": 0}
for nid in orphans:
s = prune_for_note(client, cols, nid, dry_run=(not args.apply))
total_stats["notes"] += s["notes"]
total_stats["chunks"] += s["chunks"]
total_stats["edges"] += s["edges"]
print(json.dumps({"deleted": total_stats}, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()