mindnet/scripts/validate_edges.py
Lars e9532e8878
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
script_Überprüfung und Kommentarheader
2025-12-28 10:40:28 +01:00

243 lines
8.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FILE: scripts/validate_edges.py
VERSION: 2.1.0 (2025-12-15)
STATUS: Active
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b)
Zweck:
-------
Validiert die Integrität der Edges in Qdrant.
Prüft strukturelle Korrektheit, Referenz-Integrität und Konsistenz.
Funktionsweise:
---------------
1. Lädt alle Edges aus {prefix}_edges
2. Führt mehrere Validierungen durch:
- Zählt Edges nach Typ (references, backlink, etc.)
- Prüft: Für jede "references" existiert "backlink"-Gegenkante
- Prüft: "backlink" darf nicht "unresolved" sein
- Prüft: "references_at".source_id existiert in chunks
- Prüft: source/target existieren in notes
- Prüft: doppelte edge_id (sollte 0 sein)
3. Gibt kompaktes JSON-Resultat + optionale Detail-Listen aus
Ergebnis-Interpretation:
------------------------
- Ausgabe: JSON mit Validierungs-Ergebnissen
* counts_by_kind: Anzahl Edges pro Typ
* validation_results: Ergebnisse der einzelnen Prüfungen
* issues: Liste gefundener Probleme (optional mit --verbose)
- Exit-Code 0: Alle Validierungen bestanden
- Exit-Code 1: Validierungsfehler gefunden
Verwendung:
-----------
- Qualitätskontrolle nach Importen
- Debugging von Graph-Problemen
- CI/CD-Validierung
Hinweise:
---------
- Prüft strukturelle Integrität, nicht semantische Korrektheit
- Kann bei großen Graphen langsam sein
Aufruf:
-------
python3 -m scripts.validate_edges --prefix mindnet
python3 -m scripts.validate_edges --prefix mindnet --verbose
Parameter:
----------
--prefix TEXT Collection-Präfix (Default: ENV COLLECTION_PREFIX oder mindnet)
--verbose Zeigt detaillierte Problem-Listen
Umgebungsvariablen:
-------------------
QDRANT_URL (Default: http://127.0.0.1:6333), QDRANT_API_KEY, COLLECTION_PREFIX
Änderungen:
-----------
v2.1.0 (2025-12-15): Dokumentation aktualisiert
v1.0.0: Initial Release
"""
from __future__ import annotations
import argparse
import os
from collections import Counter
from typing import Any, Dict, List, Optional, Set, Tuple
from qdrant_client import QdrantClient
def scroll_all(client: QdrantClient, collection: str, with_payload: bool = True, batch: int = 1000):
next_offset = None
while True:
points, next_offset = client.scroll(
collection_name=collection,
limit=batch,
with_payload=with_payload,
with_vectors=False,
offset=next_offset,
)
for p in points:
yield p
if next_offset is None:
break
def get_env_or_default(args_value: Optional[str], env_name: str, default: Optional[str]) -> Optional[str]:
if args_value is not None and args_value != "":
return args_value
v = os.getenv(env_name)
return v if v is not None and v != "" else default
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--url", help="Qdrant URL (z. B. http://127.0.0.1:6333)")
ap.add_argument("--api-key", help="Qdrant API Key (optional)")
ap.add_argument("--prefix", help="Collection Prefix (Default: mindnet)")
ap.add_argument("--details", action="store_true", help="Auch Problem-Listen ausführlich ausgeben (bis 200 Einträge pro Liste)")
args = ap.parse_args()
url = get_env_or_default(args.url, "QDRANT_URL", "http://127.0.0.1:6333")
api_key = get_env_or_default(args.api_key, "QDRANT_API_KEY", None)
prefix = get_env_or_default(args.prefix, "COLLECTION_PREFIX", "mindnet")
notes_col = f"{prefix}_notes"
chunks_col = f"{prefix}_chunks"
edges_col = f"{prefix}_edges"
client = QdrantClient(url=url, api_key=api_key)
# --- Laden ---
notes_ids: Set[str] = set()
for p in scroll_all(client, notes_col):
pl = p.payload or {}
nid = pl.get("note_id") or pl.get("id")
if nid:
notes_ids.add(nid)
chunk_ids: Set[str] = set()
for p in scroll_all(client, chunks_col):
pl = p.payload or {}
cid = pl.get("chunk_id") or pl.get("id")
if cid:
chunk_ids.add(cid)
counts = Counter()
unresolved_counts = Counter()
other_kinds: Set[str] = set()
references: Set[Tuple[str, str]] = set() # (src_note, tgt_note) nur resolved
backlinks: Set[Tuple[str, str]] = set() # (src_note, tgt_note)
references_at: Set[Tuple[str, str, int]] = set() # (src_chunk, tgt_note, seq)
unresolved_refs: List[Dict[str, Any]] = []
edge_ids: Set[str] = set()
duplicate_edge_ids: List[str] = []
for p in scroll_all(client, edges_col):
pl = p.payload or {}
edge_id = pl.get("edge_id") or ""
if edge_id in edge_ids:
duplicate_edge_ids.append(edge_id)
else:
edge_ids.add(edge_id)
kind = (pl.get("kind") or "").strip()
counts[kind] += 1
status = pl.get("status") or ""
if kind not in {"references", "backlink", "references_at"}:
other_kinds.add(kind or "<missing>")
# classify
s = pl.get("source_id")
t = pl.get("target_id")
seq = pl.get("seq")
# unresolved bookkeeping
if status == "unresolved":
unresolved_counts[kind] += 1
if kind == "references":
unresolved_refs.append({"source_id": s, "raw": pl.get("raw"), "target_label": pl.get("target_label")})
# backlink sollte NIE unresolved sein
continue
if kind == "references":
if s and t:
references.add((s, t))
elif kind == "backlink":
if s and t:
backlinks.add((s, t))
elif kind == "references_at":
if s and t:
try:
seq_i = int(seq) if seq is not None else -1
except Exception:
seq_i = -1
references_at.add((s, t, seq_i))
# --- Invarianten prüfen ---
# 1) Für jede resolved "references" muss es eine "backlink"-Gegenkante geben
missing_backlinks = sorted([(s, t) for (s, t) in references if (t, s) not in backlinks])
# 2) "backlink" darf nicht unresolved sein (bereits oben gefiltert); wir prüfen nur, ob solche Einträge existierten
backlink_unresolved_flag = unresolved_counts.get("backlink", 0) > 0
# 3) "references_at": Quelle (Chunk) muss existieren, Ziel (Note) muss existieren
dangling_refat_source = sorted([(s, t, seq) for (s, t, seq) in references_at if s not in chunk_ids])
dangling_refat_target = sorted([(s, t, seq) for (s, t, seq) in references_at if t not in notes_ids])
# 4) "references"/"backlink": Quelle/Ziel müssen Notes sein
missing_source_notes = sorted([(s, t) for (s, t) in references if s not in notes_ids])
missing_target_notes = sorted([(s, t) for (s, t) in references if t not in notes_ids])
# --- Ergebnis zusammenstellen ---
total_edges = sum(v for k, v in counts.items() if k)
def head(lst, n=50):
return lst[:n]
result = {
"collections": {"notes": notes_col, "chunks": chunks_col, "edges": edges_col},
"counts": {
"total": total_edges,
"references": counts.get("references", 0),
"backlink": counts.get("backlink", 0),
"references_at": counts.get("references_at", 0),
"unresolved_total": sum(unresolved_counts.values()),
"unresolved_by_kind": dict(unresolved_counts),
"other_kinds": sorted(list(other_kinds)),
"unique_edge_ids": len(edge_ids),
"duplicate_edge_ids": len(duplicate_edge_ids),
},
"invariants": {
"references_have_backlink": len(missing_backlinks) == 0,
"no_unresolved_backlink": not backlink_unresolved_flag,
"references_at_source_exist": len(dangling_refat_source) == 0,
"references_at_target_exist": len(dangling_refat_target) == 0,
"references_source_in_notes": len(missing_source_notes) == 0,
"references_target_in_notes": len(missing_target_notes) == 0,
},
"problems_samples": {
"missing_backlinks": head(missing_backlinks, 100 if args.details else 30),
"dangling_references_at_source": head(dangling_refat_source, 100 if args.details else 30),
"dangling_references_at_target": head(dangling_refat_target, 100 if args.details else 30),
"missing_source_notes": head(missing_source_notes, 100 if args.details else 30),
"missing_target_notes": head(missing_target_notes, 100 if args.details else 30),
"duplicate_edge_ids": head(duplicate_edge_ids, 100 if args.details else 30),
"unresolved_references": head(unresolved_refs, 100 if args.details else 30),
},
}
import json
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()