""" Fähigkeiten-Profile und Vorschläge (Phase 3) für Planungsartefakte. GET …/skill-profile — gewichtetes Profil aus verknüpften Übungen. GET /api/skill-discovery/suggestions — Rahmenprogramme, Module, Progressionsgraphen nach Fähigkeiten. """ from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, HTTPException, Query from db import get_db, get_cursor, r2d from tenant_context import TenantContext, get_tenant_context, library_content_visibility_sql from skill_scoring import ( GRAPH_DEFAULT_ITEM_MINUTES, ExerciseOccurrence, batch_compute_profiles, batch_framework_occurrences_by_id, batch_module_occurrences_by_id, collect_module_exercise_occurrences, collect_progression_graph_exercise_occurrences, collect_unit_exercise_occurrences, compact_profile_summary, compute_planning_corpus_by_type, compute_club_corpus_reference, compute_corpus_skill_max_weights, compute_skill_profile, corpus_for_artifact_type, fetch_exercise_skills_bulk, match_score_for_skill_ids, profile_for_occurrences, reference_scale_meta, top_categories_summary, ) from routers.training_framework_programs import _framework_access from routers.training_modules import _module_access from routers.exercise_progression_graphs import _require_graph_read router = APIRouter(prefix="/api", tags=["skill_profiles"]) def _parse_skill_ids_param(raw: Optional[str]) -> List[int]: if not raw or not str(raw).strip(): return [] out: List[int] = [] for part in str(raw).split(","): part = part.strip() if not part: continue try: n = int(part) except ValueError: raise HTTPException(status_code=400, detail="skill_ids: ungültige ID") from None if n > 0 and n not in out: out.append(n) return out @router.get("/training-framework-programs/{framework_id}/skill-profile") def framework_program_skill_profile( framework_id: int, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) row = _framework_access(cur, framework_id, profile_id, role) cur.execute( """ SELECT s.id, s.sort_order, s.title, tu.id AS blueprint_unit_id FROM training_framework_slots s LEFT JOIN training_units tu ON tu.framework_slot_id = s.id WHERE s.framework_program_id = %s ORDER BY s.sort_order """, (framework_id,), ) slots_raw = [r2d(r) for r in cur.fetchall()] bundle = _load_planning_corpus(cur, tenant) tc = corpus_for_artifact_type(bundle, "framework_program") ref_max = tc["max_by_skill"] ref_by_skill = tc["ref_by_skill"] all_occurrences: List[ExerciseOccurrence] = [] slot_profiles: List[Dict[str, Any]] = [] for slot in slots_raw: uid = slot.get("blueprint_unit_id") slot_occ: List[ExerciseOccurrence] = [] slot_label = (slot.get("title") or "").strip() or f"Session {(slot.get('sort_order') or 0) + 1}" if uid: raw_occ = collect_unit_exercise_occurrences(cur, int(uid)) slot_occ = [ ExerciseOccurrence( exercise_id=o.exercise_id, planned_duration_min=o.planned_duration_min, context_label=slot_label, ) for o in raw_occ ] all_occurrences.extend(slot_occ) else: slot_occ = [] slot_profile = ( profile_for_occurrences(cur, slot_occ, reference_max_by_skill=ref_max) if slot_occ else _empty_profile() ) slot_profiles.append( { "slot_id": slot["id"], "slot_title": slot.get("title"), "sort_order": slot.get("sort_order"), "blueprint_training_unit_id": uid, "exercise_occurrence_count": len(slot_occ), "profile": slot_profile, } ) overall = ( profile_for_occurrences(cur, all_occurrences, reference_max_by_skill=ref_max) if all_occurrences else _empty_profile() ) _enrich_profile_club_best(overall, ref_by_skill, "framework_program", framework_id) for slot in slot_profiles: _enrich_profile_club_best( slot.get("profile") or {}, ref_by_skill, "framework_program", framework_id, ) return { "artifact_type": "framework_program", "artifact_id": framework_id, "artifact_title": row.get("title"), "reference_scale": reference_scale_meta( tc, "framework_program", effective_club_id=tenant.effective_club_id ), "club_best_by_skill": { str(k): v for k, v in ref_by_skill.items() }, "overall": overall, "slots": slot_profiles, } @router.get("/training-modules/{module_id}/skill-profile") def training_module_skill_profile( module_id: int, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) row = _module_access(cur, module_id, profile_id, role) bundle = _load_planning_corpus(cur, tenant) tc = corpus_for_artifact_type(bundle, "training_module") ref_max = tc["max_by_skill"] ref_by_skill = tc["ref_by_skill"] occurrences = collect_module_exercise_occurrences(cur, module_id) overall = ( profile_for_occurrences(cur, occurrences, reference_max_by_skill=ref_max) if occurrences else _empty_profile() ) _enrich_profile_club_best(overall, ref_by_skill, "training_module", module_id) return { "artifact_type": "training_module", "artifact_id": module_id, "artifact_title": row.get("title"), "reference_scale": reference_scale_meta( tc, "training_module", effective_club_id=tenant.effective_club_id ), "club_best_by_skill": { str(k): v for k, v in ref_by_skill.items() }, "overall": overall, } @router.get("/exercise-progression-graphs/{graph_id}/skill-profile") def progression_graph_skill_profile( graph_id: int, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) row = _require_graph_read(cur, graph_id, profile_id, role) bundle = _load_planning_corpus(cur, tenant) tc = corpus_for_artifact_type(bundle, "progression_graph") ref_max = tc["max_by_skill"] ref_by_skill = tc["ref_by_skill"] occurrences = collect_progression_graph_exercise_occurrences(cur, graph_id) overall = ( profile_for_occurrences( cur, occurrences, default_item_minutes=GRAPH_DEFAULT_ITEM_MINUTES, reference_max_by_skill=ref_max, ) if occurrences else _empty_profile() ) _enrich_profile_club_best(overall, ref_by_skill, "progression_graph", graph_id) return { "artifact_type": "progression_graph", "artifact_id": graph_id, "artifact_title": row.get("name"), "reference_scale": reference_scale_meta( tc, "progression_graph", effective_club_id=tenant.effective_club_id ), "club_best_by_skill": { str(k): v for k, v in ref_by_skill.items() }, "overall": overall, } @router.post("/skill-profiles/batch-summaries") def batch_skill_profile_summaries( data: dict, tenant: TenantContext = Depends(get_tenant_context), ): """ Kompakte Fähigkeiten-Profile für Listen (ein Corpus-Scan, Batch-SQL). Body: { framework_program_ids?: number[], training_module_ids?: number[] } """ fp_ids = _parse_id_list(data.get("framework_program_ids")) mod_ids = _parse_id_list(data.get("training_module_ids")) if not fp_ids and not mod_ids: raise HTTPException( status_code=400, detail="framework_program_ids oder training_module_ids erforderlich", ) profile_id = tenant.profile_id role = tenant.global_role summaries: Dict[str, Dict[str, Any]] = {} with get_db() as conn: cur = get_cursor(conn) bundle = compute_planning_corpus_by_type( cur, profile_id=tenant.profile_id, role=role, effective_club_id=tenant.effective_club_id, include_artifact_summaries=True, ) allowed_fp: List[int] = [] if fp_ids: for fid in fp_ids: try: _framework_access(cur, fid, profile_id, role) allowed_fp.append(fid) except HTTPException: pass allowed_mod: List[int] = [] if mod_ids: for mid in mod_ids: try: _module_access(cur, mid, profile_id, role) allowed_mod.append(mid) except HTTPException: pass summaries = _merge_batch_summaries( cur, bundle=bundle, allowed_fp=allowed_fp, allowed_mod=allowed_mod, ) ref_by_skill = {} for t in ("framework_program", "training_module", "progression_graph"): ref_by_skill.update(corpus_for_artifact_type(bundle, t).get("ref_by_skill") or {}) skill_ids_seen: set[int] = set() for summary in summaries.values(): for sk in summary.get("skills") or []: if sk.get("skill_id") is not None: skill_ids_seen.add(int(sk["skill_id"])) club_best_subset = { str(sid): ref_by_skill[sid] for sid in skill_ids_seen if sid in ref_by_skill } return { "reference_scale_by_type": { t: reference_scale_meta( corpus_for_artifact_type(bundle, t), t, effective_club_id=tenant.effective_club_id, ) for t in ("framework_program", "training_module", "progression_graph") }, "club_best_by_skill": club_best_subset, "summaries": summaries, } @router.get("/skill-discovery/suggestions") def skill_discovery_suggestions( skill_ids: str = Query(..., description="Komma-getrennte skill-IDs"), types: Optional[str] = Query( default="framework_program,training_module,progression_graph", description="Artefakttypen, komma-getrennt", ), limit: int = Query(default=20, ge=1, le=50), tenant: TenantContext = Depends(get_tenant_context), ): """ Findet Bibliotheksartefakte, deren Übungs-Fähigkeiten-Profil die gewünschten Fähigkeiten stark abdeckt. """ wanted = _parse_skill_ids_param(skill_ids) if not wanted: raise HTTPException(status_code=400, detail="skill_ids ist Pflicht (mindestens eine ID)") type_set = {t.strip() for t in (types or "").split(",") if t.strip()} profile_id = tenant.profile_id role = tenant.global_role results: List[Dict[str, Any]] = [] with get_db() as conn: cur = get_cursor(conn) planning_bundle = _load_planning_corpus(cur, tenant) fw_ref = corpus_for_artifact_type(planning_bundle, "framework_program")["max_by_skill"] mod_ref = corpus_for_artifact_type(planning_bundle, "training_module")["max_by_skill"] graph_ref = corpus_for_artifact_type(planning_bundle, "progression_graph")["max_by_skill"] if "framework_program" in type_set: vis_clause, vis_params = library_content_visibility_sql( alias="fp", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) cur.execute( f""" SELECT fp.id, fp.title FROM training_framework_programs fp WHERE ({vis_clause}) ORDER BY fp.updated_at DESC NULLS LAST LIMIT 80 """, vis_params, ) for fp_row in cur.fetchall(): fid = int(fp_row["id"]) try: _framework_access(cur, fid, profile_id, role) except HTTPException: continue cur.execute( """ SELECT tu.id FROM training_framework_slots s INNER JOIN training_units tu ON tu.framework_slot_id = s.id WHERE s.framework_program_id = %s """, (fid,), ) occ: List[ExerciseOccurrence] = [] for u in cur.fetchall(): occ.extend(collect_unit_exercise_occurrences(cur, int(u["id"]))) if not occ: continue prof = profile_for_occurrences(cur, occ, reference_max_by_skill=fw_ref) match = match_score_for_skill_ids(prof, wanted) if match["match_weight"] <= 0: continue results.append( { "artifact_type": "framework_program", "artifact_id": fid, "artifact_title": fp_row["title"], "path": f"/planning/framework-programs/{fid}", "match": match, "skill_profile_summary": { "total_score": prof.get("total_score"), "top_by_category": top_categories_summary(prof), }, } ) if "training_module" in type_set: vis_clause, vis_params = library_content_visibility_sql( alias="m", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) cur.execute( f""" SELECT m.id, m.title FROM training_modules m WHERE ({vis_clause}) ORDER BY m.updated_at DESC NULLS LAST LIMIT 80 """, vis_params, ) for m_row in cur.fetchall(): mid = int(m_row["id"]) try: _module_access(cur, mid, profile_id, role) except HTTPException: continue occ = collect_module_exercise_occurrences(cur, mid) if not occ: continue prof = profile_for_occurrences(cur, occ, reference_max_by_skill=mod_ref) match = match_score_for_skill_ids(prof, wanted) if match["match_weight"] <= 0: continue results.append( { "artifact_type": "training_module", "artifact_id": mid, "artifact_title": m_row["title"], "path": f"/planning/training-modules/{mid}", "match": match, "skill_profile_summary": { "total_score": prof.get("total_score"), "top_by_category": top_categories_summary(prof), }, } ) if "progression_graph" in type_set: vis_clause, vis_params = library_content_visibility_sql( alias="g", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) cur.execute( f""" SELECT g.id, g.name FROM exercise_progression_graphs g WHERE ({vis_clause}) ORDER BY g.updated_at DESC NULLS LAST LIMIT 80 """, vis_params, ) for g_row in cur.fetchall(): gid = int(g_row["id"]) try: _require_graph_read(cur, gid, profile_id, role) except HTTPException: continue occ = collect_progression_graph_exercise_occurrences(cur, gid) if not occ: continue prof = profile_for_occurrences( cur, occ, default_item_minutes=GRAPH_DEFAULT_ITEM_MINUTES, reference_max_by_skill=graph_ref, ) match = match_score_for_skill_ids(prof, wanted) if match["match_weight"] <= 0: continue results.append( { "artifact_type": "progression_graph", "artifact_id": gid, "artifact_title": g_row["name"], "path": None, "match": match, "skill_profile_summary": { "total_score": prof.get("total_score"), "top_by_category": top_categories_summary(prof), }, } ) results.sort( key=lambda x: -float(x.get("match", {}).get("match_score") or x.get("match", {}).get("match_weight") or 0), ) return { "skill_ids": wanted, "types": sorted(type_set), "suggestions": results[:limit], } def _top_categories_summary(profile: Dict[str, Any], limit: int = 6) -> List[Dict[str, Any]]: """Kurzliste Top-Fähigkeit je Unterkategorie für Discovery-Treffer.""" out: List[Dict[str, Any]] = [] for mc in profile.get("by_main_category") or []: for cat in mc.get("categories") or []: top = cat.get("top_skill") if not top: continue out.append( { "main_category_name": mc.get("main_category_name"), "category_name": cat.get("category_name"), "skill_id": top.get("skill_id"), "skill_name": top.get("skill_name"), "score": top.get("score") or top.get("weight"), } ) if len(out) >= limit: return out return out def _parse_id_list(raw: Any, *, max_count: int = 120) -> List[int]: if not raw: return [] if not isinstance(raw, list): raise HTTPException(status_code=400, detail="ID-Listen müssen Arrays sein") out: List[int] = [] for item in raw: try: n = int(item) except (TypeError, ValueError): raise HTTPException(status_code=400, detail="Ungültige ID in Liste") from None if n > 0 and n not in out: out.append(n) if len(out) >= max_count: break return out def _load_planning_corpus(cur, tenant: TenantContext) -> Dict[str, Any]: return compute_planning_corpus_by_type( cur, profile_id=tenant.profile_id, role=tenant.global_role, effective_club_id=tenant.effective_club_id, ) def _enrich_profile_club_best( profile: Dict[str, Any], ref_by_skill: Dict[int, Dict[str, Any]], artifact_type: Optional[str] = None, artifact_id: Optional[int] = None, ) -> None: """Hängt Vereins-Referenz-Artefakt an Fähigkeiten an (wenn nicht selbst Spitze).""" if not profile or not ref_by_skill: return def attach(sk: Optional[Dict[str, Any]]) -> None: if not sk or sk.get("skill_id") is None: return sid = int(sk["skill_id"]) ref = ref_by_skill.get(sid) if not ref: return if ( artifact_type and artifact_id is not None and ref.get("artifact_type") == artifact_type and int(ref.get("artifact_id") or 0) == int(artifact_id) ): return w = float(sk.get("weight") or 0) if w < float(ref.get("weight") or 0) - 0.01: sk["club_best"] = ref for sk in profile.get("skills") or []: attach(sk) for mc in profile.get("by_main_category") or []: for cat in mc.get("categories") or []: attach(cat.get("top_skill")) def _empty_profile() -> Dict[str, Any]: return compute_skill_profile([], {}) def _summarize_framework_program( cur, framework_id: int, ref_max: Dict[int, float], ref_by_skill: Dict[int, Dict[str, Any]], ) -> Dict[str, Any]: cur.execute( """ SELECT tu.id FROM training_framework_slots s INNER JOIN training_units tu ON tu.framework_slot_id = s.id WHERE s.framework_program_id = %s """, (int(framework_id),), ) occ: List[ExerciseOccurrence] = [] for u in cur.fetchall(): occ.extend(collect_unit_exercise_occurrences(cur, int(u["id"]))) prof = ( profile_for_occurrences(cur, occ, reference_max_by_skill=ref_max) if occ else _empty_profile() ) _enrich_profile_club_best(prof, ref_by_skill, "framework_program", int(framework_id)) return compact_profile_summary(prof, ref_by_skill) def _summarize_training_module( cur, module_id: int, ref_max: Dict[int, float], ref_by_skill: Dict[int, Dict[str, Any]], ) -> Dict[str, Any]: occ = collect_module_exercise_occurrences(cur, int(module_id)) prof = ( profile_for_occurrences(cur, occ, reference_max_by_skill=ref_max) if occ else _empty_profile() ) _enrich_profile_club_best(prof, ref_by_skill, "training_module", int(module_id)) return compact_profile_summary(prof, ref_by_skill) def _merge_batch_summaries( cur, *, bundle: Dict[str, Any], allowed_fp: List[int], allowed_mod: List[int], ) -> Dict[str, Dict[str, Any]]: """Summaries für angeforderte IDs — Referenz je Planungs-Kontext (Typ getrennt).""" fw_tc = corpus_for_artifact_type(bundle, "framework_program") mod_tc = corpus_for_artifact_type(bundle, "training_module") fw_cached = fw_tc.get("artifact_summaries") or {} mod_cached = mod_tc.get("artifact_summaries") or {} out: Dict[str, Dict[str, Any]] = {} fw_ref_max = fw_tc["max_by_skill"] fw_ref_by = fw_tc["ref_by_skill"] missing_fp = [fid for fid in allowed_fp if f"framework_program:{fid}" not in fw_cached] if missing_fp: occ_map = batch_framework_occurrences_by_id(cur, missing_fp) all_eids = {o.exercise_id for occs in occ_map.values() for o in occs} skills_map = fetch_exercise_skills_bulk(cur, all_eids) if all_eids else {} profiles = batch_compute_profiles( occ_map, skills_map, reference_max_by_skill=fw_ref_max ) for fid in missing_fp: key = f"framework_program:{fid}" prof = profiles.get(fid) or _empty_profile() _enrich_profile_club_best(prof, fw_ref_by, "framework_program", fid) out[key] = compact_profile_summary(prof, fw_ref_by) mod_ref_max = mod_tc["max_by_skill"] mod_ref_by = mod_tc["ref_by_skill"] missing_mod = [mid for mid in allowed_mod if f"training_module:{mid}" not in mod_cached] if missing_mod: occ_map = batch_module_occurrences_by_id(cur, missing_mod) all_eids = {o.exercise_id for occs in occ_map.values() for o in occs} skills_map = fetch_exercise_skills_bulk(cur, all_eids) if all_eids else {} profiles = batch_compute_profiles( occ_map, skills_map, reference_max_by_skill=mod_ref_max ) for mid in missing_mod: key = f"training_module:{mid}" prof = profiles.get(mid) or _empty_profile() _enrich_profile_club_best(prof, mod_ref_by, "training_module", mid) out[key] = compact_profile_summary(prof, mod_ref_by) for fid in allowed_fp: key = f"framework_program:{fid}" if key in fw_cached: out[key] = fw_cached[key] elif key not in out: out[key] = _summarize_framework_program(cur, fid, fw_ref_max, fw_ref_by) for mid in allowed_mod: key = f"training_module:{mid}" if key in mod_cached: out[key] = mod_cached[key] elif key not in out: out[key] = _summarize_training_module(cur, mid, mod_ref_max, mod_ref_by) return out