diff --git a/tests/assert_payload_schema.py b/tests/assert_payload_schema.py index fc369a1..c5cc279 100644 --- a/tests/assert_payload_schema.py +++ b/tests/assert_payload_schema.py @@ -1,19 +1,28 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -tests/assert_payload_schema.py +tests/assert_payload_schema.py (compat v1.1) Prüft, ob die erwarteten Payload-Indizes (payload_schema) auf den drei Collections vorhanden sind. +Kompatibel mit älteren qdrant-client Versionen (ohne with_payload_schema im Wrapper). + +Collections & Pflichtfelder: - mindnet_notes : note_id, type, title, updated, tags - mindnet_chunks : note_id, chunk_id, index, type, tags - mindnet_edges : note_id, kind, scope, source_id, target_id, chunk_id -Aufruf: +Nutzung: python3 -m tests.assert_payload_schema """ from __future__ import annotations -import json, sys -from typing import Dict, List + +import json +import os +import sys +import urllib.error +import urllib.request +from typing import Any, Dict, List + from app.core.qdrant import QdrantConfig, get_client, collection_names REQUIRED = { @@ -22,13 +31,62 @@ REQUIRED = { "edges": ["note_id", "kind", "scope", "source_id", "target_id", "chunk_id"], } -def fetch_schema(client, name: str) -> Dict[str, dict]: - info = client.get_collection(collection_name=name) - d = info.dict() if hasattr(info, "dict") else info - return (d.get("result", {}) or {}).get("payload_schema") or {} +def _safe_model_dump(obj: Any) -> Dict[str, Any]: + if hasattr(obj, "model_dump"): + return obj.model_dump() + if hasattr(obj, "dict"): + return obj.dict() + return obj if isinstance(obj, dict) else {} -def ensure_fields(schema: Dict[str, dict], required: List[str]) -> Dict[str, bool]: - return {k: (k in schema) for k in required} +def _cfg_base_url(cfg) -> str: + if getattr(cfg, "url", None): + return cfg.url.rstrip('/') + host = getattr(cfg, "host", None) or os.getenv("QDRANT_HOST") or "127.0.0.1" + port = getattr(cfg, "port", None) or os.getenv("QDRANT_PORT") or "6333" + return f"http://{host}:{port}" + +def _http_get(url: str, api_key: str | None) -> Dict[str, Any]: + req = urllib.request.Request(url, method="GET") + if api_key: + req.add_header("api-key", api_key) + req.add_header("Authorization", f"Bearer {api_key}") + req.add_header("Accept", "application/json") + with urllib.request.urlopen(req, timeout=30) as resp: + data = resp.read() + return json.loads(data.decode("utf-8")) + +def get_collection_payload_schema(client, cfg, name: str) -> Dict[str, Any] | None: + # 1) OpenAPI client mit Flag + try: + oc = getattr(client, "openapi_client", None) + if oc is not None and hasattr(oc, "collections_api"): + api = oc.collections_api + info = api.get_collection(collection_name=name, with_payload_schema=True) + d = _safe_model_dump(info) + return (d.get("result") or {}).get("payload_schema") + except Exception: + pass + # 2) Wrapper (manche Versionen liefern Schema auch ohne Flag) + try: + info = client.get_collection(collection_name=name) + d = _safe_model_dump(info) + ps = (d.get("result") or {}).get("payload_schema") + if ps is not None: + return ps + except Exception: + pass + # 3) Direkter HTTP-Call + try: + base = _cfg_base_url(cfg) + url = f"{base}/collections/{name}?with_payload_schema=true" + raw = _http_get(url, getattr(cfg, "api_key", None)) + return (raw.get("result") or {}).get("payload_schema") + except Exception: + return None + +def ensure_fields(schema: Dict[str, dict] | None, required: List[str]) -> Dict[str, bool]: + ks = set((schema or {}).keys()) + return {k: (k in ks) for k in required} def main(): cfg = QdrantConfig.from_env() @@ -39,7 +97,7 @@ def main(): report = {} ok_all = True for kind, col in names.items(): - sch = fetch_schema(client, col) + sch = get_collection_payload_schema(client, cfg, col) checks = ensure_fields(sch, REQUIRED[kind]) ok = all(checks.values()) ok_all = ok_all and ok