#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ wiki_importer.py – v2.3.8 Ziel dieses Patches: Die Felder `discipline`, `execution`, `keywords`, `equipment`, `duration_minutes` usw. kommen bei dir teilweise leer an. Ursache sind zu aggressive Normalisierungen/Matcher. Fix (konservativ & robust): - Parser liest jetzt **gezielt** die bekannten Templates **ohne** Over-Normalisierung: • `{{ÜbungInfoBox}}` / `{{UebungInfoBox}}` • `{{Übungsbeschreibung}}` / `{{Uebungsbeschreibung}}` • `{{Hilfsmittel}}` • `{{SkillDevelopment}}` - Feld-Extraktion nutzt **zuerst die exakten Wiki-Parameternamen** (deutsch/mit Umlauten), erst danach schmale Synonym-Fallbacks. Das stellt sicher, dass z. B. `Schlüsselworte=` wirklich in `keywords` landet. - `imported_at` wird bei **Create und Update** gesetzt. - Optionales Debugging: `--debug-raw` druckt die gefundenen Raw-Keys (einfach, nachvollziehbar). Bestehende API-Endpunkte bleiben unverändert. """ import os import sys import argparse from typing import Dict, Any, Tuple, Optional, List import requests import mwparserfromhell from dotenv import load_dotenv import hashlib import json import time # ----- Konfiguration / Defaults ----- load_dotenv() API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen") DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen") REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # ----- Helpers für Wiki-Router ----- def wiki_health() -> None: r = requests.get(f"{API_BASE_URL}/health", timeout=15) r.raise_for_status() print("[Sanity] Wiki health OK") def wiki_login(username: str, password: str) -> None: payload = {"username": username, "password": password} r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30) try: data = r.json() except Exception: print(f"[Login] HTTP {r.status_code}: {r.text}") r.raise_for_status() status = (data or {}).get("status") if status != "success": msg = (data or {}).get("message", "Login fehlgeschlagen") raise RuntimeError(f"[Login] {msg}") print("[Login] success") def fetch_all_pages(category: str) -> Dict[str, Any]: resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return resp.json() def fetch_page_info(title: str) -> Dict[str, Any]: r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30) r.raise_for_status() info = r.json() return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")} # ----- Parser (konservativ) ----- T_INFOS = {"ÜbungInfoBox", "UebungInfoBox"} T_BESCHR = {"Übungsbeschreibung", "Uebungsbeschreibung"} T_HILFS = {"Hilfsmittel"} T_SKILL = {"SkillDevelopment"} def parse_exercise(title: str, pageid: int) -> Dict[str, Any]: print(f"[Parse] Lade '{title}' (ID={pageid})") resp = requests.get( f"{API_BASE_URL}/parsepage", params={"pageid": pageid, "title": title}, timeout=REQUEST_TIMEOUT, ) resp.raise_for_status() wikitext = resp.json().get("wikitext", "") wikicode = mwparserfromhell.parse(wikitext) raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid, "wikitext": wikitext} for tpl in wikicode.filter_templates(): name = str(tpl.name).strip() if name in T_INFOS or name in T_BESCHR or name in T_HILFS: for p in tpl.params: key = str(p.name).strip() val = str(p.value).strip() raw[key] = val elif name in T_SKILL: raw.setdefault("capabilities", []) def _getp(t, k): try: return str(t.get(k).value).strip() except Exception: return "" cap = _getp(tpl, "PrimaryCapability") lvl = _getp(tpl, "CapabilityLevel") try: lvl_i = int(lvl) except Exception: lvl_i = 0 if cap: raw["capabilities"].append({"capability": cap, "level": lvl_i}) return raw # ----- Fingerprint (stabil, wie zuvor) ----- def _normalize(v: Any) -> str: if v is None: return "" if isinstance(v, (list, tuple)): return ",".join(_normalize(x) for x in v) if isinstance(v, dict): return json.dumps(v, sort_keys=True, ensure_ascii=False) return str(v).strip() def _norm_text(s: str) -> str: if s is None: return "" s = str(s).replace("\u00a0", " ") s = " ".join(s.split()) return s.strip() def _canon_title(t: str) -> str: t = (t or "").strip().replace("_", " ") return t.replace("–", "-").replace("—", "-") def compute_fingerprint(payload: Dict[str, Any]) -> str: kws = payload.get("keywords") or [] kws = [k.replace("\u2013", "-").replace("\u2014", "-") for k in kws] kws = sorted({(k or "").strip() for k in kws if (k or "").strip()}, key=str.casefold) dur = payload.get("duration_minutes") or 0 try: dur = int(round(float(dur))) except Exception: dur = 0 fields = [ _canon_title(payload.get("title", "")), _norm_text(payload.get("summary", "")), _norm_text(payload.get("execution", "")), _norm_text(payload.get("notes", "")), dur, payload.get("capabilities", {}), kws, ] base = "|".join(_normalize(f) for f in fields) return hashlib.sha256(base.encode("utf-8")).hexdigest() # ----- Payload (exakte DE-Keys zuerst, dann schmale Fallbacks) ----- EXACT_KEYS = { "summary": ["Summary", "Kurzbeschreibung"], "execution": ["Durchführung", "Durchfuehrung", "Ablauf"], "duration": ["Dauer", "Zeit"], "keywords": ["Schlüsselworte", "Schlüsselwörter", "Schluesselworte", "Schluesselwoerter", "Keywords", "Tags"], "equipment_prim": ["Hilfsmittel"], "equipment_alt": ["Geräte", "Geraete", "Gerät", "Geraet", "Material"], "discipline": ["Übungstyp", "Uebungstyp", "Disziplin"], "group": ["Gruppengröße", "Gruppengroesse", "Group"], "age_group": ["Altersgruppe"], "target_group": ["Zielgruppe"], "purpose": ["Ziel", "Zweck"], "notes": ["Hinweise", "Notes"], "preparation": ["Vorbereitung", "RefMethode"], "method": ["Methode", "Method"], } def _first_any(raw: Dict[str, Any], keys: List[str]) -> Optional[str]: for k in keys: v = raw.get(k) if isinstance(v, str) and v.strip(): return v.strip() return None def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]: # Capabilities -> Dict[str,int] capabilities: Dict[str, int] = {} for c in raw.get("capabilities", []) or []: cap = c.get("capability"); lvl = c.get("level") if isinstance(cap, str) and cap: try: capabilities[cap] = int(lvl) except Exception: pass # Exakte Schlüssel zuerst summary = _first_any(raw, EXACT_KEYS["summary"]) or "" execution = _first_any(raw, EXACT_KEYS["execution"]) or "" duration = _first_any(raw, EXACT_KEYS["duration"]) or "0" kw_raw = _first_any(raw, EXACT_KEYS["keywords"]) or "" if kw_raw: parts = [p.strip() for p in kw_raw.replace("\n", ",").split(",")] keywords = [p for p in parts if p] else: keywords = [] eq_raw = _first_any(raw, EXACT_KEYS["equipment_prim"]) or _first_any(raw, EXACT_KEYS["equipment_alt"]) or "" if eq_raw: equipment = [e.strip() for e in eq_raw.replace("\n", ",").split(",") if e.strip()] else: equipment = [] notes = _first_any(raw, EXACT_KEYS["notes"]) or "" discipline = _first_any(raw, EXACT_KEYS["discipline"]) or "" group = _first_any(raw, EXACT_KEYS["group"]) or None age_group = _first_any(raw, EXACT_KEYS["age_group"]) or "" target_group = _first_any(raw, EXACT_KEYS["target_group"]) or "" purpose = _first_any(raw, EXACT_KEYS["purpose"]) or "" preparation = _first_any(raw, EXACT_KEYS["preparation"]) or "" method = _first_any(raw, EXACT_KEYS["method"]) or "" try: duration_f = float(duration or 0) except Exception: duration_f = 0.0 payload: Dict[str, Any] = { "title": raw.get("title") or "", "summary": summary, "short_description": summary, "keywords": keywords, "link": fullurl or "", "discipline": discipline, "group": group, "age_group": age_group, "target_group": target_group, "min_participants": 1, "duration_minutes": int(round(duration_f)), "capabilities": capabilities, "category": category or "", "purpose": purpose, "execution": execution, "notes": (notes + (" [auto-update]" if mutate else "")).strip(), "preparation": preparation, "method": method, "equipment": equipment, "fullurl": fullurl or "", "external_id": f"mw:{raw.get('pageid')}", "source": "MediaWiki", } payload["fingerprint"] = compute_fingerprint(payload) return payload # ----- Lookup/Upsert ----- def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]: url = f"{EXERCISE_API}/by-external-id" try: r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT) if r.status_code == 404: return None, 404 r.raise_for_status() return r.json(), r.status_code except requests.HTTPError as e: return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None) except Exception as e: return {"error": str(e)}, None def _payload_subset_for_fp(p: Dict[str, Any]) -> Dict[str, Any]: return { "title": p.get("title"), "summary": p.get("summary"), "execution": p.get("execution"), "notes": p.get("notes"), "duration_minutes": p.get("duration_minutes"), "capabilities": p.get("capabilities") or {}, "keywords": p.get("keywords") or [], } def _print_diff(before: Dict[str, Any], after: Dict[str, Any]) -> None: keys = ["title","summary","execution","notes","duration_minutes","capabilities","keywords"] b = {k: before.get(k) for k in keys} a = {k: after.get(k) for k in keys} def _kws(x): return sorted({(k or "").strip() for k in (x or [])}, key=str.casefold) b_norm = { "title": _canon_title(b.get("title")), "summary": _norm_text(b.get("summary")), "execution": _norm_text(b.get("execution")), "notes": _norm_text(b.get("notes")), "duration_minutes": b.get("duration_minutes"), "capabilities": b.get("capabilities"), "keywords": _kws(b.get("keywords")), } a_norm = { "title": _canon_title(a.get("title")), "summary": _norm_text(a.get("summary")), "execution": _norm_text(a.get("execution")), "notes": _norm_text(a.get("notes")), "duration_minutes": a.get("duration_minutes"), "capabilities": a.get("capabilities"), "keywords": _kws(a.get("keywords")), } diff = {k: (b_norm[k], a_norm[k]) for k in keys if b_norm.get(k) != a_norm.get(k)} if diff: print("[Diff] changes:", json.dumps(diff, ensure_ascii=False)) else: print("[Diff] (none in hash fields)") def _now_iso() -> str: return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str: title = payload.get("title", "") ext_id = payload.get("external_id") fp_new = payload.get("fingerprint") found, status = lookup_by_external_id(ext_id) action = "create"; reason = "not found (lookup 404)"; found_payload = {} if not (status == 404 or found is None): if isinstance(found, dict): found_payload = found.get("payload", found) fp_old_stored = found.get("fingerprint") or found_payload.get("fingerprint") fp_old_recalc = compute_fingerprint(_payload_subset_for_fp(found_payload)) if fp_new == fp_old_stored or fp_new == fp_old_recalc: action, reason = "skip", "fingerprint unchanged" else: action, reason = "update", "fingerprint changed" else: action, reason = "create", "unexpected lookup type" if dry_run: print(f"[DryRun] {action.upper():6} '{title}' ({ext_id}) – {reason}") if action == "update": _print_diff(found_payload, payload) return action if action == "create": body = dict(payload); body["imported_at"] = _now_iso() resp = requests.post(EXERCISE_API, json=body, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status(); print(f"[Create] '{title}' – {reason} -> OK") elif action == "update": body = dict(payload); body["imported_at"] = _now_iso() resp = requests.post(EXERCISE_API, json=body, timeout=REQUEST_TIMEOUT) if resp.status_code == 422: print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}") try: resp.raise_for_status() except Exception: pass else: resp.raise_for_status(); print(f"[Update] '{title}' – {reason} -> OK"); _print_diff(found_payload, payload) else: print(f"[Skip] '{title}' – {reason}") return action # ----- Orchestrierung ----- def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False, debug_raw: bool = False) -> str: info = fetch_page_info(title) pid = info.get("pageid"); fullurl = info.get("fullurl") or "" if not pid: print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr); return "failed" raw = parse_exercise(title, pid) if debug_raw: print("[Debug] Raw-Keys:", sorted([k for k in raw.keys() if k not in {"wikitext"}])) payload = build_payload(raw, fullurl, category, mutate=mutate) return upsert_exercise(payload, dry_run=dry_run) def process_all(category: str, *, dry_run: bool = False, debug_raw: bool = False) -> Dict[str, int]: stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0} print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'…") pages = fetch_all_pages(category) print(f"[Main] {len(pages)} Seiten gefunden.") for idx, (title, entry) in enumerate(pages.items(), 1): try: getter = getattr(entry, "get", None) pid = getter("pageid") if callable(getter) else None fullurl = getter("fullurl") if callable(getter) else None if not pid: info = fetch_page_info(title); pid = info.get("pageid"); fullurl = fullurl or info.get("fullurl") if not pid: print(f"[Skip] '{title}' hat keine pageid"); stats["failed"] += 1; continue raw = parse_exercise(title, pid) if debug_raw and idx <= 5: print(f"[Debug] #{idx} '{title}' Raw-Keys:", sorted([k for k in raw.keys() if k not in {"wikitext"}])) payload = build_payload(raw, fullurl or "", category) act = upsert_exercise(payload, dry_run=dry_run) stats["created" if act=="create" else "updated" if act=="update" else "skipped"] += 1 except requests.HTTPError as e: code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None if code == 404: print(f"[Skip] '{title}': page not found (404)"); stats["failed"] += 1 else: print(f"[Error] '{title}': {e}"); stats["failed"] += 1 except Exception as e: print(f"[Error] '{title}': {e}"); stats["failed"] += 1 return stats def run_smoke_test(title: str, category: str, *, debug_raw: bool = False) -> None: print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)"); act1 = process_one(title, category, mutate=False, debug_raw=debug_raw); print("[SmokeTest] Aktion:", act1) print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)"); act2 = process_one(title, category, mutate=False, debug_raw=debug_raw); print("[SmokeTest] Aktion:", act2) print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')"); act3 = process_one(title, category, mutate=True, debug_raw=debug_raw); print("[SmokeTest] Aktion:", act3) print("\n[SmokeTest] Zusammenfassung:"); print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2)) # ----- Main ----- def main() -> None: parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)") parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)") parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels") parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')") parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)") parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)") parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)") parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) + Gründe loggen") parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title") parser.add_argument("--debug-raw", action="store_true", help="Zeigt die aus dem Wiki gelesenen Roh-Keys je Seite") args = parser.parse_args() wiki_health() if not args.skip_login: if not args.username or not args.password: print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr) sys.exit(1) try: wiki_login(args.username, args.password) except Exception as e: print(str(e), file=sys.stderr); sys.exit(1) if args.smoke_test: run_smoke_test(args.title, args.category, debug_raw=args.debug_raw); return if args.all: stats = process_all(args.category, dry_run=args.dry_run, debug_raw=args.debug_raw) print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats)) else: print(f"[Main] Import single exercise: {args.title}") result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run, debug_raw=args.debug_raw) print(f"[Result] {result}") if __name__ == "__main__": main()