mindnet/scripts/validate_edges.py
Lars c9d2c5c4ca
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 2s
scripts/validate_edges.py hinzugefügt
2025-09-04 16:03:44 +02:00

200 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""
scripts/validate_edges.py
Validiert die von WP-03 erzeugten Edges in Qdrant:
- Zählt Kanten je Typ (references, backlink, references_at)
- Prüft: Für jede "references" (resolved) existiert eine "backlink"-Gegenkante
- Prüft: "backlink" darf nicht "unresolved" sein
- Prüft: "references_at".source_id existiert in {prefix}_chunks
- Prüft: "references"/"backlink" source/target existieren in {prefix}_notes
- Prüft: doppelte edge_id (sollte 0 sein, da UUIDv5 aus edge_id)
Gibt ein kompaktes JSON-Resultat + optionale Detail-Listen aus.
Umgebung/Parameter:
- QDRANT_URL (Default: http://127.0.0.1:6333)
- QDRANT_API_KEY (optional)
- COLLECTION_PREFIX (Default: mindnet)
Beispiel:
python scripts/validate_edges.py --prefix mindnet
"""
from __future__ import annotations
import argparse
import os
from collections import Counter
from typing import Any, Dict, List, Optional, Set, Tuple
from qdrant_client import QdrantClient
def scroll_all(client: QdrantClient, collection: str, with_payload: bool = True, batch: int = 1000):
next_offset = None
while True:
points, next_offset = client.scroll(
collection_name=collection,
limit=batch,
with_payload=with_payload,
with_vectors=False,
offset=next_offset,
)
for p in points:
yield p
if next_offset is None:
break
def get_env_or_default(args_value: Optional[str], env_name: str, default: Optional[str]) -> Optional[str]:
if args_value is not None and args_value != "":
return args_value
v = os.getenv(env_name)
return v if v is not None and v != "" else default
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--url", help="Qdrant URL (z. B. http://127.0.0.1:6333)")
ap.add_argument("--api-key", help="Qdrant API Key (optional)")
ap.add_argument("--prefix", help="Collection Prefix (Default: mindnet)")
ap.add_argument("--details", action="store_true", help="Auch Problem-Listen ausführlich ausgeben (bis 200 Einträge pro Liste)")
args = ap.parse_args()
url = get_env_or_default(args.url, "QDRANT_URL", "http://127.0.0.1:6333")
api_key = get_env_or_default(args.api_key, "QDRANT_API_KEY", None)
prefix = get_env_or_default(args.prefix, "COLLECTION_PREFIX", "mindnet")
notes_col = f"{prefix}_notes"
chunks_col = f"{prefix}_chunks"
edges_col = f"{prefix}_edges"
client = QdrantClient(url=url, api_key=api_key)
# --- Laden ---
notes_ids: Set[str] = set()
for p in scroll_all(client, notes_col):
pl = p.payload or {}
nid = pl.get("note_id") or pl.get("id")
if nid:
notes_ids.add(nid)
chunk_ids: Set[str] = set()
for p in scroll_all(client, chunks_col):
pl = p.payload or {}
cid = pl.get("chunk_id") or pl.get("id")
if cid:
chunk_ids.add(cid)
counts = Counter()
unresolved_counts = Counter()
other_kinds: Set[str] = set()
references: Set[Tuple[str, str]] = set() # (src_note, tgt_note) nur resolved
backlinks: Set[Tuple[str, str]] = set() # (src_note, tgt_note)
references_at: Set[Tuple[str, str, int]] = set() # (src_chunk, tgt_note, seq)
unresolved_refs: List[Dict[str, Any]] = []
edge_ids: Set[str] = set()
duplicate_edge_ids: List[str] = []
for p in scroll_all(client, edges_col):
pl = p.payload or {}
edge_id = pl.get("edge_id") or ""
if edge_id in edge_ids:
duplicate_edge_ids.append(edge_id)
else:
edge_ids.add(edge_id)
kind = (pl.get("kind") or "").strip()
counts[kind] += 1
status = pl.get("status") or ""
if kind not in {"references", "backlink", "references_at"}:
other_kinds.add(kind or "<missing>")
# classify
s = pl.get("source_id")
t = pl.get("target_id")
seq = pl.get("seq")
# unresolved bookkeeping
if status == "unresolved":
unresolved_counts[kind] += 1
if kind == "references":
unresolved_refs.append({"source_id": s, "raw": pl.get("raw"), "target_label": pl.get("target_label")})
# backlink sollte NIE unresolved sein
continue
if kind == "references":
if s and t:
references.add((s, t))
elif kind == "backlink":
if s and t:
backlinks.add((s, t))
elif kind == "references_at":
if s and t:
try:
seq_i = int(seq) if seq is not None else -1
except Exception:
seq_i = -1
references_at.add((s, t, seq_i))
# --- Invarianten prüfen ---
# 1) Für jede resolved "references" muss es eine "backlink"-Gegenkante geben
missing_backlinks = sorted([(s, t) for (s, t) in references if (t, s) not in backlinks])
# 2) "backlink" darf nicht unresolved sein (bereits oben gefiltert); wir prüfen nur, ob solche Einträge existierten
backlink_unresolved_flag = unresolved_counts.get("backlink", 0) > 0
# 3) "references_at": Quelle (Chunk) muss existieren, Ziel (Note) muss existieren
dangling_refat_source = sorted([(s, t, seq) for (s, t, seq) in references_at if s not in chunk_ids])
dangling_refat_target = sorted([(s, t, seq) for (s, t, seq) in references_at if t not in notes_ids])
# 4) "references"/"backlink": Quelle/Ziel müssen Notes sein
missing_source_notes = sorted([(s, t) for (s, t) in references if s not in notes_ids])
missing_target_notes = sorted([(s, t) for (s, t) in references if t not in notes_ids])
# --- Ergebnis zusammenstellen ---
total_edges = sum(v for k, v in counts.items() if k)
def head(lst, n=50):
return lst[:n]
result = {
"collections": {"notes": notes_col, "chunks": chunks_col, "edges": edges_col},
"counts": {
"total": total_edges,
"references": counts.get("references", 0),
"backlink": counts.get("backlink", 0),
"references_at": counts.get("references_at", 0),
"unresolved_total": sum(unresolved_counts.values()),
"unresolved_by_kind": dict(unresolved_counts),
"other_kinds": sorted(list(other_kinds)),
"unique_edge_ids": len(edge_ids),
"duplicate_edge_ids": len(duplicate_edge_ids),
},
"invariants": {
"references_have_backlink": len(missing_backlinks) == 0,
"no_unresolved_backlink": not backlink_unresolved_flag,
"references_at_source_exist": len(dangling_refat_source) == 0,
"references_at_target_exist": len(dangling_refat_target) == 0,
"references_source_in_notes": len(missing_source_notes) == 0,
"references_target_in_notes": len(missing_target_notes) == 0,
},
"problems_samples": {
"missing_backlinks": head(missing_backlinks, 100 if args.details else 30),
"dangling_references_at_source": head(dangling_refat_source, 100 if args.details else 30),
"dangling_references_at_target": head(dangling_refat_target, 100 if args.details else 30),
"missing_source_notes": head(missing_source_notes, 100 if args.details else 30),
"missing_target_notes": head(missing_target_notes, 100 if args.details else 30),
"duplicate_edge_ids": head(duplicate_edge_ids, 100 if args.details else 30),
"unresolved_references": head(unresolved_refs, 100 if args.details else 30),
},
}
import json
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()