From 97bc283ce198c508b53042b0580f71be7c42580a Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 11 Aug 2025 11:12:19 +0200 Subject: [PATCH] scripts/wiki_importer.py aktualisiert --- scripts/wiki_importer.py | 267 ++++++++++++++++++++++++++++++++------- 1 file changed, 220 insertions(+), 47 deletions(-) diff --git a/scripts/wiki_importer.py b/scripts/wiki_importer.py index 7b81ec8..d20f3fd 100644 --- a/scripts/wiki_importer.py +++ b/scripts/wiki_importer.py @@ -10,35 +10,44 @@ Beschreibung: * Fetch pageinfo (pageid, fullurl) via `/info` * Parse Wikitext (Templates: ÜbungInfoBox, Übungsbeschreibung, SkillDevelopment) via `/parsepage` * Baut Payload entsprechend Exercise-Datenmodell - * POST an `/exercise` Endpoint (exercise_router) + * Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder, + Lookup via `/exercise/by-external-id`, dann create/update/skip inkl. Zählern. - Unterstützt Single-Import via `--title` (oder ENV `WIKI_EXERCISE_TITLE`) und Full-Import via `--all` - Optional: Credentials via CLI (--username/--password) oder `.env` (WIKI_BOT_USER / WIKI_BOT_PASSWORD) +- Smoke-Test (`--smoke-test`): 3 Läufe nacheinander (create → skip → update), ohne API-Signaturen zu ändern. -Version: 2.1.0 +Version: 2.3.1 """ import os import sys import argparse -from typing import Dict, Any +from typing import Dict, Any, Tuple, Optional import requests import mwparserfromhell from dotenv import load_dotenv +import hashlib +import json +import time # ----- Konfiguration / Defaults ----- load_dotenv() # .env laden, falls vorhanden +# CHANGED: Basis-URLs klar getrennt (Wiki-Proxy vs. Exercise-API) API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy -EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint +EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint (Basis, ohne Slash am Ende) DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") +REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ---- Hilfsfunktionen für Wiki-Router ---- + def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") + def wiki_login(username: str, password: str) -> None: """ Führt einen Login gegen den wiki_router durch. @@ -59,23 +68,26 @@ def wiki_login(username: str, password: str) -> None: raise RuntimeError(f"[Login] {msg}") print("[Login] success") + def fetch_all_pages(category: str) -> Dict[str, Any]: - resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=60) + resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json() + def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} + def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, - timeout=60 + timeout=REQUEST_TIMEOUT ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") @@ -105,7 +117,40 @@ def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: raw["wikitext"] = wikitext return raw -def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, Any]: + +# NEW: Stabile Normalisierung für Hash-Bildung + +def _normalize(v: Any) -> str: + if v is None: + return "" + if isinstance(v, (list, tuple)): + return ",".join(_normalize(x) for x in v) + if isinstance(v, dict): + # sort by key for stable hash + return json.dumps(v, sort_keys=True, ensure_ascii=False) + return str(v).strip() + + +# NEW: Fingerprint über Kernfelder + +def compute_fingerprint(payload: Dict[str, Any]) -> str: + """sha256 über Kernfelder: title, summary, execution, notes, duration_minutes, capabilities, keywords""" + fields = [ + payload.get("title", ""), + payload.get("summary", ""), + payload.get("execution", ""), + payload.get("notes", ""), + payload.get("duration_minutes", 0), + payload.get("capabilities", {}), + payload.get("keywords", []), + ] + base = "|".join(_normalize(f) for f in fields) + return hashlib.sha256(base.encode("utf-8")).hexdigest() + + +# CHANGED: Payload inkl. external_id (mw:{pageid}) + fingerprint + +def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: # Exercise.capabilities erwartet Dict[str,int] caps_list = raw.get("capabilities", []) capabilities = {} @@ -137,6 +182,11 @@ def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, elif isinstance(eq_raw, list): equipment = [str(e).strip() for e in eq_raw if str(e).strip()] + notes = raw.get("Hinweise", "") or "" + if mutate: + # Für Smoke-Test (3. Lauf) geringfügige Änderung erzeugen + notes = (notes + " [auto-update]").strip() + payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": raw.get("Summary", "") or "", @@ -153,31 +203,170 @@ def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, "category": category or "", "purpose": raw.get("Ziel", "") or "", "execution": raw.get("Durchführung", "") or "", - "notes": raw.get("Hinweise", "") or "", + "notes": notes, "preparation": raw.get("RefMethode", "") or "", "method": raw.get("method", "") or "", # falls im Wikitext vorhanden "equipment": equipment, "fullurl": fullurl or "", # optionales Feld - # Idempotenz (optional nutzbar in exercise_router): - "external_id": f"wiki:{raw.get('pageid')}", + # Idempotenz-Felder (werden vom Backend ggf. ignoriert – API bleibt unverändert): + "external_id": f"mw:{raw.get('pageid')}", "source": "MediaWiki" } + # Fingerprint ergänzen (nicht API-relevant, aber nützlich für Lookup-Entscheidung) + payload["fingerprint"] = compute_fingerprint(payload) return payload -def ingest_exercise(payload: Dict[str, Any]) -> None: + +# NEW: Lookup gegen Exercise-API + +def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: + """Fragt /exercise/by-external-id?external_id=... ab. + Rückgabe (json, http_status). Bei 404 -> (None, 404)""" + url = f"{EXERCISE_API}/by-external-id" + try: + r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) + if r.status_code == 404: + return None, 404 + r.raise_for_status() + return r.json(), r.status_code + except requests.HTTPError as e: + return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) + except Exception as e: + return {"error": str(e)}, None + + +# NEW: Upsert-Entscheidung (create/update/skip) + +def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: title = payload.get("title", "") - resp = requests.post(EXERCISE_API, json=payload, timeout=60) - if resp.status_code == 422: - print(f"[Ingest] '{title}' -> FAILED 422:\n{resp.text}") - try: + ext_id = payload.get("external_id") + fp_new = payload.get("fingerprint") + found, status = lookup_by_external_id(ext_id) + + action = "create" + if status == 404 or found is None: + action = "create" + elif isinstance(found, dict): + fp_old = found.get("fingerprint") or found.get("payload", {}).get("fingerprint") + if fp_old == fp_new: + action = "skip" + else: + action = "update" + else: + action = "create" + + if dry_run: + print(f"[DryRun] {action.upper():6} '{title}' ({ext_id})") + return action + + if action == "create": + resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT) + if resp.status_code == 422: + print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") + try: + resp.raise_for_status() + except Exception: + pass + else: resp.raise_for_status() - except Exception: - pass - return - resp.raise_for_status() - print(f"[Ingest] '{title}' -> OK") + print(f"[Create] '{title}' -> OK") + elif action == "update": + # Spezifikation: "update (POST /exercise) + imported_at" + payload2 = dict(payload) + payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT) + if resp.status_code == 422: + print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") + try: + resp.raise_for_status() + except Exception: + pass + else: + resp.raise_for_status() + print(f"[Update] '{title}' -> OK") + else: + print(f"[Skip] '{title}' (unverändert)") + return action + + +# CHANGED: Einzelverarbeitung nutzt Upsert + +def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str: + info = fetch_page_info(title) + pid = info.get("pageid") + fullurl = info.get("fullurl") or "" + if not pid: + print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr) + return "failed" + raw = parse_exercise(title, pid) + payload = build_payload(raw, fullurl, category, mutate=mutate) + return upsert_exercise(payload, dry_run=dry_run) + + +# CHANGED: Batch mit Stats & Fehlertoleranz + +def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]: + stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} + print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") + pages = fetch_all_pages(category) + print(f"[Main] {len(pages)} Seiten gefunden.") + + for title, entry in pages.items(): + try: + pid = entry.get("pageid") if isinstance(entry, dict) else None + fullurl = entry.get("fullurl") if isinstance(entry, dict) else None + if not pid: + info = fetch_page_info(title) + pid = info.get("pageid") + fullurl = fullurl or info.get("fullurl") + if not pid: + print(f"[Skip] '{title}' hat keine pageid") + stats["failed"] += 1 + continue + raw = parse_exercise(title, pid) + payload = build_payload(raw, fullurl or "", category) + act = upsert_exercise(payload, dry_run=dry_run) + if act == "create": + stats["created"] += 1 + elif act == "update": + stats["updated"] += 1 + elif act == "skip": + stats["skipped"] += 1 + except requests.HTTPError as e: + code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None + if code == 404: + print(f"[Skip] '{title}': page not found (404)") + stats["failed"] += 1 + else: + print(f"[Error] '{title}': {e}") + stats["failed"] += 1 + except Exception as e: + print(f"[Error] '{title}': {e}") + stats["failed"] += 1 + return stats + + +# NEW: Smoke-Test (3 Läufe: create → skip → update) + +def run_smoke_test(title: str, category: str) -> None: + print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)") + act1 = process_one(title, category, mutate=False) + print("[SmokeTest] Aktion:", act1) + + print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)") + act2 = process_one(title, category, mutate=False) + print("[SmokeTest] Aktion:", act2) + + print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')") + act3 = process_one(title, category, mutate=True) + print("[SmokeTest] Aktion:", act3) + + print("\n[SmokeTest] Zusammenfassung:") + print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) + # ----- Main ----- + def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") @@ -186,7 +375,8 @@ def main() -> None: parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") - + parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) loggen") + parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") args = parser.parse_args() # Sanity @@ -203,36 +393,19 @@ def main() -> None: print(str(e), file=sys.stderr) sys.exit(1) + if args.smoke_test: + run_smoke_test(args.title, args.category) + return + # Einzel- oder Vollimport if args.all: - print(f"[Main] Lade Liste der Übungen aus Kategorie '{args.category}'…") - pages = fetch_all_pages(args.category) - print(f"[Main] {len(pages)} Seiten gefunden.") - for title, entry in pages.items(): - pid = entry.get("pageid") - fullurl = entry.get("fullurl") - if not pid: - # Core-Info nachschlagen - info = fetch_page_info(title) - pid = info.get("pageid") - fullurl = fullurl or info.get("fullurl") - if not pid: - print(f"[Skip] '{title}' hat keine pageid") - continue - raw = parse_exercise(title, pid) - payload = build_payload(raw, fullurl or "", args.category) - ingest_exercise(payload) + stats = process_all(args.category, dry_run=args.dry_run) + print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) else: print(f"[Main] Import single exercise: {args.title}") - info = fetch_page_info(args.title) - pid = info.get("pageid") - fullurl = info.get("fullurl") or "" - if not pid: - print(f"[Error] pageid für '{args.title}' nicht gefunden.", file=sys.stderr) - sys.exit(1) - raw = parse_exercise(args.title, pid) - payload = build_payload(raw, fullurl, args.category) - ingest_exercise(payload) + result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run) + print(f"[Result] {result}") + if __name__ == "__main__": main()