#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Module: wiki_importer.py Beschreibung: - Importiert Übungen aus dem MediaWiki via FastAPI wiki_router - Login gegen /import/wiki/login (abschaltbar via --skip-login) - Titel-Liste via /semantic/pages, Parsing via /parsepage, Info via /info (nur wenn nötig) - Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder - Lookup via /exercise/by-external-id, dann create/update/skip inkl. Zählern - Smoke-Test (--smoke-test): 3 Läufe (create → skip → update) v2.3.4 – Änderungen ggü. 2.3.3: - **Robuste Template-Erkennung**: Namen werden unicode-normalisiert & diakritik-insensitiv verglichen (z. B. "ÜbungInfoBox" == "UebungInfoBox" == "uebunginfobox"). - **Feld-Synonyme & Key-Normalisierung**: "summary/execution/duration/keywords/..." werden über mehrere mögliche Parameternamen aufgelöst (z. B. Durchführung/Durchfuehrung/Ablauf). - Ziel: Verhindert leere Felder beim 2. Lauf und damit fälschliche Updates. """ import os import sys import argparse from typing import Dict, Any, Tuple, Optional, List from collections.abc import Mapping import requests import mwparserfromhell from dotenv import load_dotenv import hashlib import json import time import unicodedata # ----- Konfiguration / Defaults ----- load_dotenv() # .env laden, falls vorhanden API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ---- Unicode-/Key-Normalisierung ---- def _norm_unicode(s: str) -> str: return unicodedata.normalize("NFKC", s) def _strip_diacritics(s: str) -> str: return "".join(ch for ch in unicodedata.normalize("NFD", s) if not unicodedata.combining(ch)) def _norm_key(s: str) -> str: s = _norm_unicode(s or "") s = _strip_diacritics(s) s = s.strip().casefold() return s # Template-Aliasse (normalisierte Namen) TPL_UEBUNG_INFOBOX = {"ubunginfobox", "uebunginfobox"} TPL_UEBUNGSBESCHREIBUNG = {"ubungsbeschreibung", "uebungsbeschreibung"} TPL_SKILLDEV = {"skilldevelopment"} # Synonyme für Parameter (normalisierte Keys) KEYS_SUMMARY = ["summary", "kurzbeschreibung", "beschreibung", "kurztext"] KEYS_EXECUTION = ["durchführung", "durchfuehrung", "ausführung", "ausfuehrung", "execution", "ablauf"] KEYS_DURATION = ["dauer", "zeit", "dauer_minuten", "dauer (min)", "minuten"] KEYS_KEYWORDS = ["schlüsselworte", "schluesselworte", "keywords", "tags"] KEYS_EQUIPMENT = ["equipment", "geräte", "geraete", "material"] KEYS_DISCIPLINE = ["übungstyp", "uebungstyp", "discipline"] KEYS_GROUP = ["gruppengröße", "gruppengroesse", "group"] KEYS_AGE_GROUP = ["altersgruppe"] KEYS_TARGET_GROUP = ["zielgruppe", "target_group"] KEYS_PURPOSE = ["ziel", "zweck", "purpose"] KEYS_PREPARATION = ["refmethode", "vorbereitung", "preparation"] KEYS_METHOD = ["method", "methode"] KEYS_NOTES = ["hinweise", "notes"] # ---- Hilfsfunktionen ---- def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") def wiki_login(username: str, password: str) -> None: payload = {"username": username, "password": password} r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30) try: data = r.json() except Exception: print(f"[Login] HTTP {r.status_code}: {r.text}") r.raise_for_status() status = (data or {}).get("status") if status != "success": msg = (data or {}).get("message", "Login fehlgeschlagen") raise RuntimeError(f"[Login] {msg}") print("[Login] success") def fetch_all_pages(category: str) -> Dict[str, Any]: resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json() def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, timeout=REQUEST_TIMEOUT ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") wikicode = mwparserfromhell.parse(wikitext) raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid} # Templates sammeln (robust gegen Varianten) for tpl in wikicode.filter_templates(): name_raw = str(tpl.name) name_norm = _norm_key(name_raw) if name_norm in TPL_UEBUNG_INFOBOX: for p in tpl.params: raw[str(p.name).strip()] = str(p.value).strip() elif name_norm in TPL_UEBUNGSBESCHREIBUNG: for p in tpl.params: raw[str(p.name).strip()] = str(p.value).strip() elif name_norm in TPL_SKILLDEV: raw.setdefault("capabilities", []) # Standard-Keys (engl. Template) def _getp(t, k): try: return str(t.get(k).value).strip() except Exception: return "" cap = _getp(tpl, "PrimaryCapability") lvl = _getp(tpl, "CapabilityLevel") try: lvl_i = int(lvl) except Exception: lvl_i = 0 if cap: raw["capabilities"].append({"capability": cap, "level": lvl_i}) raw["wikitext"] = wikitext return raw # ---- Fingerprint-Unterstützung (stabil) ---- def _normalize(v: Any) -> str: if v is None: return "" if isinstance(v, (list, tuple)): return ",".join(_normalize(x) for x in v) if isinstance(v, dict): return json.dumps(v, sort_keys=True, ensure_ascii=False) return str(v).strip() def _norm_text(s: str) -> str: if s is None: return "" s = str(s).replace("\u00a0", " ") # NBSP → Space s = s.strip() s = " ".join(s.split()) # Collapse whitespace return s def _canon_title(t: str) -> str: t = (t or "").strip().replace("_", " ") return t.replace("–", "-").replace("—", "-") def compute_fingerprint(payload: Dict[str, Any]) -> str: kws = payload.get("keywords") or [] kws = sorted({(k or "").strip() for k in kws if (k or "").strip()}, key=str.casefold) dur = payload.get("duration_minutes") or 0 try: dur = int(round(float(dur))) except Exception: dur = 0 fields = [ _canon_title(payload.get("title", "")), _norm_text(payload.get("summary", "")), _norm_text(payload.get("execution", "")), _norm_text(payload.get("notes", "")), dur, payload.get("capabilities", {}), kws, ] base = "|".join(_normalize(f) for f in fields) return hashlib.sha256(base.encode("utf-8")).hexdigest() # ---- Feldauflösung (Synonyme) ---- def _norm_keymap(d: Dict[str, Any]) -> Dict[str, Any]: return { _norm_key(k): v for k, v in d.items() if isinstance(k, str) } def _get_first(d: Dict[str, Any], candidates: List[str]) -> Any: m = _norm_keymap(d) for c in candidates: v = m.get(c) if v not in (None, ""): return v return None def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: # Exercise.capabilities erwartet Dict[str,int] caps_list = raw.get("capabilities", []) capabilities: Dict[str, int] = {} for c in caps_list: cap = c.get("capability") lvl = c.get("level") if isinstance(cap, str) and cap: try: capabilities[cap] = int(lvl) except Exception: pass # Defaults/Fallbacks via Synonyme # summary / execution summary = _get_first(raw, KEYS_SUMMARY) or "" execution = _get_first(raw, KEYS_EXECUTION) or "" # duration duration = _get_first(raw, KEYS_DURATION) try: duration_f = float(duration or 0) except Exception: duration_f = 0.0 # keywords kw_raw = _get_first(raw, KEYS_KEYWORDS) keywords: List[str] = [] if isinstance(kw_raw, str): keywords = [k.strip() for k in kw_raw.split(",") if k.strip()] # equipment eq_raw = _get_first(raw, KEYS_EQUIPMENT) equipment: List[str] = [] if isinstance(eq_raw, str): equipment = [e.strip() for e in eq_raw.split(",") if e.strip()] elif isinstance(eq_raw, list): equipment = [str(e).strip() for e in eq_raw if str(e).strip()] notes = _get_first(raw, KEYS_NOTES) or "" if mutate: notes = (str(notes) + " [auto-update]").strip() discipline = _get_first(raw, KEYS_DISCIPLINE) or "" group = _get_first(raw, KEYS_GROUP) or None age_group = _get_first(raw, KEYS_AGE_GROUP) or "" target_group = _get_first(raw, KEYS_TARGET_GROUP) or "" purpose = _get_first(raw, KEYS_PURPOSE) or "" preparation = _get_first(raw, KEYS_PREPARATION) or "" method = _get_first(raw, KEYS_METHOD) or "" payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": str(summary) or "", "short_description": str(summary) or "", "keywords": keywords, "link": fullurl or "", "discipline": str(discipline) or "", "group": str(group) if group else None, "age_group": str(age_group) or "", "target_group": str(target_group) or "", "min_participants": 1, "duration_minutes": int(round(duration_f)), "capabilities": capabilities, "category": category or "", "purpose": str(purpose) or "", "execution": str(execution) or "", "notes": str(notes) or "", "preparation": str(preparation) or "", "method": str(method) or "", "equipment": equipment, "fullurl": fullurl or "", "external_id": f"mw:{raw.get('pageid')}", "source": "MediaWiki", } payload["fingerprint"] = compute_fingerprint(payload) return payload # ---- Lookup/Upsert ---- def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: url = f"{EXERCISE_API}/by-external-id" try: r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) if r.status_code == 404: return None, 404 r.raise_for_status() return r.json(), r.status_code except requests.HTTPError as e: return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) except Exception as e: return {"error": str(e)}, None def _payload_subset_for_fp(p: Dict[str, Any]) -> Dict[str, Any]: return { "title": p.get("title"), "summary": p.get("summary"), "execution": p.get("execution"), "notes": p.get("notes"), "duration_minutes": p.get("duration_minutes"), "capabilities": p.get("capabilities") or {}, "keywords": p.get("keywords") or [], } def _print_diff(before: Dict[str, Any], after: Dict[str, Any]) -> None: keys = ["title","summary","execution","notes","duration_minutes","capabilities","keywords"] b = {k: before.get(k) for k in keys} a = {k: after.get(k) for k in keys} def _kws(x): return sorted({(k or "").strip() for k in (x or [])}, key=str.casefold) b_norm = { "title": _canon_title(b.get("title")), "summary": _norm_text(b.get("summary")), "execution": _norm_text(b.get("execution")), "notes": _norm_text(b.get("notes")), "duration_minutes": b.get("duration_minutes"), "capabilities": b.get("capabilities"), "keywords": _kws(b.get("keywords")), } a_norm = { "title": _canon_title(a.get("title")), "summary": _norm_text(a.get("summary")), "execution": _norm_text(a.get("execution")), "notes": _norm_text(a.get("notes")), "duration_minutes": a.get("duration_minutes"), "capabilities": a.get("capabilities"), "keywords": _kws(a.get("keywords")), } diff = {k: (b_norm[k], a_norm[k]) for k in keys if b_norm.get(k) != a_norm.get(k)} if diff: print("[Diff] changes:", json.dumps(diff, ensure_ascii=False)) else: print("[Diff] (none in hash fields)") def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: title = payload.get("title", "") ext_id = payload.get("external_id") fp_new = payload.get("fingerprint") found, status = lookup_by_external_id(ext_id) action = "create" reason = "not found (lookup 404)" found_payload = {} if not (status == 404 or found is None): if isinstance(found, dict): found_payload = found.get("payload", found) fp_old_stored = found.get("fingerprint") or found_payload.get("fingerprint") fp_old_recalc = compute_fingerprint(_payload_subset_for_fp(found_payload)) if fp_new == fp_old_stored or fp_new == fp_old_recalc: action, reason = "skip", "fingerprint unchanged" else: action, reason = "update", "fingerprint changed" else: action, reason = "create", "unexpected lookup type" if dry_run: print(f"[DryRun] {action.upper():6} '{title}' ({ext_id}) - {reason}") if action == "update": _print_diff(found_payload, payload) return action if action == "create": resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status() print(f"[Create] '{title}' – {reason} -> OK") elif action == "update": payload2 = dict(payload) payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status() print(f"[Update] '{title}' - {reason} -> OK") _print_diff(found_payload, payload) else: print(f"[Skip] '{title}' - {reason}") return action # ----- Orchestrierung ----- def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str: info = fetch_page_info(title) pid = info.get("pageid") fullurl = info.get("fullurl") or "" if not pid: print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr) return "failed" raw = parse_exercise(title, pid) payload = build_payload(raw, fullurl, category, mutate=mutate) return upsert_exercise(payload, dry_run=dry_run) def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]: stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") pages = fetch_all_pages(category) print(f"[Main] {len(pages)} Seiten gefunden.") for title, entry in pages.items(): try: getter = getattr(entry, "get", None) if callable(getter): pid = getter("pageid") fullurl = getter("fullurl") else: pid = None fullurl = None if not pid: info = fetch_page_info(title) pid = info.get("pageid") fullurl = fullurl or info.get("fullurl") if not pid: print(f"[Skip] '{title}' hat keine pageid") stats["failed"] += 1 continue raw = parse_exercise(title, pid) payload = build_payload(raw, fullurl or "", category) act = upsert_exercise(payload, dry_run=dry_run) if act == "create": stats["created"] += 1 elif act == "update": stats["updated"] += 1 elif act == "skip": stats["skipped"] += 1 except requests.HTTPError as e: code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None if code == 404: print(f"[Skip] '{title}': page not found (404)") stats["failed"] += 1 else: print(f"[Error] '{title}': {e}") stats["failed"] += 1 except Exception as e: print(f"[Error] '{title}': {e}") stats["failed"] += 1 return stats def run_smoke_test(title: str, category: str) -> None: print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)") act1 = process_one(title, category, mutate=False) print("[SmokeTest] Aktion:", act1) print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)") act2 = process_one(title, category, mutate=False) print("[SmokeTest] Aktion:", act2) print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')") act3 = process_one(title, category, mutate=True) print("[SmokeTest] Aktion:", act3) print("\n[SmokeTest] Zusammenfassung:") print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) # ----- Main ----- def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels") parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')") parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) + Gründe loggen") parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") args = parser.parse_args() # Sanity wiki_health() # Login (sofern nicht explizit übersprungen) if not args.skip_login: if not args.username or not args.password: print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr) sys.exit(1) try: wiki_login(args.username, args.password) except Exception as e: print(str(e), file=sys.stderr) sys.exit(1) if args.smoke_test: run_smoke_test(args.title, args.category) return if args.all: stats = process_all(args.category, dry_run=args.dry_run) print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) else: print(f"[Main] Import single exercise: {args.title}") result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run) print(f"[Result] {result}") if __name__ == "__main__": main()