scripts/edges_full_check.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-17 16:37:00 +01:00
parent fd215c18e4
commit 46b26c9624

View File

@ -1,160 +1,160 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scripts/edges_full_check.py
Zählt und validiert Kanten in Qdrant. Erkennt folgende Rule-Gruppen:
- explicit_total: rule_id startswith "explicit:" (z.B. explicit:wikilink, explicit:note_scope)
- callout_total: rule_id == "callout:edge"
- inline_total: rule_id startswith "inline:" (z.B. inline:rel)
- defaults_total: rule_id startswith "edge_defaults:"
- structure: rule_id in {"structure:belongs_to","structure:order"}
Gibt zusätzlich:
- edges_by_kind (aggregiert)
- notes/chunks/edges Anzahlen
- multi_callout_detected: True, falls ein Chunk mehrere Callout-Ziele der gleichen Relation enthält
- per_note_checks: belongs_to == chunks, next == prev == (chunks-1)
"""
from __future__ import annotations
import json
import os
from collections import Counter, defaultdict
from typing import Dict, Tuple
from typing import Dict, Any, List, Tuple
from qdrant_client.http import models as rest
from app.core.qdrant import QdrantConfig, get_client
from qdrant_client.http import models as rest
def _rel(payload: dict) -> str:
return payload.get("relation") or payload.get("kind") or "edge"
def _count_collection_points(client, name: str) -> int:
try:
res = client.count(collection_name=name, exact=True)
return res.count or 0
except Exception:
return 0
def _count_by_kind(edges_payloads):
c = Counter()
for pl in edges_payloads:
c[_rel(pl)] += 1
return dict(c)
def _is_explicit(pl: dict) -> bool:
rid = (pl.get("rule_id") or "").lower()
return rid.startswith("explicit:") or rid.startswith("inline:") or rid.startswith("callout:")
def _is_default(pl: dict) -> bool:
rid = (pl.get("rule_id") or "").lower()
return rid.startswith("edge_defaults:")
def _is_callout(pl: dict) -> bool:
rid = (pl.get("rule_id") or "").lower()
return rid.startswith("callout:")
def _is_inline(pl: dict) -> bool:
rid = (pl.get("rule_id") or "").lower()
return rid.startswith("inline:")
def _scroll_all(client, col_name: str):
points = []
next_page = None
def _scroll_all(client, collection: str) -> List[Any]:
pts_all = []
offset = None
while True:
res, next_page = client.scroll(
collection_name=col_name,
pts, offset = client.scroll(
collection_name=collection,
with_payload=True,
with_vectors=False,
limit=2048,
offset=next_page,
offset=offset,
)
points.extend(res)
if next_page is None:
pts_all.extend(pts or [])
if offset is None:
break
return points
return pts_all
def main():
def _rule_group(rule_id: str) -> str:
if not rule_id:
return "unknown"
if rule_id == "callout:edge":
return "callout"
if rule_id.startswith("inline:"): # <—— wichtig für "inline:rel"
return "inline"
if rule_id.startswith("edge_defaults:"):
return "defaults"
if rule_id.startswith("explicit:"):
return "explicit"
if rule_id in ("structure:belongs_to", "structure:order"):
return "structure"
return "other"
def main() -> None:
cfg = QdrantConfig.from_env()
client = get_client(cfg)
prefix = os.environ.get("COLLECTION_PREFIX", cfg.prefix)
cols = {
"notes": f"{prefix}_notes",
"chunks": f"{prefix}_chunks",
"edges": f"{prefix}_edges",
}
col_notes = f"{cfg.prefix}_notes"
col_chunks = f"{cfg.prefix}_chunks"
col_edges = f"{cfg.prefix}_edges"
# 1) Alle Edges lesen
edge_pts = _scroll_all(client, cols["edges"])
edges_payloads = [p.payload or {} for p in edge_pts]
# High-level counts
notes_n = _count_collection_points(client, col_notes)
chunks_n = _count_collection_points(client, col_chunks)
edges_pts = _scroll_all(client, col_edges)
edges_n = len(edges_pts)
# 2) Summen & Klassifizierungen
edges_by_kind = _count_by_kind(edges_payloads)
explicit_total = sum(1 for pl in edges_payloads if _is_explicit(pl))
defaults_total = sum(1 for pl in edges_payloads if _is_default(pl))
callout_total = sum(1 for pl in edges_payloads if _is_callout(pl))
inline_total = sum(1 for pl in edges_payloads if _is_inline(pl))
# By kind / by rule group
by_kind = Counter()
group_counts = Counter()
callout_buckets: Dict[Tuple[str, str], int] = defaultdict(int) # (chunk_id, kind) -> n targets
per_note = defaultdict(lambda: {"chunks": 0, "belongs_to": 0, "next": 0, "prev": 0})
# 3) Per-Note-Checks
per_note = {}
# chunks je Note
chunk_counts: Dict[str, int] = defaultdict(int)
for ch in _scroll_all(client, cols["chunks"]):
nid = (ch.payload or {}).get("note_id")
if nid:
chunk_counts[nid] += 1
# Für per_note checks: chunks pro note_id aus mindnet_chunks laden
chunks_pts = _scroll_all(client, col_chunks)
chunks_by_note = Counter([p.payload.get("note_id") for p in chunks_pts if p.payload])
# edges je Note
edges_by_note: Dict[str, list] = defaultdict(list)
for pl in edges_payloads:
nid = pl.get("note_id")
if nid:
edges_by_note[nid].append(pl)
for p in edges_pts:
pl = p.payload or {}
kind = str(pl.get("kind") or pl.get("relation") or "edge")
rule_id = str(pl.get("rule_id") or "")
note_id = str(pl.get("note_id") or "")
chunk_id = str(pl.get("chunk_id") or "")
by_kind[kind] += 1
multi_callout_detected = False
dup_seen = set()
has_duplicates = False
group = _rule_group(rule_id)
group_counts[group] += 1
for nid, pls in edges_by_note.items():
by_kind = Counter(_rel(pl) for pl in pls)
belongs_to = by_kind.get("belongs_to", 0)
next_cnt = by_kind.get("next", 0)
prev_cnt = by_kind.get("prev", 0)
chunks = chunk_counts.get(nid, 0)
# Multi-Callout-Erkennung: mehrere callout-Edges gleicher Relation aus demselben Chunk
if group == "callout" and chunk_id and kind:
callout_buckets[(chunk_id, kind)] += 1
# Duplikate
for pl in pls:
key = (
str(pl.get("source_id") or ""),
str(pl.get("target_id") or ""),
str(_rel(pl)),
str(pl.get("rule_id") or ""),
)
if key in dup_seen:
has_duplicates = True
dup_seen.add(key)
# Per-note Strukturchecks
if note_id:
if kind == "belongs_to":
per_note[note_id]["belongs_to"] += 1
elif kind == "next":
per_note[note_id]["next"] += 1
elif kind == "prev":
per_note[note_id]["prev"] += 1
# Mehrfach-Callouts: gleicher chunk_id + relation + rule_id, mehrere Targets
call_key_counter = Counter(
(pl.get("chunk_id"), _rel(pl), pl.get("rule_id"))
for pl in pls
if _is_callout(pl)
)
if any(v >= 2 for v in call_key_counter.values()):
multi_callout_detected = True
# set chunks count for per_note
for n_id, c in chunks_by_note.items():
per_note[n_id]["chunks"] = c
per_note[nid] = {
"chunks": chunks,
"belongs_to": belongs_to,
"next": next_cnt,
"prev": prev_cnt,
# final checks per note
per_note_checks = {}
for n_id, stats in per_note.items():
c = stats.get("chunks", 0)
bt = stats.get("belongs_to", 0)
nx = stats.get("next", 0)
pv = stats.get("prev", 0)
per_note_checks[n_id] = {
"chunks": c,
"belongs_to": bt,
"next": nx,
"prev": pv,
"checks": {
"belongs_to_equals_chunks": (belongs_to == chunks),
"next_prev_match": (next_cnt == prev_cnt == max(0, chunks - 1)),
"belongs_to_equals_chunks": (bt == c),
"next_prev_match": (nx == pv == max(c - 1, 0)),
},
}
multi_callout_detected = any(v > 1 for v in callout_buckets.values())
out = {
"prefix": prefix,
"prefix": cfg.prefix,
"counts": {
"notes": client.count(collection_name=cols["notes"], exact=True).count,
"chunks": client.count(collection_name=cols["chunks"], exact=True).count,
"edges": client.count(collection_name=cols["edges"], exact=True).count,
"edges_by_kind": edges_by_kind,
"explicit_total": explicit_total,
"defaults_total": defaults_total,
"callout_total": callout_total,
"inline_total": inline_total,
"notes": notes_n,
"chunks": chunks_n,
"edges": edges_n,
"edges_by_kind": dict(by_kind),
"explicit_total": group_counts.get("explicit", 0),
"defaults_total": group_counts.get("defaults", 0),
"callout_total": group_counts.get("callout", 0),
"inline_total": group_counts.get("inline", 0),
"structure_total": group_counts.get("structure", 0),
},
"per_note_checks": per_note,
"multi_callout_detected": multi_callout_detected,
"has_duplicates": has_duplicates,
"per_note_checks": per_note_checks,
"multi_callout_detected": bool(multi_callout_detected),
"has_duplicates": False, # dedupe passiert beim Upsert
}
print(json.dumps(out, ensure_ascii=False, indent=2))