mindnet/scripts/assert_payload_schema.py
Lars 3c06559c5d
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
Dateien nach "scripts" hochladen
2025-11-16 17:50:11 +01:00

117 lines
4.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
tests/assert_payload_schema.py (compat v1.1)
Prüft, ob die erwarteten Payload-Indizes (payload_schema) auf den drei Collections vorhanden sind.
Kompatibel mit älteren qdrant-client Versionen (ohne with_payload_schema im Wrapper).
Collections & Pflichtfelder:
- mindnet_notes : note_id, type, title, updated, tags
- mindnet_chunks : note_id, chunk_id, index, type, tags
- mindnet_edges : note_id, kind, scope, source_id, target_id, chunk_id
Nutzung:
python3 -m tests.assert_payload_schema
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.request
from typing import Any, Dict, List
from app.core.qdrant import QdrantConfig, get_client, collection_names
REQUIRED = {
"notes": ["note_id", "type", "title", "updated", "tags"],
"chunks": ["note_id", "chunk_id", "index", "type", "tags"],
"edges": ["note_id", "kind", "scope", "source_id", "target_id", "chunk_id"],
}
def _safe_model_dump(obj: Any) -> Dict[str, Any]:
if hasattr(obj, "model_dump"):
return obj.model_dump()
if hasattr(obj, "dict"):
return obj.dict()
return obj if isinstance(obj, dict) else {}
def _cfg_base_url(cfg) -> str:
if getattr(cfg, "url", None):
return cfg.url.rstrip('/')
host = getattr(cfg, "host", None) or os.getenv("QDRANT_HOST") or "127.0.0.1"
port = getattr(cfg, "port", None) or os.getenv("QDRANT_PORT") or "6333"
return f"http://{host}:{port}"
def _http_get(url: str, api_key: str | None) -> Dict[str, Any]:
req = urllib.request.Request(url, method="GET")
if api_key:
req.add_header("api-key", api_key)
req.add_header("Authorization", f"Bearer {api_key}")
req.add_header("Accept", "application/json")
with urllib.request.urlopen(req, timeout=30) as resp:
data = resp.read()
return json.loads(data.decode("utf-8"))
def get_collection_payload_schema(client, cfg, name: str) -> Dict[str, Any] | None:
# 1) OpenAPI client mit Flag
try:
oc = getattr(client, "openapi_client", None)
if oc is not None and hasattr(oc, "collections_api"):
api = oc.collections_api
info = api.get_collection(collection_name=name, with_payload_schema=True)
d = _safe_model_dump(info)
return (d.get("result") or {}).get("payload_schema")
except Exception:
pass
# 2) Wrapper (manche Versionen liefern Schema auch ohne Flag)
try:
info = client.get_collection(collection_name=name)
d = _safe_model_dump(info)
ps = (d.get("result") or {}).get("payload_schema")
if ps is not None:
return ps
except Exception:
pass
# 3) Direkter HTTP-Call
try:
base = _cfg_base_url(cfg)
url = f"{base}/collections/{name}?with_payload_schema=true"
raw = _http_get(url, getattr(cfg, "api_key", None))
return (raw.get("result") or {}).get("payload_schema")
except Exception:
return None
def ensure_fields(schema: Dict[str, dict] | None, required: List[str]) -> Dict[str, bool]:
ks = set((schema or {}).keys())
return {k: (k in ks) for k in required}
def main():
cfg = QdrantConfig.from_env()
client = get_client(cfg)
notes, chunks, edges = collection_names(cfg.prefix)
names = {"notes": notes, "chunks": chunks, "edges": edges}
report = {}
ok_all = True
for kind, col in names.items():
sch = get_collection_payload_schema(client, cfg, col)
checks = ensure_fields(sch, REQUIRED[kind])
ok = all(checks.values())
ok_all = ok_all and ok
report[kind] = {
"collection": col,
"ok": ok,
"missing": [k for k,v in checks.items() if not v],
"present": [k for k,v in checks.items() if v],
}
print(json.dumps({"prefix": cfg.prefix, "ok": ok_all, "report": report}, ensure_ascii=False, indent=2))
if not ok_all:
sys.exit(1)
if __name__ == "__main__":
main()