Trainer_LLM/scripts/wiki_importer.py
Lars 34320b46d9
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 1s
scripts/wiki_importer.py aktualisiert
2025-08-11 15:33:15 +02:00

548 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
wiki_importer.py v2.3.7
Fixes ggü. v2.3.6:
- **Keywords/EQUIPMENT/DISCIPLINE u.a. wurden teils nicht erkannt**: Bugfix in `_get_first()`
Kandidatenschlüssel werden jetzt ebenfalls normalisiert (`_norm_key(c)`), damit
`Schlüsselworte` (aus dem Wiki) zuverlässig matcht.
- `_get_first_fuzzy()` normalisiert die Such-Tokens.
- Kleine Bugfixes/Polish: `action.upper()` im Dry-Run, sanftere Keywords-Splittung.
Hinweis: Keine API-/CLI-Änderungen. Parser unterstützt weiterhin `{{Hilfsmittel}}`.
"""
import os
import sys
import argparse
from typing import Dict, Any, Tuple, Optional, List
from collections.abc import Mapping
import requests
import mwparserfromhell
from dotenv import load_dotenv
import hashlib
import json
import time
import unicodedata
# ----- Konfiguration / Defaults -----
load_dotenv()
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki")
EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise")
DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen")
DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen")
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60"))
# ---- Unicode-/Key-Normalisierung ----
def _norm_unicode(s: str) -> str:
return unicodedata.normalize("NFKC", s)
def _strip_diacritics(s: str) -> str:
return "".join(ch for ch in unicodedata.normalize("NFD", s) if not unicodedata.combining(ch))
def _norm_key(s: str) -> str:
s = _norm_unicode(s or "")
s = _strip_diacritics(s)
s = s.strip().casefold()
return s
def _norm_tpl(s: str) -> str:
s = _norm_key(s)
return "".join(ch for ch in s if ch.isalnum())
# Template-Aliasse (normalisierte Namen, _norm_tpl!)
TPL_UEBUNG_INFOBOX = {"ubunginfobox", "uebunginfobox", "ubunginfo", "uebunginfo"}
TPL_UEBUNGSBESCHREIBUNG = {"ubungsbeschreibung", "uebungsbeschreibung", "beschreibungubung", "beschreibunguebung"}
TPL_SKILLDEV = {"skilldevelopment"}
TPL_HILFSMITTEL = {"hilfsmittel"}
# Synonyme (werden im Code nochmals normalisiert)
KEYS_SUMMARY = ["summary", "kurzbeschreibung", "beschreibung", "kurztext"]
KEYS_EXECUTION = ["durchführung", "durchfuehrung", "ausführung", "ausfuehrung", "execution", "ablauf", "vorgehen"]
KEYS_DURATION = ["dauer", "zeit", "dauer_minuten", "dauer (min)", "minuten"]
KEYS_KEYWORDS = ["schlüsselworte", "schluesselworte", "schlüsselwörter", "schluesselwoerter", "keywords", "stichworte", "schlagworte", "tags", "schluesselwort", "schlüsselwort"]
KEYS_EQUIPMENT = ["equipment", "geräte", "geraete", "gerät", "geraet", "material", "hilfsmittel", "gerate/material"]
KEYS_DISCIPLINE = ["übungstyp", "uebungstyp", "discipline", "disziplin", "schwerpunkt", "bereich", "thema", "technik"]
KEYS_GROUP = ["gruppengröße", "gruppengroesse", "group"]
KEYS_AGE_GROUP = ["altersgruppe"]
KEYS_TARGET_GROUP = ["zielgruppe", "target_group"]
KEYS_PURPOSE = ["ziel", "zweck", "purpose"]
KEYS_PREPARATION = ["refmethode", "vorbereitung", "preparation"]
KEYS_METHOD = ["method", "methode"]
KEYS_NOTES = ["hinweise", "notes"]
# ---- Wiki-Router Helpers ----
def wiki_health() -> None:
r = requests.get(f"{API_BASE_URL}/health", timeout=15)
r.raise_for_status()
print("[Sanity] Wiki health OK")
def wiki_login(username: str, password: str) -> None:
payload = {"username": username, "password": password}
r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30)
try:
data = r.json()
except Exception:
print(f"[Login] HTTP {r.status_code}: {r.text}")
r.raise_for_status()
status = (data or {}).get("status")
if status != "success":
msg = (data or {}).get("message", "Login fehlgeschlagen")
raise RuntimeError(f"[Login] {msg}")
print("[Login] success")
def fetch_all_pages(category: str) -> Dict[str, Any]:
resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.json()
def fetch_page_info(title: str) -> Dict[str, Any]:
r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30)
r.raise_for_status()
info = r.json()
return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")}
# ---- Parser ----
def parse_exercise(title: str, pageid: int) -> Dict[str, Any]:
print(f"[Parse] Lade '{title}' (ID={pageid})")
resp = requests.get(
f"{API_BASE_URL}/parsepage",
params={"pageid": pageid, "title": title},
timeout=REQUEST_TIMEOUT
)
resp.raise_for_status()
wikitext = resp.json().get("wikitext", "")
wikicode = mwparserfromhell.parse(wikitext)
raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid}
for tpl in wikicode.filter_templates():
name_norm = _norm_tpl(str(tpl.name))
if name_norm in TPL_UEBUNG_INFOBOX or name_norm in TPL_UEBUNGSBESCHREIBUNG:
for p in tpl.params:
raw[str(p.name).strip()] = str(p.value).strip()
elif name_norm in TPL_SKILLDEV:
raw.setdefault("capabilities", [])
def _getp(t, k):
try:
return str(t.get(k).value).strip()
except Exception:
return ""
cap = _getp(tpl, "PrimaryCapability")
lvl = _getp(tpl, "CapabilityLevel")
try:
lvl_i = int(lvl)
except Exception:
lvl_i = 0
if cap:
raw["capabilities"].append({"capability": cap, "level": lvl_i})
elif name_norm in TPL_HILFSMITTEL:
for p in tpl.params:
raw[str(p.name).strip()] = str(p.value).strip()
raw["wikitext"] = wikitext
return raw
# ---- Fingerprint (stabil) ----
def _normalize(v: Any) -> str:
if v is None:
return ""
if isinstance(v, (list, tuple)):
return ",".join(_normalize(x) for x in v)
if isinstance(v, dict):
return json.dumps(v, sort_keys=True, ensure_ascii=False)
return str(v).strip()
def _norm_text(s: str) -> str:
if s is None:
return ""
s = str(s).replace("\u00a0", " ")
s = s.strip()
s = " ".join(s.split())
return s
def _canon_title(t: str) -> str:
t = (t or "").strip().replace("_", " ")
return t.replace("", "-").replace("", "-")
def compute_fingerprint(payload: Dict[str, Any]) -> str:
kws = payload.get("keywords") or []
# Strichvarianten normalisieren
kws = [k.replace("\u2013", "-").replace("\u2014", "-") for k in kws]
kws = sorted({(k or "").strip() for k in kws if (k or "").strip()}, key=str.casefold)
dur = payload.get("duration_minutes") or 0
try:
dur = int(round(float(dur)))
except Exception:
dur = 0
fields = [
_canon_title(payload.get("title", "")),
_norm_text(payload.get("summary", "")),
_norm_text(payload.get("execution", "")),
_norm_text(payload.get("notes", "")),
dur,
payload.get("capabilities", {}),
kws,
]
base = "|".join(_normalize(f) for f in fields)
return hashlib.sha256(base.encode("utf-8")).hexdigest()
# ---- Feldauflösung (Synonyme + Fuzzy) ----
def _norm_keymap(d: Dict[str, Any]) -> Dict[str, Any]:
return {_norm_key(k): v for k, v in d.items() if isinstance(k, str)}
def _get_first(d: Dict[str, Any], candidates: List[str]) -> Any:
m = _norm_keymap(d)
for c in candidates:
v = m.get(_norm_key(c)) # << Bugfix: Kandidaten ebenfalls normalisieren
if v not in (None, ""):
return v
return None
def _get_first_fuzzy(d: Dict[str, Any], tokens: List[str]) -> Any:
m = _norm_keymap(d)
toks = [_norm_key(t) for t in tokens]
for k, v in m.items():
if v in (None, ""):
continue
if all(t in k for t in toks):
return v
return None
# ---- Payload ----
def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]:
# Capabilities -> Dict[str,int]
caps_list = raw.get("capabilities", [])
capabilities: Dict[str, int] = {}
for c in caps_list:
cap = c.get("capability")
lvl = c.get("level")
if isinstance(cap, str) and cap:
try:
capabilities[cap] = int(lvl)
except Exception:
pass
# summary / execution
summary = _get_first(raw, KEYS_SUMMARY) or ""
execution = _get_first(raw, KEYS_EXECUTION)
if execution in (None, ""):
execution = _get_first_fuzzy(raw, ["ablauf"]) or _get_first_fuzzy(raw, ["durchf"]) or ""
# duration
duration = _get_first(raw, KEYS_DURATION)
try:
duration_f = float(duration or 0)
except Exception:
duration_f = 0.0
# keywords
kw_raw = _get_first(raw, KEYS_KEYWORDS)
if kw_raw in (None, ""):
kw_raw = _get_first_fuzzy(raw, ["stich", "worte"]) or _get_first_fuzzy(raw, ["schlag", "worte"]) or ""
keywords: List[str] = []
if isinstance(kw_raw, str):
# robuste Auftrennung; ignoriert doppelte Kommas/Zeilenumbrüche
parts = [p.strip() for p in kw_raw.replace("\n", ",").split(",")]
keywords = [p for p in parts if p]
# equipment
eq_raw = _get_first(raw, KEYS_EQUIPMENT)
if eq_raw in (None, ""):
eq_raw = _get_first_fuzzy(raw, ["gerate", "material"]) or _get_first_fuzzy(raw, ["hilfsmittel"]) or ""
equipment: List[str] = []
if isinstance(eq_raw, str):
equipment = [e.strip() for e in eq_raw.replace("\n", ",").split(",") if e.strip()]
elif isinstance(eq_raw, list):
equipment = [str(e).strip() for e in eq_raw if str(e).strip()]
notes = _get_first(raw, KEYS_NOTES) or ""
if mutate:
notes = (str(notes) + " [auto-update]").strip()
discipline = _get_first(raw, KEYS_DISCIPLINE) or ""
if discipline in (None, ""):
discipline = _get_first_fuzzy(raw, ["ubung", "typ"]) or _get_first_fuzzy(raw, ["schwerpunkt"]) or ""
group = _get_first(raw, KEYS_GROUP) or None
age_group = _get_first(raw, KEYS_AGE_GROUP) or ""
target_group = _get_first(raw, KEYS_TARGET_GROUP) or ""
purpose = _get_first(raw, KEYS_PURPOSE) or ""
preparation = _get_first(raw, KEYS_PREPARATION) or ""
method = _get_first(raw, KEYS_METHOD) or ""
payload: Dict[str, Any] = {
"title": raw.get("title") or "",
"summary": str(summary) or "",
"short_description": str(summary) or "",
"keywords": keywords,
"link": fullurl or "",
"discipline": str(discipline) or "",
"group": str(group) if group else None,
"age_group": str(age_group) or "",
"target_group": str(target_group) or "",
"min_participants": 1,
"duration_minutes": int(round(duration_f)),
"capabilities": capabilities,
"category": category or "",
"purpose": str(purpose) or "",
"execution": str(execution) or "",
"notes": str(notes) or "",
"preparation": str(preparation) or "",
"method": str(method) or "",
"equipment": equipment,
"fullurl": fullurl or "",
"external_id": f"mw:{raw.get('pageid')}",
"source": "MediaWiki",
}
payload["fingerprint"] = compute_fingerprint(payload)
return payload
# ---- Lookup/Upsert ----
def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]:
url = f"{EXERCISE_API}/by-external-id"
try:
r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT)
if r.status_code == 404:
return None, 404
r.raise_for_status()
return r.json(), r.status_code
except requests.HTTPError as e:
return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None)
except Exception as e:
return {"error": str(e)}, None
def _payload_subset_for_fp(p: Dict[str, Any]) -> Dict[str, Any]:
return {
"title": p.get("title"),
"summary": p.get("summary"),
"execution": p.get("execution"),
"notes": p.get("notes"),
"duration_minutes": p.get("duration_minutes"),
"capabilities": p.get("capabilities") or {},
"keywords": p.get("keywords") or [],
}
def _print_diff(before: Dict[str, Any], after: Dict[str, Any]) -> None:
keys = ["title","summary","execution","notes","duration_minutes","capabilities","keywords"]
b = {k: before.get(k) for k in keys}
a = {k: after.get(k) for k in keys}
def _kws(x):
return sorted({(k or "").strip() for k in (x or [])}, key=str.casefold)
b_norm = {
"title": _canon_title(b.get("title")),
"summary": _norm_text(b.get("summary")),
"execution": _norm_text(b.get("execution")),
"notes": _norm_text(b.get("notes")),
"duration_minutes": b.get("duration_minutes"),
"capabilities": b.get("capabilities"),
"keywords": _kws(b.get("keywords")),
}
a_norm = {
"title": _canon_title(a.get("title")),
"summary": _norm_text(a.get("summary")),
"execution": _norm_text(a.get("execution")),
"notes": _norm_text(a.get("notes")),
"duration_minutes": a.get("duration_minutes"),
"capabilities": a.get("capabilities"),
"keywords": _kws(a.get("keywords")),
}
diff = {k: (b_norm[k], a_norm[k]) for k in keys if b_norm.get(k) != a_norm.get(k)}
if diff:
print("[Diff] changes:", json.dumps(diff, ensure_ascii=False))
else:
print("[Diff] (none in hash fields)")
def _now_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str:
title = payload.get("title", "<ohne Titel>")
ext_id = payload.get("external_id")
fp_new = payload.get("fingerprint")
found, status = lookup_by_external_id(ext_id)
action = "create"
reason = "not found (lookup 404)"
found_payload = {}
if not (status == 404 or found is None):
if isinstance(found, dict):
found_payload = found.get("payload", found)
fp_old_stored = found.get("fingerprint") or found_payload.get("fingerprint")
fp_old_recalc = compute_fingerprint(_payload_subset_for_fp(found_payload))
if fp_new == fp_old_stored or fp_new == fp_old_recalc:
action, reason = "skip", "fingerprint unchanged"
else:
action, reason = "update", "fingerprint changed"
else:
action, reason = "create", "unexpected lookup type"
if dry_run:
print(f"[DryRun] {action.upper():6} '{title}' ({ext_id}) {reason}")
if action == "update":
_print_diff(found_payload, payload)
return action
if action == "create":
payload2 = dict(payload)
payload2["imported_at"] = _now_iso()
resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT)
if resp.status_code == 422:
print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}")
try:
resp.raise_for_status()
except Exception:
pass
else:
resp.raise_for_status()
print(f"[Create] '{title}' {reason} -> OK")
elif action == "update":
payload2 = dict(payload)
payload2["imported_at"] = _now_iso()
resp = requests.post(EXERCISE_API, json=payload2, timeout=REQUEST_TIMEOUT)
if resp.status_code == 422:
print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}")
try:
resp.raise_for_status()
except Exception:
pass
else:
resp.raise_for_status()
print(f"[Update] '{title}' {reason} -> OK")
_print_diff(found_payload, payload)
else:
print(f"[Skip] '{title}' {reason}")
return action
# ----- Orchestrierung -----
def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False) -> str:
info = fetch_page_info(title)
pid = info.get("pageid")
fullurl = info.get("fullurl") or ""
if not pid:
print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr)
return "failed"
raw = parse_exercise(title, pid)
payload = build_payload(raw, fullurl, category, mutate=mutate)
return upsert_exercise(payload, dry_run=dry_run)
def process_all(category: str, *, dry_run: bool = False) -> Dict[str, int]:
stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0}
print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'")
pages = fetch_all_pages(category)
print(f"[Main] {len(pages)} Seiten gefunden.")
for title, entry in pages.items():
try:
getter = getattr(entry, "get", None)
if callable(getter):
pid = getter("pageid")
fullurl = getter("fullurl")
else:
pid = None
fullurl = None
if not pid:
info = fetch_page_info(title)
pid = info.get("pageid")
fullurl = fullurl or info.get("fullurl")
if not pid:
print(f"[Skip] '{title}' hat keine pageid")
stats["failed"] += 1
continue
raw = parse_exercise(title, pid)
payload = build_payload(raw, fullurl or "", category)
act = upsert_exercise(payload, dry_run=dry_run)
if act == "create":
stats["created"] += 1
elif act == "update":
stats["updated"] += 1
elif act == "skip":
stats["skipped"] += 1
except requests.HTTPError as e:
code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None
if code == 404:
print(f"[Skip] '{title}': page not found (404)")
stats["failed"] += 1
else:
print(f"[Error] '{title}': {e}")
stats["failed"] += 1
except Exception as e:
print(f"[Error] '{title}': {e}")
stats["failed"] += 1
return stats
def run_smoke_test(title: str, category: str) -> None:
print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)")
act1 = process_one(title, category, mutate=False)
print("[SmokeTest] Aktion:", act1)
print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)")
act2 = process_one(title, category, mutate=False)
print("[SmokeTest] Aktion:", act2)
print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')")
act3 = process_one(title, category, mutate=True)
print("[SmokeTest] Aktion:", act3)
print("\n[SmokeTest] Zusammenfassung:")
print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2))
# ----- Main -----
def main() -> None:
parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)")
parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)")
parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels")
parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')")
parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)")
parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)")
parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)")
parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) + Gründe loggen")
parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title")
args = parser.parse_args()
wiki_health()
if not args.skip_login:
if not args.username or not args.password:
print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr)
sys.exit(1)
try:
wiki_login(args.username, args.password)
except Exception as e:
print(str(e), file=sys.stderr)
sys.exit(1)
if args.smoke_test:
run_smoke_test(args.title, args.category)
return
if args.all:
stats = process_all(args.category, dry_run=args.dry_run)
print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats))
else:
print(f"[Main] Import single exercise: {args.title}")
result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run)
print(f"[Result] {result}")
if __name__ == "__main__":
main()