shinkan-jinkendo/backend/routers/import_wiki.py
Lars 623af621b4
All checks were successful
Deploy Development / deploy (push) Successful in 38s
Test Suite / pytest-backend (push) Successful in 40s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m17s
Test Suite / pytest-backend (pull_request) Successful in 35s
Test Suite / lint-backend (pull_request) Successful in 0s
Test Suite / build-frontend (pull_request) Successful in 12s
Test Suite / k6 /health Baseline (pull_request) Successful in 33s
Test Suite / playwright-tests (pull_request) Successful in 1m8s
Enhance MediaWiki import functionality with category normalization and skill attributes
- Introduced `_normalize_mw_category` function to clean category names for API calls, ensuring consistent handling of category prefixes.
- Updated `SmwClient` methods to utilize normalized category names, improving data retrieval accuracy.
- Added `_wiki_category_or_default` function to provide default categories based on import type, enhancing user experience during imports.
- Integrated new fields `karate_relevance` and `relevance_level` into various admin components, allowing for better skill management.
- Incremented app version to 0.8.145 and updated changelog to reflect these changes.
2026-05-16 11:05:15 +02:00

718 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MediaWiki Import Router
Importiert Übungen, Fähigkeiten und Methoden aus Semantic MediaWiki
direkt via API (kein XML-Export). Nur Super-Admin darf importieren.
Wiki-URL: https://karatetrainer.net/api.php
"""
import asyncio
import logging
import os
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Query
from pydantic import BaseModel
from db import get_db, get_cursor, r2d
from auth import require_auth
from smw_client import SmwClient, SmwClientError
from smw_mapper import map_wiki_to_exercise, map_wiki_to_skill, map_wiki_to_method, build_skill_assignments
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["import"])
# Kategorie-Namen aus .env (echte Namen von karatetrainer.net)
CATEGORY_EXERCISES = os.getenv("MEDIAWIKI_CATEGORY_EXERCISES", "Übungen")
CATEGORY_SKILLS = os.getenv("MEDIAWIKI_CATEGORY_SKILLS", "Fähigkeitsbeschreibung")
CATEGORY_METHODS = os.getenv("MEDIAWIKI_CATEGORY_METHODS", "Methodenbeschreibung")
CATEGORY_MODELS = os.getenv("MEDIAWIKI_CATEGORY_MODELS", "Reifegradmodelle")
def _wiki_category_or_default(category: Optional[str], import_type: str) -> str:
"""Leeres category ⇒ Standard je import_type."""
if (category or "").strip():
return category.strip()
if import_type == "skill":
return CATEGORY_SKILLS
if import_type == "method":
return CATEGORY_METHODS
return CATEGORY_EXERCISES
# ------------------------------------------------------------------ #
# Pydantic Models #
# ------------------------------------------------------------------ #
class ImportExecuteRequest(BaseModel):
category: str
import_type: str = "exercise" # exercise | skill | method
reimport_existing: bool = False
dry_run: bool = False
limit: Optional[int] = None
# ------------------------------------------------------------------ #
# Auth-Hilfsfunktion #
# ------------------------------------------------------------------ #
def require_admin(session: dict = Depends(require_auth)) -> dict:
"""Nur Super-Admins dürfen importieren."""
if session.get("role") not in ("admin", "superadmin"):
raise HTTPException(status_code=403, detail="Nur Admins dürfen den Import ausführen")
return session
# ------------------------------------------------------------------ #
# Preview Endpoint #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/preview")
async def preview_import(
category: Optional[str] = Query(default=None),
import_type: str = Query(default="exercise"),
limit: int = Query(default=10, ge=1, le=500),
session: dict = Depends(require_admin),
):
"""
Zeigt Vorschau: Welche Seiten würden importiert werden?
Überprüft Duplikate und mapped Felder ohne zu speichern.
"""
resolved_category = _wiki_category_or_default(category, import_type)
client = SmwClient()
try:
members = await client.get_category_members(resolved_category, limit=limit)
except SmwClientError as e:
raise HTTPException(status_code=502, detail=f"Wiki-API nicht erreichbar: {e}")
preview = []
with get_db() as conn:
cur = get_cursor(conn)
for member in members[:limit]:
page_title = member["title"]
page_id = member.get("pageid")
# Duplikat-Check
cur.execute(
"SELECT id, last_imported FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
existing_ref = r2d(cur.fetchone())
# SMW Properties abrufen
mapped_fields = {}
warnings = []
errors = []
try:
smw_props = await client.browse_subject(page_title)
if import_type == "exercise":
mapped = map_wiki_to_exercise(page_title, page_id, smw_props)
elif import_type == "skill":
mapped = map_wiki_to_skill(page_title, page_id, smw_props)
else:
mapped = map_wiki_to_method(page_title, page_id, smw_props)
warnings = mapped.pop("warnings", [])
# Entferne interne Felder aus der Vorschau
for k in ("wiki_page_id", "import_source", "import_id"):
mapped.pop(k, None)
mapped_fields = mapped
# Warnung wenn Pflichtfelder fehlen (nur bei Übungen)
if import_type == "exercise":
if not mapped.get("goal") and not mapped.get("execution"):
errors.append("Pflichtfelder 'Ziel' und 'Durchführung' fehlen")
except SmwClientError as e:
errors.append(f"Seite nicht abrufbar: {e}")
preview.append({
"wiki_page_title": page_title,
"wiki_page_id": page_id,
"already_imported": existing_ref is not None,
"last_imported_at": existing_ref["last_imported"].isoformat()
if existing_ref and existing_ref.get("last_imported") is not None
else None,
"mapped_fields": mapped_fields,
"warnings": warnings,
"errors": errors,
})
return {
"category": resolved_category,
"import_type": import_type,
"total_found": len(members),
"preview": preview,
}
# ------------------------------------------------------------------ #
# Execute Import (async Background Task) #
# ------------------------------------------------------------------ #
@router.post("/import/mediawiki/execute", status_code=202)
async def execute_import(
body: ImportExecuteRequest,
background_tasks: BackgroundTasks,
session: dict = Depends(require_admin),
):
"""
Startet einen MediaWiki-Import als Background-Task.
Gibt sofort log_id zurück Status via GET /import/mediawiki/status/{log_id}.
"""
profile_id = session["profile_id"]
resolved_category = _wiki_category_or_default(body.category, body.import_type)
# Log-Eintrag anlegen
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO wiki_import_log
(import_type, import_status, category, dry_run, reimport, imported_by)
VALUES (%s, 'running', %s, %s, %s, %s)
RETURNING id""",
(body.import_type, resolved_category, body.dry_run, body.reimport_existing, profile_id)
)
log_id = cur.fetchone()['id']
conn.commit()
# Import asynchron starten
background_tasks.add_task(
_run_import,
log_id=log_id,
category=resolved_category,
import_type=body.import_type,
reimport=body.reimport_existing,
dry_run=body.dry_run,
limit=body.limit,
imported_by=profile_id,
)
return {
"log_id": log_id,
"status": "running",
"message": f"Import gestartet. Status: GET /api/import/mediawiki/status/{log_id}",
}
# ------------------------------------------------------------------ #
# Status & Logs #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/status/{log_id}")
def get_import_status(
log_id: int,
session: dict = Depends(require_admin),
):
"""Status eines laufenden oder abgeschlossenen Imports."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM wiki_import_log WHERE id = %s", (log_id,))
row = r2d(cur.fetchone())
if not row:
raise HTTPException(status_code=404, detail="Import-Log nicht gefunden")
return row
@router.get("/import/mediawiki/logs")
def list_import_logs(
session: dict = Depends(require_admin),
):
"""Alle Import-Logs (neueste zuerst, ohne error_log Details)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, import_type, import_status, category, dry_run, reimport,
items_total, items_imported, items_skipped, items_failed,
imported_by, started_at, finished_at
FROM wiki_import_log
ORDER BY started_at DESC
LIMIT 50"""
)
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.delete("/import/mediawiki/references/{ref_id}", status_code=204)
def delete_import_reference(
ref_id: int,
session: dict = Depends(require_admin),
):
"""
Löscht eine Import-Referenz, damit das Item bei nächstem Import
als neu behandelt wird (Re-Import ermöglichen).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM wiki_import_references WHERE id = %s RETURNING id", (ref_id,))
deleted = cur.fetchone()
conn.commit()
if not deleted:
raise HTTPException(status_code=404, detail="Referenz nicht gefunden")
# ------------------------------------------------------------------ #
# Schema-Discovery (Debug-Hilfsmittel für Admins) #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/discover/{page_title:path}")
async def discover_properties(
page_title: str,
session: dict = Depends(require_admin),
):
"""
Gibt alle SMW-Properties einer Wiki-Seite zurück.
Nützlich um Property-Namen zu ermitteln und smw_mapper.py anzupassen.
"""
client = SmwClient()
try:
props = await client.discover_properties(page_title)
except SmwClientError as e:
raise HTTPException(status_code=502, detail=f"Wiki-API Fehler: {e}")
return {"page_title": page_title, "properties": props}
# ------------------------------------------------------------------ #
# Background Import Worker #
# ------------------------------------------------------------------ #
async def _run_import(
log_id: int,
category: str,
import_type: str,
reimport: bool,
dry_run: bool,
limit: Optional[int],
imported_by: int,
):
"""Hintergrund-Task: Importiert alle Seiten einer Kategorie."""
client = SmwClient()
stats = {"total": 0, "imported": 0, "skipped": 0, "failed": 0}
errors = []
try:
members = await client.get_category_members(category, limit=limit or 500)
stats["total"] = len(members)
_update_log(log_id, import_status="running", items_total=stats["total"])
for member in members:
page_title = member["title"]
page_id = member.get("pageid")
# Duplikat-Check: Prüfe ob bereits importiert UND ob tatsächlich existiert
if not reimport:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT local_id FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
ref = cur.fetchone()
if ref:
# Prüfe ob die Übung tatsächlich existiert
if import_type == "exercise":
cur.execute("SELECT id FROM exercises WHERE id = %s", (ref['local_id'],))
elif import_type == "skill":
cur.execute("SELECT id FROM skills WHERE id = %s", (ref['local_id'],))
else:
cur.execute("SELECT id FROM training_methods WHERE id = %s", (ref['local_id'],))
if cur.fetchone():
# Referenz UND Übung existieren → Skip
stats["skipped"] += 1
_update_log(log_id, **stats)
continue
else:
# Referenz existiert, aber Übung fehlt → Lösche fehlerhafte Referenz
logger.warning("Orphaned reference found for '%s', re-importing", page_title)
cur.execute(
"DELETE FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
conn.commit()
# SMW Properties abrufen (mit Retry)
try:
smw_props = await _fetch_with_retry(client, page_title)
except SmwClientError as e:
errors.append({"item": page_title, "error": str(e)})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
continue
# Field-Mapping
if import_type == "exercise":
mapped = map_wiki_to_exercise(page_title, page_id, smw_props)
required_ok = bool(mapped.get("goal") or mapped.get("execution"))
elif import_type == "skill":
mapped = map_wiki_to_skill(page_title, page_id, smw_props)
required_ok = True
else:
mapped = map_wiki_to_method(page_title, page_id, smw_props)
required_ok = True
if not required_ok:
errors.append({"item": page_title, "error": "Pflichtfelder fehlen (Ziel + Durchführung)"})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
continue
if dry_run:
stats["imported"] += 1
_update_log(log_id, **stats)
continue
# Speichern
try:
if import_type == "exercise":
local_id = _upsert_exercise(mapped, reimport, imported_by)
elif import_type == "skill":
local_id = _upsert_skill(mapped, reimport)
else:
local_id = _upsert_method(mapped, reimport)
if local_id:
_upsert_wiki_ref(page_title, page_id, import_type, local_id)
stats["imported"] += 1
else:
stats["skipped"] += 1
except Exception as e:
logger.exception("Fehler beim Speichern von '%s'", page_title)
errors.append({"item": page_title, "error": str(e)})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
# Abschluss
_update_log(
log_id,
import_status="completed",
error_log=errors,
finished=True,
**stats,
)
except Exception as e:
logger.exception("Import-Prozess fehlgeschlagen (log_id=%s)", log_id)
_update_log(log_id, import_status="failed", error_log=[{"item": "global", "error": str(e)}], finished=True, **stats)
async def _fetch_with_retry(client: SmwClient, page_title: str, retries: int = 3) -> dict:
"""Ruft SMW-Properties mit bis zu 3 Versuchen ab."""
for attempt in range(retries):
try:
return await client.browse_subject(page_title)
except SmwClientError:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt)
# ------------------------------------------------------------------ #
# DB-Helper #
# ------------------------------------------------------------------ #
def _update_log(
log_id: int,
import_status: Optional[str] = None,
items_total: int = 0,
total: int = 0,
imported: int = 0,
skipped: int = 0,
failed: int = 0,
error_log: Optional[list] = None,
finished: bool = False,
):
import json as _json
with get_db() as conn:
cur = get_cursor(conn)
fields = []
params = []
if import_status:
fields.append("import_status = %s")
params.append(import_status)
total_val = items_total or total
fields += [
"items_total = %s",
"items_imported = %s",
"items_skipped = %s",
"items_failed = %s",
]
params += [total_val, imported, skipped, failed]
if error_log is not None:
fields.append("error_log = %s")
params.append(_json.dumps(error_log))
if finished:
fields.append("finished_at = NOW()")
fields.append("updated_at = NOW()")
params.append(log_id)
cur.execute(
f"UPDATE wiki_import_log SET {', '.join(fields)} WHERE id = %s",
params
)
conn.commit()
def _upsert_exercise(mapped: dict, reimport: bool, created_by: int) -> Optional[int]:
"""Legt Übung an oder aktualisiert sie (wenn reimport=True)."""
import json as _json
with get_db() as conn:
cur = get_cursor(conn)
# Existiert die Übung bereits (über import_id)?
cur.execute(
"SELECT id FROM exercises WHERE import_id = %s AND import_source = 'mediawiki'",
(mapped["import_id"],)
)
existing = cur.fetchone()
equipment = _json.dumps(mapped.get("equipment", [])) if mapped.get("equipment") else None
if existing and reimport:
ex_id = existing['id']
cur.execute(
"""UPDATE exercises SET
title = %s, summary = %s, goal = %s, execution = %s,
preparation = %s, trainer_notes = %s,
duration_min = %s, duration_max = %s,
group_size_min = %s, group_size_max = %s,
equipment = %s, updated_at = NOW()
WHERE id = %s""",
(
mapped.get("title"), mapped.get("summary"), mapped.get("goal"),
mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"),
mapped.get("duration_min"), mapped.get("duration_max"),
mapped.get("group_size_min"), mapped.get("group_size_max"),
equipment, ex_id
)
)
elif existing and not reimport:
return None # Übersprungen
else:
cur.execute(
"""INSERT INTO exercises
(title, summary, goal, execution, preparation, trainer_notes,
duration_min, duration_max, group_size_min, group_size_max,
equipment, visibility, status, import_source, import_id, created_by)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id""",
(
mapped.get("title"), mapped.get("summary"), mapped.get("goal"),
mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"),
mapped.get("duration_min"), mapped.get("duration_max"),
mapped.get("group_size_min"), mapped.get("group_size_max"),
equipment,
mapped.get("visibility", "private"),
mapped.get("status", "draft"),
"mediawiki",
mapped.get("import_id"),
created_by,
)
)
ex_id = cur.fetchone()['id']
conn.commit()
# Katalog-Zuordnungen (Focus Areas, Skills, etc.)
_assign_exercise_catalogs(cur, conn, ex_id, mapped)
# Skill-Zuordnungen mit Levels
skill_assignments = build_skill_assignments(mapped)
_assign_exercise_skills(cur, conn, ex_id, skill_assignments)
return ex_id
def _find_skill_id_by_label(cur, label: str) -> Optional[int]:
"""Katalog-Skill anhand Wiki-Label; exakt, ohne Leerzeichen, zuletzt Teilstring."""
if not label or not str(label).strip():
return None
raw = str(label).strip()
cur.execute("SELECT id FROM skills WHERE TRIM(name) ILIKE %s", (raw,))
row = cur.fetchone()
if row:
return row["id"]
comp = "".join(raw.split())
if len(comp) >= 3:
cur.execute(
"SELECT id FROM skills WHERE regexp_replace(TRIM(name), E'\\s+', '', 'g') ILIKE %s",
(comp,),
)
row = cur.fetchone()
if row:
return row["id"]
cur.execute("SELECT id FROM skills WHERE name ILIKE %s LIMIT 1", (f"%{raw}%",))
row = cur.fetchone()
return row["id"] if row else None
def _assign_exercise_catalogs(cur, conn, exercise_id: int, mapped: dict):
"""Weist M:N Katalog-Zuordnungen für eine importierte Übung zu."""
# Focus Areas
for idx, name in enumerate(mapped.get("focus_area_names", [])):
cur.execute("SELECT id FROM focus_areas WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_focus_areas (exercise_id, focus_area_id, is_primary)
VALUES (%s, %s, %s)
ON CONFLICT (exercise_id, focus_area_id) DO NOTHING""",
(exercise_id, row['id'], idx == 0)
)
else:
logger.warning("Focus Area '%s' nicht im Katalog gefunden", name)
# Stilrichtungen
for idx, name in enumerate(mapped.get("style_names", [])):
cur.execute("SELECT id FROM style_directions WHERE name ILIKE %s", (name.strip(),))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_style_directions (exercise_id, style_direction_id, is_primary)
VALUES (%s, %s, %s) ON CONFLICT (exercise_id, style_direction_id) DO NOTHING""",
(exercise_id, row["id"], idx == 0),
)
else:
logger.warning("Stilrichtung '%s' nicht im Katalog gefunden", name)
# Target Groups
for name in mapped.get("target_group_names", []):
cur.execute("SELECT id FROM target_groups WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_target_groups (exercise_id, target_group_id)
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
(exercise_id, row['id'])
)
# Fähigkeiten: nur über _assign_exercise_skills (Levels + is_primary), nicht doppelt hier
conn.commit()
def _assign_exercise_skills(cur, conn, exercise_id: int, skill_assignments: list):
"""Weist Skill-Zuordnungen mit Levels einer Übung zu."""
for assignment in skill_assignments:
sid = _find_skill_id_by_label(cur, assignment.get("skill_name") or "")
if not sid:
logger.warning("Skill '%s' nicht im Katalog gefunden", assignment.get("skill_name"))
continue
cur.execute(
"""INSERT INTO exercise_skills
(exercise_id, skill_id, target_level, required_level, intensity, is_primary)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (exercise_id, skill_id) DO UPDATE SET
target_level = EXCLUDED.target_level,
is_primary = EXCLUDED.is_primary""",
(
exercise_id,
sid,
assignment.get("target_level"),
assignment.get("required_level"),
assignment.get("intensity"),
assignment.get("is_primary", False),
)
)
conn.commit()
def _upsert_skill(mapped: dict, _reimport: bool) -> Optional[int]:
"""Legt Skill an oder aktualisiert bestehenden Datensatz bei Namenskonflikt (Wiki-Reimport)."""
with get_db() as conn:
cur = get_cursor(conn)
# Kategorie auflösen (optional)
category_id = None
if mapped.get("category_name"):
cur.execute("SELECT id FROM skill_categories WHERE name ILIKE %s", (mapped["category_name"],))
row = cur.fetchone()
if row:
category_id = row["id"]
cur.execute(
"""INSERT INTO skills (name, description, category_id, karate_relevance, relevance_level)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (name) DO UPDATE SET
description = COALESCE(EXCLUDED.description, skills.description),
category_id = COALESCE(EXCLUDED.category_id, skills.category_id),
karate_relevance = COALESCE(EXCLUDED.karate_relevance, skills.karate_relevance),
relevance_level = COALESCE(EXCLUDED.relevance_level, skills.relevance_level),
updated_at = NOW()
RETURNING id""",
(
mapped["name"],
mapped.get("description"),
category_id,
mapped.get("karate_relevance"),
mapped.get("relevance_level"),
),
)
skill_id = cur.fetchone()["id"]
conn.commit()
return skill_id
def _upsert_method(mapped: dict, _reimport: bool) -> Optional[int]:
"""Legt Trainingsmethode an oder aktualisiert Beschreibung nach Namen.
training_methods.name hat keinen UNIQUE-Constraint — daher kein ON CONFLICT (name).
"""
name = (mapped.get("name") or "").strip()
desc = mapped.get("description")
if not name:
return None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id FROM training_methods WHERE TRIM(name) ILIKE TRIM(%s)",
(name,),
)
row = cur.fetchone()
if row:
mid = row["id"]
cur.execute(
"UPDATE training_methods SET description = %s, updated_at = NOW() WHERE id = %s",
(desc, mid),
)
conn.commit()
return mid
cur.execute(
"INSERT INTO training_methods (name, description) VALUES (%s, %s) RETURNING id",
(name, desc),
)
mid = cur.fetchone()["id"]
conn.commit()
return mid
def _upsert_wiki_ref(
page_title: str, page_id: Optional[int], content_type: str, local_id: int
):
"""Legt Import-Referenz an oder aktualisiert last_imported."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO wiki_import_references
(wiki_page_title, wiki_page_id, content_type, local_id, last_imported)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (wiki_page_title, content_type) DO UPDATE SET
wiki_page_id = EXCLUDED.wiki_page_id,
local_id = EXCLUDED.local_id,
last_imported = NOW()""",
(page_title, page_id, content_type, local_id)
)
conn.commit()