#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FILE: scripts/assert_payload_schema.py VERSION: 2.1.0 (2025-12-15) STATUS: Active COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b) Zweck: ------- Prüft, ob die erwarteten Payload-Indizes auf allen Collections vorhanden sind. Validiert Schema-Integrität für CI/CD und Wartung. Funktionsweise: --------------- 1. Ermittelt Collections für das Präfix (notes, chunks, edges) 2. Lädt Payload-Schema für jede Collection 3. Prüft, ob alle Pflichtfelder indiziert sind: - notes: note_id, type, title, updated, tags - chunks: note_id, chunk_id, index, type, tags - edges: note_id, kind, scope, source_id, target_id, chunk_id 4. Gibt Validierungs-Ergebnis aus Ergebnis-Interpretation: ------------------------ - Ausgabe: JSON mit Validierungs-Ergebnissen * collections: Schema-Status pro Collection * missing: Fehlende Indizes * valid: true/false - Exit-Code 0: Alle Indizes vorhanden - Exit-Code 1: Fehlende Indizes gefunden Verwendung: ----------- - CI/CD-Validierung nach ensure_payload_indexes - Diagnose von Index-Problemen - Wartung und Audit Hinweise: --------- - Kompatibel mit verschiedenen qdrant-client Versionen - Nutzt Fallback-Strategie für Schema-Abfrage Aufruf: ------- python3 -m scripts.assert_payload_schema --prefix mindnet Parameter: ---------- --prefix TEXT Collection-Präfix (Default: ENV COLLECTION_PREFIX oder mindnet) Änderungen: ----------- v2.1.0 (2025-12-15): Dokumentation aktualisiert v1.1.0: Kompatibilität mit verschiedenen qdrant-client Versionen v1.0.0: Initial Release """ from __future__ import annotations import json import os import sys import urllib.error import urllib.request from typing import Any, Dict, List from app.core.database.qdrant import QdrantConfig, get_client, collection_names REQUIRED = { "notes": ["note_id", "type", "title", "updated", "tags"], "chunks": ["note_id", "chunk_id", "index", "type", "tags"], "edges": ["note_id", "kind", "scope", "source_id", "target_id", "chunk_id"], } def _safe_model_dump(obj: Any) -> Dict[str, Any]: if hasattr(obj, "model_dump"): return obj.model_dump() if hasattr(obj, "dict"): return obj.dict() return obj if isinstance(obj, dict) else {} def _cfg_base_url(cfg) -> str: if getattr(cfg, "url", None): return cfg.url.rstrip('/') host = getattr(cfg, "host", None) or os.getenv("QDRANT_HOST") or "127.0.0.1" port = getattr(cfg, "port", None) or os.getenv("QDRANT_PORT") or "6333" return f"http://{host}:{port}" def _http_get(url: str, api_key: str | None) -> Dict[str, Any]: req = urllib.request.Request(url, method="GET") if api_key: req.add_header("api-key", api_key) req.add_header("Authorization", f"Bearer {api_key}") req.add_header("Accept", "application/json") with urllib.request.urlopen(req, timeout=30) as resp: data = resp.read() return json.loads(data.decode("utf-8")) def get_collection_payload_schema(client, cfg, name: str) -> Dict[str, Any] | None: # 1) OpenAPI client mit Flag try: oc = getattr(client, "openapi_client", None) if oc is not None and hasattr(oc, "collections_api"): api = oc.collections_api info = api.get_collection(collection_name=name, with_payload_schema=True) d = _safe_model_dump(info) return (d.get("result") or {}).get("payload_schema") except Exception: pass # 2) Wrapper (manche Versionen liefern Schema auch ohne Flag) try: info = client.get_collection(collection_name=name) d = _safe_model_dump(info) ps = (d.get("result") or {}).get("payload_schema") if ps is not None: return ps except Exception: pass # 3) Direkter HTTP-Call try: base = _cfg_base_url(cfg) url = f"{base}/collections/{name}?with_payload_schema=true" raw = _http_get(url, getattr(cfg, "api_key", None)) return (raw.get("result") or {}).get("payload_schema") except Exception: return None def ensure_fields(schema: Dict[str, dict] | None, required: List[str]) -> Dict[str, bool]: ks = set((schema or {}).keys()) return {k: (k in ks) for k in required} def main(): cfg = QdrantConfig.from_env() client = get_client(cfg) notes, chunks, edges = collection_names(cfg.prefix) names = {"notes": notes, "chunks": chunks, "edges": edges} report = {} ok_all = True for kind, col in names.items(): sch = get_collection_payload_schema(client, cfg, col) checks = ensure_fields(sch, REQUIRED[kind]) ok = all(checks.values()) ok_all = ok_all and ok report[kind] = { "collection": col, "ok": ok, "missing": [k for k,v in checks.items() if not v], "present": [k for k,v in checks.items() if v], } print(json.dumps({"prefix": cfg.prefix, "ok": ok_all, "report": report}, ensure_ascii=False, indent=2)) if not ok_all: sys.exit(1) if __name__ == "__main__": main()