scripts/backfill_capability_facets.py aktualisiert
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 2s

This commit is contained in:
Lars 2025-08-11 19:24:17 +02:00
parent fa8a92208a
commit a6d68134cd

View File

@ -1,22 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Idempotentes Backfill-Skript für Capability-Facetten in Qdrant. Backfill Capability-Facetten in Qdrant v1.2
Fix: beendet korrekt, wenn `next_page_offset` (offset) None ist.
- Kompatibel mit qdrant-client 1.15.x: **kein** WithPayloadSelector-Import nötig
- Liest alle Punkte der Collection mit Payload (scroll, with_payload=True)
- Schreibt folgende Felder pro Point nach:
* capability_keys
* capability_ge1 .. capability_ge5
* capability_eq1 .. capability_eq5
Hinweis: Das Skript setzt KEINE Vektoren neu, es aktualisiert nur Payload-Felder.
""" """
import os import os
from typing import Dict, Any, List, Tuple, Optional from typing import Dict, Any, List
from qdrant_client import QdrantClient from qdrant_client import QdrantClient
from qdrant_client.models import Filter # nur für API-Kompatibilität; wird hier leer genutzt
COLL = os.getenv("EXERCISE_COLLECTION", "exercises") COLL = os.getenv("EXERCISE_COLLECTION", "exercises")
QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost") QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost")
@ -26,7 +16,6 @@ BATCH = int(os.getenv("BACKFILL_BATCH", "256"))
def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]: def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]:
caps = caps or {} caps = caps or {}
def names_where(pred) -> List[str]: def names_where(pred) -> List[str]:
out = [] out = []
for k, v in caps.items(): for k, v in caps.items():
@ -38,20 +27,16 @@ def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]:
s = str(k).strip() s = str(k).strip()
if s: if s:
out.append(s) out.append(s)
# stabil sortieren
return sorted({s for s in out}, key=str.casefold) return sorted({s for s in out}, key=str.casefold)
all_keys = sorted({str(k).strip() for k in caps.keys() if str(k).strip()}, key=str.casefold) all_keys = sorted({str(k).strip() for k in caps.keys() if str(k).strip()}, key=str.casefold)
return { return {
"capability_keys": all_keys, "capability_keys": all_keys,
# >= N
"capability_ge1": names_where(lambda lv: lv >= 1), "capability_ge1": names_where(lambda lv: lv >= 1),
"capability_ge2": names_where(lambda lv: lv >= 2), "capability_ge2": names_where(lambda lv: lv >= 2),
"capability_ge3": names_where(lambda lv: lv >= 3), "capability_ge3": names_where(lambda lv: lv >= 3),
"capability_ge4": names_where(lambda lv: lv >= 4), "capability_ge4": names_where(lambda lv: lv >= 4),
"capability_ge5": names_where(lambda lv: lv >= 5), "capability_ge5": names_where(lambda lv: lv >= 5),
# == N
"capability_eq1": names_where(lambda lv: lv == 1), "capability_eq1": names_where(lambda lv: lv == 1),
"capability_eq2": names_where(lambda lv: lv == 2), "capability_eq2": names_where(lambda lv: lv == 2),
"capability_eq3": names_where(lambda lv: lv == 3), "capability_eq3": names_where(lambda lv: lv == 3),
@ -62,47 +47,48 @@ def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]:
def main() -> None: def main() -> None:
client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
# Sanity: Collection muss existieren
info = client.get_collection(COLL) info = client.get_collection(COLL)
print(f"[Backfill] Collection '{COLL}' ok vectors={info.config.params.vectors}") print(f"[Backfill] Collection '{COLL}' ok vectors={info.config.params.vectors}")
updated = 0 updated_total = 0
offset = None offset = None
page = 0 page = 0
while True: while True:
page += 1 points, next_offset = client.scroll(
points, offset = client.scroll(
collection_name=COLL, collection_name=COLL,
scroll_filter=None, # alles scroll_filter=None,
offset=offset, offset=offset,
limit=BATCH, limit=BATCH,
with_payload=True, with_payload=True,
) )
page += 1
if not points: if not points:
print("[Backfill] no more points done")
break break
updated_page = 0
for pt in points: for pt in points:
pld = pt.payload or {} pld = pt.payload or {}
caps = pld.get("capabilities") or {} caps = pld.get("capabilities") or {}
facets = _facet_capabilities(caps) facets = _facet_capabilities(caps)
# Nur schreiben, wenn sich etwas ändert oder Felder fehlen # nur setzen, wenn sich etwas ändert
need = False need = any(pld.get(k) != v for k, v in facets.items())
for k, v in facets.items():
if pld.get(k) != v:
need = True
break
if not need: if not need:
continue continue
# set_payload: pro Punkt separat (per-Point Payload)
client.set_payload(collection_name=COLL, points=[pt.id], payload=facets) client.set_payload(collection_name=COLL, points=[pt.id], payload=facets)
updated += 1 updated_total += 1
print(f"[Backfill] page={page} processed={len(points)} updated_total={updated}") updated_page += 1
print(f"[Backfill] done. total_updated={updated}") print(f"[Backfill] page={page} processed={len(points)} updated_page={updated_page} updated_total={updated_total}")
# Ende erreicht? Dann nach dieser Seite aussteigen.
if next_offset is None:
break
offset = next_offset
print(f"[Backfill] done. total_updated={updated_total}")
if __name__ == "__main__": if __name__ == "__main__":