shinkan-jinkendo/backend/routers/import_wiki.py
Lars 949a77fe38
All checks were successful
Deploy Development / deploy (push) Successful in 43s
Test Suite / pytest-backend (push) Successful in 38s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 15s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m11s
Enhance skill model and import functionality with karate relevance and relevance level
- Added `karate_relevance` and `relevance_level` fields to the SkillCreate and SkillResponse models, allowing for more detailed skill attributes.
- Updated the SMW property mapping to include these new fields, facilitating their integration during data import.
- Implemented parsing logic for relevance levels from Wiki data, ensuring proper handling of values between 1 and 3.
- Modified the upsert and create skill functions to support the new fields, ensuring they are correctly stored and updated in the database.
- Incremented app version to 0.8.143 and updated changelog to reflect these changes.
2026-05-16 10:56:15 +02:00

705 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MediaWiki Import Router
Importiert Übungen, Fähigkeiten und Methoden aus Semantic MediaWiki
direkt via API (kein XML-Export). Nur Super-Admin darf importieren.
Wiki-URL: https://karatetrainer.net/api.php
"""
import asyncio
import logging
import os
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Query
from pydantic import BaseModel
from db import get_db, get_cursor, r2d
from auth import require_auth
from smw_client import SmwClient, SmwClientError
from smw_mapper import map_wiki_to_exercise, map_wiki_to_skill, map_wiki_to_method, build_skill_assignments
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["import"])
# Kategorie-Namen aus .env (echte Namen von karatetrainer.net)
CATEGORY_EXERCISES = os.getenv("MEDIAWIKI_CATEGORY_EXERCISES", "Übungen")
CATEGORY_SKILLS = os.getenv("MEDIAWIKI_CATEGORY_SKILLS", "Fähigkeitsbeschreibung")
CATEGORY_METHODS = os.getenv("MEDIAWIKI_CATEGORY_METHODS", "Methodenbeschreibung")
CATEGORY_MODELS = os.getenv("MEDIAWIKI_CATEGORY_MODELS", "Reifegradmodelle")
# ------------------------------------------------------------------ #
# Pydantic Models #
# ------------------------------------------------------------------ #
class ImportExecuteRequest(BaseModel):
category: str
import_type: str = "exercise" # exercise | skill | method
reimport_existing: bool = False
dry_run: bool = False
limit: Optional[int] = None
# ------------------------------------------------------------------ #
# Auth-Hilfsfunktion #
# ------------------------------------------------------------------ #
def require_admin(session: dict = Depends(require_auth)) -> dict:
"""Nur Super-Admins dürfen importieren."""
if session.get("role") not in ("admin", "superadmin"):
raise HTTPException(status_code=403, detail="Nur Admins dürfen den Import ausführen")
return session
# ------------------------------------------------------------------ #
# Preview Endpoint #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/preview")
async def preview_import(
category: str = Query(default=CATEGORY_EXERCISES),
import_type: str = Query(default="exercise"),
limit: int = Query(default=10, ge=1, le=500),
session: dict = Depends(require_admin),
):
"""
Zeigt Vorschau: Welche Seiten würden importiert werden?
Überprüft Duplikate und mapped Felder ohne zu speichern.
"""
client = SmwClient()
try:
members = await client.get_category_members(category, limit=limit)
except SmwClientError as e:
raise HTTPException(status_code=502, detail=f"Wiki-API nicht erreichbar: {e}")
preview = []
with get_db() as conn:
cur = get_cursor(conn)
for member in members[:limit]:
page_title = member["title"]
page_id = member.get("pageid")
# Duplikat-Check
cur.execute(
"SELECT id, last_imported FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
existing_ref = r2d(cur.fetchone())
# SMW Properties abrufen
mapped_fields = {}
warnings = []
errors = []
try:
smw_props = await client.browse_subject(page_title)
if import_type == "exercise":
mapped = map_wiki_to_exercise(page_title, page_id, smw_props)
elif import_type == "skill":
mapped = map_wiki_to_skill(page_title, page_id, smw_props)
else:
mapped = map_wiki_to_method(page_title, page_id, smw_props)
warnings = mapped.pop("warnings", [])
# Entferne interne Felder aus der Vorschau
for k in ("wiki_page_id", "import_source", "import_id"):
mapped.pop(k, None)
mapped_fields = mapped
# Warnung wenn Pflichtfelder fehlen (nur bei Übungen)
if import_type == "exercise":
if not mapped.get("goal") and not mapped.get("execution"):
errors.append("Pflichtfelder 'Ziel' und 'Durchführung' fehlen")
except SmwClientError as e:
errors.append(f"Seite nicht abrufbar: {e}")
preview.append({
"wiki_page_title": page_title,
"wiki_page_id": page_id,
"already_imported": existing_ref is not None,
"last_imported_at": existing_ref["last_imported"].isoformat()
if existing_ref and existing_ref.get("last_imported") is not None
else None,
"mapped_fields": mapped_fields,
"warnings": warnings,
"errors": errors,
})
return {
"category": category,
"import_type": import_type,
"total_found": len(members),
"preview": preview,
}
# ------------------------------------------------------------------ #
# Execute Import (async Background Task) #
# ------------------------------------------------------------------ #
@router.post("/import/mediawiki/execute", status_code=202)
async def execute_import(
body: ImportExecuteRequest,
background_tasks: BackgroundTasks,
session: dict = Depends(require_admin),
):
"""
Startet einen MediaWiki-Import als Background-Task.
Gibt sofort log_id zurück Status via GET /import/mediawiki/status/{log_id}.
"""
profile_id = session["profile_id"]
# Log-Eintrag anlegen
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO wiki_import_log
(import_type, import_status, category, dry_run, reimport, imported_by)
VALUES (%s, 'running', %s, %s, %s, %s)
RETURNING id""",
(body.import_type, body.category, body.dry_run, body.reimport_existing, profile_id)
)
log_id = cur.fetchone()['id']
conn.commit()
# Import asynchron starten
background_tasks.add_task(
_run_import,
log_id=log_id,
category=body.category,
import_type=body.import_type,
reimport=body.reimport_existing,
dry_run=body.dry_run,
limit=body.limit,
imported_by=profile_id,
)
return {
"log_id": log_id,
"status": "running",
"message": f"Import gestartet. Status: GET /api/import/mediawiki/status/{log_id}",
}
# ------------------------------------------------------------------ #
# Status & Logs #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/status/{log_id}")
def get_import_status(
log_id: int,
session: dict = Depends(require_admin),
):
"""Status eines laufenden oder abgeschlossenen Imports."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM wiki_import_log WHERE id = %s", (log_id,))
row = r2d(cur.fetchone())
if not row:
raise HTTPException(status_code=404, detail="Import-Log nicht gefunden")
return row
@router.get("/import/mediawiki/logs")
def list_import_logs(
session: dict = Depends(require_admin),
):
"""Alle Import-Logs (neueste zuerst, ohne error_log Details)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT id, import_type, import_status, category, dry_run, reimport,
items_total, items_imported, items_skipped, items_failed,
imported_by, started_at, finished_at
FROM wiki_import_log
ORDER BY started_at DESC
LIMIT 50"""
)
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.delete("/import/mediawiki/references/{ref_id}", status_code=204)
def delete_import_reference(
ref_id: int,
session: dict = Depends(require_admin),
):
"""
Löscht eine Import-Referenz, damit das Item bei nächstem Import
als neu behandelt wird (Re-Import ermöglichen).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM wiki_import_references WHERE id = %s RETURNING id", (ref_id,))
deleted = cur.fetchone()
conn.commit()
if not deleted:
raise HTTPException(status_code=404, detail="Referenz nicht gefunden")
# ------------------------------------------------------------------ #
# Schema-Discovery (Debug-Hilfsmittel für Admins) #
# ------------------------------------------------------------------ #
@router.get("/import/mediawiki/discover/{page_title:path}")
async def discover_properties(
page_title: str,
session: dict = Depends(require_admin),
):
"""
Gibt alle SMW-Properties einer Wiki-Seite zurück.
Nützlich um Property-Namen zu ermitteln und smw_mapper.py anzupassen.
"""
client = SmwClient()
try:
props = await client.discover_properties(page_title)
except SmwClientError as e:
raise HTTPException(status_code=502, detail=f"Wiki-API Fehler: {e}")
return {"page_title": page_title, "properties": props}
# ------------------------------------------------------------------ #
# Background Import Worker #
# ------------------------------------------------------------------ #
async def _run_import(
log_id: int,
category: str,
import_type: str,
reimport: bool,
dry_run: bool,
limit: Optional[int],
imported_by: int,
):
"""Hintergrund-Task: Importiert alle Seiten einer Kategorie."""
client = SmwClient()
stats = {"total": 0, "imported": 0, "skipped": 0, "failed": 0}
errors = []
try:
members = await client.get_category_members(category, limit=limit or 500)
stats["total"] = len(members)
_update_log(log_id, import_status="running", items_total=stats["total"])
for member in members:
page_title = member["title"]
page_id = member.get("pageid")
# Duplikat-Check: Prüfe ob bereits importiert UND ob tatsächlich existiert
if not reimport:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT local_id FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
ref = cur.fetchone()
if ref:
# Prüfe ob die Übung tatsächlich existiert
if import_type == "exercise":
cur.execute("SELECT id FROM exercises WHERE id = %s", (ref['local_id'],))
elif import_type == "skill":
cur.execute("SELECT id FROM skills WHERE id = %s", (ref['local_id'],))
else:
cur.execute("SELECT id FROM training_methods WHERE id = %s", (ref['local_id'],))
if cur.fetchone():
# Referenz UND Übung existieren → Skip
stats["skipped"] += 1
_update_log(log_id, **stats)
continue
else:
# Referenz existiert, aber Übung fehlt → Lösche fehlerhafte Referenz
logger.warning("Orphaned reference found for '%s', re-importing", page_title)
cur.execute(
"DELETE FROM wiki_import_references WHERE wiki_page_title = %s AND content_type = %s",
(page_title, import_type)
)
conn.commit()
# SMW Properties abrufen (mit Retry)
try:
smw_props = await _fetch_with_retry(client, page_title)
except SmwClientError as e:
errors.append({"item": page_title, "error": str(e)})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
continue
# Field-Mapping
if import_type == "exercise":
mapped = map_wiki_to_exercise(page_title, page_id, smw_props)
required_ok = bool(mapped.get("goal") or mapped.get("execution"))
elif import_type == "skill":
mapped = map_wiki_to_skill(page_title, page_id, smw_props)
required_ok = True
else:
mapped = map_wiki_to_method(page_title, page_id, smw_props)
required_ok = True
if not required_ok:
errors.append({"item": page_title, "error": "Pflichtfelder fehlen (Ziel + Durchführung)"})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
continue
if dry_run:
stats["imported"] += 1
_update_log(log_id, **stats)
continue
# Speichern
try:
if import_type == "exercise":
local_id = _upsert_exercise(mapped, reimport, imported_by)
elif import_type == "skill":
local_id = _upsert_skill(mapped, reimport)
else:
local_id = _upsert_method(mapped, reimport)
if local_id:
_upsert_wiki_ref(page_title, page_id, import_type, local_id)
stats["imported"] += 1
else:
stats["skipped"] += 1
except Exception as e:
logger.exception("Fehler beim Speichern von '%s'", page_title)
errors.append({"item": page_title, "error": str(e)})
stats["failed"] += 1
_update_log(log_id, **stats, error_log=errors)
# Abschluss
_update_log(
log_id,
import_status="completed",
error_log=errors,
finished=True,
**stats,
)
except Exception as e:
logger.exception("Import-Prozess fehlgeschlagen (log_id=%s)", log_id)
_update_log(log_id, import_status="failed", error_log=[{"item": "global", "error": str(e)}], finished=True, **stats)
async def _fetch_with_retry(client: SmwClient, page_title: str, retries: int = 3) -> dict:
"""Ruft SMW-Properties mit bis zu 3 Versuchen ab."""
for attempt in range(retries):
try:
return await client.browse_subject(page_title)
except SmwClientError:
if attempt == retries - 1:
raise
await asyncio.sleep(2 ** attempt)
# ------------------------------------------------------------------ #
# DB-Helper #
# ------------------------------------------------------------------ #
def _update_log(
log_id: int,
import_status: Optional[str] = None,
items_total: int = 0,
total: int = 0,
imported: int = 0,
skipped: int = 0,
failed: int = 0,
error_log: Optional[list] = None,
finished: bool = False,
):
import json as _json
with get_db() as conn:
cur = get_cursor(conn)
fields = []
params = []
if import_status:
fields.append("import_status = %s")
params.append(import_status)
total_val = items_total or total
fields += [
"items_total = %s",
"items_imported = %s",
"items_skipped = %s",
"items_failed = %s",
]
params += [total_val, imported, skipped, failed]
if error_log is not None:
fields.append("error_log = %s")
params.append(_json.dumps(error_log))
if finished:
fields.append("finished_at = NOW()")
fields.append("updated_at = NOW()")
params.append(log_id)
cur.execute(
f"UPDATE wiki_import_log SET {', '.join(fields)} WHERE id = %s",
params
)
conn.commit()
def _upsert_exercise(mapped: dict, reimport: bool, created_by: int) -> Optional[int]:
"""Legt Übung an oder aktualisiert sie (wenn reimport=True)."""
import json as _json
with get_db() as conn:
cur = get_cursor(conn)
# Existiert die Übung bereits (über import_id)?
cur.execute(
"SELECT id FROM exercises WHERE import_id = %s AND import_source = 'mediawiki'",
(mapped["import_id"],)
)
existing = cur.fetchone()
equipment = _json.dumps(mapped.get("equipment", [])) if mapped.get("equipment") else None
if existing and reimport:
ex_id = existing['id']
cur.execute(
"""UPDATE exercises SET
title = %s, summary = %s, goal = %s, execution = %s,
preparation = %s, trainer_notes = %s,
duration_min = %s, duration_max = %s,
group_size_min = %s, group_size_max = %s,
equipment = %s, updated_at = NOW()
WHERE id = %s""",
(
mapped.get("title"), mapped.get("summary"), mapped.get("goal"),
mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"),
mapped.get("duration_min"), mapped.get("duration_max"),
mapped.get("group_size_min"), mapped.get("group_size_max"),
equipment, ex_id
)
)
elif existing and not reimport:
return None # Übersprungen
else:
cur.execute(
"""INSERT INTO exercises
(title, summary, goal, execution, preparation, trainer_notes,
duration_min, duration_max, group_size_min, group_size_max,
equipment, visibility, status, import_source, import_id, created_by)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id""",
(
mapped.get("title"), mapped.get("summary"), mapped.get("goal"),
mapped.get("execution"), mapped.get("preparation"), mapped.get("trainer_notes"),
mapped.get("duration_min"), mapped.get("duration_max"),
mapped.get("group_size_min"), mapped.get("group_size_max"),
equipment,
mapped.get("visibility", "private"),
mapped.get("status", "draft"),
"mediawiki",
mapped.get("import_id"),
created_by,
)
)
ex_id = cur.fetchone()['id']
conn.commit()
# Katalog-Zuordnungen (Focus Areas, Skills, etc.)
_assign_exercise_catalogs(cur, conn, ex_id, mapped)
# Skill-Zuordnungen mit Levels
skill_assignments = build_skill_assignments(mapped)
_assign_exercise_skills(cur, conn, ex_id, skill_assignments)
return ex_id
def _find_skill_id_by_label(cur, label: str) -> Optional[int]:
"""Katalog-Skill anhand Wiki-Label; exakt, ohne Leerzeichen, zuletzt Teilstring."""
if not label or not str(label).strip():
return None
raw = str(label).strip()
cur.execute("SELECT id FROM skills WHERE TRIM(name) ILIKE %s", (raw,))
row = cur.fetchone()
if row:
return row["id"]
comp = "".join(raw.split())
if len(comp) >= 3:
cur.execute(
"SELECT id FROM skills WHERE regexp_replace(TRIM(name), E'\\s+', '', 'g') ILIKE %s",
(comp,),
)
row = cur.fetchone()
if row:
return row["id"]
cur.execute("SELECT id FROM skills WHERE name ILIKE %s LIMIT 1", (f"%{raw}%",))
row = cur.fetchone()
return row["id"] if row else None
def _assign_exercise_catalogs(cur, conn, exercise_id: int, mapped: dict):
"""Weist M:N Katalog-Zuordnungen für eine importierte Übung zu."""
# Focus Areas
for idx, name in enumerate(mapped.get("focus_area_names", [])):
cur.execute("SELECT id FROM focus_areas WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_focus_areas (exercise_id, focus_area_id, is_primary)
VALUES (%s, %s, %s)
ON CONFLICT (exercise_id, focus_area_id) DO NOTHING""",
(exercise_id, row['id'], idx == 0)
)
else:
logger.warning("Focus Area '%s' nicht im Katalog gefunden", name)
# Stilrichtungen
for idx, name in enumerate(mapped.get("style_names", [])):
cur.execute("SELECT id FROM style_directions WHERE name ILIKE %s", (name.strip(),))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_style_directions (exercise_id, style_direction_id, is_primary)
VALUES (%s, %s, %s) ON CONFLICT (exercise_id, style_direction_id) DO NOTHING""",
(exercise_id, row["id"], idx == 0),
)
else:
logger.warning("Stilrichtung '%s' nicht im Katalog gefunden", name)
# Target Groups
for name in mapped.get("target_group_names", []):
cur.execute("SELECT id FROM target_groups WHERE name ILIKE %s", (name,))
row = cur.fetchone()
if row:
cur.execute(
"""INSERT INTO exercise_target_groups (exercise_id, target_group_id)
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
(exercise_id, row['id'])
)
# Fähigkeiten: nur über _assign_exercise_skills (Levels + is_primary), nicht doppelt hier
conn.commit()
def _assign_exercise_skills(cur, conn, exercise_id: int, skill_assignments: list):
"""Weist Skill-Zuordnungen mit Levels einer Übung zu."""
for assignment in skill_assignments:
sid = _find_skill_id_by_label(cur, assignment.get("skill_name") or "")
if not sid:
logger.warning("Skill '%s' nicht im Katalog gefunden", assignment.get("skill_name"))
continue
cur.execute(
"""INSERT INTO exercise_skills
(exercise_id, skill_id, target_level, required_level, intensity, is_primary)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (exercise_id, skill_id) DO UPDATE SET
target_level = EXCLUDED.target_level,
is_primary = EXCLUDED.is_primary""",
(
exercise_id,
sid,
assignment.get("target_level"),
assignment.get("required_level"),
assignment.get("intensity"),
assignment.get("is_primary", False),
)
)
conn.commit()
def _upsert_skill(mapped: dict, _reimport: bool) -> Optional[int]:
"""Legt Skill an oder aktualisiert bestehenden Datensatz bei Namenskonflikt (Wiki-Reimport)."""
with get_db() as conn:
cur = get_cursor(conn)
# Kategorie auflösen (optional)
category_id = None
if mapped.get("category_name"):
cur.execute("SELECT id FROM skill_categories WHERE name ILIKE %s", (mapped["category_name"],))
row = cur.fetchone()
if row:
category_id = row["id"]
cur.execute(
"""INSERT INTO skills (name, description, category_id, karate_relevance, relevance_level)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (name) DO UPDATE SET
description = COALESCE(EXCLUDED.description, skills.description),
category_id = COALESCE(EXCLUDED.category_id, skills.category_id),
karate_relevance = COALESCE(EXCLUDED.karate_relevance, skills.karate_relevance),
relevance_level = COALESCE(EXCLUDED.relevance_level, skills.relevance_level),
updated_at = NOW()
RETURNING id""",
(
mapped["name"],
mapped.get("description"),
category_id,
mapped.get("karate_relevance"),
mapped.get("relevance_level"),
),
)
skill_id = cur.fetchone()["id"]
conn.commit()
return skill_id
def _upsert_method(mapped: dict, _reimport: bool) -> Optional[int]:
"""Legt Trainingsmethode an oder aktualisiert Beschreibung nach Namen.
training_methods.name hat keinen UNIQUE-Constraint — daher kein ON CONFLICT (name).
"""
name = (mapped.get("name") or "").strip()
desc = mapped.get("description")
if not name:
return None
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id FROM training_methods WHERE TRIM(name) ILIKE TRIM(%s)",
(name,),
)
row = cur.fetchone()
if row:
mid = row["id"]
cur.execute(
"UPDATE training_methods SET description = %s, updated_at = NOW() WHERE id = %s",
(desc, mid),
)
conn.commit()
return mid
cur.execute(
"INSERT INTO training_methods (name, description) VALUES (%s, %s) RETURNING id",
(name, desc),
)
mid = cur.fetchone()["id"]
conn.commit()
return mid
def _upsert_wiki_ref(
page_title: str, page_id: Optional[int], content_type: str, local_id: int
):
"""Legt Import-Referenz an oder aktualisiert last_imported."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""INSERT INTO wiki_import_references
(wiki_page_title, wiki_page_id, content_type, local_id, last_imported)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (wiki_page_title, content_type) DO UPDATE SET
wiki_page_id = EXCLUDED.wiki_page_id,
local_id = EXCLUDED.local_id,
last_imported = NOW()""",
(page_title, page_id, content_type, local_id)
)
conn.commit()