From b2808f82df45b8bf59f21e0cb6bb6a945b71318f Mon Sep 17 00:00:00 2001 From: Lars Date: Mon, 11 Aug 2025 11:32:29 +0200 Subject: [PATCH] scripts/wiki_importer.py aktualisiert --- scripts/wiki_importer.py | 267 +++++++-------------------------------- 1 file changed, 47 insertions(+), 220 deletions(-) diff --git a/scripts/wiki_importer.py b/scripts/wiki_importer.py index d20f3fd..7b81ec8 100644 --- a/scripts/wiki_importer.py +++ b/scripts/wiki_importer.py @@ -10,44 +10,35 @@ Beschreibung: * Fetch pageinfo (pageid, fullurl) via `/info` * Parse Wikitext (Templates: ÜbungInfoBox, Übungsbeschreibung, SkillDevelopment) via `/parsepage` * Baut Payload entsprechend Exercise-Datenmodell - * Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder, - Lookup via `/exercise/by-external-id`, dann create/update/skip inkl. Zählern. + * POST an `/exercise` Endpoint (exercise_router) - Unterstützt Single-Import via `--title` (oder ENV `WIKI_EXERCISE_TITLE`) und Full-Import via `--all` - Optional: Credentials via CLI (--username/--password) oder `.env` (WIKI_BOT_USER / WIKI_BOT_PASSWORD) -- Smoke-Test (`--smoke-test`): 3 Läufe nacheinander (create → skip → update), ohne API-Signaturen zu ändern. -Version: 2.3.1 +Version: 2.1.0 """ import os import sys import argparse -from typing import Dict, Any, Tuple, Optional +from typing import Dict, Any import requests import mwparserfromhell from dotenv import load_dotenv -import hashlib -import json -import time # ----- Konfiguration / Defaults ----- load_dotenv() # .env laden, falls vorhanden -# CHANGED: Basis-URLs klar getrennt (Wiki-Proxy vs. Exercise-API) API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy -EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint (Basis, ohne Slash am Ende) +EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") -REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ---- Hilfsfunktionen für Wiki-Router ---- - def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") - def wiki_login(username: str, password: str) -> None: """ Führt einen Login gegen den wiki_router durch. @@ -68,26 +59,23 @@ def wiki_login(username: str, password: str) -> None: raise RuntimeError(f"[Login] {msg}") print("[Login] success") - def fetch_all_pages(category: str) -> Dict[str, Any]: - resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) + resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=60) resp.raise_for_status() return resp.json() - def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} - def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, - timeout=REQUEST_TIMEOUT + timeout=60 ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") @@ -117,40 +105,7 @@ def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: raw["wikitext"] = wikitext return raw - -# NEW: Stabile Normalisierung für Hash-Bildung - -def _normalize(v: Any) -> str: - if v is None: - return "" - if isinstance(v, (list, tuple)): - return ",".join(_normalize(x) for x in v) - if isinstance(v, dict): - # sort by key for stable hash - return json.dumps(v, sort_keys=True, ensure_ascii=False) - return str(v).strip() - - -# NEW: Fingerprint über Kernfelder - -def compute_fingerprint(payload: Dict[str, Any]) -> str: - """sha256 über Kernfelder: title, summary, execution, notes, duration_minutes, capabilities, keywords""" - fields = [ - payload.get("title", ""), - payload.get("summary", ""), - payload.get("execution", ""), - payload.get("notes", ""), - payload.get("duration_minutes", 0), - payload.get("capabilities", {}), - payload.get("keywords", []), - ] - base = "|".join(_normalize(f) for f in fields) - return hashlib.sha256(base.encode("utf-8")).hexdigest() - - -# CHANGED: Payload inkl. external_id (mw:{pageid}) + fingerprint - -def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: +def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, Any]: # Exercise.capabilities erwartet Dict[str,int] caps_list = raw.get("capabilities", []) capabilities = {} @@ -182,11 +137,6 @@ def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: b elif isinstance(eq_raw, list): equipment = [str(e).strip() for e in eq_raw if str(e).strip()] - notes = raw.get("Hinweise", "") or "" - if mutate: - # Für Smoke-Test (3. Lauf) geringfügige Änderung erzeugen - notes = (notes + " [auto-update]").strip() - payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": raw.get("Summary", "") or "", @@ -203,170 +153,31 @@ def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: b "category": category or "", "purpose": raw.get("Ziel", "") or "", "execution": raw.get("Durchführung", "") or "", - "notes": notes, + "notes": raw.get("Hinweise", "") or "", "preparation": raw.get("RefMethode", "") or "", "method": raw.get("method", "") or "", # falls im Wikitext vorhanden "equipment": equipment, "fullurl": fullurl or "", # optionales Feld - # Idempotenz-Felder (werden vom Backend ggf. ignoriert – API bleibt unverändert): - "external_id": f"mw:{raw.get('pageid')}", + # Idempotenz (optional nutzbar in exercise_router): + "external_id": f"wiki:{raw.get('pageid')}", "source": "MediaWiki" } - # Fingerprint ergänzen (nicht API-relevant, aber nützlich für Lookup-Entscheidung) - payload["fingerprint"] = compute_fingerprint(payload) return payload - -# NEW: Lookup gegen Exercise-API - -def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: - """Fragt /exercise/by-external-id?external_id=... ab. - Rückgabe (json, http_status). Bei 404 -> (None, 404)""" - url = f"{EXERCISE_API}/by-external-id" - try: - r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) - if r.status_code == 404: - return None, 404 - r.raise_for_status() - return r.json(), r.status_code - except requests.HTTPError as e: - return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) - except Exception as e: - return {"error": str(e)}, None - - -# NEW: Upsert-Entscheidung (create/update/skip) - -def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: +def ingest_exercise(payload: Dict[str, Any]) -> None: title = payload.get("title", "") - ext_id = payload.get("external_id") - fp_new = payload.get("fingerprint") - found, status = lookup_by_external_id(ext_id) - - action = "create" - if status == 404 or found is None: - action = "create" - elif isinstance(found, dict): - fp_old = found.get("fingerprint") or found.get("payload", {}).get("fingerprint") - if fp_old == fp_new: - action = "skip" - else: - action = "update" - else: - action = "create" - - if dry_run: - print(f"[DryRun] {action.upper():6} '{title}' ({ext_id})") - return action - - if action == "create": - resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT) - if resp.status_code == 422: - print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") - try: - resp.raise_for_status() - except Exception: - pass - else: - resp.raise_for_status() - print(f"[Create] '{title}' -> OK") - elif action == "update": - # Spezifikation: "update (POST /exercise) + imported_at" - payload2 = dict(payload) - payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) - resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT) - if resp.status_code == 422: - print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") - try: - resp.raise_for_status() - except Exception: - pass - else: - resp.raise_for_status() - print(f"[Update] '{title}' -> OK") - else: - print(f"[Skip] '{title}' (unverändert)") - return action - - -# CHANGED: Einzelverarbeitung nutzt Upsert - -def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str: - info = fetch_page_info(title) - pid = info.get("pageid") - fullurl = info.get("fullurl") or "" - if not pid: - print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr) - return "failed" - raw = parse_exercise(title, pid) - payload = build_payload(raw, fullurl, category, mutate=mutate) - return upsert_exercise(payload, dry_run=dry_run) - - -# CHANGED: Batch mit Stats & Fehlertoleranz - -def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]: - stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} - print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") - pages = fetch_all_pages(category) - print(f"[Main] {len(pages)} Seiten gefunden.") - - for title, entry in pages.items(): + resp = requests.post(EXERCISE_API, json=payload, timeout=60) + if resp.status_code == 422: + print(f"[Ingest] '{title}' -> FAILED 422:\n{resp.text}") try: - pid = entry.get("pageid") if isinstance(entry, dict) else None - fullurl = entry.get("fullurl") if isinstance(entry, dict) else None - if not pid: - info = fetch_page_info(title) - pid = info.get("pageid") - fullurl = fullurl or info.get("fullurl") - if not pid: - print(f"[Skip] '{title}' hat keine pageid") - stats["failed"] += 1 - continue - raw = parse_exercise(title, pid) - payload = build_payload(raw, fullurl or "", category) - act = upsert_exercise(payload, dry_run=dry_run) - if act == "create": - stats["created"] += 1 - elif act == "update": - stats["updated"] += 1 - elif act == "skip": - stats["skipped"] += 1 - except requests.HTTPError as e: - code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None - if code == 404: - print(f"[Skip] '{title}': page not found (404)") - stats["failed"] += 1 - else: - print(f"[Error] '{title}': {e}") - stats["failed"] += 1 - except Exception as e: - print(f"[Error] '{title}': {e}") - stats["failed"] += 1 - return stats - - -# NEW: Smoke-Test (3 Läufe: create → skip → update) - -def run_smoke_test(title: str, category: str) -> None: - print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)") - act1 = process_one(title, category, mutate=False) - print("[SmokeTest] Aktion:", act1) - - print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)") - act2 = process_one(title, category, mutate=False) - print("[SmokeTest] Aktion:", act2) - - print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')") - act3 = process_one(title, category, mutate=True) - print("[SmokeTest] Aktion:", act3) - - print("\n[SmokeTest] Zusammenfassung:") - print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) - + resp.raise_for_status() + except Exception: + pass + return + resp.raise_for_status() + print(f"[Ingest] '{title}' -> OK") # ----- Main ----- - def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") @@ -375,8 +186,7 @@ def main() -> None: parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") - parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) loggen") - parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") + args = parser.parse_args() # Sanity @@ -393,19 +203,36 @@ def main() -> None: print(str(e), file=sys.stderr) sys.exit(1) - if args.smoke_test: - run_smoke_test(args.title, args.category) - return - # Einzel- oder Vollimport if args.all: - stats = process_all(args.category, dry_run=args.dry_run) - print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) + print(f"[Main] Lade Liste der Übungen aus Kategorie '{args.category}'…") + pages = fetch_all_pages(args.category) + print(f"[Main] {len(pages)} Seiten gefunden.") + for title, entry in pages.items(): + pid = entry.get("pageid") + fullurl = entry.get("fullurl") + if not pid: + # Core-Info nachschlagen + info = fetch_page_info(title) + pid = info.get("pageid") + fullurl = fullurl or info.get("fullurl") + if not pid: + print(f"[Skip] '{title}' hat keine pageid") + continue + raw = parse_exercise(title, pid) + payload = build_payload(raw, fullurl or "", args.category) + ingest_exercise(payload) else: print(f"[Main] Import single exercise: {args.title}") - result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run) - print(f"[Result] {result}") - + info = fetch_page_info(args.title) + pid = info.get("pageid") + fullurl = info.get("fullurl") or "" + if not pid: + print(f"[Error] pageid für '{args.title}' nicht gefunden.", file=sys.stderr) + sys.exit(1) + raw = parse_exercise(args.title, pid) + payload = build_payload(raw, fullurl, args.category) + ingest_exercise(payload) if __name__ == "__main__": main()