#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Module: wiki_importer.py Beschreibung: - Importiert Übungen aus dem MediaWiki via FastAPI wiki_router - Führt vor dem Import einen Login gegen /import/wiki/login durch (falls nicht via --skip-login deaktiviert) - Holt Liste aller Übungs-Titel (SMW-Ask) via `/semantic/pages` - Für jede Übung: * Fetch pageinfo (pageid, fullurl) via `/info` (nur wenn nicht bereits geliefert) * Parse Wikitext (Templates: ÜbungInfoBox, Übungsbeschreibung, SkillDevelopment) via `/parsepage` * Baut Payload entsprechend Exercise-Datenmodell * Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder, Lookup via `/exercise/by-external-id`, dann create/update/skip inkl. Zählern. - Unterstützt Single-Import via `--title` (oder ENV `WIKI_EXERCISE_TITLE`) und Full-Import via `--all` - Optional: Credentials via CLI (--username/--password) oder `.env` (WIKI_BOT_USER / WIKI_BOT_PASSWORD) - Smoke-Test (`--smoke-test`): 3 Läufe nacheinander (create → skip → update), ohne API-Signaturen zu ändern. Version: 2.3.2 Änderung: Regressionsfix in `process_all()` – statt `isinstance(entry, dict)` wird nun generisch über `getattr(entry, "get", None)` auf `pageid/fullurl` zugegriffen (unterstützt Mapping‑ähnliche Typen wie pydantic/OrderedDict/Mapping). So werden vorhandene pageids aus `/semantic/pages` wieder zuverlässig genutzt und unnötige `/info`‑Aufrufe vermieden. """ import os import sys import argparse from typing import Dict, Any, Tuple, Optional from collections.abc import Mapping import requests import mwparserfromhell from dotenv import load_dotenv import hashlib import json import time # ----- Konfiguration / Defaults ----- load_dotenv() # .env laden, falls vorhanden API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint (Basis, ohne Slash am Ende) DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ---- Hilfsfunktionen für Wiki-Router ---- def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") def wiki_login(username: str, password: str) -> None: """ Führt einen Login gegen den wiki_router durch. Erwartet: {"status":"success"} bei Erfolg. """ payload = {"username": username, "password": password} r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30) try: data = r.json() except Exception: print(f"[Login] HTTP {r.status_code}: {r.text}") r.raise_for_status() status = (data or {}).get("status") if status != "success": msg = (data or {}).get("message", "Login fehlgeschlagen") raise RuntimeError(f"[Login] {msg}") print("[Login] success") def fetch_all_pages(category: str) -> Dict[str, Any]: resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json() def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, timeout=REQUEST_TIMEOUT ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") wikicode = mwparserfromhell.parse(wikitext) raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid} for tpl in wikicode.filter_templates(): name = str(tpl.name).strip() if name == "ÜbungInfoBox": for p in tpl.params: raw[str(p.name).strip()] = str(p.value).strip() elif name == "Übungsbeschreibung": for p in tpl.params: raw[str(p.name).strip()] = str(p.value).strip() elif name == "SkillDevelopment": raw.setdefault("capabilities", []) try: cap = str(tpl.get("PrimaryCapability").value).strip() except Exception: cap = "" try: lvl = int(str(tpl.get("CapabilityLevel").value).strip()) except Exception: lvl = 0 if cap: raw["capabilities"].append({"capability": cap, "level": lvl}) raw["wikitext"] = wikitext return raw # ---- Fingerprint-Unterstützung ---- def _normalize(v: Any) -> str: if v is None: return "" if isinstance(v, (list, tuple)): return ",".join(_normalize(x) for x in v) if isinstance(v, dict): return json.dumps(v, sort_keys=True, ensure_ascii=False) return str(v).strip() def compute_fingerprint(payload: Dict[str, Any]) -> str: """sha256 über Kernfelder: title, summary, execution, notes, duration_minutes, capabilities, keywords""" fields = [ payload.get("title", ""), payload.get("summary", ""), payload.get("execution", ""), payload.get("notes", ""), payload.get("duration_minutes", 0), payload.get("capabilities", {}), payload.get("keywords", []), ] base = "|".join(_normalize(f) for f in fields) return hashlib.sha256(base.encode("utf-8")).hexdigest() def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: # Exercise.capabilities erwartet Dict[str,int] caps_list = raw.get("capabilities", []) capabilities = {} for c in caps_list: cap = c.get("capability") lvl = c.get("level") if isinstance(cap, str) and cap: try: capabilities[cap] = int(lvl) except Exception: pass # Defaults/Fallbacks duration = 0.0 try: duration = float(raw.get("Dauer", 0) or 0) except Exception: duration = 0.0 keywords = [] kw_raw = raw.get("Schlüsselworte", "") if isinstance(kw_raw, str): keywords = [k.strip() for k in kw_raw.split(",") if k.strip()] equipment = [] eq_raw = raw.get("equipment", []) if isinstance(eq_raw, str): equipment = [e.strip() for e in eq_raw.split(",") if e.strip()] elif isinstance(eq_raw, list): equipment = [str(e).strip() for e in eq_raw if str(e).strip()] notes = raw.get("Hinweise", "") or "" if mutate: # Für Smoke-Test (3. Lauf) geringfügige Änderung erzeugen notes = (notes + " [auto-update]").strip() payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": raw.get("Summary", "") or "", "short_description": raw.get("Summary", "") or "", "keywords": keywords, "link": fullurl or "", "discipline": raw.get("Übungstyp", "") or "", "group": raw.get("Gruppengröße", "") or None, "age_group": raw.get("Altersgruppe", "") or "", "target_group": raw.get("Zielgruppe", "") or "", "min_participants": 1, "duration_minutes": int(round(duration)), "capabilities": capabilities, "category": category or "", "purpose": raw.get("Ziel", "") or "", "execution": raw.get("Durchführung", "") or "", "notes": notes, "preparation": raw.get("RefMethode", "") or "", "method": raw.get("method", "") or "", "equipment": equipment, "fullurl": fullurl or "", "external_id": f"mw:{raw.get('pageid')}", "source": "MediaWiki", } payload["fingerprint"] = compute_fingerprint(payload) return payload def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: url = f"{EXERCISE_API}/by-external-id" try: r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) if r.status_code == 404: return None, 404 r.raise_for_status() return r.json(), r.status_code except requests.HTTPError as e: return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) except Exception as e: return {"error": str(e)}, None def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: title = payload.get("title", "") ext_id = payload.get("external_id") fp_new = payload.get("fingerprint") found, status = lookup_by_external_id(ext_id) action = "create" if status == 404 or found is None: action = "create" elif isinstance(found, dict): fp_old = found.get("fingerprint") or found.get("payload", {}).get("fingerprint") if fp_old == fp_new: action = "skip" else: action = "update" else: action = "create" if dry_run: print(f"[DryRun] {action.upper():6} '{title}' ({ext_id})") return action if action == "create": resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status() print(f"[Create] '{title}' -> OK") elif action == "update": payload2 = dict(payload) payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status() print(f"[Update] '{title}' -> OK") else: print(f"[Skip] '{title}' (unverändert)") return action def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str: info = fetch_page_info(title) pid = info.get("pageid") fullurl = info.get("fullurl") or "" if not pid: print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr) return "failed" raw = parse_exercise(title, pid) payload = build_payload(raw, fullurl, category, mutate=mutate) return upsert_exercise(payload, dry_run=dry_run) def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]: stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") pages = fetch_all_pages(category) print(f"[Main] {len(pages)} Seiten gefunden.") for title, entry in pages.items(): try: # Regressionsfix: generischer Zugriff auf Mapping‑ähnliche Einträge getter = getattr(entry, "get", None) if callable(getter): pid = getter("pageid") fullurl = getter("fullurl") else: pid = None fullurl = None if not pid: info = fetch_page_info(title) pid = info.get("pageid") fullurl = fullurl or info.get("fullurl") if not pid: print(f"[Skip] '{title}' hat keine pageid") stats["failed"] += 1 continue raw = parse_exercise(title, pid) payload = build_payload(raw, fullurl or "", category) act = upsert_exercise(payload, dry_run=dry_run) if act == "create": stats["created"] += 1 elif act == "update": stats["updated"] += 1 elif act == "skip": stats["skipped"] += 1 except requests.HTTPError as e: code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None if code == 404: print(f"[Skip] '{title}': page not found (404)") stats["failed"] += 1 else: print(f"[Error] '{title}': {e}") stats["failed"] += 1 except Exception as e: print(f"[Error] '{title}': {e}") stats["failed"] += 1 return stats def run_smoke_test(title: str, category: str) -> None: print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)") act1 = process_one(title, category, mutate=False) print("[SmokeTest] Aktion:", act1) print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)") act2 = process_one(title, category, mutate=False) print("[SmokeTest] Aktion:", act2) print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')") act3 = process_one(title, category, mutate=True) print("[SmokeTest] Aktion:", act3) print("\n[SmokeTest] Zusammenfassung:") print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) # ----- Main ----- def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels") parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')") parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) loggen") parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") args = parser.parse_args() # Sanity wiki_health() # Login (sofern nicht explizit übersprungen) if not args.skip_login: if not args.username or not args.password: print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr) sys.exit(1) try: wiki_login(args.username, args.password) except Exception as e: print(str(e), file=sys.stderr) sys.exit(1) if args.smoke_test: run_smoke_test(args.title, args.category) return if args.all: stats = process_all(args.category, dry_run=args.dry_run) print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) else: print(f"[Main] Import single exercise: {args.title}") result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run) print(f"[Result] {result}") if __name__ == "__main__": main()