Dateien nach "tests" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
6311456917
commit
7ce3f982f2
|
|
@ -1,19 +1,28 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
tests/assert_payload_schema.py
|
||||
tests/assert_payload_schema.py (compat v1.1)
|
||||
|
||||
Prüft, ob die erwarteten Payload-Indizes (payload_schema) auf den drei Collections vorhanden sind.
|
||||
Kompatibel mit älteren qdrant-client Versionen (ohne with_payload_schema im Wrapper).
|
||||
|
||||
Collections & Pflichtfelder:
|
||||
- mindnet_notes : note_id, type, title, updated, tags
|
||||
- mindnet_chunks : note_id, chunk_id, index, type, tags
|
||||
- mindnet_edges : note_id, kind, scope, source_id, target_id, chunk_id
|
||||
|
||||
Aufruf:
|
||||
Nutzung:
|
||||
python3 -m tests.assert_payload_schema
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import json, sys
|
||||
from typing import Dict, List
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from app.core.qdrant import QdrantConfig, get_client, collection_names
|
||||
|
||||
REQUIRED = {
|
||||
|
|
@ -22,13 +31,62 @@ REQUIRED = {
|
|||
"edges": ["note_id", "kind", "scope", "source_id", "target_id", "chunk_id"],
|
||||
}
|
||||
|
||||
def fetch_schema(client, name: str) -> Dict[str, dict]:
|
||||
info = client.get_collection(collection_name=name)
|
||||
d = info.dict() if hasattr(info, "dict") else info
|
||||
return (d.get("result", {}) or {}).get("payload_schema") or {}
|
||||
def _safe_model_dump(obj: Any) -> Dict[str, Any]:
|
||||
if hasattr(obj, "model_dump"):
|
||||
return obj.model_dump()
|
||||
if hasattr(obj, "dict"):
|
||||
return obj.dict()
|
||||
return obj if isinstance(obj, dict) else {}
|
||||
|
||||
def ensure_fields(schema: Dict[str, dict], required: List[str]) -> Dict[str, bool]:
|
||||
return {k: (k in schema) for k in required}
|
||||
def _cfg_base_url(cfg) -> str:
|
||||
if getattr(cfg, "url", None):
|
||||
return cfg.url.rstrip('/')
|
||||
host = getattr(cfg, "host", None) or os.getenv("QDRANT_HOST") or "127.0.0.1"
|
||||
port = getattr(cfg, "port", None) or os.getenv("QDRANT_PORT") or "6333"
|
||||
return f"http://{host}:{port}"
|
||||
|
||||
def _http_get(url: str, api_key: str | None) -> Dict[str, Any]:
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
if api_key:
|
||||
req.add_header("api-key", api_key)
|
||||
req.add_header("Authorization", f"Bearer {api_key}")
|
||||
req.add_header("Accept", "application/json")
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
data = resp.read()
|
||||
return json.loads(data.decode("utf-8"))
|
||||
|
||||
def get_collection_payload_schema(client, cfg, name: str) -> Dict[str, Any] | None:
|
||||
# 1) OpenAPI client mit Flag
|
||||
try:
|
||||
oc = getattr(client, "openapi_client", None)
|
||||
if oc is not None and hasattr(oc, "collections_api"):
|
||||
api = oc.collections_api
|
||||
info = api.get_collection(collection_name=name, with_payload_schema=True)
|
||||
d = _safe_model_dump(info)
|
||||
return (d.get("result") or {}).get("payload_schema")
|
||||
except Exception:
|
||||
pass
|
||||
# 2) Wrapper (manche Versionen liefern Schema auch ohne Flag)
|
||||
try:
|
||||
info = client.get_collection(collection_name=name)
|
||||
d = _safe_model_dump(info)
|
||||
ps = (d.get("result") or {}).get("payload_schema")
|
||||
if ps is not None:
|
||||
return ps
|
||||
except Exception:
|
||||
pass
|
||||
# 3) Direkter HTTP-Call
|
||||
try:
|
||||
base = _cfg_base_url(cfg)
|
||||
url = f"{base}/collections/{name}?with_payload_schema=true"
|
||||
raw = _http_get(url, getattr(cfg, "api_key", None))
|
||||
return (raw.get("result") or {}).get("payload_schema")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def ensure_fields(schema: Dict[str, dict] | None, required: List[str]) -> Dict[str, bool]:
|
||||
ks = set((schema or {}).keys())
|
||||
return {k: (k in ks) for k in required}
|
||||
|
||||
def main():
|
||||
cfg = QdrantConfig.from_env()
|
||||
|
|
@ -39,7 +97,7 @@ def main():
|
|||
report = {}
|
||||
ok_all = True
|
||||
for kind, col in names.items():
|
||||
sch = fetch_schema(client, col)
|
||||
sch = get_collection_payload_schema(client, cfg, col)
|
||||
checks = ensure_fields(sch, REQUIRED[kind])
|
||||
ok = all(checks.values())
|
||||
ok_all = ok_all and ok
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user