Dateien nach "tests" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
This commit is contained in:
parent
eb6e4028ff
commit
e451ea64ae
|
|
@ -5,49 +5,22 @@ check_types_registry_edges.py
|
|||
--------------------------------------------------
|
||||
Prüft, ob die in config/types.yaml hinterlegten Default-Kanten
|
||||
pro Note-Typ tatsächlich in der Qdrant *edges*-Collection auftauchen.
|
||||
|
||||
Benötigte ENV (wie im Projekt üblich):
|
||||
- QDRANT_URL (optional; default http://127.0.0.1:6333)
|
||||
- QDRANT_API_KEY (optional)
|
||||
- COLLECTION_PREFIX oder MINDNET_PREFIX (bestimmt Collection-Namen)
|
||||
- TYPES_FILE (optional; default ./config/types.yaml)
|
||||
|
||||
Collections (Standard):
|
||||
- {prefix}_notes
|
||||
- {prefix}_edges
|
||||
|
||||
Ausgabe: JSON-Zeilen mit Countern und ggf. Missing-Hinweisen.
|
||||
|
||||
Nutzung:
|
||||
python3 tests/check_types_registry_edges.py --prefix mindnet
|
||||
# oder Prefix aus ENV (COLLECTION_PREFIX/MINDNET_PREFIX)
|
||||
|
||||
Hinweis:
|
||||
- Scrollt alle Notes (id → type) und alle Edges (edge_type, src_* Felder)
|
||||
- Mappt Edges zurück auf Note-Typ (über src_note_id oder src_id, heuristisch)
|
||||
- Vergleicht beobachtete edge_types je Typ mit den in types.yaml geforderten
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import os, sys, json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple, Set
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
try:
|
||||
import yaml # PyYAML
|
||||
except Exception as e:
|
||||
print(json.dumps({"error": f"PyYAML not installed: {e}"}))
|
||||
sys.exit(2)
|
||||
print(json.dumps({"error": f"PyYAML not installed: {e}"})); sys.exit(2)
|
||||
|
||||
try:
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models as rest
|
||||
from qdrant_client.http import models as rest # noqa: F401
|
||||
except Exception as e:
|
||||
print(json.dumps({"error": f"qdrant-client not installed: {e}"}))
|
||||
sys.exit(2)
|
||||
|
||||
print(json.dumps({"error": f"qdrant-client not installed: {e}"})); sys.exit(2)
|
||||
|
||||
@dataclass
|
||||
class Cfg:
|
||||
|
|
@ -58,7 +31,6 @@ class Cfg:
|
|||
edges: str
|
||||
types_file: Path
|
||||
|
||||
|
||||
def _env_prefix() -> Optional[str]:
|
||||
for k in ("COLLECTION_PREFIX", "MINDNET_PREFIX"):
|
||||
v = os.environ.get(k, "").strip()
|
||||
|
|
@ -66,7 +38,6 @@ def _env_prefix() -> Optional[str]:
|
|||
return v
|
||||
return None
|
||||
|
||||
|
||||
def _load_types_yaml(path: Path) -> Dict:
|
||||
if not path.exists():
|
||||
print(json.dumps({"warn": f"types.yaml fehlt: {path}. Fallback: keine Vorgaben."}))
|
||||
|
|
@ -74,92 +45,65 @@ def _load_types_yaml(path: Path) -> Dict:
|
|||
try:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
# erwartet: {"version": "1.0", "types": {type_name: {"edge_defaults":[...]}}}
|
||||
return data if isinstance(data, dict) else {}
|
||||
except Exception as e:
|
||||
print(json.dumps({"warn": f"types.yaml defekt ({path}): {e}. Fallback: keine Vorgaben."}))
|
||||
return {}
|
||||
|
||||
|
||||
def _cfg_from_env(argv_prefix: Optional[str]) -> Cfg:
|
||||
url = os.environ.get("QDRANT_URL", "http://127.0.0.1:6333").strip()
|
||||
api_key = os.environ.get("QDRANT_API_KEY", "").strip() or None
|
||||
|
||||
prefix = (argv_prefix or _env_prefix() or "mindnet").strip()
|
||||
notes = f"{prefix}_notes"
|
||||
edges = f"{prefix}_edges"
|
||||
|
||||
types_path = Path(os.environ.get("TYPES_FILE", "config/types.yaml")).resolve()
|
||||
return Cfg(url=url, api_key=api_key, prefix=prefix, notes=notes, edges=edges, types_file=types_path)
|
||||
|
||||
|
||||
def _mk_client(cfg: Cfg) -> QdrantClient:
|
||||
return QdrantClient(url=cfg.url, api_key=cfg.api_key, timeout=30.0)
|
||||
|
||||
|
||||
def _scroll_all_notes(client: QdrantClient, notes_col: str) -> Dict[str, Dict]:
|
||||
"""returns dict note_id -> payload"""
|
||||
out = {}
|
||||
offset = None
|
||||
while True:
|
||||
res = client.scroll(
|
||||
points, offset = client.scroll(
|
||||
collection_name=notes_col,
|
||||
scroll_filter=None, # kein Filter, wir holen alles
|
||||
limit=256,
|
||||
offset=offset,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
scroll_filter=None, limit=256, offset=offset,
|
||||
with_payload=True, with_vectors=False,
|
||||
)
|
||||
points, offset = res
|
||||
if not points:
|
||||
break
|
||||
for p in points:
|
||||
payload = p.payload or {}
|
||||
# Normalisierung: note_id kann in 'id' oder 'note_id' liegen
|
||||
nid = payload.get("note_id") or payload.get("id") or payload.get("uid") or payload.get("slug")
|
||||
if not nid:
|
||||
# try: some pipelines store it also as top-level id; keep point.id fallback
|
||||
nid = str(p.id)
|
||||
out[str(nid)] = payload
|
||||
return out
|
||||
|
||||
|
||||
def _scroll_all_edges(client: QdrantClient, edges_col: str) -> List[Tuple[str, dict]]:
|
||||
"""returns list of tuples (edge_point_id, payload)"""
|
||||
out = []
|
||||
offset = None
|
||||
while True:
|
||||
res = client.scroll(
|
||||
points, offset = client.scroll(
|
||||
collection_name=edges_col,
|
||||
scroll_filter=None,
|
||||
limit=256,
|
||||
offset=offset,
|
||||
with_payload=True,
|
||||
with_vectors=False,
|
||||
scroll_filter=None, limit=256, offset=offset,
|
||||
with_payload=True, with_vectors=False,
|
||||
)
|
||||
points, offset = res
|
||||
if not points:
|
||||
break
|
||||
for p in points:
|
||||
out.append((str(p.id), p.payload or {}))
|
||||
return out
|
||||
|
||||
|
||||
def _guess_src_note_id(ed_pl: dict) -> Optional[str]:
|
||||
"""
|
||||
Versucht, die Quell-Note-ID aus der Edge-Payload zu lesen.
|
||||
Unterstützt mehrere mögliche Feldnamen/Schemata.
|
||||
"""
|
||||
# gängigste Varianten
|
||||
for k in ("src_note_id", "note_id", "src_id", "src"):
|
||||
nid = ed_pl.get(k)
|
||||
if nid:
|
||||
return str(nid)
|
||||
# manchmal liegt sie in 'src_ref' oder 'from'
|
||||
for k in ("src_ref", "from"):
|
||||
nid = ed_pl.get(k)
|
||||
if isinstance(nid, dict):
|
||||
# z.B. {"kind":"note","id":"..."} oder {"note_id":"..."}
|
||||
for kk in ("note_id", "id"):
|
||||
if nid.get(kk):
|
||||
return str(nid[kk])
|
||||
|
|
@ -167,13 +111,16 @@ def _guess_src_note_id(ed_pl: dict) -> Optional[str]:
|
|||
return nid
|
||||
return None
|
||||
|
||||
|
||||
def _edge_type(ed_pl: dict) -> Optional[str]:
|
||||
for k in ("edge_type", "type", "rel", "relation"):
|
||||
if ed_pl.get(k):
|
||||
return str(ed_pl[k])
|
||||
return None
|
||||
|
||||
def _cfg_to_jsonable(cfg: Cfg) -> dict:
|
||||
d = asdict(cfg)
|
||||
d["types_file"] = str(d.get("types_file"))
|
||||
return d
|
||||
|
||||
def main(argv=None):
|
||||
import argparse
|
||||
|
|
@ -182,59 +129,47 @@ def main(argv=None):
|
|||
args = ap.parse_args(argv)
|
||||
|
||||
cfg = _cfg_from_env(args.prefix)
|
||||
print(json.dumps({"cfg": cfg.__dict__}, ensure_ascii=False))
|
||||
print(json.dumps({"cfg": _cfg_to_jsonable(cfg)}, ensure_ascii=False))
|
||||
|
||||
# Type-Registry laden
|
||||
tr = _load_types_yaml(cfg.types_file)
|
||||
types_def = (tr.get("types") if isinstance(tr, dict) else {}) or {}
|
||||
print(json.dumps({"types_defined": list(types_def.keys())}, ensure_ascii=False))
|
||||
|
||||
client = _mk_client(cfg)
|
||||
|
||||
# alle Notes (id -> type) aufbauen
|
||||
notes = _scroll_all_notes(client, cfg.notes)
|
||||
print(json.dumps({"notes_count": len(notes)}, ensure_ascii=False))
|
||||
|
||||
# alle Edges lesen
|
||||
edges = _scroll_all_edges(client, cfg.edges)
|
||||
print(json.dumps({"edges_count": len(edges)}, ensure_ascii=False))
|
||||
|
||||
# Map: note_id -> type
|
||||
note_type: Dict[str, str] = {}
|
||||
note_type = {}
|
||||
for nid, pl in notes.items():
|
||||
t = pl.get("type") or "concept"
|
||||
note_type[str(nid)] = str(t)
|
||||
|
||||
# Beobachtete Kanten je Note-Typ sammeln
|
||||
seen: Dict[str, Set[str]] = {}
|
||||
# auch Zähler
|
||||
counts: Dict[str, Dict[str, int]] = {}
|
||||
seen = {}
|
||||
counts = {}
|
||||
|
||||
for edge_pid, ed_pl in edges:
|
||||
et = _edge_type(ed_pl)
|
||||
if not et:
|
||||
# nicht auswertbar
|
||||
continue
|
||||
src_nid = _guess_src_note_id(ed_pl)
|
||||
if not src_nid:
|
||||
# evtl. chunk->note edges, die nicht auf Note verweisen. überspringen
|
||||
continue
|
||||
t = note_type.get(src_nid)
|
||||
if not t:
|
||||
# Quelle unbekannt (z.B. Note nicht (mehr) vorhanden)
|
||||
continue
|
||||
|
||||
seen.setdefault(t, set()).add(et)
|
||||
counts.setdefault(t, {}).setdefault(et, 0)
|
||||
counts[t][et] += 1
|
||||
|
||||
# Erwartete Defaults je Typ aus Registry
|
||||
expected: Dict[str, Set[str]] = {}
|
||||
expected = {}
|
||||
for tname, tdef in types_def.items():
|
||||
eddefs = (tdef or {}).get("edge_defaults") or []
|
||||
expected[tname] = set([str(x) for x in eddefs if x])
|
||||
|
||||
# Report
|
||||
for tname, exp in expected.items():
|
||||
obs = seen.get(tname, set())
|
||||
missing = sorted(list(exp - obs))
|
||||
|
|
@ -248,7 +183,6 @@ def main(argv=None):
|
|||
"counts": counts.get(tname, {}),
|
||||
}, ensure_ascii=False))
|
||||
|
||||
# Hinweis, wenn keine Typen konfiguriert
|
||||
if not expected:
|
||||
print(json.dumps({"warn": "Keine Typ-Defaults in types.yaml gefunden (edge_defaults leer?)."}))
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user