All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
158 lines
5.0 KiB
Python
158 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
FILE: scripts/assert_payload_schema.py
|
|
VERSION: 2.1.0 (2025-12-15)
|
|
STATUS: Active
|
|
COMPATIBILITY: v2.9.1 (Post-WP14/WP-15b)
|
|
|
|
Zweck:
|
|
-------
|
|
Prüft, ob die erwarteten Payload-Indizes auf allen Collections vorhanden sind.
|
|
Validiert Schema-Integrität für CI/CD und Wartung.
|
|
|
|
Funktionsweise:
|
|
---------------
|
|
1. Ermittelt Collections für das Präfix (notes, chunks, edges)
|
|
2. Lädt Payload-Schema für jede Collection
|
|
3. Prüft, ob alle Pflichtfelder indiziert sind:
|
|
- notes: note_id, type, title, updated, tags
|
|
- chunks: note_id, chunk_id, index, type, tags
|
|
- edges: note_id, kind, scope, source_id, target_id, chunk_id
|
|
4. Gibt Validierungs-Ergebnis aus
|
|
|
|
Ergebnis-Interpretation:
|
|
------------------------
|
|
- Ausgabe: JSON mit Validierungs-Ergebnissen
|
|
* collections: Schema-Status pro Collection
|
|
* missing: Fehlende Indizes
|
|
* valid: true/false
|
|
- Exit-Code 0: Alle Indizes vorhanden
|
|
- Exit-Code 1: Fehlende Indizes gefunden
|
|
|
|
Verwendung:
|
|
-----------
|
|
- CI/CD-Validierung nach ensure_payload_indexes
|
|
- Diagnose von Index-Problemen
|
|
- Wartung und Audit
|
|
|
|
Hinweise:
|
|
---------
|
|
- Kompatibel mit verschiedenen qdrant-client Versionen
|
|
- Nutzt Fallback-Strategie für Schema-Abfrage
|
|
|
|
Aufruf:
|
|
-------
|
|
python3 -m scripts.assert_payload_schema --prefix mindnet
|
|
|
|
Parameter:
|
|
----------
|
|
--prefix TEXT Collection-Präfix (Default: ENV COLLECTION_PREFIX oder mindnet)
|
|
|
|
Änderungen:
|
|
-----------
|
|
v2.1.0 (2025-12-15): Dokumentation aktualisiert
|
|
v1.1.0: Kompatibilität mit verschiedenen qdrant-client Versionen
|
|
v1.0.0: Initial Release
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from typing import Any, Dict, List
|
|
|
|
from app.core.qdrant import QdrantConfig, get_client, collection_names
|
|
|
|
REQUIRED = {
|
|
"notes": ["note_id", "type", "title", "updated", "tags"],
|
|
"chunks": ["note_id", "chunk_id", "index", "type", "tags"],
|
|
"edges": ["note_id", "kind", "scope", "source_id", "target_id", "chunk_id"],
|
|
}
|
|
|
|
def _safe_model_dump(obj: Any) -> Dict[str, Any]:
|
|
if hasattr(obj, "model_dump"):
|
|
return obj.model_dump()
|
|
if hasattr(obj, "dict"):
|
|
return obj.dict()
|
|
return obj if isinstance(obj, dict) else {}
|
|
|
|
def _cfg_base_url(cfg) -> str:
|
|
if getattr(cfg, "url", None):
|
|
return cfg.url.rstrip('/')
|
|
host = getattr(cfg, "host", None) or os.getenv("QDRANT_HOST") or "127.0.0.1"
|
|
port = getattr(cfg, "port", None) or os.getenv("QDRANT_PORT") or "6333"
|
|
return f"http://{host}:{port}"
|
|
|
|
def _http_get(url: str, api_key: str | None) -> Dict[str, Any]:
|
|
req = urllib.request.Request(url, method="GET")
|
|
if api_key:
|
|
req.add_header("api-key", api_key)
|
|
req.add_header("Authorization", f"Bearer {api_key}")
|
|
req.add_header("Accept", "application/json")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
data = resp.read()
|
|
return json.loads(data.decode("utf-8"))
|
|
|
|
def get_collection_payload_schema(client, cfg, name: str) -> Dict[str, Any] | None:
|
|
# 1) OpenAPI client mit Flag
|
|
try:
|
|
oc = getattr(client, "openapi_client", None)
|
|
if oc is not None and hasattr(oc, "collections_api"):
|
|
api = oc.collections_api
|
|
info = api.get_collection(collection_name=name, with_payload_schema=True)
|
|
d = _safe_model_dump(info)
|
|
return (d.get("result") or {}).get("payload_schema")
|
|
except Exception:
|
|
pass
|
|
# 2) Wrapper (manche Versionen liefern Schema auch ohne Flag)
|
|
try:
|
|
info = client.get_collection(collection_name=name)
|
|
d = _safe_model_dump(info)
|
|
ps = (d.get("result") or {}).get("payload_schema")
|
|
if ps is not None:
|
|
return ps
|
|
except Exception:
|
|
pass
|
|
# 3) Direkter HTTP-Call
|
|
try:
|
|
base = _cfg_base_url(cfg)
|
|
url = f"{base}/collections/{name}?with_payload_schema=true"
|
|
raw = _http_get(url, getattr(cfg, "api_key", None))
|
|
return (raw.get("result") or {}).get("payload_schema")
|
|
except Exception:
|
|
return None
|
|
|
|
def ensure_fields(schema: Dict[str, dict] | None, required: List[str]) -> Dict[str, bool]:
|
|
ks = set((schema or {}).keys())
|
|
return {k: (k in ks) for k in required}
|
|
|
|
def main():
|
|
cfg = QdrantConfig.from_env()
|
|
client = get_client(cfg)
|
|
notes, chunks, edges = collection_names(cfg.prefix)
|
|
names = {"notes": notes, "chunks": chunks, "edges": edges}
|
|
|
|
report = {}
|
|
ok_all = True
|
|
for kind, col in names.items():
|
|
sch = get_collection_payload_schema(client, cfg, col)
|
|
checks = ensure_fields(sch, REQUIRED[kind])
|
|
ok = all(checks.values())
|
|
ok_all = ok_all and ok
|
|
report[kind] = {
|
|
"collection": col,
|
|
"ok": ok,
|
|
"missing": [k for k,v in checks.items() if not v],
|
|
"present": [k for k,v in checks.items() if v],
|
|
}
|
|
|
|
print(json.dumps({"prefix": cfg.prefix, "ok": ok_all, "report": report}, ensure_ascii=False, indent=2))
|
|
if not ok_all:
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|