""" MediaWiki Import Router Importiert Übungen, Fähigkeiten und Methoden aus Semantic MediaWiki direkt via API (kein XML-Export). Nur Super-Admin darf importieren. Wiki-URL: https://karatetrainer.net/api.php """ import asyncio import logging import os from typing import Optional from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Query from pydantic import BaseModel from db import get_db, get_cursor, r2d from auth import require_auth from smw_client import SmwClient, SmwClientError from smw_mapper import map_wiki_to_exercise, map_wiki_to_skill, map_wiki_to_method, build_skill_assignments from routers.exercises import normalize_exercise_skill_intensity logger = logging.getLogger(__name__) router = APIRouter(prefix="/api", tags=["import"]) # Kategorie-Namen aus .env (echte Namen von karatetrainer.net) CATEGORY_EXERCISES = os.getenv("MEDIAWIKI_CATEGORY_EXERCISES", "Übungen") CATEGORY_SKILLS = os.getenv("MEDIAWIKI_CATEGORY_SKILLS", "Fähigkeitsbeschreibung") CATEGORY_METHODS = os.getenv("MEDIAWIKI_CATEGORY_METHODS", "Methodenbeschreibung") CATEGORY_MODELS = os.getenv("MEDIAWIKI_CATEGORY_MODELS", "Reifegradmodelle") def _wiki_category_or_default(category: Optional[str], import_type: str) -> str: """Leeres category ⇒ Standard je import_type.""" if (category or "").strip(): return category.strip() if import_type == "skill": return CATEGORY_SKILLS if import_type == "method": return CATEGORY_METHODS return CATEGORY_EXERCISES # ------------------------------------------------------------------ # # Pydantic Models # # ------------------------------------------------------------------ # class ImportExecuteRequest(BaseModel): category: str import_type: str = "exercise" # exercise | skill | method reimport_existing: bool = False dry_run: bool = False limit: Optional[int] = None # ------------------------------------------------------------------ # # Auth-Hilfsfunktion # # ------------------------------------------------------------------ # def require_admin(session: dict = Depends(require_auth)) -> dict: """Nur Super-Admins dürfen importieren.""" if session.get("role") not in ("admin", "superadmin"): raise HTTPException(status_code=403, detail="Nur Admins dürfen den Import ausführen") return session # ------------------------------------------------------------------ # # Preview Endpoint # # ------------------------------------------------------------------ # @router.get("/import/mediawiki/preview") async def preview_import( category: Optional[str] = Query(default=None), import_type: str = Query(default="exercise"), limit: int = Query(default=10, ge=1, le=500), session: dict = Depends(require_admin), ): """ Zeigt Vorschau: Welche Seiten würden importiert werden? Überprüft Duplikate und mapped Felder ohne zu speichern. """ resolved_category = _wiki_category_or_default(category, import_type) client = SmwClient() try: members = await client.get_category_members(resolved_category, limit=limit) except SmwClientError as e: raise HTTPException(status_code=502, detail=f"Wiki-API nicht erreichbar: {e}") preview = [] with get_db() as conn: cur = get_cursor(conn) for member in members[:limit]: page_title = member["title"] page_id = member.get("pageid") # Duplikat-Check cur.execute( "SELECT id, last_imported FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s", (page_title, import_type) ) existing_ref = r2d(cur.fetchone()) # SMW Properties abrufen mapped_fields = {} warnings = [] errors = [] try: smw_props = await client.browse_subject(page_title) if import_type == "exercise": mapped = map_wiki_to_exercise(page_title, page_id, smw_props) elif import_type == "skill": mapped = map_wiki_to_skill(page_title, page_id, smw_props) else: mapped = map_wiki_to_method(page_title, page_id, smw_props) warnings = mapped.pop("warnings", []) # Entferne interne Felder aus der Vorschau for k in ("wiki_page_id", "import_source", "import_id"): mapped.pop(k, None) mapped_fields = mapped # Warnung wenn Pflichtfelder fehlen (nur bei Übungen) if import_type == "exercise": if not mapped.get("goal") and not mapped.get("execution"): errors.append("Pflichtfelder 'Ziel' und 'Durchführung' fehlen") except SmwClientError as e: errors.append(f"Seite nicht abrufbar: {e}") preview.append({ "wiki_page_title": page_title, "wiki_page_id": page_id, "already_imported": existing_ref is not None, "last_imported_at": existing_ref["last_imported"].isoformat() if existing_ref and existing_ref.get("last_imported") is not None else None, "mapped_fields": mapped_fields, "warnings": warnings, "errors": errors, }) return { "category": resolved_category, "import_type": import_type, "total_found": len(members), "preview": preview, } # ------------------------------------------------------------------ # # Execute Import (async Background Task) # # ------------------------------------------------------------------ # @router.post("/import/mediawiki/execute", status_code=202) async def execute_import( body: ImportExecuteRequest, background_tasks: BackgroundTasks, session: dict = Depends(require_admin), ): """ Startet einen MediaWiki-Import als Background-Task. Gibt sofort log_id zurück – Status via GET /import/mediawiki/status/{log_id}. """ profile_id = session["profile_id"] resolved_category = _wiki_category_or_default(body.category, body.import_type) # Log-Eintrag anlegen with get_db() as conn: cur = get_cursor(conn) cur.execute( """INSERT INTO wiki_import_log (import_type, import_status, category, dry_run, reimport, imported_by) VALUES (%s, 'running', %s, %s, %s, %s) RETURNING id""", (body.import_type, resolved_category, body.dry_run, body.reimport_existing, profile_id) ) log_id = cur.fetchone()['id'] conn.commit() # Import asynchron starten background_tasks.add_task( _run_import, log_id=log_id, category=resolved_category, import_type=body.import_type, reimport=body.reimport_existing, dry_run=body.dry_run, limit=body.limit, imported_by=profile_id, ) return { "log_id": log_id, "status": "running", "message": f"Import gestartet. Status: GET /api/import/mediawiki/status/{log_id}", } # ------------------------------------------------------------------ # # Status & Logs # # ------------------------------------------------------------------ # @router.get("/import/mediawiki/status/{log_id}") def get_import_status( log_id: int, session: dict = Depends(require_admin), ): """Status eines laufenden oder abgeschlossenen Imports.""" with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM wiki_import_log WHERE id = %s", (log_id,)) row = r2d(cur.fetchone()) if not row: raise HTTPException(status_code=404, detail="Import-Log nicht gefunden") return row @router.get("/import/mediawiki/logs") def list_import_logs( session: dict = Depends(require_admin), ): """Alle Import-Logs (neueste zuerst, ohne error_log Details).""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT id, import_type, import_status, category, dry_run, reimport, items_total, items_imported, items_skipped, items_failed, imported_by, started_at, finished_at FROM wiki_import_log ORDER BY started_at DESC LIMIT 50""" ) rows = cur.fetchall() return [r2d(r) for r in rows] @router.delete("/import/mediawiki/references/{ref_id}", status_code=204) def delete_import_reference( ref_id: int, session: dict = Depends(require_admin), ): """ Löscht eine Import-Referenz, damit das Item bei nächstem Import als neu behandelt wird (Re-Import ermöglichen). """ with get_db() as conn: cur = get_cursor(conn) cur.execute("DELETE FROM wiki_import_references WHERE id = %s RETURNING id", (ref_id,)) deleted = cur.fetchone() conn.commit() if not deleted: raise HTTPException(status_code=404, detail="Referenz nicht gefunden") # ------------------------------------------------------------------ # # Schema-Discovery (Debug-Hilfsmittel für Admins) # # ------------------------------------------------------------------ # @router.get("/import/mediawiki/discover/{page_title:path}") async def discover_properties( page_title: str, session: dict = Depends(require_admin), ): """ Gibt alle SMW-Properties einer Wiki-Seite zurück. Nützlich um Property-Namen zu ermitteln und smw_mapper.py anzupassen. """ client = SmwClient() try: props = await client.discover_properties(page_title) except SmwClientError as e: raise HTTPException(status_code=502, detail=f"Wiki-API Fehler: {e}") return {"page_title": page_title, "properties": props} # ------------------------------------------------------------------ # # Background Import Worker # # ------------------------------------------------------------------ # async def _run_import( log_id: int, category: str, import_type: str, reimport: bool, dry_run: bool, limit: Optional[int], imported_by: int, ): """Hintergrund-Task: Importiert alle Seiten einer Kategorie.""" client = SmwClient() stats = {"total": 0, "imported": 0, "skipped": 0, "failed": 0} errors = [] try: members = await client.get_category_members(category, limit=limit or 500) stats["total"] = len(members) _update_log(log_id, import_status="running", items_total=stats["total"]) for member in members: page_title = member["title"] page_id = member.get("pageid") # Duplikat-Check: Prüfe ob bereits importiert UND ob tatsächlich existiert if not reimport: with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT local_id FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s", (page_title, import_type) ) ref = cur.fetchone() if ref: # Prüfe ob die Übung tatsächlich existiert if import_type == "exercise": cur.execute("SELECT id FROM exercises WHERE id = %s", (ref['local_id'],)) elif import_type == "skill": cur.execute("SELECT id FROM skills WHERE id = %s", (ref['local_id'],)) else: cur.execute("SELECT id FROM training_methods WHERE id = %s", (ref['local_id'],)) if cur.fetchone(): # Referenz UND Übung existieren → Skip stats["skipped"] += 1 _update_log(log_id, **stats) continue else: # Referenz existiert, aber Übung fehlt → Lösche fehlerhafte Referenz logger.warning("Orphaned reference found for '%s', re-importing", page_title) cur.execute( "DELETE FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s", (page_title, import_type) ) conn.commit() # SMW Properties abrufen (mit Retry) try: smw_props = await _fetch_with_retry(client, page_title) except SmwClientError as e: errors.append({"item": page_title, "error": str(e)}) stats["failed"] += 1 _update_log(log_id, **stats, error_log=errors) continue # Field-Mapping if import_type == "exercise": mapped = map_wiki_to_exercise(page_title, page_id, smw_props) required_ok = bool(mapped.get("goal") or mapped.get("execution")) elif import_type == "skill": mapped = map_wiki_to_skill(page_title, page_id, smw_props) required_ok = True else: mapped = map_wiki_to_method(page_title, page_id, smw_props) required_ok = True if not required_ok: errors.append({"item": page_title, "error": "Pflichtfelder fehlen (Ziel + Durchführung)"}) stats["failed"] += 1 _update_log(log_id, **stats, error_log=errors) continue if dry_run: stats["imported"] += 1 _update_log(log_id, **stats) continue # Speichern try: if import_type == "exercise": local_id = _upsert_exercise(mapped, reimport, imported_by) elif import_type == "skill": local_id = _upsert_skill(mapped, reimport) else: local_id = _upsert_method(mapped, reimport) if local_id: _upsert_wiki_ref(page_title, page_id, import_type, local_id) stats["imported"] += 1 else: stats["skipped"] += 1 except Exception as e: logger.exception("Fehler beim Speichern von '%s'", page_title) errors.append({"item": page_title, "error": str(e)}) stats["failed"] += 1 _update_log(log_id, **stats, error_log=errors) # Abschluss _update_log( log_id, import_status="completed", error_log=errors, finished=True, **stats, ) except Exception as e: logger.exception("Import-Prozess fehlgeschlagen (log_id=%s)", log_id) _update_log(log_id, import_status="failed", error_log=[{"item": "global", "error": str(e)}], finished=True, **stats) async def _fetch_with_retry(client: SmwClient, page_title: str, retries: int = 3) -> dict: """Ruft SMW-Properties mit bis zu 3 Versuchen ab.""" for attempt in range(retries): try: return await client.browse_subject(page_title) except SmwClientError: if attempt == retries - 1: raise await asyncio.sleep(2 ** attempt) # ------------------------------------------------------------------ # # DB-Helper # # ------------------------------------------------------------------ # def _update_log( log_id: int, import_status: Optional[str] = None, items_total: int = 0, total: int = 0, imported: int = 0, skipped: int = 0, failed: int = 0, error_log: Optional[list] = None, finished: bool = False, ): import json as _json with get_db() as conn: cur = get_cursor(conn) fields = [] params = [] if import_status: fields.append("import_status = %s") params.append(import_status) total_val = items_total or total fields += [ "items_total = %s", "items_imported = %s", "items_skipped = %s", "items_failed = %s", ] params += [total_val, imported, skipped, failed] if error_log is not None: fields.append("error_log = %s") params.append(_json.dumps(error_log)) if finished: fields.append("finished_at = NOW()") fields.append("updated_at = NOW()") params.append(log_id) cur.execute( f"UPDATE wiki_import_log SET {', '.join(fields)} WHERE id = %s", params ) conn.commit() def _upsert_exercise(mapped: dict, reimport: bool, created_by: int) -> Optional[int]: """Legt Übung an oder aktualisiert sie (wenn reimport=True).""" import json as _json with get_db() as conn: cur = get_cursor(conn) # Existiert die Übung bereits (über import_id)? cur.execute( "SELECT id FROM exercises WHERE import_id = %s AND import_source = 'mediawiki'", (mapped["import_id"],) ) existing = cur.fetchone() equipment = _json.dumps(mapped.get("equipment", [])) if mapped.get("equipment") else None if existing and reimport: ex_id = existing['id'] cur.execute( """UPDATE exercises SET title = %s, summary = %s, goal = %s, execution = %s, preparation = %s, trainer_notes = %s, duration_min = %s, duration_max = %s, group_size_min = %s, group_size_max = %s, equipment = %s, updated_at = NOW() WHERE id = %s""", ( mapped.get("title"), mapped.get("summary"), mapped.get("goal"), mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"), mapped.get("duration_min"), mapped.get("duration_max"), mapped.get("group_size_min"), mapped.get("group_size_max"), equipment, ex_id ) ) elif existing and not reimport: return None # Übersprungen else: cur.execute( """INSERT INTO exercises (title, summary, goal, execution, preparation, trainer_notes, duration_min, duration_max, group_size_min, group_size_max, equipment, visibility, status, import_source, import_id, created_by) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id""", ( mapped.get("title"), mapped.get("summary"), mapped.get("goal"), mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"), mapped.get("duration_min"), mapped.get("duration_max"), mapped.get("group_size_min"), mapped.get("group_size_max"), equipment, mapped.get("visibility", "private"), mapped.get("status", "draft"), "mediawiki", mapped.get("import_id"), created_by, ) ) ex_id = cur.fetchone()['id'] conn.commit() # Katalog-Zuordnungen (Focus Areas, Skills, etc.) _assign_exercise_catalogs(cur, conn, ex_id, mapped) # Skill-Zuordnungen mit Levels skill_assignments = build_skill_assignments(mapped) _assign_exercise_skills(cur, conn, ex_id, skill_assignments) return ex_id def _find_skill_id_by_label(cur, label: str) -> Optional[int]: """Katalog-Skill anhand Wiki-Label; exakt, ohne Leerzeichen, zuletzt Teilstring.""" if not label or not str(label).strip(): return None raw = str(label).strip() cur.execute("SELECT id FROM skills WHERE TRIM(name) ILIKE %s", (raw,)) row = cur.fetchone() if row: return row["id"] comp = "".join(raw.split()) if len(comp) >= 3: cur.execute( "SELECT id FROM skills WHERE regexp_replace(TRIM(name), E'\\s+', '', 'g') ILIKE %s", (comp,), ) row = cur.fetchone() if row: return row["id"] cur.execute("SELECT id FROM skills WHERE name ILIKE %s LIMIT 1", (f"%{raw}%",)) row = cur.fetchone() return row["id"] if row else None def _assign_exercise_catalogs(cur, conn, exercise_id: int, mapped: dict): """Weist M:N Katalog-Zuordnungen für eine importierte Übung zu.""" # Focus Areas for idx, name in enumerate(mapped.get("focus_area_names", [])): cur.execute("SELECT id FROM focus_areas WHERE name ILIKE %s", (name,)) row = cur.fetchone() if row: cur.execute( """INSERT INTO exercise_focus_areas (exercise_id, focus_area_id, is_primary) VALUES (%s, %s, %s) ON CONFLICT (exercise_id, focus_area_id) DO NOTHING""", (exercise_id, row['id'], idx == 0) ) else: logger.warning("Focus Area '%s' nicht im Katalog gefunden", name) # Stilrichtungen for idx, name in enumerate(mapped.get("style_names", [])): cur.execute("SELECT id FROM style_directions WHERE name ILIKE %s", (name.strip(),)) row = cur.fetchone() if row: cur.execute( """INSERT INTO exercise_style_directions (exercise_id, style_direction_id, is_primary) VALUES (%s, %s, %s) ON CONFLICT (exercise_id, style_direction_id) DO NOTHING""", (exercise_id, row["id"], idx == 0), ) else: logger.warning("Stilrichtung '%s' nicht im Katalog gefunden", name) # Target Groups for name in mapped.get("target_group_names", []): cur.execute("SELECT id FROM target_groups WHERE name ILIKE %s", (name,)) row = cur.fetchone() if row: cur.execute( """INSERT INTO exercise_target_groups (exercise_id, target_group_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (exercise_id, row['id']) ) # Fähigkeiten: nur über _assign_exercise_skills (Levels + is_primary), nicht doppelt hier conn.commit() def _assign_exercise_skills(cur, conn, exercise_id: int, skill_assignments: list): """Weist Skill-Zuordnungen mit Levels einer Übung zu.""" for assignment in skill_assignments: sid = _find_skill_id_by_label(cur, assignment.get("skill_name") or "") if not sid: logger.warning("Skill '%s' nicht im Katalog gefunden", assignment.get("skill_name")) continue cur.execute( """INSERT INTO exercise_skills (exercise_id, skill_id, target_level, required_level, intensity, is_primary) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (exercise_id, skill_id) DO UPDATE SET target_level = EXCLUDED.target_level, required_level = EXCLUDED.required_level, intensity = EXCLUDED.intensity, is_primary = false""", ( exercise_id, sid, assignment.get("target_level"), assignment.get("required_level"), normalize_exercise_skill_intensity(assignment.get("intensity")), False, ) ) conn.commit() def _upsert_skill(mapped: dict, _reimport: bool) -> Optional[int]: """Legt Skill an oder aktualisiert bestehenden Datensatz bei Namenskonflikt (Wiki-Reimport).""" with get_db() as conn: cur = get_cursor(conn) # Kategorie auflösen (optional) category_id = None if mapped.get("category_name"): cur.execute("SELECT id FROM skill_categories WHERE name ILIKE %s", (mapped["category_name"],)) row = cur.fetchone() if row: category_id = row["id"] cur.execute( """INSERT INTO skills (name, description, category_id, karate_relevance, relevance_level) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (name) DO UPDATE SET description = COALESCE(EXCLUDED.description, skills.description), category_id = COALESCE(EXCLUDED.category_id, skills.category_id), karate_relevance = COALESCE(EXCLUDED.karate_relevance, skills.karate_relevance), relevance_level = COALESCE(EXCLUDED.relevance_level, skills.relevance_level), updated_at = NOW() RETURNING id""", ( mapped["name"], mapped.get("description"), category_id, mapped.get("karate_relevance"), mapped.get("relevance_level"), ), ) skill_id = cur.fetchone()["id"] conn.commit() return skill_id def _upsert_method(mapped: dict, _reimport: bool) -> Optional[int]: """Legt Trainingsmethode an oder aktualisiert Beschreibung nach Namen. training_methods.name hat keinen UNIQUE-Constraint — daher kein ON CONFLICT (name). """ name = (mapped.get("name") or "").strip() desc = mapped.get("description") if not name: return None with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT id FROM training_methods WHERE TRIM(name) ILIKE TRIM(%s)", (name,), ) row = cur.fetchone() if row: mid = row["id"] cur.execute( "UPDATE training_methods SET description = %s, updated_at = NOW() WHERE id = %s", (desc, mid), ) conn.commit() return mid cur.execute( "INSERT INTO training_methods (name, description) VALUES (%s, %s) RETURNING id", (name, desc), ) mid = cur.fetchone()["id"] conn.commit() return mid def _upsert_wiki_ref( page_title: str, page_id: Optional[int], content_type: str, local_id: int ): """Legt Import-Referenz an oder aktualisiert last_imported.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """INSERT INTO wiki_import_references (wiki_page_title, wiki_page_id, content_type, local_id, last_imported) VALUES (%s, %s, %s, %s, NOW()) ON CONFLICT (wiki_page_title, content_type) DO UPDATE SET wiki_page_id = EXCLUDED.wiki_page_id, local_id = EXCLUDED.local_id, last_imported = NOW()""", (page_title, page_id, content_type, local_id) ) conn.commit()