Trainer_LLM/scripts/wiki_importer.py
Lars 7b383f0778
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 1s
scripts/wiki_importer.py aktualisiert
2025-08-11 15:40:41 +02:00

471 lines
19 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
wiki_importer.py v2.3.8
Ziel dieses Patches: Die Felder `discipline`, `execution`, `keywords`, `equipment`, `duration_minutes` usw.
kommen bei dir teilweise leer an. Ursache sind zu aggressive Normalisierungen/Matcher.
Fix (konservativ & robust):
- Parser liest jetzt **gezielt** die bekannten Templates **ohne** Over-Normalisierung:
• `{{ÜbungInfoBox}}` / `{{UebungInfoBox}}`
• `{{Übungsbeschreibung}}` / `{{Uebungsbeschreibung}}`
• `{{Hilfsmittel}}`
• `{{SkillDevelopment}}`
- Feld-Extraktion nutzt **zuerst die exakten Wiki-Parameternamen** (deutsch/mit Umlauten),
erst danach schmale Synonym-Fallbacks. Das stellt sicher, dass z.B. `Schlüsselworte=`
wirklich in `keywords` landet.
- `imported_at` wird bei **Create und Update** gesetzt.
- Optionales Debugging: `--debug-raw` druckt die gefundenen Raw-Keys (einfach, nachvollziehbar).
Bestehende API-Endpunkte bleiben unverändert.
"""
import os
import sys
import argparse
from typing import Dict, Any, Tuple, Optional, List
import requests
import mwparserfromhell
from dotenv import load_dotenv
import hashlib
import json
import time
# ----- Konfiguration / Defaults -----
load_dotenv()
API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000/import/wiki")
EXERCISE_API = os.getenv("EXERCISE_API_URL", "http://localhost:8000/exercise")
DEFAULT_CAT = os.getenv("WIKI_CATEGORY", "Übungen")
DEFAULT_TITLE = os.getenv("WIKI_EXERCISE_TITLE", "Affenklatschen")
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60"))
# ----- Helpers für Wiki-Router -----
def wiki_health() -> None:
r = requests.get(f"{API_BASE_URL}/health", timeout=15)
r.raise_for_status()
print("[Sanity] Wiki health OK")
def wiki_login(username: str, password: str) -> None:
payload = {"username": username, "password": password}
r = requests.post(f"{API_BASE_URL}/login", json=payload, timeout=30)
try:
data = r.json()
except Exception:
print(f"[Login] HTTP {r.status_code}: {r.text}")
r.raise_for_status()
status = (data or {}).get("status")
if status != "success":
msg = (data or {}).get("message", "Login fehlgeschlagen")
raise RuntimeError(f"[Login] {msg}")
print("[Login] success")
def fetch_all_pages(category: str) -> Dict[str, Any]:
resp = requests.get(f"{API_BASE_URL}/semantic/pages", params={"category": category}, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return resp.json()
def fetch_page_info(title: str) -> Dict[str, Any]:
r = requests.get(f"{API_BASE_URL}/info", params={"title": title}, timeout=30)
r.raise_for_status()
info = r.json()
return {"pageid": info.get("pageid"), "fullurl": info.get("fullurl")}
# ----- Parser (konservativ) -----
T_INFOS = {"ÜbungInfoBox", "UebungInfoBox"}
T_BESCHR = {"Übungsbeschreibung", "Uebungsbeschreibung"}
T_HILFS = {"Hilfsmittel"}
T_SKILL = {"SkillDevelopment"}
def parse_exercise(title: str, pageid: int) -> Dict[str, Any]:
print(f"[Parse] Lade '{title}' (ID={pageid})")
resp = requests.get(
f"{API_BASE_URL}/parsepage",
params={"pageid": pageid, "title": title},
timeout=REQUEST_TIMEOUT,
)
resp.raise_for_status()
wikitext = resp.json().get("wikitext", "")
wikicode = mwparserfromhell.parse(wikitext)
raw: Dict[str, Any] = {"title": title, "source": "MediaWiki", "pageid": pageid, "wikitext": wikitext}
for tpl in wikicode.filter_templates():
name = str(tpl.name).strip()
if name in T_INFOS or name in T_BESCHR or name in T_HILFS:
for p in tpl.params:
key = str(p.name).strip()
val = str(p.value).strip()
raw[key] = val
elif name in T_SKILL:
raw.setdefault("capabilities", [])
def _getp(t, k):
try:
return str(t.get(k).value).strip()
except Exception:
return ""
cap = _getp(tpl, "PrimaryCapability")
lvl = _getp(tpl, "CapabilityLevel")
try:
lvl_i = int(lvl)
except Exception:
lvl_i = 0
if cap:
raw["capabilities"].append({"capability": cap, "level": lvl_i})
return raw
# ----- Fingerprint (stabil, wie zuvor) -----
def _normalize(v: Any) -> str:
if v is None:
return ""
if isinstance(v, (list, tuple)):
return ",".join(_normalize(x) for x in v)
if isinstance(v, dict):
return json.dumps(v, sort_keys=True, ensure_ascii=False)
return str(v).strip()
def _norm_text(s: str) -> str:
if s is None:
return ""
s = str(s).replace("\u00a0", " ")
s = " ".join(s.split())
return s.strip()
def _canon_title(t: str) -> str:
t = (t or "").strip().replace("_", " ")
return t.replace("", "-").replace("", "-")
def compute_fingerprint(payload: Dict[str, Any]) -> str:
kws = payload.get("keywords") or []
kws = [k.replace("\u2013", "-").replace("\u2014", "-") for k in kws]
kws = sorted({(k or "").strip() for k in kws if (k or "").strip()}, key=str.casefold)
dur = payload.get("duration_minutes") or 0
try:
dur = int(round(float(dur)))
except Exception:
dur = 0
fields = [
_canon_title(payload.get("title", "")),
_norm_text(payload.get("summary", "")),
_norm_text(payload.get("execution", "")),
_norm_text(payload.get("notes", "")),
dur,
payload.get("capabilities", {}),
kws,
]
base = "|".join(_normalize(f) for f in fields)
return hashlib.sha256(base.encode("utf-8")).hexdigest()
# ----- Payload (exakte DE-Keys zuerst, dann schmale Fallbacks) -----
EXACT_KEYS = {
"summary": ["Summary", "Kurzbeschreibung"],
"execution": ["Durchführung", "Durchfuehrung", "Ablauf"],
"duration": ["Dauer", "Zeit"],
"keywords": ["Schlüsselworte", "Schlüsselwörter", "Schluesselworte", "Schluesselwoerter", "Keywords", "Tags"],
"equipment_prim": ["Hilfsmittel"],
"equipment_alt": ["Geräte", "Geraete", "Gerät", "Geraet", "Material"],
"discipline": ["Übungstyp", "Uebungstyp", "Disziplin"],
"group": ["Gruppengröße", "Gruppengroesse", "Group"],
"age_group": ["Altersgruppe"],
"target_group": ["Zielgruppe"],
"purpose": ["Ziel", "Zweck"],
"notes": ["Hinweise", "Notes"],
"preparation": ["Vorbereitung", "RefMethode"],
"method": ["Methode", "Method"],
}
def _first_any(raw: Dict[str, Any], keys: List[str]) -> Optional[str]:
for k in keys:
v = raw.get(k)
if isinstance(v, str) and v.strip():
return v.strip()
return None
def build_payload(raw: Dict[str, Any], fullurl: str, category: str, *, mutate: bool = False) -> Dict[str, Any]:
# Capabilities -> Dict[str,int]
capabilities: Dict[str, int] = {}
for c in raw.get("capabilities", []) or []:
cap = c.get("capability"); lvl = c.get("level")
if isinstance(cap, str) and cap:
try:
capabilities[cap] = int(lvl)
except Exception:
pass
# Exakte Schlüssel zuerst
summary = _first_any(raw, EXACT_KEYS["summary"]) or ""
execution = _first_any(raw, EXACT_KEYS["execution"]) or ""
duration = _first_any(raw, EXACT_KEYS["duration"]) or "0"
kw_raw = _first_any(raw, EXACT_KEYS["keywords"]) or ""
if kw_raw:
parts = [p.strip() for p in kw_raw.replace("\n", ",").split(",")]
keywords = [p for p in parts if p]
else:
keywords = []
eq_raw = _first_any(raw, EXACT_KEYS["equipment_prim"]) or _first_any(raw, EXACT_KEYS["equipment_alt"]) or ""
if eq_raw:
equipment = [e.strip() for e in eq_raw.replace("\n", ",").split(",") if e.strip()]
else:
equipment = []
notes = _first_any(raw, EXACT_KEYS["notes"]) or ""
discipline = _first_any(raw, EXACT_KEYS["discipline"]) or ""
group = _first_any(raw, EXACT_KEYS["group"]) or None
age_group = _first_any(raw, EXACT_KEYS["age_group"]) or ""
target_group = _first_any(raw, EXACT_KEYS["target_group"]) or ""
purpose = _first_any(raw, EXACT_KEYS["purpose"]) or ""
preparation = _first_any(raw, EXACT_KEYS["preparation"]) or ""
method = _first_any(raw, EXACT_KEYS["method"]) or ""
try:
duration_f = float(duration or 0)
except Exception:
duration_f = 0.0
payload: Dict[str, Any] = {
"title": raw.get("title") or "",
"summary": summary,
"short_description": summary,
"keywords": keywords,
"link": fullurl or "",
"discipline": discipline,
"group": group,
"age_group": age_group,
"target_group": target_group,
"min_participants": 1,
"duration_minutes": int(round(duration_f)),
"capabilities": capabilities,
"category": category or "",
"purpose": purpose,
"execution": execution,
"notes": (notes + (" [auto-update]" if mutate else "")).strip(),
"preparation": preparation,
"method": method,
"equipment": equipment,
"fullurl": fullurl or "",
"external_id": f"mw:{raw.get('pageid')}",
"source": "MediaWiki",
}
payload["fingerprint"] = compute_fingerprint(payload)
return payload
# ----- Lookup/Upsert -----
def lookup_by_external_id(external_id: str) -> Tuple[Optional[Dict[str, Any]], Optional[int]]:
url = f"{EXERCISE_API}/by-external-id"
try:
r = requests.get(url, params={"external_id": external_id}, timeout=REQUEST_TIMEOUT)
if r.status_code == 404:
return None, 404
r.raise_for_status()
return r.json(), r.status_code
except requests.HTTPError as e:
return {"error": str(e), "status_code": getattr(e.response, "status_code", None)}, getattr(e.response, "status_code", None)
except Exception as e:
return {"error": str(e)}, None
def _payload_subset_for_fp(p: Dict[str, Any]) -> Dict[str, Any]:
return {
"title": p.get("title"),
"summary": p.get("summary"),
"execution": p.get("execution"),
"notes": p.get("notes"),
"duration_minutes": p.get("duration_minutes"),
"capabilities": p.get("capabilities") or {},
"keywords": p.get("keywords") or [],
}
def _print_diff(before: Dict[str, Any], after: Dict[str, Any]) -> None:
keys = ["title","summary","execution","notes","duration_minutes","capabilities","keywords"]
b = {k: before.get(k) for k in keys}
a = {k: after.get(k) for k in keys}
def _kws(x):
return sorted({(k or "").strip() for k in (x or [])}, key=str.casefold)
b_norm = {
"title": _canon_title(b.get("title")),
"summary": _norm_text(b.get("summary")),
"execution": _norm_text(b.get("execution")),
"notes": _norm_text(b.get("notes")),
"duration_minutes": b.get("duration_minutes"),
"capabilities": b.get("capabilities"),
"keywords": _kws(b.get("keywords")),
}
a_norm = {
"title": _canon_title(a.get("title")),
"summary": _norm_text(a.get("summary")),
"execution": _norm_text(a.get("execution")),
"notes": _norm_text(a.get("notes")),
"duration_minutes": a.get("duration_minutes"),
"capabilities": a.get("capabilities"),
"keywords": _kws(a.get("keywords")),
}
diff = {k: (b_norm[k], a_norm[k]) for k in keys if b_norm.get(k) != a_norm.get(k)}
if diff:
print("[Diff] changes:", json.dumps(diff, ensure_ascii=False))
else:
print("[Diff] (none in hash fields)")
def _now_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def upsert_exercise(payload: Dict[str, Any], *, dry_run: bool = False) -> str:
title = payload.get("title", "<ohne Titel>")
ext_id = payload.get("external_id")
fp_new = payload.get("fingerprint")
found, status = lookup_by_external_id(ext_id)
action = "create"; reason = "not found (lookup 404)"; found_payload = {}
if not (status == 404 or found is None):
if isinstance(found, dict):
found_payload = found.get("payload", found)
fp_old_stored = found.get("fingerprint") or found_payload.get("fingerprint")
fp_old_recalc = compute_fingerprint(_payload_subset_for_fp(found_payload))
if fp_new == fp_old_stored or fp_new == fp_old_recalc:
action, reason = "skip", "fingerprint unchanged"
else:
action, reason = "update", "fingerprint changed"
else:
action, reason = "create", "unexpected lookup type"
if dry_run:
print(f"[DryRun] {action.upper():6} '{title}' ({ext_id}) {reason}")
if action == "update":
_print_diff(found_payload, payload)
return action
if action == "create":
body = dict(payload); body["imported_at"] = _now_iso()
resp = requests.post(EXERCISE_API, json=body, timeout=REQUEST_TIMEOUT)
if resp.status_code == 422:
print(f"[Create] '{title}' -> FAILED 422:\n{resp.text}")
try: resp.raise_for_status()
except Exception: pass
else:
resp.raise_for_status(); print(f"[Create] '{title}' {reason} -> OK")
elif action == "update":
body = dict(payload); body["imported_at"] = _now_iso()
resp = requests.post(EXERCISE_API, json=body, timeout=REQUEST_TIMEOUT)
if resp.status_code == 422:
print(f"[Update] '{title}' -> FAILED 422:\n{resp.text}")
try: resp.raise_for_status()
except Exception: pass
else:
resp.raise_for_status(); print(f"[Update] '{title}' {reason} -> OK"); _print_diff(found_payload, payload)
else:
print(f"[Skip] '{title}' {reason}")
return action
# ----- Orchestrierung -----
def process_one(title: str, category: str, *, mutate: bool = False, dry_run: bool = False, debug_raw: bool = False) -> str:
info = fetch_page_info(title)
pid = info.get("pageid"); fullurl = info.get("fullurl") or ""
if not pid:
print(f"[Error] pageid für '{title}' nicht gefunden.", file=sys.stderr); return "failed"
raw = parse_exercise(title, pid)
if debug_raw:
print("[Debug] Raw-Keys:", sorted([k for k in raw.keys() if k not in {"wikitext"}]))
payload = build_payload(raw, fullurl, category, mutate=mutate)
return upsert_exercise(payload, dry_run=dry_run)
def process_all(category: str, *, dry_run: bool = False, debug_raw: bool = False) -> Dict[str, int]:
stats = {"created": 0, "updated": 0, "skipped": 0, "failed": 0}
print(f"[Main] Lade Liste der Übungen aus Kategorie '{category}'")
pages = fetch_all_pages(category)
print(f"[Main] {len(pages)} Seiten gefunden.")
for idx, (title, entry) in enumerate(pages.items(), 1):
try:
getter = getattr(entry, "get", None)
pid = getter("pageid") if callable(getter) else None
fullurl = getter("fullurl") if callable(getter) else None
if not pid:
info = fetch_page_info(title); pid = info.get("pageid"); fullurl = fullurl or info.get("fullurl")
if not pid:
print(f"[Skip] '{title}' hat keine pageid"); stats["failed"] += 1; continue
raw = parse_exercise(title, pid)
if debug_raw and idx <= 5:
print(f"[Debug] #{idx} '{title}' Raw-Keys:", sorted([k for k in raw.keys() if k not in {"wikitext"}]))
payload = build_payload(raw, fullurl or "", category)
act = upsert_exercise(payload, dry_run=dry_run)
stats["created" if act=="create" else "updated" if act=="update" else "skipped"] += 1
except requests.HTTPError as e:
code = getattr(e, "response", None).status_code if getattr(e, "response", None) else None
if code == 404:
print(f"[Skip] '{title}': page not found (404)"); stats["failed"] += 1
else:
print(f"[Error] '{title}': {e}"); stats["failed"] += 1
except Exception as e:
print(f"[Error] '{title}': {e}"); stats["failed"] += 1
return stats
def run_smoke_test(title: str, category: str, *, debug_raw: bool = False) -> None:
print("\n[SmokeTest] Lauf 1/3: CREATE (Erstimport)"); act1 = process_one(title, category, mutate=False, debug_raw=debug_raw); print("[SmokeTest] Aktion:", act1)
print("\n[SmokeTest] Lauf 2/3: SKIP (Wiederholung, unverändert)"); act2 = process_one(title, category, mutate=False, debug_raw=debug_raw); print("[SmokeTest] Aktion:", act2)
print("\n[SmokeTest] Lauf 3/3: UPDATE (simulierte Wiki-Änderung an 'notes')"); act3 = process_one(title, category, mutate=True, debug_raw=debug_raw); print("[SmokeTest] Aktion:", act3)
print("\n[SmokeTest] Zusammenfassung:"); print(json.dumps({"run1": act1, "run2": act2, "run3": act3}, ensure_ascii=False, indent=2))
# ----- Main -----
def main() -> None:
parser = argparse.ArgumentParser(description="Import exercises from Wiki to Qdrant (via FastAPI wiki_router)")
parser.add_argument("--all", action="store_true", help="Alle Übungen importieren (SMW-Ask)")
parser.add_argument("--title", type=str, default=DEFAULT_TITLE, help="Einzelimport eines Übungstitels")
parser.add_argument("--category", type=str, default=DEFAULT_CAT, help="Wiki-Kategorie (z.B. 'Übungen')")
parser.add_argument("--username", type=str, default=os.getenv("WIKI_BOT_USER"), help="Wiki-Login Benutzer (überschreibt .env)")
parser.add_argument("--password", type=str, default=os.getenv("WIKI_BOT_PASSWORD"), help="Wiki-Login Passwort (überschreibt .env)")
parser.add_argument("--skip-login", action="store_true", help="Login-Schritt überspringen (falls Session schon aktiv)")
parser.add_argument("--dry-run", action="store_true", help="Kein Schreiben; nur Entscheidungen (create/update/skip) + Gründe loggen")
parser.add_argument("--smoke-test", action="store_true", help="3 Durchläufe (create→skip→update) für --title")
parser.add_argument("--debug-raw", action="store_true", help="Zeigt die aus dem Wiki gelesenen Roh-Keys je Seite")
args = parser.parse_args()
wiki_health()
if not args.skip_login:
if not args.username or not args.password:
print("[Login] Fehler: fehlende Credentials. Setze .env (WIKI_BOT_USER/WIKI_BOT_PASSWORD) oder CLI --username/--password.", file=sys.stderr)
sys.exit(1)
try:
wiki_login(args.username, args.password)
except Exception as e:
print(str(e), file=sys.stderr); sys.exit(1)
if args.smoke_test:
run_smoke_test(args.title, args.category, debug_raw=args.debug_raw); return
if args.all:
stats = process_all(args.category, dry_run=args.dry_run, debug_raw=args.debug_raw)
print("\n[Stats] created={created} updated={updated} skipped={skipped} failed={failed}".format(**stats))
else:
print(f"[Main] Import single exercise: {args.title}")
result = process_one(args.title, args.category, mutate=False, dry_run=args.dry_run, debug_raw=args.debug_raw)
print(f"[Result] {result}")
if __name__ == "__main__":
main()