diff --git a/scripts/wiki_importer.py b/scripts/wiki_importer.py index 7b81ec8..0feb11b 100644 --- a/scripts/wiki_importer.py +++ b/scripts/wiki_importer.py @@ -7,38 +7,51 @@ Beschreibung: - Führt vor dem Import einen Login gegen /import/wiki/login durch (falls nicht via --skip-login deaktiviert) - Holt Liste aller Übungs-Titel (SMW-Ask) via `/semantic/pages` - Für jede Übung: - * Fetch pageinfo (pageid, fullurl) via `/info` + * Fetch pageinfo (pageid, fullurl) via `/info` (nur wenn nicht bereits geliefert) * Parse Wikitext (Templates: ÜbungInfoBox, Übungsbeschreibung, SkillDevelopment) via `/parsepage` * Baut Payload entsprechend Exercise-Datenmodell - * POST an `/exercise` Endpoint (exercise_router) + * Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder, + Lookup via `/exercise/by-external-id`, dann create/update/skip inkl. Zählern. - Unterstützt Single-Import via `--title` (oder ENV `WIKI_EXERCISE_TITLE`) und Full-Import via `--all` - Optional: Credentials via CLI (--username/--password) oder `.env` (WIKI_BOT_USER / WIKI_BOT_PASSWORD) +- Smoke-Test (`--smoke-test`): 3 Läufe nacheinander (create → skip → update), ohne API-Signaturen zu ändern. -Version: 2.1.0 +Version: 2.3.2 +Änderung: Regressionsfix in `process_all()` – statt `isinstance(entry, dict)` wird nun generisch über + `getattr(entry, "get", None)` auf `pageid/fullurl` zugegriffen (unterstützt Mapping‑ähnliche Typen + wie pydantic/OrderedDict/Mapping). So werden vorhandene pageids aus `/semantic/pages` wieder zuverlässig + genutzt und unnötige `/info`‑Aufrufe vermieden. """ import os import sys import argparse -from typing import Dict, Any +from typing import Dict, Any, Tuple, Optional +from collections.abc import Mapping import requests import mwparserfromhell from dotenv import load_dotenv +import hashlib +import json +import time # ----- Konfiguration / Defaults ----- load_dotenv() # .env laden, falls vorhanden API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy -EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint +EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint (Basis, ohne Slash am Ende) DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") +REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ---- Hilfsfunktionen für Wiki-Router ---- + def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") + def wiki_login(username: str, password: str) -> None: """ Führt einen Login gegen den wiki_router durch. @@ -46,7 +59,6 @@ def wiki_login(username: str, password: str) -> None: """ payload = {"username": username, "password": password} r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30) - # kein raise_for_status(), wir wollen die JSON-Fehler sauber ausgeben try: data = r.json() except Exception: @@ -59,23 +71,26 @@ def wiki_login(username: str, password: str) -> None: raise RuntimeError(f"[Login] {msg}") print("[Login] success") + def fetch_all_pages(category: str) -> Dict[str, Any]: - resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=60) + resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json() + def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} + def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, - timeout=60 + timeout=REQUEST_TIMEOUT ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") @@ -105,7 +120,35 @@ def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: raw["wikitext"] = wikitext return raw -def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, Any]: + +# ---- Fingerprint-Unterstützung ---- + +def _normalize(v: Any) -> str: + if v is None: + return "" + if isinstance(v, (list, tuple)): + return ",".join(_normalize(x) for x in v) + if isinstance(v, dict): + return json.dumps(v, sort_keys=True, ensure_ascii=False) + return str(v).strip() + + +def compute_fingerprint(payload: Dict[str, Any]) -> str: + """sha256 über Kernfelder: title, summary, execution, notes, duration_minutes, capabilities, keywords""" + fields = [ + payload.get("title", ""), + payload.get("summary", ""), + payload.get("execution", ""), + payload.get("notes", ""), + payload.get("duration_minutes", 0), + payload.get("capabilities", {}), + payload.get("keywords", []), + ] + base = "|".join(_normalize(f) for f in fields) + return hashlib.sha256(base.encode("utf-8")).hexdigest() + + +def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: # Exercise.capabilities erwartet Dict[str,int] caps_list = raw.get("capabilities", []) capabilities = {} @@ -137,6 +180,11 @@ def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, elif isinstance(eq_raw, list): equipment = [str(e).strip() for e in eq_raw if str(e).strip()] + notes = raw.get("Hinweise", "") or "" + if mutate: + # Für Smoke-Test (3. Lauf) geringfügige Änderung erzeugen + notes = (notes + " [auto-update]").strip() + payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": raw.get("Summary", "") or "", @@ -148,36 +196,167 @@ def build_payload(raw: Dict[str, Any], fullurl: str, category: str) -> Dict[str, "age_group": raw.get("Altersgruppe", "") or "", "target_group": raw.get("Zielgruppe", "") or "", "min_participants": 1, - "duration_minutes": int(round(duration)), # Exercise erwartet int + "duration_minutes": int(round(duration)), "capabilities": capabilities, "category": category or "", "purpose": raw.get("Ziel", "") or "", "execution": raw.get("Durchführung", "") or "", - "notes": raw.get("Hinweise", "") or "", + "notes": notes, "preparation": raw.get("RefMethode", "") or "", - "method": raw.get("method", "") or "", # falls im Wikitext vorhanden + "method": raw.get("method", "") or "", "equipment": equipment, - "fullurl": fullurl or "", # optionales Feld - # Idempotenz (optional nutzbar in exercise_router): - "external_id": f"wiki:{raw.get('pageid')}", - "source": "MediaWiki" + "fullurl": fullurl or "", + "external_id": f"mw:{raw.get('pageid')}", + "source": "MediaWiki", } + payload["fingerprint"] = compute_fingerprint(payload) return payload -def ingest_exercise(payload: Dict[str, Any]) -> None: + +def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: + url = f"{EXERCISE_API}/by-external-id" + try: + r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) + if r.status_code == 404: + return None, 404 + r.raise_for_status() + return r.json(), r.status_code + except requests.HTTPError as e: + return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) + except Exception as e: + return {"error": str(e)}, None + + +def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: title = payload.get("title", "") - resp = requests.post(EXERCISE_API, json=payload, timeout=60) - if resp.status_code == 422: - print(f"[Ingest] '{title}' -> FAILED 422:\n{resp.text}") - try: + ext_id = payload.get("external_id") + fp_new = payload.get("fingerprint") + found, status = lookup_by_external_id(ext_id) + + action = "create" + if status == 404 or found is None: + action = "create" + elif isinstance(found, dict): + fp_old = found.get("fingerprint") or found.get("payload", {}).get("fingerprint") + if fp_old == fp_new: + action = "skip" + else: + action = "update" + else: + action = "create" + + if dry_run: + print(f"[DryRun] {action.upper():6} '{title}' ({ext_id})") + return action + + if action == "create": + resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT) + if resp.status_code == 422: + print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") + try: + resp.raise_for_status() + except Exception: + pass + else: resp.raise_for_status() - except Exception: - pass - return - resp.raise_for_status() - print(f"[Ingest] '{title}' -> OK") + print(f"[Create] '{title}' -> OK") + elif action == "update": + payload2 = dict(payload) + payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT) + if resp.status_code == 422: + print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") + try: + resp.raise_for_status() + except Exception: + pass + else: + resp.raise_for_status() + print(f"[Update] '{title}' -> OK") + else: + print(f"[Skip] '{title}' (unverändert)") + return action + + +def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str: + info = fetch_page_info(title) + pid = info.get("pageid") + fullurl = info.get("fullurl") or "" + if not pid: + print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr) + return "failed" + raw = parse_exercise(title, pid) + payload = build_payload(raw, fullurl, category, mutate=mutate) + return upsert_exercise(payload, dry_run=dry_run) + + +def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]: + stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} + print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") + pages = fetch_all_pages(category) + print(f"[Main] {len(pages)} Seiten gefunden.") + + for title, entry in pages.items(): + try: + # Regressionsfix: generischer Zugriff auf Mapping‑ähnliche Einträge + getter = getattr(entry, "get", None) + if callable(getter): + pid = getter("pageid") + fullurl = getter("fullurl") + else: + pid = None + fullurl = None + + if not pid: + info = fetch_page_info(title) + pid = info.get("pageid") + fullurl = fullurl or info.get("fullurl") + if not pid: + print(f"[Skip] '{title}' hat keine pageid") + stats["failed"] += 1 + continue + raw = parse_exercise(title, pid) + payload = build_payload(raw, fullurl or "", category) + act = upsert_exercise(payload, dry_run=dry_run) + if act == "create": + stats["created"] += 1 + elif act == "update": + stats["updated"] += 1 + elif act == "skip": + stats["skipped"] += 1 + except requests.HTTPError as e: + code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None + if code == 404: + print(f"[Skip] '{title}': page not found (404)") + stats["failed"] += 1 + else: + print(f"[Error] '{title}': {e}") + stats["failed"] += 1 + except Exception as e: + print(f"[Error] '{title}': {e}") + stats["failed"] += 1 + return stats + + +def run_smoke_test(title: str, category: str) -> None: + print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)") + act1 = process_one(title, category, mutate=False) + print("[SmokeTest] Aktion:", act1) + + print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)") + act2 = process_one(title, category, mutate=False) + print("[SmokeTest] Aktion:", act2) + + print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')") + act3 = process_one(title, category, mutate=True) + print("[SmokeTest] Aktion:", act3) + + print("\n[SmokeTest] Zusammenfassung:") + print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) + # ----- Main ----- + def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") @@ -186,7 +365,8 @@ def main() -> None: parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") - + parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) loggen") + parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") args = parser.parse_args() # Sanity @@ -203,36 +383,18 @@ def main() -> None: print(str(e), file=sys.stderr) sys.exit(1) - # Einzel- oder Vollimport + if args.smoke_test: + run_smoke_test(args.title, args.category) + return + if args.all: - print(f"[Main] Lade Liste der Übungen aus Kategorie '{args.category}'…") - pages = fetch_all_pages(args.category) - print(f"[Main] {len(pages)} Seiten gefunden.") - for title, entry in pages.items(): - pid = entry.get("pageid") - fullurl = entry.get("fullurl") - if not pid: - # Core-Info nachschlagen - info = fetch_page_info(title) - pid = info.get("pageid") - fullurl = fullurl or info.get("fullurl") - if not pid: - print(f"[Skip] '{title}' hat keine pageid") - continue - raw = parse_exercise(title, pid) - payload = build_payload(raw, fullurl or "", args.category) - ingest_exercise(payload) + stats = process_all(args.category, dry_run=args.dry_run) + print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) else: print(f"[Main] Import single exercise: {args.title}") - info = fetch_page_info(args.title) - pid = info.get("pageid") - fullurl = info.get("fullurl") or "" - if not pid: - print(f"[Error] pageid für '{args.title}' nicht gefunden.", file=sys.stderr) - sys.exit(1) - raw = parse_exercise(args.title, pid) - payload = build_payload(raw, fullurl, args.category) - ingest_exercise(payload) + result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run) + print(f"[Result] {result}") + if __name__ == "__main__": main()