shinkan-jinkendo/backend/routers/import_wiki.py
Lars a02df32ce2
Some checks failed
Deploy Development / deploy (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 5s
Test Suite / playwright-tests (push) Failing after 1m58s
fix: set created_by automatically on MediaWiki import
- Added imported_by parameter to _run_import and _upsert_exercise
- Exercises now automatically get created_by set to importing user
- Fixes visibility issue where imported exercises were invisible

Related: Wiki import - exercise visibility fix
2026-04-27 09:52:18 +02:00

660 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MediaWiki Import Router
Importiert Übungen, Fähigkeiten und Methoden aus Semantic MediaWiki
direkt via API (kein XML-Export). Nur Super-Admin darf importieren.
Wiki-URL: https://karatetrainer.net/api.php
"""
import asyncio
import logging
import os
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Query
from pydantic import BaseModel
from db import get_db, get_cursor, r2d
from auth import require_auth
from smw_client import SmwClient, SmwClientError
from smw_mapper import map_wiki_to_exercise, map_wiki_to_skill, map_wiki_to_method, build_skill_assignments
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["import"])
# Kategorie-Namen aus .env (echte Namen von karatetrainer.net)
CATEGORY_EXERCISES = os.getenv("MEDIAWIKI_CATEGORY_EXERCISES", "Übungen")
CATEGORY_SKILLS = os.getenv("MEDIAWIKI_CATEGORY_SKILLS", "Fähigkeitsbeschreibung")
CATEGORY_METHODS = os.getenv("MEDIAWIKI_CATEGORY_METHODS", "Methodenbeschreibung")
CATEGORY_MODELS = os.getenv("MEDIAWIKI_CATEGORY_MODELS", "Reifegradmodelle")
# ------------------------------------------------------------------ #
# Pydantic Models #
# ------------------------------------------------------------------ #
class ImportExecuteRequest(BaseModel):
category: str
import_type: str = "exercise" # exercise | skill | method
reimport_existing: bool = False
dry_run: bool = False
limit: Optional[int] = None
# ------------------------------------------------------------------ #
# Auth-Hilfsfunktion #
# ------------------------------------------------------------------ #
def require_admin(session: dict = Depends(require_auth)) -> dict:
"""Nur Super-Admins dürfen importieren."""
if session.get("role") not in ("admin", "superadmin"):
raise HTTPException(status_code=403, detail="Nur Admins dürfen den Import ausführen")
return session
# ------------------------------------------------------------------ #
# Preview Endpoint #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/preview")
async def preview_import(
category: str = Query(default=CATEGORY_EXERCISES),
import_type: str = Query(default="exercise"),
limit: int = Query(default=10, ge=1, le=500),
session: dict = Depends(require_admin),
):
"""
Zeigt Vorschau: Welche Seiten würden importiert werden?
Überprüft Duplikate und mapped Felder ohne zu speichern.
"""
client = SmwClient()
try:
members = await client.get_category_members(category, limit=limit)
except SmwClientError as e:
raise HTTPException(status_code=502, detail=f"Wiki-API nicht erreichbar: {e}")
preview = []
with get_db() as conn:
cur = get_cursor(conn)
for member in members[:limit]:
page_title = member["title"]
page_id = member.get("pageid")
# Duplikat-Check
cur.execute(
"SELECT id, last_imported FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
existing_ref = r2d(cur.fetchone())
# SMW Properties abrufen
mapped_fields = {}
warnings = []
errors = []
try:
smw_props = await client.browse_subject(page_title)
if import_type == "exercise":
mapped = map_wiki_to_exercise(page_title, page_id, smw_props)
elif import_type == "skill":
mapped = map_wiki_to_skill(page_title, page_id, smw_props)
else:
mapped = map_wiki_to_method(page_title, page_id, smw_props)
warnings = mapped.pop("warnings", [])
# Entferne interne Felder aus der Vorschau
for k in ("wiki_page_id", "import_source", "import_id"):
mapped.pop(k, None)
mapped_fields = mapped
# Warnung wenn Pflichtfelder fehlen (nur bei Übungen)
if import_type == "exercise":
if not mapped.get("goal") and not mapped.get("execution"):
errors.append("Pflichtfelder 'Ziel' und 'Durchführung' fehlen")
except SmwClientError as e:
errors.append(f"Seite nicht abrufbar: {e}")
preview.append({
"wiki_page_title": page_title,
"wiki_page_id": page_id,
"already_imported": existing_ref is not None,
"last_imported_at": existing_ref["last_imported"].isoformat() if existing_ref else None,
"mapped_fields": mapped_fields,
"warnings": warnings,
"errors": errors,
})
return {
"category": category,
"import_type": import_type,
"total_found": len(members),
"preview": preview,
}
# ------------------------------------------------------------------ #
# Execute Import (async Background Task) #
# ------------------------------------------------------------------ #
@router.post("/import/mediawiki/execute", status_code=202)
async def execute_import(
body: ImportExecuteRequest,
background_tasks: BackgroundTasks,
session: dict = Depends(require_admin),
):
"""
Startet einen MediaWiki-Import als Background-Task.
Gibt sofort log_id zurück Status via GET /import/mediawiki/status/{log_id}.
"""
profile_id = session["profile_id"]
# Log-Eintrag anlegen
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO wiki_import_log
(import_type, import_status, category, dry_run, reimport, imported_by)
VALUES (%s, 'running', %s, %s, %s, %s)
RETURNING id""",
(body.import_type, body.category, body.dry_run, body.reimport_existing, profile_id)
)
log_id = cur.fetchone()['id']
conn.commit()
# Import asynchron starten
background_tasks.add_task(
_run_import,
log_id=log_id,
category=body.category,
import_type=body.import_type,
reimport=body.reimport_existing,
dry_run=body.dry_run,
limit=body.limit,
imported_by=profile_id,
)
return {
"log_id": log_id,
"status": "running",
"message": f"Import gestartet. Status: GET /api/import/mediawiki/status/{log_id}",
}
# ------------------------------------------------------------------ #
# Status & Logs #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/status/{log_id}")
def get_import_status(
log_id: int,
session: dict = Depends(require_admin),
):
"""Status eines laufenden oder abgeschlossenen Imports."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM wiki_import_log WHERE id = %s", (log_id,))
row = r2d(cur.fetchone())
if not row:
raise HTTPException(status_code=404, detail="Import-Log nicht gefunden")
return row
@router.get("/import/mediawiki/logs")
def list_import_logs(
session: dict = Depends(require_admin),
):
"""Alle Import-Logs (neueste zuerst, ohne error_log Details)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, import_type, import_status, category, dry_run, reimport,
items_total, items_imported, items_skipped, items_failed,
imported_by, started_at, finished_at
FROM wiki_import_log
ORDER BY started_at DESC
LIMIT 50"""
)
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.delete("/import/mediawiki/references/{ref_id}", status_code=204)
def delete_import_reference(
ref_id: int,
session: dict = Depends(require_admin),
):
"""
Löscht eine Import-Referenz, damit das Item bei nächstem Import
als neu behandelt wird (Re-Import ermöglichen).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM wiki_import_references WHERE id = %s RETURNING id", (ref_id,))
deleted = cur.fetchone()
conn.commit()
if not deleted:
raise HTTPException(status_code=404, detail="Referenz nicht gefunden")
# ------------------------------------------------------------------ #
# Schema-Discovery (Debug-Hilfsmittel für Admins) #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/discover/{page_title:path}")
async def discover_properties(
page_title: str,
session: dict = Depends(require_admin),
):
"""
Gibt alle SMW-Properties einer Wiki-Seite zurück.
Nützlich um Property-Namen zu ermitteln und smw_mapper.py anzupassen.
"""
client = SmwClient()
try:
props = await client.discover_properties(page_title)
except SmwClientError as e:
raise HTTPException(status_code=502, detail=f"Wiki-API Fehler: {e}")
return {"page_title": page_title, "properties": props}
# ------------------------------------------------------------------ #
# Background Import Worker #
# ------------------------------------------------------------------ #
async def _run_import(
log_id: int,
category: str,
import_type: str,
reimport: bool,
dry_run: bool,
limit: Optional[int],
imported_by: int,
):
"""Hintergrund-Task: Importiert alle Seiten einer Kategorie."""
client = SmwClient()
stats = {"total": 0, "imported": 0, "skipped": 0, "failed": 0}
errors = []
try:
members = await client.get_category_members(category, limit=limit or 500)
stats["total"] = len(members)
_update_log(log_id, import_status="running", items_total=stats["total"])
for member in members:
page_title = member["title"]
page_id = member.get("pageid")
# Duplikat-Check: Prüfe ob bereits importiert UND ob tatsächlich existiert
if not reimport:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT local_id FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
ref = cur.fetchone()
if ref:
# Prüfe ob die Übung tatsächlich existiert
if import_type == "exercise":
cur.execute("SELECT id FROM exercises WHERE id = %s", (ref['local_id'],))
elif import_type == "skill":
cur.execute("SELECT id FROM skills WHERE id = %s", (ref['local_id'],))
else:
cur.execute("SELECT id FROM training_methods WHERE id = %s", (ref['local_id'],))
if cur.fetchone():
# Referenz UND Übung existieren → Skip
stats["skipped"] += 1
_update_log(log_id, **stats)
continue
else:
# Referenz existiert, aber Übung fehlt → Lösche fehlerhafte Referenz
logger.warning("Orphaned reference found for '%s', re-importing", page_title)
cur.execute(
"DELETE FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
conn.commit()
# SMW Properties abrufen (mit Retry)
try:
smw_props = await _fetch_with_retry(client, page_title)
except SmwClientError as e:
errors.append({"item": page_title, "error": str(e)})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
continue
# Field-Mapping
if import_type == "exercise":
mapped = map_wiki_to_exercise(page_title, page_id, smw_props)
required_ok = bool(mapped.get("goal") or mapped.get("execution"))
elif import_type == "skill":
mapped = map_wiki_to_skill(page_title, page_id, smw_props)
required_ok = True
else:
mapped = map_wiki_to_method(page_title, page_id, smw_props)
required_ok = True
if not required_ok:
errors.append({"item": page_title, "error": "Pflichtfelder fehlen (Ziel + Durchführung)"})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
continue
if dry_run:
stats["imported"] += 1
_update_log(log_id, **stats)
continue
# Speichern
try:
if import_type == "exercise":
local_id = _upsert_exercise(mapped, reimport, imported_by)
elif import_type == "skill":
local_id = _upsert_skill(mapped, reimport)
else:
local_id = _upsert_method(mapped, reimport)
if local_id:
_upsert_wiki_ref(page_title, page_id, import_type, local_id)
stats["imported"] += 1
else:
stats["skipped"] += 1
except Exception as e:
logger.exception("Fehler beim Speichern von '%s'", page_title)
errors.append({"item": page_title, "error": str(e)})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
# Abschluss
_update_log(
log_id,
import_status="completed",
error_log=errors,
finished=True,
**stats,
)
except Exception as e:
logger.exception("Import-Prozess fehlgeschlagen (log_id=%s)", log_id)
_update_log(log_id, import_status="failed", error_log=[{"item": "global", "error": str(e)}], finished=True, **stats)
async def _fetch_with_retry(client: SmwClient, page_title: str, retries: int = 3) -> dict:
"""Ruft SMW-Properties mit bis zu 3 Versuchen ab."""
for attempt in range(retries):
try:
return await client.browse_subject(page_title)
except SmwClientError:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt)
# ------------------------------------------------------------------ #
# DB-Helper #
# ------------------------------------------------------------------ #
def _update_log(
log_id: int,
import_status: Optional[str] = None,
items_total: int = 0,
total: int = 0,
imported: int = 0,
skipped: int = 0,
failed: int = 0,
error_log: Optional[list] = None,
finished: bool = False,
):
import json as _json
with get_db() as conn:
cur = get_cursor(conn)
fields = []
params = []
if import_status:
fields.append("import_status = %s")
params.append(import_status)
total_val = items_total or total
fields += [
"items_total = %s",
"items_imported = %s",
"items_skipped = %s",
"items_failed = %s",
]
params += [total_val, imported, skipped, failed]
if error_log is not None:
fields.append("error_log = %s")
params.append(_json.dumps(error_log))
if finished:
fields.append("finished_at = NOW()")
fields.append("updated_at = NOW()")
params.append(log_id)
cur.execute(
f"UPDATE wiki_import_log SET {', '.join(fields)} WHERE id = %s",
params
)
conn.commit()
def _upsert_exercise(mapped: dict, reimport: bool, created_by: int) -> Optional[int]:
"""Legt Übung an oder aktualisiert sie (wenn reimport=True)."""
import json as _json
with get_db() as conn:
cur = get_cursor(conn)
# Existiert die Übung bereits (über import_id)?
cur.execute(
"SELECT id FROM exercises WHERE import_id = %s AND import_source = 'mediawiki'",
(mapped["import_id"],)
)
existing = cur.fetchone()
equipment = _json.dumps(mapped.get("equipment", [])) if mapped.get("equipment") else None
if existing and reimport:
ex_id = existing['id']
cur.execute(
"""UPDATE exercises SET
title = %s, summary = %s, goal = %s, execution = %s,
preparation = %s, trainer_notes = %s,
duration_min = %s, duration_max = %s,
group_size_min = %s, group_size_max = %s,
equipment = %s, updated_at = NOW()
WHERE id = %s""",
(
mapped.get("title"), mapped.get("summary"), mapped.get("goal"),
mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"),
mapped.get("duration_min"), mapped.get("duration_max"),
mapped.get("group_size_min"), mapped.get("group_size_max"),
equipment, ex_id
)
)
elif existing and not reimport:
return None # Übersprungen
else:
cur.execute(
"""INSERT INTO exercises
(title, summary, goal, execution, preparation, trainer_notes,
duration_min, duration_max, group_size_min, group_size_max,
equipment, visibility, status, import_source, import_id, created_by)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id""",
(
mapped.get("title"), mapped.get("summary"), mapped.get("goal"),
mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"),
mapped.get("duration_min"), mapped.get("duration_max"),
mapped.get("group_size_min"), mapped.get("group_size_max"),
equipment,
mapped.get("visibility", "private"),
mapped.get("status", "draft"),
"mediawiki",
mapped.get("import_id"),
created_by,
)
)
ex_id = cur.fetchone()['id']
conn.commit()
# Katalog-Zuordnungen (Focus Areas, Skills, etc.)
_assign_exercise_catalogs(cur, conn, ex_id, mapped)
# Skill-Zuordnungen mit Levels
skill_assignments = build_skill_assignments(mapped)
_assign_exercise_skills(cur, conn, ex_id, skill_assignments)
return ex_id
def _assign_exercise_catalogs(cur, conn, exercise_id: int, mapped: dict):
"""Weist M:N Katalog-Zuordnungen für eine importierte Übung zu."""
# Focus Areas
for idx, name in enumerate(mapped.get("focus_area_names", [])):
cur.execute("SELECT id FROM focus_areas WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_focus_areas (exercise_id, focus_area_id, is_primary)
VALUES (%s, %s, %s)
ON CONFLICT (exercise_id, focus_area_id) DO NOTHING""",
(exercise_id, row['id'], idx == 0)
)
else:
logger.warning("Focus Area '%s' nicht im Katalog gefunden", name)
# Style Directions
for name in mapped.get("style_names", []):
cur.execute("SELECT id FROM style_directions WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_training_styles (exercise_id, style_direction_id)
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
(exercise_id, row['id'])
)
# Target Groups
for name in mapped.get("target_group_names", []):
cur.execute("SELECT id FROM target_groups WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_target_groups (exercise_id, target_group_id)
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
(exercise_id, row['id'])
)
# Skills
for name in mapped.get("skill_names", []):
cur.execute("SELECT id FROM skills WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_skills (exercise_id, skill_id)
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
(exercise_id, row['id'])
)
conn.commit()
def _assign_exercise_skills(cur, conn, exercise_id: int, skill_assignments: list):
"""Weist Skill-Zuordnungen mit Levels einer Übung zu."""
for assignment in skill_assignments:
cur.execute("SELECT id FROM skills WHERE name ILIKE %s", (assignment["skill_name"],))
row = cur.fetchone()
if not row:
logger.warning("Skill '%s' nicht im Katalog gefunden", assignment["skill_name"])
continue
cur.execute(
"""INSERT INTO exercise_skills
(exercise_id, skill_id, target_level, required_level, intensity, is_primary)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (exercise_id, skill_id) DO UPDATE SET
target_level = EXCLUDED.target_level,
is_primary = EXCLUDED.is_primary""",
(
exercise_id, row['id'],
assignment.get("target_level"),
assignment.get("required_level"),
assignment.get("intensity"),
assignment.get("is_primary", False),
)
)
conn.commit()
def _upsert_skill(mapped: dict, reimport: bool) -> Optional[int]:
"""Legt Skill an oder überspringt bei Duplikat."""
with get_db() as conn:
cur = get_cursor(conn)
# Kategorie auflösen (optional)
category_id = None
if mapped.get("category_name"):
cur.execute("SELECT id FROM skill_categories WHERE name ILIKE %s", (mapped["category_name"],))
row = cur.fetchone()
if row:
category_id = row['id']
cur.execute(
"""INSERT INTO skills (name, description, category_id)
VALUES (%s, %s, %s)
ON CONFLICT (name) DO UPDATE SET
description = EXCLUDED.description
RETURNING id""",
(mapped["name"], mapped.get("description"), category_id)
)
skill_id = cur.fetchone()['id']
conn.commit()
return skill_id
def _upsert_method(mapped: dict, reimport: bool) -> Optional[int]:
"""Legt Trainingsmethode an oder aktualisiert Beschreibung."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO training_methods (name, description)
VALUES (%s, %s)
ON CONFLICT (name) DO UPDATE SET
description = EXCLUDED.description
RETURNING id""",
(mapped["name"], mapped.get("description"))
)
method_id = cur.fetchone()['id']
conn.commit()
return method_id
def _upsert_wiki_ref(
page_title: str, page_id: Optional[int], content_type: str, local_id: int
):
"""Legt Import-Referenz an oder aktualisiert last_imported."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO wiki_import_references
(wiki_page_title, wiki_page_id, content_type, local_id, last_imported)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (wiki_page_title, content_type) DO UPDATE SET
wiki_page_id = EXCLUDED.wiki_page_id,
local_id = EXCLUDED.local_id,
last_imported = NOW()""",
(page_title, page_id, content_type, local_id)
)
conn.commit()