diff --git a/scripts/health_check_mindnet.py b/scripts/health_check_mindnet.py new file mode 100644 index 0000000..be7d9dd --- /dev/null +++ b/scripts/health_check_mindnet.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +""" +scripts/health_check_mindnet.py + +Ein einfacher Health-Check für den mindnet-Retriever/Query-Endpoint. +Kann z. B. über Cron, systemd oder n8n (SSH / Command-Node) ausgeführt werden. + +Funktion: +- POST auf /query in den Modi "semantic" und "hybrid" +- prüft HTTP-Status, JSON-Struktur, Anzahl Treffer +- gibt eine kompakte JSON-Ausgabe zurück +- Exit-Code 0 = OK, 1 = Fehler + +Beispiel: + + python scripts/health_check_mindnet.py \ + --url http://127.0.0.1:8001/query \ + --query "embeddings" \ + --top-k 3 +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +import urllib.error +import urllib.request +from typing import Any, Dict, List + + +def _post_query( + url: str, + mode: str, + query: str, + top_k: int, + timeout: float, +) -> Dict[str, Any]: + """ + Führt einen POST auf den /query-Endpoint aus und gibt das decodierte JSON zurück. + + Raises: + urllib.error.URLError, urllib.error.HTTPError, json.JSONDecodeError + """ + payload = { + "mode": mode, + "query": query, + "top_k": top_k, + # Minimal-Expand, damit der Hybridmodus nichts "Exotisches" braucht. + "expand": { + "depth": 1 if mode == "hybrid" else 0, + "edge_types": ["references", "belongs_to", "prev", "next"], + }, + "ret": { + "with_paths": True, + "with_notes": True, + "with_chunks": True, + }, + } + + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + url=url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + + start = time.monotonic() + with urllib.request.urlopen(req, timeout=timeout) as resp: + status_code = resp.getcode() + body = resp.read().decode("utf-8") + end = time.monotonic() + + latency_ms = int((end - start) * 1000) + + if status_code != 200: + raise RuntimeError(f"HTTP {status_code}: {body}") + + try: + doc = json.loads(body) + except json.JSONDecodeError as exc: + raise RuntimeError(f"Antwort ist kein gültiges JSON: {exc}") from exc + + if not isinstance(doc, dict): + raise RuntimeError("Antwort ist kein JSON-Objekt") + + doc["_latency_ms"] = latency_ms + return doc + + +def _validate_response( + mode: str, + doc: Dict[str, Any], + min_results: int, +) -> Dict[str, Any]: + """ + Prüft die wichtigsten Invarianten des Query-Responses. + + Gibt ein Result-Dict zurück mit: + - status: "ok" oder "error" + - message: str + - latency_ms: int + - used_mode: str + - result_count: int + """ + latency_ms = int(doc.get("_latency_ms", -1)) + used_mode = str(doc.get("used_mode", "unknown")) + results = doc.get("results", []) + + if not isinstance(results, list): + return { + "status": "error", + "message": "results ist kein Array", + "latency_ms": latency_ms, + "used_mode": used_mode, + "result_count": -1, + } + + result_count = len(results) + + if result_count < min_results: + return { + "status": "error", + "message": f"zu wenige Ergebnisse: {result_count} < {min_results}", + "latency_ms": latency_ms, + "used_mode": used_mode, + "result_count": result_count, + } + + # Optional: ein paar Felder im ersten Hit prüfen + sample_msg = "OK" + if result_count > 0: + first = results[0] + if not isinstance(first, dict): + sample_msg = "first result ist kein Objekt" + else: + # einfache Plausibilitätschecks + _ = first.get("note_id") + _ = first.get("chunk_id") + _ = first.get("total_score") + + status = "ok" if sample_msg == "OK" else "error" + return { + "status": status, + "message": sample_msg, + "latency_ms": latency_ms, + "used_mode": used_mode, + "result_count": result_count, + } + + +def run_health_check( + url: str, + query: str, + top_k: int, + timeout: float, + modes: List[str], + min_results: int, +) -> Dict[str, Any]: + """ + Führt den Health-Check für die angegebenen Modi aus. + """ + report: Dict[str, Any] = { + "url": url, + "query": query, + "top_k": top_k, + "timeout_s": timeout, + "checks": [], + "overall_status": "ok", + } + + for mode in modes: + entry: Dict[str, Any] = { + "mode": mode, + "status": "error", + "message": "", + } + try: + doc = _post_query(url=url, mode=mode, query=query, top_k=top_k, timeout=timeout) + validation = _validate_response(mode=mode, doc=doc, min_results=min_results) + entry.update(validation) + except (urllib.error.URLError, urllib.error.HTTPError) as exc: + entry["status"] = "error" + entry["message"] = f"HTTP/Netzwerkfehler: {exc}" + entry["latency_ms"] = -1 + entry["used_mode"] = mode + entry["result_count"] = -1 + except Exception as exc: # noqa: BLE001 + entry["status"] = "error" + entry["message"] = f"unerwarteter Fehler: {exc}" + entry["latency_ms"] = -1 + entry["used_mode"] = mode + entry["result_count"] = -1 + + report["checks"].append(entry) + + # Overall-Status bestimmen + for c in report["checks"]: + if c.get("status") != "ok": + report["overall_status"] = "error" + break + + return report + + +def parse_args(argv: List[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Health-Check für mindnet /query-Retriever", + ) + parser.add_argument( + "--url", + default="http://127.0.0.1:8001/query", + help="Vollständige URL des /query-Endpunkts (Default: %(default)s)", + ) + parser.add_argument( + "--query", + default="embeddings", + help="Test-Query für den Health-Check (Default: %(default)s)", + ) + parser.add_argument( + "--top-k", + type=int, + default=3, + help="Top-K für die Testabfrage (Default: %(default)s)", + ) + parser.add_argument( + "--timeout", + type=float, + default=5.0, + help="Timeout in Sekunden pro Request (Default: %(default)s)", + ) + parser.add_argument( + "--modes", + nargs="+", + default=["semantic", "hybrid"], + help="Zu prüfende Modi (Default: semantic hybrid)", + ) + parser.add_argument( + "--min-results", + type=int, + default=1, + help="Minimale Anzahl erwarteter Ergebnisse (Default: %(default)s)", + ) + return parser.parse_args(argv) + + +def main(argv: List[str]) -> int: + args = parse_args(argv) + + report = run_health_check( + url=args.url, + query=args.query, + top_k=args.top_k, + timeout=args.timeout, + modes=args.modes, + min_results=args.min_results, + ) + + overall = report.get("overall_status", "error") + + # Kurze menschenlesbare Zusammenfassung: + print(f"mindnet health: {overall}") + for c in report["checks"]: + mode = c.get("mode") + status = c.get("status") + latency = c.get("latency_ms") + result_count = c.get("result_count") + msg = c.get("message", "") + print(f" - {mode}: {status} (latency={latency} ms, results={result_count}) {msg}") + + # JSON-Ausgabe für n8n / Maschinen: + print(json.dumps(report, ensure_ascii=False)) + + return 0 if overall == "ok" else 1 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:]))