#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FILE: scripts/validate_edges.py VERSION: 2.1.0 (2025-12-15) STATUS: Active COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b) Zweck: ------- Validiert die Integrität der Edges in Qdrant. Prüft strukturelle Korrektheit, Referenz-Integrität und Konsistenz. Funktionsweise: --------------- 1. Lädt alle Edges aus {prefix}_edges 2. Führt mehrere Validierungen durch: - Zählt Edges nach Typ (references, backlink, etc.) - Prüft: Für jede "references" existiert "backlink"-Gegenkante - Prüft: "backlink" darf nicht "unresolved" sein - Prüft: "references_at".source_id existiert in chunks - Prüft: source/target existieren in notes - Prüft: doppelte edge_id (sollte 0 sein) 3. Gibt kompaktes JSON-Resultat + optionale Detail-Listen aus Ergebnis-Interpretation: ------------------------ - Ausgabe: JSON mit Validierungs-Ergebnissen * counts_by_kind: Anzahl Edges pro Typ * validation_results: Ergebnisse der einzelnen Prüfungen * issues: Liste gefundener Probleme (optional mit --verbose) - Exit-Code 0: Alle Validierungen bestanden - Exit-Code 1: Validierungsfehler gefunden Verwendung: ----------- - Qualitätskontrolle nach Importen - Debugging von Graph-Problemen - CI/CD-Validierung Hinweise: --------- - Prüft strukturelle Integrität, nicht semantische Korrektheit - Kann bei großen Graphen langsam sein Aufruf: ------- python3 -m scripts.validate_edges --prefix mindnet python3 -m scripts.validate_edges --prefix mindnet --verbose Parameter: ---------- --prefix TEXT Collection-Präfix (Default: ENV COLLECTION_PREFIX oder mindnet) --verbose Zeigt detaillierte Problem-Listen Umgebungsvariablen: ------------------- QDRANT_URL (Default: http://127.0.0.1:6333), QDRANT_API_KEY, COLLECTION_PREFIX Änderungen: ----------- v2.1.0 (2025-12-15): Dokumentation aktualisiert v1.0.0: Initial Release """ from __future__ import annotations import argparse import os from collections import Counter from typing import Any, Dict, List, Optional, Set, Tuple from qdrant_client import QdrantClient def scroll_all(client: QdrantClient, collection: str, with_payload: bool = True, batch: int = 1000): next_offset = None while True: points, next_offset = client.scroll( collection_name=collection, limit=batch, with_payload=with_payload, with_vectors=False, offset=next_offset, ) for p in points: yield p if next_offset is None: break def get_env_or_default(args_value: Optional[str], env_name: str, default: Optional[str]) -> Optional[str]: if args_value is not None and args_value != "": return args_value v = os.getenv(env_name) return v if v is not None and v != "" else default def main(): ap = argparse.ArgumentParser() ap.add_argument("--url", help="Qdrant URL (z. B. http://127.0.0.1:6333)") ap.add_argument("--api-key", help="Qdrant API Key (optional)") ap.add_argument("--prefix", help="Collection Prefix (Default: mindnet)") ap.add_argument("--details", action="store_true", help="Auch Problem-Listen ausführlich ausgeben (bis 200 Einträge pro Liste)") args = ap.parse_args() url = get_env_or_default(args.url, "QDRANT_URL", "http://127.0.0.1:6333") api_key = get_env_or_default(args.api_key, "QDRANT_API_KEY", None) prefix = get_env_or_default(args.prefix, "COLLECTION_PREFIX", "mindnet") notes_col = f"{prefix}_notes" chunks_col = f"{prefix}_chunks" edges_col = f"{prefix}_edges" client = QdrantClient(url=url, api_key=api_key) # --- Laden --- notes_ids: Set[str] = set() for p in scroll_all(client, notes_col): pl = p.payload or {} nid = pl.get("note_id") or pl.get("id") if nid: notes_ids.add(nid) chunk_ids: Set[str] = set() for p in scroll_all(client, chunks_col): pl = p.payload or {} cid = pl.get("chunk_id") or pl.get("id") if cid: chunk_ids.add(cid) counts = Counter() unresolved_counts = Counter() other_kinds: Set[str] = set() references: Set[Tuple[str, str]] = set() # (src_note, tgt_note) nur resolved backlinks: Set[Tuple[str, str]] = set() # (src_note, tgt_note) references_at: Set[Tuple[str, str, int]] = set() # (src_chunk, tgt_note, seq) unresolved_refs: List[Dict[str, Any]] = [] edge_ids: Set[str] = set() duplicate_edge_ids: List[str] = [] for p in scroll_all(client, edges_col): pl = p.payload or {} edge_id = pl.get("edge_id") or "" if edge_id in edge_ids: duplicate_edge_ids.append(edge_id) else: edge_ids.add(edge_id) kind = (pl.get("kind") or "").strip() counts[kind] += 1 status = pl.get("status") or "" if kind not in {"references", "backlink", "references_at"}: other_kinds.add(kind or "") # classify s = pl.get("source_id") t = pl.get("target_id") seq = pl.get("seq") # unresolved bookkeeping if status == "unresolved": unresolved_counts[kind] += 1 if kind == "references": unresolved_refs.append({"source_id": s, "raw": pl.get("raw"), "target_label": pl.get("target_label")}) # backlink sollte NIE unresolved sein continue if kind == "references": if s and t: references.add((s, t)) elif kind == "backlink": if s and t: backlinks.add((s, t)) elif kind == "references_at": if s and t: try: seq_i = int(seq) if seq is not None else -1 except Exception: seq_i = -1 references_at.add((s, t, seq_i)) # --- Invarianten prüfen --- # 1) Für jede resolved "references" muss es eine "backlink"-Gegenkante geben missing_backlinks = sorted([(s, t) for (s, t) in references if (t, s) not in backlinks]) # 2) "backlink" darf nicht unresolved sein (bereits oben gefiltert); wir prüfen nur, ob solche Einträge existierten backlink_unresolved_flag = unresolved_counts.get("backlink", 0) > 0 # 3) "references_at": Quelle (Chunk) muss existieren, Ziel (Note) muss existieren dangling_refat_source = sorted([(s, t, seq) for (s, t, seq) in references_at if s not in chunk_ids]) dangling_refat_target = sorted([(s, t, seq) for (s, t, seq) in references_at if t not in notes_ids]) # 4) "references"/"backlink": Quelle/Ziel müssen Notes sein missing_source_notes = sorted([(s, t) for (s, t) in references if s not in notes_ids]) missing_target_notes = sorted([(s, t) for (s, t) in references if t not in notes_ids]) # --- Ergebnis zusammenstellen --- total_edges = sum(v for k, v in counts.items() if k) def head(lst, n=50): return lst[:n] result = { "collections": {"notes": notes_col, "chunks": chunks_col, "edges": edges_col}, "counts": { "total": total_edges, "references": counts.get("references", 0), "backlink": counts.get("backlink", 0), "references_at": counts.get("references_at", 0), "unresolved_total": sum(unresolved_counts.values()), "unresolved_by_kind": dict(unresolved_counts), "other_kinds": sorted(list(other_kinds)), "unique_edge_ids": len(edge_ids), "duplicate_edge_ids": len(duplicate_edge_ids), }, "invariants": { "references_have_backlink": len(missing_backlinks) == 0, "no_unresolved_backlink": not backlink_unresolved_flag, "references_at_source_exist": len(dangling_refat_source) == 0, "references_at_target_exist": len(dangling_refat_target) == 0, "references_source_in_notes": len(missing_source_notes) == 0, "references_target_in_notes": len(missing_target_notes) == 0, }, "problems_samples": { "missing_backlinks": head(missing_backlinks, 100 if args.details else 30), "dangling_references_at_source": head(dangling_refat_source, 100 if args.details else 30), "dangling_references_at_target": head(dangling_refat_target, 100 if args.details else 30), "missing_source_notes": head(missing_source_notes, 100 if args.details else 30), "missing_target_notes": head(missing_target_notes, 100 if args.details else 30), "duplicate_edge_ids": head(duplicate_edge_ids, 100 if args.details else 30), "unresolved_references": head(unresolved_refs, 100 if args.details else 30), }, } import json print(json.dumps(result, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()