Trainer_LLM/scripts/wiki_importer.py
Lars 97bc283ce1
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 2s
scripts/wiki_importer.py aktualisiert
2025-08-11 11:12:19 +02:00

412 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Module: wiki_importer.py
Beschreibung:
- Importiert Übungen aus dem MediaWiki via FastAPI wiki_router
- Führt vor dem Import einen Login gegen /import/wiki/login durch (falls nicht via --skip-login deaktiviert)
- Holt Liste aller Übungs-Titel (SMW-Ask) via `/semantic/pages`
- Für jede Übung:
* Fetch pageinfo (pageid, fullurl) via `/info`
* Parse Wikitext (Templates: ÜbungInfoBox, Übungsbeschreibung, SkillDevelopment) via `/parsepage`
* Baut Payload entsprechend Exercise-Datenmodell
* Idempotentes Upsert: external_id="mw:{pageid}", Fingerprint (sha256) über Kernfelder,
Lookup via `/exercise/by-external-id`, dann create/update/skip inkl. Zählern.
- Unterstützt Single-Import via `--title` (oder ENV `WIKI_EXERCISE_TITLE`) und Full-Import via `--all`
- Optional: Credentials via CLI (--username/--password) oder `.env` (WIKI_BOT_USER / WIKI_BOT_PASSWORD)
- Smoke-Test (`--smoke-test`): 3 Läufe nacheinander (create → skip → update), ohne API-Signaturen zu ändern.
Version: 2.3.1
"""
import os
import sys
import argparse
from typing import Dict, Any, Tuple, Optional
import requests
import mwparserfromhell
from dotenv import load_dotenv
import hashlib
import json
import time
# ----- Konfiguration / Defaults -----
load_dotenv() # .env laden, falls vorhanden
# CHANGED: Basis-URLs klar getrennt (Wiki-Proxy vs. Exercise-API)
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki") # FastAPI-Wiki-Proxy
EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise") # Exercise-Endpoint (Basis, ohne Slash am Ende)
DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen")
DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen")
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60"))
# ---- Hilfsfunktionen für Wiki-Router ----
def wiki_health() -> None:
r = requests.get(f"{API_BASE_URL}/health", timeout=15)
r.raise_for_status()
print("[Sanity] Wiki health OK")
def wiki_login(username: str, password: str) -> None:
"""
Führt einen Login gegen den wiki_router durch.
Erwartet: {"status":"success"} bei Erfolg.
"""
payload = {"username": username, "password": password}
r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30)
# kein raise_for_status(), wir wollen die JSON-Fehler sauber ausgeben
try:
data = r.json()
except Exception:
print(f"[Login] HTTP {r.status_code}: {r.text}")
r.raise_for_status()
status = (data or {}).get("status")
if status != "success":
msg = (data or {}).get("message", "Login fehlgeschlagen")
raise RuntimeError(f"[Login] {msg}")
print("[Login] success")
def fetch_all_pages(category: str) -> Dict[str, Any]:
resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.json()
def fetch_page_info(title: str) -> Dict[str, Any]:
r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30)
r.raise_for_status()
info = r.json()
return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")}
def parse_exercise(title: str, pageid: int) -> Dict[str, Any]:
print(f"[Parse] Lade '{title}' (ID={pageid})")
resp = requests.get(
f"{API_BASE_URL}/parsepage",
params={"pageid": pageid, "title": title},
timeout=REQUEST_TIMEOUT
)
resp.raise_for_status()
wikitext = resp.json().get("wikitext", "")
wikicode = mwparserfromhell.parse(wikitext)
raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid}
for tpl in wikicode.filter_templates():
name = str(tpl.name).strip()
if name == "ÜbungInfoBox":
for p in tpl.params:
raw[str(p.name).strip()] = str(p.value).strip()
elif name == "Übungsbeschreibung":
for p in tpl.params:
raw[str(p.name).strip()] = str(p.value).strip()
elif name == "SkillDevelopment":
raw.setdefault("capabilities", [])
try:
cap = str(tpl.get("PrimaryCapability").value).strip()
except Exception:
cap = ""
try:
lvl = int(str(tpl.get("CapabilityLevel").value).strip())
except Exception:
lvl = 0
if cap:
raw["capabilities"].append({"capability": cap, "level": lvl})
raw["wikitext"] = wikitext
return raw
# NEW: Stabile Normalisierung für Hash-Bildung
def _normalize(v: Any) -> str:
if v is None:
return ""
if isinstance(v, (list, tuple)):
return ",".join(_normalize(x) for x in v)
if isinstance(v, dict):
# sort by key for stable hash
return json.dumps(v, sort_keys=True, ensure_ascii=False)
return str(v).strip()
# NEW: Fingerprint über Kernfelder
def compute_fingerprint(payload: Dict[str, Any]) -> str:
"""sha256 über Kernfelder: title, summary, execution, notes, duration_minutes, capabilities, keywords"""
fields = [
payload.get("title", ""),
payload.get("summary", ""),
payload.get("execution", ""),
payload.get("notes", ""),
payload.get("duration_minutes", 0),
payload.get("capabilities", {}),
payload.get("keywords", []),
]
base = "|".join(_normalize(f) for f in fields)
return hashlib.sha256(base.encode("utf-8")).hexdigest()
# CHANGED: Payload inkl. external_id (mw:{pageid}) + fingerprint
def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]:
# Exercise.capabilities erwartet Dict[str,int]
caps_list = raw.get("capabilities", [])
capabilities = {}
for c in caps_list:
cap = c.get("capability")
lvl = c.get("level")
if isinstance(cap, str) and cap:
try:
capabilities[cap] = int(lvl)
except Exception:
pass
# Defaults/Fallbacks
duration = 0.0
try:
duration = float(raw.get("Dauer", 0) or 0)
except Exception:
duration = 0.0
keywords = []
kw_raw = raw.get("Schlüsselworte", "")
if isinstance(kw_raw, str):
keywords = [k.strip() for k in kw_raw.split(",") if k.strip()]
equipment = []
eq_raw = raw.get("equipment", [])
if isinstance(eq_raw, str):
equipment = [e.strip() for e in eq_raw.split(",") if e.strip()]
elif isinstance(eq_raw, list):
equipment = [str(e).strip() for e in eq_raw if str(e).strip()]
notes = raw.get("Hinweise", "") or ""
if mutate:
# Für Smoke-Test (3. Lauf) geringfügige Änderung erzeugen
notes = (notes + " [auto-update]").strip()
payload: Dict[str, Any] = {
"title": raw.get("title") or "",
"summary": raw.get("Summary", "") or "",
"short_description": raw.get("Summary", "") or "",
"keywords": keywords,
"link": fullurl or "",
"discipline": raw.get("Übungstyp", "") or "",
"group": raw.get("Gruppengröße", "") or None,
"age_group": raw.get("Altersgruppe", "") or "",
"target_group": raw.get("Zielgruppe", "") or "",
"min_participants": 1,
"duration_minutes": int(round(duration)), # Exercise erwartet int
"capabilities": capabilities,
"category": category or "",
"purpose": raw.get("Ziel", "") or "",
"execution": raw.get("Durchführung", "") or "",
"notes": notes,
"preparation": raw.get("RefMethode", "") or "",
"method": raw.get("method", "") or "", # falls im Wikitext vorhanden
"equipment": equipment,
"fullurl": fullurl or "", # optionales Feld
# Idempotenz-Felder (werden vom Backend ggf. ignoriert API bleibt unverändert):
"external_id": f"mw:{raw.get('pageid')}",
"source": "MediaWiki"
}
# Fingerprint ergänzen (nicht API-relevant, aber nützlich für Lookup-Entscheidung)
payload["fingerprint"] = compute_fingerprint(payload)
return payload
# NEW: Lookup gegen Exercise-API
def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]:
"""Fragt /exercise/by-external-id?external_id=... ab.
Rückgabe (json, http_status). Bei 404 -> (None, 404)"""
url = f"{EXERCISE_API}/by-external-id"
try:
r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT)
if r.status_code == 404:
return None, 404
r.raise_for_status()
return r.json(), r.status_code
except requests.HTTPError as e:
return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None)
except Exception as e:
return {"error": str(e)}, None
# NEW: Upsert-Entscheidung (create/update/skip)
def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str:
title = payload.get("title", "<ohne Titel>")
ext_id = payload.get("external_id")
fp_new = payload.get("fingerprint")
found, status = lookup_by_external_id(ext_id)
action = "create"
if status == 404 or found is None:
action = "create"
elif isinstance(found, dict):
fp_old = found.get("fingerprint") or found.get("payload", {}).get("fingerprint")
if fp_old == fp_new:
action = "skip"
else:
action = "update"
else:
action = "create"
if dry_run:
print(f"[DryRun] {action.upper():6} '{title}' ({ext_id})")
return action
if action == "create":
resp = requests.post(EXERCISE_API, json=payload, timeout=REQUEST_TIMEOUT)
if resp.status_code == 422:
print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}")
try:
resp.raise_for_status()
except Exception:
pass
else:
resp.raise_for_status()
print(f"[Create] '{title}' -> OK")
elif action == "update":
# Spezifikation: "update (POST /exercise) + imported_at"
payload2 = dict(payload)
payload2["imported_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT)
if resp.status_code == 422:
print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}")
try:
resp.raise_for_status()
except Exception:
pass
else:
resp.raise_for_status()
print(f"[Update] '{title}' -> OK")
else:
print(f"[Skip] '{title}' (unverändert)")
return action
# CHANGED: Einzelverarbeitung nutzt Upsert
def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str:
info = fetch_page_info(title)
pid = info.get("pageid")
fullurl = info.get("fullurl") or ""
if not pid:
print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr)
return "failed"
raw = parse_exercise(title, pid)
payload = build_payload(raw, fullurl, category, mutate=mutate)
return upsert_exercise(payload, dry_run=dry_run)
# CHANGED: Batch mit Stats & Fehlertoleranz
def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]:
stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0}
print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'")
pages = fetch_all_pages(category)
print(f"[Main] {len(pages)} Seiten gefunden.")
for title, entry in pages.items():
try:
pid = entry.get("pageid") if isinstance(entry, dict) else None
fullurl = entry.get("fullurl") if isinstance(entry, dict) else None
if not pid:
info = fetch_page_info(title)
pid = info.get("pageid")
fullurl = fullurl or info.get("fullurl")
if not pid:
print(f"[Skip] '{title}' hat keine pageid")
stats["failed"] += 1
continue
raw = parse_exercise(title, pid)
payload = build_payload(raw, fullurl or "", category)
act = upsert_exercise(payload, dry_run=dry_run)
if act == "create":
stats["created"] += 1
elif act == "update":
stats["updated"] += 1
elif act == "skip":
stats["skipped"] += 1
except requests.HTTPError as e:
code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None
if code == 404:
print(f"[Skip] '{title}': page not found (404)")
stats["failed"] += 1
else:
print(f"[Error] '{title}': {e}")
stats["failed"] += 1
except Exception as e:
print(f"[Error] '{title}': {e}")
stats["failed"] += 1
return stats
# NEW: Smoke-Test (3 Läufe: create → skip → update)
def run_smoke_test(title: str, category: str) -> None:
print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)")
act1 = process_one(title, category, mutate=False)
print("[SmokeTest] Aktion:", act1)
print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)")
act2 = process_one(title, category, mutate=False)
print("[SmokeTest] Aktion:", act2)
print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')")
act3 = process_one(title, category, mutate=True)
print("[SmokeTest] Aktion:", act3)
print("\n[SmokeTest] Zusammenfassung:")
print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2))
# ----- Main -----
def main() -> None:
parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)")
parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)")
parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels")
parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')")
parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)")
parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)")
parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)")
parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) loggen")
parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title")
args = parser.parse_args()
# Sanity
wiki_health()
# Login (sofern nicht explizit übersprungen)
if not args.skip_login:
if not args.username or not args.password:
print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr)
sys.exit(1)
try:
wiki_login(args.username, args.password)
except Exception as e:
print(str(e), file=sys.stderr)
sys.exit(1)
if args.smoke_test:
run_smoke_test(args.title, args.category)
return
# Einzel- oder Vollimport
if args.all:
stats = process_all(args.category, dry_run=args.dry_run)
print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats))
else:
print(f"[Main] Import single exercise: {args.title}")
result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run)
print(f"[Result] {result}")
if __name__ == "__main__":
main()