#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Module: wiki_importer.py Beschreibung: - Importiert Übungen aus dem MediaWiki via FastAPI wiki_router - Login gegen /import/wiki/login (abschaltbar via --skip-login) - Titel-Liste via /semantic/pages, Parsing via /parsepage, Info via /info (nur wenn nötig) - Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder - Lookup via /exercise/by-external-id, dann create/update/skip inkl. Zählern - Smoke-Test (--smoke-test): 3 Läufe (create → skip → update) v2.3.3 – Änderungen ggü. 2.3.2: - Stabilerer Fingerprint (Kanonisierung & Whitespace-Normalisierung): • Titel: _ zu Leerzeichen, Gedankenstriche → Bindestrich • summary/execution/notes: Whitespace kollabieren • keywords: dedupliziert (case-insensitiv) & sortiert • duration_minutes: sicher als int - Backcompat beim Update-Entscheid: zusätzlich Neu-Berechnung des Fingerprints aus dem gefundenen Payload (verhindert False-Positives bei Altbeständen ohne/mit abweichendem Fingerprint) - Diagnostik: Gründe im Log (not found / unchanged / changed) und Feld-Diff bei Update - Kein API-/CLI-Bruch """ import os import sys import argparse from typing import Dict, Any, Tuple, Optional from collections.abc import Mapping import requests import mwparserfromhell from dotenv import load_dotenv import hashlib import json import time # ----- Konfiguration / Defaults ----- load_dotenv() # .env laden, falls vorhanden API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ---- Hilfsfunktionen für Wiki-Router ---- def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") def wiki_login(username: str, password: str) -> None: payload = {"username": username, "password": password} r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30) try: data = r.json() except Exception: print(f"[Login] HTTP {r.status_code}: {r.text}") r.raise_for_status() status = (data or {}).get("status") if status != "success": msg = (data or {}).get("message", "Login fehlgeschlagen") raise RuntimeError(f"[Login] {msg}") print("[Login] success") def fetch_all_pages(category: str) -> Dict[str, Any]: resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json() def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, timeout=REQUEST_TIMEOUT ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") wikicode = mwparserfromhell.parse(wikitext) raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid} for tpl in wikicode.filter_templates(): name = str(tpl.name).strip() if name == "ÜbungInfoBox": for p in tpl.params: raw[str(p.name).strip()] = str(p.value).strip() elif name == "Übungsbeschreibung": for p in tpl.params: raw[str(p.name).strip()] = str(p.value).strip() elif name == "SkillDevelopment": raw.setdefault("capabilities", []) try: cap = str(tpl.get("PrimaryCapability").value).strip() except Exception: cap = "" try: lvl = int(str(tpl.get("CapabilityLevel").value).strip()) except Exception: lvl = 0 if cap: raw["capabilities"].append({"capability": cap, "level": lvl}) raw["wikitext"] = wikitext return raw # ---- Fingerprint-Unterstützung (stabil) ---- def _normalize(v: Any) -> str: if v is None: return "" if isinstance(v, (list, tuple)): return ",".join(_normalize(x) for x in v) if isinstance(v, dict): return json.dumps(v, sort_keys=True, ensure_ascii=False) return str(v).strip() def _norm_text(s: str) -> str: if s is None: return "" s = str(s).replace("\u00a0", " ") # NBSP → Space s = s.strip() s = " ".join(s.split()) # Collapse whitespace return s def _canon_title(t: str) -> str: t = (t or "").strip().replace("_", " ") # Gedankenstriche vereinheitlichen return t.replace("–", "-").replace("—", "-") def compute_fingerprint(payload: Dict[str, Any]) -> str: # keywords stabilisieren: trim, dedupe (case-insensitiv), sort kws = payload.get("keywords") or [] kws = sorted({(k or "").strip() for k in kws if (k or "").strip()}, key=str.casefold) # dauer als int dur = payload.get("duration_minutes") or 0 try: dur = int(round(float(dur))) except Exception: dur = 0 fields = [ _canon_title(payload.get("title", "")), _norm_text(payload.get("summary", "")), _norm_text(payload.get("execution", "")), _norm_text(payload.get("notes", "")), dur, payload.get("capabilities", {}), kws, ] base = "|".join(_normalize(f) for f in fields) return hashlib.sha256(base.encode("utf-8")).hexdigest() def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: # Exercise.capabilities erwartet Dict[str,int] caps_list = raw.get("capabilities", []) capabilities = {} for c in caps_list: cap = c.get("capability") lvl = c.get("level") if isinstance(cap, str) and cap: try: capabilities[cap] = int(lvl) except Exception: pass # Defaults/Fallbacks try: duration = float(raw.get("Dauer", 0) or 0) except Exception: duration = 0.0 keywords = [] kw_raw = raw.get("Schlüsselworte", "") if isinstance(kw_raw, str): keywords = [k.strip() for k in kw_raw.split(",") if k.strip()] equipment = [] eq_raw = raw.get("equipment", []) if isinstance(eq_raw, str): equipment = [e.strip() for e in eq_raw.split(",") if e.strip()] elif isinstance(eq_raw, list): equipment = [str(e).strip() for e in eq_raw if str(e).strip()] notes = raw.get("Hinweise", "") or "" if mutate: notes = (notes + " [auto-update]").strip() payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": raw.get("Summary", "") or "", "short_description": raw.get("Summary", "") or "", "keywords": keywords, "link": fullurl or "", "discipline": raw.get("Übungstyp", "") or "", "group": raw.get("Gruppengröße", "") or None, "age_group": raw.get("Altersgruppe", "") or "", "target_group": raw.get("Zielgruppe", "") or "", "min_participants": 1, "duration_minutes": int(round(duration)), "capabilities": capabilities, "category": category or "", "purpose": raw.get("Ziel", "") or "", "execution": raw.get("Durchführung", "") or "", "notes": notes, "preparation": raw.get("RefMethode", "") or "", "method": raw.get("method", "") or "", "equipment": equipment, "fullurl": fullurl or "", "external_id": f"mw:{raw.get('pageid')}", "source": "MediaWiki", } payload["fingerprint"] = compute_fingerprint(payload) return payload def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: url = f"{EXERCISE_API}/by-external-id" try: r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) if r.status_code == 404: return None, 404 r.raise_for_status() return r.json(), r.status_code except requests.HTTPError as e: return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) except Exception as e: return {"error": str(e)}, None def _payload_subset_for_fp(p: Dict[str, Any]) -> Dict[str, Any]: return { "title": p.get("title"), "summary": p.get("summary"), "execution": p.get("execution"), "notes": p.get("notes"), "duration_minutes": p.get("duration_minutes"), "capabilities": p.get("capabilities") or {}, "keywords": p.get("keywords") or [], } def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: title = payload.get("title", "") ext_id = payload.get("external_id") fp_new = payload.get("fingerprint") found, status = lookup_by_external_id(ext_id) action = "create" reason = "not found (lookup 404)" found_payload = {} if not (status == 404 or found is None): if isinstance(found, dict): found_payload = found.get("payload", found) fp_old_stored = found.get("fingerprint") or found_payload.get("fingerprint") fp_old_recalc = compute_fingerprint(_payload_subset_for_fp(found_payload)) if fp_new == fp_old_stored or fp_new == fp_old_recalc: action, reason = "skip", "fingerprint unchanged" else: action, reason = "update", "fingerprint changed" else: action, reason = "create", "unexpected lookup type" if dry_run: print(f"[DryRun] {action.upper():6} '{title}' ({ext_id}) – {reason}") if action == "update": _print_diff(found_payload, payload) return action if action == "create": resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status() print(f"[Create] '{title}' – {reason} -> OK") elif action == "update": payload2 = dict(payload) payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status() print(f"[Update] '{title}' – {reason} -> OK") _print_diff(found_payload, payload) else: print(f"[Skip] '{title}' – {reason}") return action def _print_diff(before: Dict[str, Any], after: Dict[str, Any]) -> None: """Kleines Feld-Diff für die Hash-Felder (Diagnose).""" keys = ["title","summary","execution","notes","duration_minutes","capabilities","keywords"] b = {k: before.get(k) for k in keys} a = {k: after.get(k) for k in keys} # für bessere Lesbarkeit normalisieren wir die Textfelder b_norm = { "title": _canon_title(b.get("title")), "summary": _norm_text(b.get("summary")), "execution": _norm_text(b.get("execution")), "notes": _norm_text(b.get("notes")), "duration_minutes": b.get("duration_minutes"), "capabilities": b.get("capabilities"), "keywords": sorted({(k or "").strip() for k in (b.get("keywords") or [])}, key=str.casefold), } a_norm = { "title": _canon_title(a.get("title")), "summary": _norm_text(a.get("summary")), "execution": _norm_text(a.get("execution")), "notes": _norm_text(a.get("notes")), "duration_minutes": a.get("duration_minutes"), "capabilities": a.get("capabilities"), "keywords": sorted({(k or "").strip() for k in (a.get("keywords") or [])}, key=str.casefold), } diff = {k: (b_norm[k], a_norm[k]) for k in keys if b_norm.get(k) != a_norm.get(k)} if diff: print("[Diff] changes:", json.dumps(diff, ensure_ascii=False)) else: print("[Diff] (none in hash fields)") # ----- Orchestrierung ----- def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str: info = fetch_page_info(title) pid = info.get("pageid") fullurl = info.get("fullurl") or "" if not pid: print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr) return "failed" raw = parse_exercise(title, pid) payload = build_payload(raw, fullurl, category, mutate=mutate) return upsert_exercise(payload, dry_run=dry_run) def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]: stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") pages = fetch_all_pages(category) print(f"[Main] {len(pages)} Seiten gefunden.") for title, entry in pages.items(): try: getter = getattr(entry, "get", None) if callable(getter): pid = getter("pageid") fullurl = getter("fullurl") else: pid = None fullurl = None if not pid: info = fetch_page_info(title) pid = info.get("pageid") fullurl = fullurl or info.get("fullurl") if not pid: print(f"[Skip] '{title}' hat keine pageid") stats["failed"] += 1 continue raw = parse_exercise(title, pid) payload = build_payload(raw, fullurl or "", category) act = upsert_exercise(payload, dry_run=dry_run) if act == "create": stats["created"] += 1 elif act == "update": stats["updated"] += 1 elif act == "skip": stats["skipped"] += 1 except requests.HTTPError as e: code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None if code == 404: print(f"[Skip] '{title}': page not found (404)") stats["failed"] += 1 else: print(f"[Error] '{title}': {e}") stats["failed"] += 1 except Exception as e: print(f"[Error] '{title}': {e}") stats["failed"] += 1 return stats def run_smoke_test(title: str, category: str) -> None: print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)") act1 = process_one(title, category, mutate=False) print("[SmokeTest] Aktion:", act1) print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)") act2 = process_one(title, category, mutate=False) print("[SmokeTest] Aktion:", act2) print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')") act3 = process_one(title, category, mutate=True) print("[SmokeTest] Aktion:", act3) print("\n[SmokeTest] Zusammenfassung:") print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) # ----- Main ----- def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels") parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')") parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) + Gründe loggen") parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") args = parser.parse_args() # Sanity wiki_health() # Login (sofern nicht explizit übersprungen) if not args.skip_login: if not args.username or not args.password: print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr) sys.exit(1) try: wiki_login(args.username, args.password) except Exception as e: print(str(e), file=sys.stderr) sys.exit(1) if args.smoke_test: run_smoke_test(args.title, args.category) return if args.all: stats = process_all(args.category, dry_run=args.dry_run) print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) else: print(f"[Main] Import single exercise: {args.title}") result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run) print(f"[Result] {result}") if __name__ == "__main__": main()