#!/usr/bin/env python3 """ scripts/health_check_mindnet.py Health-Check für den mindnet-Retriever-/Query-Endpoint. Funktion: - Führt POST-Requests auf /query in verschiedenen Modi aus (standard: semantic + hybrid). - Prüft Status-Code, JSON-Struktur und Anzahl der Treffer. - Kennzeichnet Probleme als: - status="ok" - status="warning" (z.B. Timeout) - status="error" (harte Fehler wie HTTP-Fehler, JSON-Fehler etc.) Exit-Code: - Default (tolerant): - overall_status = "ok" (inkl. warnings) → Exit-Code 0 - overall_status = "error" → Exit-Code 1 - Mit --strict: - warnings werden wie errors behandelt → Exit-Code 1 Beispiele: python scripts/health_check_mindnet.py \ --url http://127.0.0.1:8001/query \ --query "embeddings" \ --top-k 3 python scripts/health_check_mindnet.py \ --url http://127.0.0.1:8001/query \ --query "embeddings" \ --top-k 3 \ --timeout 15 \ --modes hybrid # Strenger Modus (warnings → Exit-Code 1) python scripts/health_check_mindnet.py --strict """ from __future__ import annotations import argparse import json import socket import sys import time import urllib.error import urllib.request from typing import Any, Dict, List def _post_query( url: str, mode: str, query: str, top_k: int, timeout: float, ) -> Dict[str, Any]: """ Führt einen POST auf den /query-Endpoint aus und gibt das dekodierte JSON zurück. Raises: urllib.error.URLError, urllib.error.HTTPError, RuntimeError """ payload = { "mode": mode, "query": query, "top_k": top_k, "expand": { "depth": 1 if mode == "hybrid" else 0, "edge_types": ["references", "belongs_to", "prev", "next"], }, "ret": { "with_paths": True, "with_notes": True, "with_chunks": True, }, } data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( url=url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) start = time.monotonic() with urllib.request.urlopen(req, timeout=timeout) as resp: status_code = resp.getcode() body = resp.read().decode("utf-8") end = time.monotonic() latency_ms = int((end - start) * 1000) if status_code != 200: raise RuntimeError(f"HTTP {status_code}: {body}") try: doc = json.loads(body) except json.JSONDecodeError as exc: raise RuntimeError(f"Antwort ist kein gültiges JSON: {exc}") from exc if not isinstance(doc, dict): raise RuntimeError("Antwort ist kein JSON-Objekt") doc["_latency_ms"] = latency_ms return doc def _validate_response( mode: str, doc: Dict[str, Any], min_results: int, ) -> Dict[str, Any]: """ Prüft die wichtigsten Invarianten des Query-Responses. Rückgabe-Format: { "status": "ok" | "warning" | "error", "message": str, "latency_ms": int, "used_mode": str, "result_count": int } """ latency_ms = int(doc.get("_latency_ms", -1)) used_mode = str(doc.get("used_mode", "unknown")) results = doc.get("results", []) if not isinstance(results, list): return { "status": "error", "message": "results ist kein Array", "latency_ms": latency_ms, "used_mode": used_mode, "result_count": -1, } result_count = len(results) if result_count < min_results: # Zu wenige Ergebnisse sind aus Sicht eines Health-Checks eher ein Error, # da die Retrieval-Qualität nicht gewährleistet ist. return { "status": "error", "message": f"zu wenige Ergebnisse: {result_count} < {min_results}", "latency_ms": latency_ms, "used_mode": used_mode, "result_count": result_count, } sample_msg = "OK" if result_count > 0: first = results[0] if not isinstance(first, dict): sample_msg = "first result ist kein Objekt" else: # einfache Plausibilitätschecks (nur Zugriff, kein strikter Typcheck) _ = first.get("note_id") _ = first.get("chunk_id") _ = first.get("total_score") status = "ok" if sample_msg == "OK" else "warning" return { "status": status, "message": sample_msg, "latency_ms": latency_ms, "used_mode": used_mode, "result_count": result_count, } def run_health_check( url: str, query: str, top_k: int, timeout: float, modes: List[str], min_results: int, ) -> Dict[str, Any]: """ Führt den Health-Check für die angegebenen Modi aus. """ report: Dict[str, Any] = { "url": url, "query": query, "top_k": top_k, "timeout_s": timeout, "checks": [], "overall_status": "ok", # wird unten ggf. auf warning oder error gesetzt } overall_status = "ok" for mode in modes: entry: Dict[str, Any] = { "mode": mode, "status": "error", "message": "", } try: doc = _post_query(url=url, mode=mode, query=query, top_k=top_k, timeout=timeout) validation = _validate_response(mode=mode, doc=doc, min_results=min_results) entry.update(validation) except urllib.error.HTTPError as exc: entry["status"] = "error" entry["message"] = f"HTTP-Fehler: {exc}" entry["latency_ms"] = -1 entry["used_mode"] = mode entry["result_count"] = -1 except urllib.error.URLError as exc: # URLError kann u.a. socket.timeout enthalten. lat = -1 entry["latency_ms"] = lat entry["used_mode"] = mode entry["result_count"] = -1 if isinstance(exc.reason, socket.timeout): entry["status"] = "warning" entry["message"] = f"Timeout (URLError/socket.timeout): {exc}" else: entry["status"] = "error" entry["message"] = f"HTTP/Netzwerkfehler: {exc}" except socket.timeout as exc: # Direkter Timeout (falls nicht in URLError verpackt) entry["status"] = "warning" entry["message"] = f"Timeout (socket.timeout): {exc}" entry["latency_ms"] = -1 entry["used_mode"] = mode entry["result_count"] = -1 except Exception as exc: # noqa: BLE001 entry["status"] = "error" entry["message"] = f"unerwarteter Fehler: {exc}" entry["latency_ms"] = -1 entry["used_mode"] = mode entry["result_count"] = -1 # overall_status bestimmen: # - error > warning > ok if entry["status"] == "error": overall_status = "error" elif entry["status"] == "warning" and overall_status == "ok": overall_status = "warning" report["checks"].append(entry) report["overall_status"] = overall_status return report def parse_args(argv: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Health-Check für mindnet /query-Retriever", ) parser.add_argument( "--url", default="http://127.0.0.1:8001/query", help="Vollständige URL des /query-Endpunkts (Default: %(default)s)", ) parser.add_argument( "--query", default="embeddings", help="Test-Query für den Health-Check (Default: %(default)s)", ) parser.add_argument( "--top-k", type=int, default=3, help="Top-K für die Testabfrage (Default: %(default)s)", ) parser.add_argument( "--timeout", type=float, default=5.0, help="Timeout in Sekunden pro Request (Default: %(default)s)", ) parser.add_argument( "--modes", nargs="+", default=["semantic", "hybrid"], help="Zu prüfende Modi (Default: semantic hybrid)", ) parser.add_argument( "--min-results", type=int, default=1, help="Minimale Anzahl erwarteter Ergebnisse (Default: %(default)s)", ) parser.add_argument( "--strict", action="store_true", help="Warnings als Fehler behandeln (overall_status=warning führt zu Exit-Code 1)", ) return parser.parse_args(argv) def main(argv: List[str]) -> int: args = parse_args(argv) report = run_health_check( url=args.url, query=args.query, top_k=args.top_k, timeout=args.timeout, modes=args.modes, min_results=args.min_results, ) overall = report.get("overall_status", "error") # Menschlich lesbare Zusammenfassung: print(f"mindnet health: {overall}") for c in report["checks"]: mode = c.get("mode") status = c.get("status") latency = c.get("latency_ms") result_count = c.get("result_count") msg = c.get("message", "") print(f" - {mode}: {status} (latency={latency} ms, results={result_count}) {msg}") # JSON-Ausgabe (z.B. für n8n): print(json.dumps(report, ensure_ascii=False)) if args.strict: # strict: warning wird wie error behandelt return 0 if overall == "ok" else 1 else: # tolerant: nur echte errors führen zu Exit-Code 1 return 0 if overall in ("ok", "warning") else 1 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))