""" Vollständiger Export/Import: Fähigkeitskatalog (Haupt-/Unterkategorien, Skills, Level-Definitionen), Reifegradmodelle inkl. Kontext-M:N und Kontext-Bindings. Ziel: Test → Prod; IDs werden über Slugs/Namen aufgelöst, nicht 1:1 übernommen. """ from __future__ import annotations import json import re import uuid from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple from fastapi import APIRouter, Body, Depends, HTTPException from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse from auth import require_auth from db import get_db, get_cursor, r2d router = APIRouter(prefix="/api/admin/matrix-stack", tags=["admin_matrix_stack"]) KIND_V1 = "shinkan.matrix_stack.v1" def _require_admin(session: dict) -> None: role = session.get("role") if role not in ("admin", "superadmin"): raise HTTPException(403, "Nur Administratoren") def _keywords_param(raw: Any) -> Any: if raw is None: return None if isinstance(raw, str): return raw return json.dumps(raw) def _slugify_label(text: str) -> str: t = (text or "").strip().lower() t = re.sub(r"[^a-z0-9äöüß]+", "_", t, flags=re.IGNORECASE) t = re.sub(r"_+", "_", t).strip("_") return (t[:48] or "gruppe") def _jsonable(val: Any) -> Any: if val is None: return None if hasattr(val, "isoformat"): try: return val.isoformat() except Exception: return str(val) return val def _sort_categories_topo(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: by_id = {int(r["id"]): r for r in rows} children: Dict[Optional[int], List[int]] = {} for r in rows: pid = r.get("parent_category_id") pk = int(pid) if pid is not None else None children.setdefault(pk, []).append(int(r["id"])) out: List[Dict[str, Any]] = [] seen = set() def visit(cid: int) -> None: if cid in seen: return seen.add(cid) out.append(by_id[cid]) for ch in sorted(children.get(cid, [])): visit(ch) roots = sorted(children.get(None, [])) for root_id in roots: visit(root_id) # Zyklen / verwaiste Knoten for r in rows: rid = int(r["id"]) if rid not in seen: visit(rid) return out def _catalog_name_maps(cur) -> Tuple[Dict[str, int], Dict[str, int], Dict[str, int], Dict[str, int]]: cur.execute("SELECT id, name FROM focus_areas") fa = {r["name"].strip(): int(r["id"]) for r in cur.fetchall() if r.get("name")} cur.execute("SELECT id, name FROM style_directions") sd = {r["name"].strip(): int(r["id"]) for r in cur.fetchall() if r.get("name")} cur.execute("SELECT id, name FROM training_types") tt = {r["name"].strip(): int(r["id"]) for r in cur.fetchall() if r.get("name")} cur.execute("SELECT id, name FROM target_groups") tg = {r["name"].strip(): int(r["id"]) for r in cur.fetchall() if r.get("name")} return fa, sd, tt, tg def export_matrix_stack_v1(session: dict = Depends(require_auth)) -> JSONResponse: _require_admin(session) export_uid = str(uuid.uuid4()) with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT * FROM skill_main_categories ORDER BY sort_order NULLS LAST, name """ ) main_cats = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM skill_categories ORDER BY sort_order NULLS LAST, name") raw_cats = [r2d(r) for r in cur.fetchall()] skill_categories = _sort_categories_topo(raw_cats) cur.execute( """ SELECT s.*, mc.slug AS _export_main_category_slug, sc.slug AS _export_category_slug, pfa.name AS _export_primary_focus_area_name FROM skills s LEFT JOIN skill_main_categories mc ON s.main_category_id = mc.id LEFT JOIN skill_categories sc ON s.category_id = sc.id LEFT JOIN focus_areas pfa ON s.primary_focus_area_id = pfa.id ORDER BY mc.sort_order NULLS LAST, sc.sort_order NULLS LAST, s.sort_order NULLS LAST, s.name """ ) skills = [r2d(r) for r in cur.fetchall()] cur.execute( """ SELECT sld.* FROM skill_level_definitions sld JOIN skills s ON s.id = sld.skill_id ORDER BY s.name, sld.level """ ) skill_level_definitions = [r2d(r) for r in cur.fetchall()] cur.execute("SELECT * FROM maturity_models ORDER BY id") maturity_models_raw = [r2d(r) for r in cur.fetchall()] maturity_blocks: List[Dict[str, Any]] = [] for m in maturity_models_raw: mid = int(m["id"]) cur.execute( """ SELECT fa.name, mfa.is_primary FROM maturity_model_focus_areas mfa JOIN focus_areas fa ON fa.id = mfa.focus_area_id WHERE mfa.maturity_model_id = %s ORDER BY mfa.is_primary DESC NULLS LAST, fa.sort_order, fa.name """, (mid,), ) fa_rows = [r2d(r) for r in cur.fetchall()] cur.execute( """ SELECT sd.name, msd.is_primary FROM maturity_model_style_directions msd JOIN style_directions sd ON sd.id = msd.style_direction_id WHERE msd.maturity_model_id = %s ORDER BY msd.is_primary DESC NULLS LAST, sd.name """, (mid,), ) sd_rows = [r2d(r) for r in cur.fetchall()] cur.execute( """ SELECT tg.name, mtg.is_primary FROM maturity_model_target_groups mtg JOIN target_groups tg ON tg.id = mtg.target_group_id WHERE mtg.maturity_model_id = %s ORDER BY mtg.is_primary DESC NULLS LAST, tg.name """, (mid,), ) tg_rows = [r2d(r) for r in cur.fetchall()] cur.execute( """ SELECT * FROM model_levels WHERE maturity_model_id = %s ORDER BY sort_order ASC, level_number ASC """, (mid,), ) levels = [r2d(r) for r in cur.fetchall()] cur.execute( """ SELECT ms.skill_id, ms.sort_order, ms.relevance FROM model_skills ms WHERE ms.maturity_model_id = %s ORDER BY ms.sort_order ASC, ms.id ASC """, (mid,), ) model_skills = [r2d(r) for r in cur.fetchall()] cur.execute( """ SELECT msl.skill_id, msl.level_number, msl.description, msl.observable_criteria, msl.example_exercise_hints, msl.ai_generated FROM model_skill_levels msl WHERE msl.maturity_model_id = %s ORDER BY msl.skill_id, msl.level_number """, (mid,), ) skill_levels = [r2d(r) for r in cur.fetchall()] row = {k: _jsonable(v) for k, v in m.items()} for drop in ("created_at", "updated_at"): row.pop(drop, None) maturity_blocks.append( { "source_id": mid, "model": row, "legacy_focus_areas": fa_rows, "legacy_style_directions": sd_rows, "legacy_target_groups": tg_rows, "levels": levels, "model_skills": model_skills, "skill_levels": skill_levels, } ) cur.execute( """ SELECT b.maturity_model_id AS maturity_model_source_id, fa.name AS focus_area_name, sd.name AS style_direction_name, tt.name AS training_type_name FROM maturity_model_context_bindings b JOIN focus_areas fa ON fa.id = b.focus_area_id LEFT JOIN style_directions sd ON sd.id = b.style_direction_id LEFT JOIN training_types tt ON tt.id = b.training_type_id ORDER BY fa.sort_order, fa.name, sd.name NULLS LAST, tt.name NULLS LAST """ ) context_bindings = [r2d(r) for r in cur.fetchall()] bundle = { "kind": KIND_V1, "export_version": 1, "bundle_export_id": export_uid, "exported_at": datetime.now(timezone.utc).isoformat(), "skill_main_categories": main_cats, "skill_categories": skill_categories, "skills": skills, "skill_level_definitions": skill_level_definitions, "maturity_models": maturity_blocks, "context_bindings": context_bindings, } return JSONResponse( content=jsonable_encoder(bundle), headers={ "Content-Disposition": f'attachment; filename="matrix-stack-{export_uid[:8]}.json"' }, ) def _upsert_main_category(cur, row: Dict[str, Any]) -> int: name = (row.get("name") or "").strip() slug = (row.get("slug") or "").strip() or _slugify_label(name) if not name: raise HTTPException(400, "skill_main_categories: name fehlt") cur.execute( """ INSERT INTO skill_main_categories (name, slug, description, sort_order) VALUES (%s, %s, %s, %s) ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description, sort_order = EXCLUDED.sort_order, updated_at = NOW() RETURNING id """, (name, slug, row.get("description"), row.get("sort_order")), ) return int(cur.fetchone()["id"]) def _upsert_skill_category( cur, row: Dict[str, Any], main_id_map: Dict[int, int], cat_id_map: Dict[int, int], ) -> Tuple[int, int]: """Returns (old_id, new_id).""" old_id = int(row["id"]) name = (row.get("name") or "").strip() slug = (row.get("slug") or "").strip() or _slugify_label(name) if not name: raise HTTPException(400, f"skill_categories id={old_id}: name fehlt") old_main = row.get("main_category_id") new_main = main_id_map.get(int(old_main)) if old_main is not None else None old_parent = row.get("parent_category_id") new_parent = cat_id_map.get(int(old_parent)) if old_parent is not None else None cur.execute( """ INSERT INTO skill_categories ( name, slug, description, parent_category_id, main_category_id, sort_order, status ) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (slug) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description, parent_category_id = EXCLUDED.parent_category_id, main_category_id = EXCLUDED.main_category_id, sort_order = EXCLUDED.sort_order, status = EXCLUDED.status, updated_at = NOW() RETURNING id """, ( name, slug, row.get("description"), new_parent, new_main, row.get("sort_order"), row.get("status") or "active", ), ) new_id = int(cur.fetchone()["id"]) return old_id, new_id def import_matrix_stack_v1( data: Dict[str, Any] = Body(...), session: dict = Depends(require_auth), ) -> Dict[str, Any]: _require_admin(session) if data.get("kind") != KIND_V1: raise HTTPException(400, f"kind muss {KIND_V1} sein") replace_all = bool(data.get("replace_all_maturity_models")) confirm = (data.get("confirm_replace_all") or "").strip() if replace_all and confirm != "DELETE_MATURITY_STACK": raise HTTPException( 400, 'replace_all_maturity_models erfordert confirm_replace_all: "DELETE_MATURITY_STACK"', ) if replace_all and session.get("role") != "superadmin": raise HTTPException(403, "replace_all_maturity_models nur für Superadmins") profile_id = session.get("profile_id") warnings: List[str] = [] main_rows = data.get("skill_main_categories") or [] cat_rows = _sort_categories_topo(list(data.get("skill_categories") or [])) skill_rows = data.get("skills") or [] sld_rows = data.get("skill_level_definitions") or [] model_blocks = data.get("maturity_models") or [] bind_rows = data.get("context_bindings") or [] skill_id_map: Dict[int, int] = {} model_id_map: Dict[int, int] = {} with get_db() as conn: cur = get_cursor(conn) fa_by_name, sd_by_name, tt_by_name, tg_by_name = _catalog_name_maps(cur) # ── Katalog: Hauptkategorien ── main_id_map: Dict[int, int] = {} for mc in main_rows: old = int(mc["id"]) main_id_map[old] = _upsert_main_category(cur, mc) # ── Katalog: Unterkategorien (mehrere Durchläufe für Parent-Kette) ── cat_id_map: Dict[int, int] = {} remaining = list(cat_rows) guard = 0 while remaining and guard < len(cat_rows) + 5: guard += 1 next_pass: List[Dict[str, Any]] = [] for row in remaining: old_parent = row.get("parent_category_id") if old_parent is not None and int(old_parent) not in cat_id_map: next_pass.append(row) continue old_id, new_id = _upsert_skill_category(cur, row, main_id_map, cat_id_map) cat_id_map[old_id] = new_id remaining = next_pass if remaining: raise HTTPException(400, "skill_categories: Parent-Auflösung fehlgeschlagen (Zyklus?)") # ── Skills ── slug_to_cat_id = {} cur.execute("SELECT id, slug FROM skill_categories WHERE slug IS NOT NULL") for r in cur.fetchall(): slug_to_cat_id[r["slug"]] = int(r["id"]) main_slug_to_id = {} cur.execute("SELECT id, slug FROM skill_main_categories") for r in cur.fetchall(): main_slug_to_id[r["slug"]] = int(r["id"]) for s in skill_rows: old_sid = int(s["id"]) name = (s.get("name") or "").strip() if not name: raise HTTPException(400, f"Skill id={old_sid}: name fehlt") cat_slug = s.get("_export_category_slug") main_slug = s.get("_export_main_category_slug") cat_id = None main_id = None if cat_slug: cat_id = slug_to_cat_id.get(cat_slug) if cat_id is None and s.get("category_id") is not None: cat_id = cat_id_map.get(int(s["category_id"])) if main_slug: main_id = main_slug_to_id.get(main_slug) if main_id is None and s.get("main_category_id") is not None: main_id = main_id_map.get(int(s["main_category_id"])) pfa_name = s.get("_export_primary_focus_area_name") pfa_id = None if pfa_name: pfa_id = fa_by_name.get(str(pfa_name).strip()) if pfa_id is None: warnings.append(f"primary_focus_area „{pfa_name}“ für Skill „{name}“ nicht gefunden") focus_json = s.get("focus_areas") if isinstance(focus_json, (dict, list)): focus_json = json.dumps(focus_json) elif focus_json is None: focus_json = "[]" cur.execute( """ SELECT id FROM skills WHERE category_id IS NOT DISTINCT FROM %s AND name = %s LIMIT 1 """, (cat_id, name), ) ex = cur.fetchone() if ex: new_sid = int(ex["id"]) cur.execute( """ UPDATE skills SET category = %s, description = %s, importance = %s, keywords = %s, status = COALESCE(%s, status), main_category_id = COALESCE(%s, main_category_id), category_id = COALESCE(%s, category_id), focus_areas = %s::jsonb, sort_order = COALESCE(%s, sort_order), primary_focus_area_id = COALESCE(%s, primary_focus_area_id), is_cross_domain = COALESCE(%s, is_cross_domain), level = COALESCE(%s, level), parent_skill_id = COALESCE(%s, parent_skill_id), updated_at = NOW() WHERE id = %s """, ( s.get("category"), s.get("description"), s.get("importance"), _keywords_param(s.get("keywords")), s.get("status"), main_id, cat_id, focus_json, s.get("sort_order"), pfa_id, s.get("is_cross_domain"), s.get("level"), None, new_sid, ), ) else: cur.execute( """ INSERT INTO skills ( name, category, description, importance, keywords, status, category_id, main_category_id, focus_areas, sort_order, primary_focus_area_id, is_cross_domain, level ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s,%s) RETURNING id """, ( name, s.get("category"), s.get("description"), s.get("importance"), _keywords_param(s.get("keywords")), s.get("status") or "active", cat_id, main_id, focus_json, s.get("sort_order"), pfa_id, s.get("is_cross_domain"), s.get("level"), ), ) new_sid = int(cur.fetchone()["id"]) skill_id_map[old_sid] = new_sid # parent_skill_id zweite Runde for s in skill_rows: old_sid = int(s["id"]) ps = s.get("parent_skill_id") if ps is None: continue new_sid = skill_id_map.get(old_sid) new_parent = skill_id_map.get(int(ps)) if new_sid and new_parent: cur.execute( "UPDATE skills SET parent_skill_id = %s WHERE id = %s", (new_parent, new_sid), ) # skill_level_definitions for sld in sld_rows: old_sk = int(sld["skill_id"]) new_sk = skill_id_map.get(old_sk) if not new_sk: warnings.append(f"skill_level_definitions: Skill {old_sk} nicht gemappt, übersprungen") continue lvl = int(sld["level"]) cur.execute( """ INSERT INTO skill_level_definitions (skill_id, level, description) VALUES (%s, %s, %s) ON CONFLICT (skill_id, level) DO UPDATE SET description = EXCLUDED.description, updated_at = NOW() """, (new_sk, lvl, sld.get("description") or ""), ) if replace_all: cur.execute("DELETE FROM maturity_models") for block in model_blocks: src_mid = int(block["source_id"]) mrow = dict(block.get("model") or {}) for drop in ("id", "created_at", "updated_at"): mrow.pop(drop, None) lc = int(mrow.get("level_count") or 5) if lc < 3 or lc > 10: raise HTTPException(400, "level_count ungültig") cur.execute( """ INSERT INTO maturity_models ( name, description, level_count, status, version, created_by, import_source, import_id, club_id ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id """, ( (mrow.get("name") or "").strip() or "Importiertes Modell", mrow.get("description"), lc, mrow.get("status") or "draft", mrow.get("version") or "1.0", profile_id, mrow.get("import_source") or "matrix_stack_v1", mrow.get("import_id") or f"stack:{data.get('bundle_export_id') or 'na'}:{src_mid}", None, ), ) new_mid = int(cur.fetchone()["id"]) model_id_map[src_mid] = new_mid for fa in block.get("legacy_focus_areas") or []: nm = (fa.get("name") or "").strip() fid = fa_by_name.get(nm) if fid is None: warnings.append(f"Modell {mrow.get('name')}: Fokus „{nm}“ unbekannt") continue cur.execute( """ INSERT INTO maturity_model_focus_areas (maturity_model_id, focus_area_id, is_primary) VALUES (%s, %s, %s) ON CONFLICT (maturity_model_id, focus_area_id) DO NOTHING """, (new_mid, fid, bool(fa.get("is_primary"))), ) for sd in block.get("legacy_style_directions") or []: nm = (sd.get("name") or "").strip() sid = sd_by_name.get(nm) if sid is None: warnings.append(f"Modell {mrow.get('name')}: Stilrichtung „{nm}“ unbekannt") continue cur.execute( """ INSERT INTO maturity_model_style_directions (maturity_model_id, style_direction_id, is_primary) VALUES (%s, %s, %s) ON CONFLICT (maturity_model_id, style_direction_id) DO NOTHING """, (new_mid, sid, bool(sd.get("is_primary"))), ) for tg in block.get("legacy_target_groups") or []: nm = (tg.get("name") or "").strip() tid = tg_by_name.get(nm) if tid is None: warnings.append(f"Modell {mrow.get('name')}: Zielgruppe „{nm}“ unbekannt") continue cur.execute( """ INSERT INTO maturity_model_target_groups (maturity_model_id, target_group_id, is_primary) VALUES (%s, %s, %s) ON CONFLICT (maturity_model_id, target_group_id) DO NOTHING """, (new_mid, tid, bool(tg.get("is_primary"))), ) for lev in block.get("levels") or []: cur.execute( """ INSERT INTO model_levels (maturity_model_id, level_number, name, description, sort_order) VALUES (%s, %s, %s, %s, %s) """, ( new_mid, int(lev["level_number"]), (lev.get("name") or f"Stufe {lev['level_number']}").strip(), lev.get("description"), int(lev.get("sort_order") or lev["level_number"]), ), ) for ms in block.get("model_skills") or []: old_sk = int(ms["skill_id"]) new_sk = skill_id_map.get(old_sk) if not new_sk: raise HTTPException(400, f"Modell {mrow.get('name')}: unbekannte skill_id {old_sk}") cur.execute( """ INSERT INTO model_skills (maturity_model_id, skill_id, sort_order, relevance) VALUES (%s, %s, %s, %s) ON CONFLICT (maturity_model_id, skill_id) DO UPDATE SET sort_order = EXCLUDED.sort_order, relevance = EXCLUDED.relevance """, (new_mid, new_sk, int(ms.get("sort_order") or 0), ms.get("relevance")), ) for sl in block.get("skill_levels") or []: old_sk = int(sl["skill_id"]) new_sk = skill_id_map.get(old_sk) if not new_sk: continue ln = int(sl["level_number"]) desc = (sl.get("description") or "").strip() if not desc: continue cur.execute( """ INSERT INTO model_skill_levels ( maturity_model_id, skill_id, level_number, description, observable_criteria, example_exercise_hints, ai_generated ) VALUES (%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (maturity_model_id, skill_id, level_number) DO UPDATE SET description = EXCLUDED.description, observable_criteria = EXCLUDED.observable_criteria, example_exercise_hints = EXCLUDED.example_exercise_hints, ai_generated = EXCLUDED.ai_generated, updated_at = NOW() """, ( new_mid, new_sk, ln, desc, sl.get("observable_criteria"), sl.get("example_exercise_hints"), sl.get("ai_generated"), ), ) for b in bind_rows: src_m = int(b["maturity_model_source_id"]) new_m = model_id_map.get(src_m) if not new_m: warnings.append(f"Binding: Modell-Quell-ID {src_m} nicht gefunden, übersprungen") continue fa_n = (b.get("focus_area_name") or "").strip() fa_id = fa_by_name.get(fa_n) if fa_id is None: warnings.append(f"Binding: Fokus „{fa_n}“ unbekannt") continue sd_n = b.get("style_direction_name") sd_id = None if sd_n: sd_id = sd_by_name.get(str(sd_n).strip()) if sd_id is None: warnings.append(f"Binding: Stil „{sd_n}“ unbekannt") continue tt_n = b.get("training_type_name") tt_id = None if tt_n: tt_id = tt_by_name.get(str(tt_n).strip()) if tt_id is None: warnings.append(f"Binding: Trainingsstil „{tt_n}“ unbekannt") continue if sd_id is None and tt_id is None: cur.execute( """ DELETE FROM maturity_model_context_bindings WHERE focus_area_id = %s AND style_direction_id IS NULL AND training_type_id IS NULL """, (fa_id,), ) elif sd_id is not None and tt_id is None: cur.execute( """ DELETE FROM maturity_model_context_bindings WHERE focus_area_id = %s AND style_direction_id = %s AND training_type_id IS NULL """, (fa_id, sd_id), ) elif sd_id is None and tt_id is not None: cur.execute( """ DELETE FROM maturity_model_context_bindings WHERE focus_area_id = %s AND style_direction_id IS NULL AND training_type_id = %s """, (fa_id, tt_id), ) else: cur.execute( """ DELETE FROM maturity_model_context_bindings WHERE focus_area_id = %s AND style_direction_id = %s AND training_type_id = %s """, (fa_id, sd_id, tt_id), ) cur.execute( """ INSERT INTO maturity_model_context_bindings ( maturity_model_id, focus_area_id, style_direction_id, training_type_id ) VALUES (%s, %s, %s, %s) """, (new_m, fa_id, sd_id, tt_id), ) return { "ok": True, "skill_id_map": {str(k): v for k, v in skill_id_map.items()}, "model_id_map": {str(k): v for k, v in model_id_map.items()}, "warnings": warnings, } router.add_api_route("/export", export_matrix_stack_v1, methods=["GET"]) router.add_api_route("/import", import_matrix_stack_v1, methods=["POST"])