""" ExerciseMatchProfile / PlanningTargetProfile — Phase-1-Vorselektion Planungs-Übungssuche. Siehe .claude/docs/working/PLANNING_EXERCISE_SUGGEST_CONTEXT.md §12–§14 """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Sequence, Set, Tuple from skill_scoring import ( ExerciseOccurrence, collect_unit_exercise_occurrences, fetch_exercise_skills_bulk, profile_for_occurrences, _skill_link_multiplier, DEFAULT_ITEM_MINUTES, ) def _ids_to_weights(ids: Sequence[int], primary_id: Optional[int] = None) -> Dict[int, float]: out: Dict[int, float] = {} for raw in ids or []: try: fid = int(raw) except (TypeError, ValueError): continue if fid < 1: continue w = 1.0 if primary_id is not None and fid == int(primary_id) else 0.85 out[fid] = max(out.get(fid, 0.0), w) return out def _merge_weight_maps(*maps: Optional[Dict[int, float]], scale: float = 1.0) -> Dict[int, float]: out: Dict[int, float] = {} for m in maps: if not m: continue for k, v in m.items(): try: kid = int(k) val = float(v) * scale except (TypeError, ValueError): continue if kid < 1 or val <= 0: continue out[kid] = max(out.get(kid, 0.0), val) return out def _normalize_weight_map(m: Dict[int, float]) -> Dict[int, float]: if not m: return {} mx = max(m.values()) if mx <= 0: return {} return {k: v / mx for k, v in m.items() if v > 0} def weighted_overlap(a: Dict[int, float], b: Dict[int, float]) -> float: """Gewichtete Überlappung 0..1 (min-Summe / max-Summe).""" if not a or not b: return 0.0 keys = set(a) | set(b) num = sum(min(a.get(k, 0.0), b.get(k, 0.0)) for k in keys) den = sum(max(a.get(k, 0.0), b.get(k, 0.0)) for k in keys) return num / den if den > 0 else 0.0 def gap_coverage(gap: Dict[int, float], candidate: Dict[int, float]) -> float: """Anteil der Skill-Lücke, den der Kandidat abdeckt (0..1).""" if not gap: return 0.0 total_gap = sum(gap.values()) if total_gap <= 0: return 0.0 covered = sum(min(gap.get(k, 0.0), candidate.get(k, 0.0)) for k in gap) return covered / total_gap @dataclass class ExerciseMatchProfile: exercise_id: int focus_area_ids: Dict[int, float] = field(default_factory=dict) style_direction_ids: Dict[int, float] = field(default_factory=dict) training_type_ids: Dict[int, float] = field(default_factory=dict) target_group_ids: Dict[int, float] = field(default_factory=dict) skill_weights: Dict[int, float] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return { "exercise_id": self.exercise_id, "focus_area_ids": self.focus_area_ids, "style_direction_ids": self.style_direction_ids, "training_type_ids": self.training_type_ids, "target_group_ids": self.target_group_ids, "skill_weights": self.skill_weights, } @dataclass class PlanningTargetProfile: focus_area_ids: Dict[int, float] = field(default_factory=dict) style_direction_ids: Dict[int, float] = field(default_factory=dict) training_type_ids: Dict[int, float] = field(default_factory=dict) target_group_ids: Dict[int, float] = field(default_factory=dict) skill_weights: Dict[int, float] = field(default_factory=dict) skill_gap_weights: Dict[int, float] = field(default_factory=dict) skill_plan_weights: Dict[int, float] = field(default_factory=dict) sources: List[str] = field(default_factory=list) def to_summary_dict(self, cur, limit_skills: int = 5) -> Dict[str, Any]: focus_labels = _load_focus_labels(cur, list(self.focus_area_ids.keys())[:6]) top_skills = sorted(self.skill_weights.items(), key=lambda x: -x[1])[:limit_skills] skill_names = _load_skill_names(cur, [s[0] for s in top_skills]) return { "sources": list(self.sources), "focus_areas": focus_labels, "top_skills": [ {"skill_id": sid, "name": skill_names.get(sid, f"#{sid}"), "weight": round(w, 2)} for sid, w in top_skills ], "has_skill_gap": bool(self.skill_gap_weights), } def _load_focus_labels(cur, ids: Sequence[int]) -> List[str]: if not ids: return [] ph = ",".join(["%s"] * len(ids)) cur.execute( f"SELECT id, name FROM focus_areas WHERE id IN ({ph}) ORDER BY name", list(ids), ) return [f"{r['name'] or r['id']}" for r in cur.fetchall()] def _load_skill_names(cur, ids: Sequence[int]) -> Dict[int, str]: if not ids: return {} ph = ",".join(["%s"] * len(ids)) cur.execute(f"SELECT id, name FROM skills WHERE id IN ({ph})", list(ids)) return {int(r["id"]): str(r["name"] or "") for r in cur.fetchall()} def _skill_weights_from_profile(skills_out: Sequence[Dict[str, Any]]) -> Dict[int, float]: out: Dict[int, float] = {} for row in skills_out or []: sid = row.get("skill_id") if sid is None: continue w = float(row.get("weight") or row.get("score") or 0) if w > 0: out[int(sid)] = w return out def _single_exercise_skill_weights( skill_rows: Sequence[Dict[str, Any]], *, minutes: float = DEFAULT_ITEM_MINUTES, ) -> Dict[int, float]: out: Dict[int, float] = {} for link in skill_rows or []: sid = link.get("skill_id") if sid is None: continue sid = int(sid) mult = _skill_link_multiplier( intensity=link.get("intensity"), required_level=link.get("required_level"), target_level=link.get("target_level"), ) w = minutes * mult if w > 0: out[sid] = out.get(sid, 0.0) + w return out def _load_relation_maps_bulk( cur, exercise_ids: Sequence[int], table: str, id_column: str, ) -> Dict[int, Dict[int, float]]: ids = [int(x) for x in exercise_ids if int(x) > 0] if not ids: return {} ph = ",".join(["%s"] * len(ids)) cur.execute( f""" SELECT exercise_id, {id_column} AS rel_id, is_primary FROM {table} WHERE exercise_id IN ({ph}) """, ids, ) out: Dict[int, Dict[int, float]] = {eid: {} for eid in ids} for row in cur.fetchall(): eid = int(row["exercise_id"]) rid = int(row["rel_id"]) w = 1.0 if row.get("is_primary") else 0.85 out.setdefault(eid, {})[rid] = max(out[eid].get(rid, 0.0), w) return out def load_exercise_match_profiles_bulk(cur, exercise_ids: Sequence[int]) -> Dict[int, ExerciseMatchProfile]: ids = sorted({int(x) for x in exercise_ids if int(x) > 0}) if not ids: return {} focus_map = _load_relation_maps_bulk(cur, ids, "exercise_focus_areas", "focus_area_id") style_map = _load_relation_maps_bulk(cur, ids, "exercise_style_directions", "style_direction_id") type_map = _load_relation_maps_bulk(cur, ids, "exercise_training_types", "training_type_id") tg_map = _load_relation_maps_bulk(cur, ids, "exercise_target_groups", "target_group_id") skills_bulk = fetch_exercise_skills_bulk(cur, ids) profiles: Dict[int, ExerciseMatchProfile] = {} for eid in ids: profiles[eid] = ExerciseMatchProfile( exercise_id=eid, focus_area_ids=focus_map.get(eid, {}), style_direction_ids=style_map.get(eid, {}), training_type_ids=type_map.get(eid, {}), target_group_ids=tg_map.get(eid, {}), skill_weights=_single_exercise_skill_weights(skills_bulk.get(eid, [])), ) return profiles def _resolve_framework_for_unit(cur, unit: Dict[str, Any]) -> Optional[Dict[str, Any]]: slot_id = unit.get("framework_slot_id") or unit.get("origin_framework_slot_id") if not slot_id: return None cur.execute( """ SELECT s.id AS slot_id, s.framework_program_id, s.sort_order, s.title AS slot_title, fp.title AS framework_title, fp.focus_area_id AS header_focus_area_id FROM training_framework_slots s JOIN training_framework_programs fp ON fp.id = s.framework_program_id WHERE s.id = %s """, (int(slot_id),), ) row = cur.fetchone() return dict(row) if row else None def _framework_catalog_weights(cur, framework_id: int) -> Tuple[Dict[int, float], Dict[int, float], Dict[int, float], Dict[int, float]]: cur.execute( "SELECT focus_area_id FROM training_framework_programs WHERE id = %s", (framework_id,), ) hdr = cur.fetchone() header_fa = int(hdr["focus_area_id"]) if hdr and hdr.get("focus_area_id") else None cur.execute( "SELECT focus_area_id FROM training_framework_program_focus_areas WHERE framework_program_id = %s", (framework_id,), ) fa_ids = [int(r["focus_area_id"]) for r in cur.fetchall()] if header_fa and header_fa not in fa_ids: fa_ids.insert(0, header_fa) focus = _ids_to_weights(fa_ids, primary_id=header_fa) cur.execute( "SELECT style_direction_id FROM training_framework_program_style_directions WHERE framework_program_id = %s", (framework_id,), ) style = _ids_to_weights([int(r["style_direction_id"]) for r in cur.fetchall()]) cur.execute( "SELECT training_type_id FROM training_framework_program_training_types WHERE framework_program_id = %s", (framework_id,), ) tt = _ids_to_weights([int(r["training_type_id"]) for r in cur.fetchall()]) cur.execute( "SELECT target_group_id FROM training_framework_program_target_groups WHERE framework_program_id = %s", (framework_id,), ) tg = _ids_to_weights([int(r["target_group_id"]) for r in cur.fetchall()]) return focus, style, tt, tg def _profile_from_unit_occurrences(cur, unit_id: int) -> Dict[int, float]: occ = collect_unit_exercise_occurrences(cur, int(unit_id)) if not occ: return {} prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None) return _skill_weights_from_profile(prof.get("skills") or []) def _profile_from_exercise_ids(cur, exercise_ids: Sequence[int]) -> Dict[int, float]: ids = [int(x) for x in exercise_ids if int(x) > 0] if not ids: return {} occ = [ExerciseOccurrence(exercise_id=eid) for eid in ids] prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None) return _skill_weights_from_profile(prof.get("skills") or []) def skill_profile_summary_from_exercise_ids( cur, exercise_ids: Sequence[int], *, limit_skills: int = 8, ) -> Dict[str, Any]: """Kompaktes Fähigkeitenprofil für LLM-Kontext und UI.""" ids = [int(x) for x in exercise_ids if int(x) > 0] if not ids: return {"exercise_count": 0, "skills": []} occ = [ExerciseOccurrence(exercise_id=eid) for eid in ids] prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None) skills_out = prof.get("skills") or [] top = sorted(skills_out, key=lambda s: -float(s.get("weight") or s.get("score") or 0))[:limit_skills] names = _load_skill_names(cur, [int(s["skill_id"]) for s in top if s.get("skill_id") is not None]) return { "exercise_count": len(ids), "skills": [ { "skill_id": int(s["skill_id"]), "name": names.get(int(s["skill_id"]), f"#{s['skill_id']}"), "weight": round(float(s.get("weight") or s.get("score") or 0), 3), } for s in top if s.get("skill_id") is not None ], } def build_planning_target_profile( cur, *, unit: Dict[str, Any], planned_exercise_ids: Sequence[int], section_planned_exercise_ids: Optional[Sequence[int]] = None, anchor_exercise_id: Optional[int], intent: str, ) -> PlanningTargetProfile: sources: List[str] = [] focus: Dict[int, float] = {} style: Dict[int, float] = {} tt: Dict[int, float] = {} tg: Dict[int, float] = {} skill_target: Dict[int, float] = {} skill_plan: Dict[int, float] = {} fw = _resolve_framework_for_unit(cur, unit) if fw: fid = int(fw["framework_program_id"]) f_focus, f_style, f_tt, f_tg = _framework_catalog_weights(cur, fid) focus = _merge_weight_maps(focus, f_focus) style = _merge_weight_maps(style, f_style) tt = _merge_weight_maps(tt, f_tt) tg = _merge_weight_maps(tg, f_tg) sources.append("framework_catalog") slot_id = fw.get("slot_id") cur.execute( "SELECT id FROM training_units WHERE framework_slot_id = %s LIMIT 1", (int(slot_id),), ) bp = cur.fetchone() if bp and bp.get("id"): slot_skills = _profile_from_unit_occurrences(cur, int(bp["id"])) if slot_skills: skill_target = _merge_weight_maps(skill_target, slot_skills, scale=1.0) sources.append("framework_slot_skill_profile") if not skill_target: cur.execute( """ SELECT tu.id FROM training_framework_slots s LEFT JOIN training_units tu ON tu.framework_slot_id = s.id WHERE s.framework_program_id = %s AND tu.id IS NOT NULL """, (fid,), ) all_occ: List[ExerciseOccurrence] = [] for r in cur.fetchall(): all_occ.extend(collect_unit_exercise_occurrences(cur, int(r["id"]))) if all_occ: prof = profile_for_occurrences(cur, all_occ, reference_max_by_skill=None) skill_target = _merge_weight_maps( skill_target, _skill_weights_from_profile(prof.get("skills") or []), scale=0.85 ) sources.append("framework_overall_skill_profile") if planned_exercise_ids: occ = [ExerciseOccurrence(exercise_id=int(eid)) for eid in planned_exercise_ids] prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None) skill_plan = _skill_weights_from_profile(prof.get("skills") or []) if skill_plan: sources.append("current_unit_plan") section_ids = [int(x) for x in (section_planned_exercise_ids or []) if int(x) > 0] if section_ids: section_skills = _profile_from_exercise_ids(cur, section_ids) if section_skills: skill_target = _merge_weight_maps(skill_target, section_skills, scale=1.0) sources.append("current_section_plan") if anchor_exercise_id: anchor_profiles = load_exercise_match_profiles_bulk(cur, [int(anchor_exercise_id)]) ap = anchor_profiles.get(int(anchor_exercise_id)) if ap: if intent in ("deepen_exercise", "suggest_next", "progression_next", "continue_plan_goal"): skill_target = _merge_weight_maps(skill_target, ap.skill_weights, scale=1.0) focus = _merge_weight_maps(focus, ap.focus_area_ids, scale=0.9) style = _merge_weight_maps(style, ap.style_direction_ids, scale=0.75) tt = _merge_weight_maps(tt, ap.training_type_ids, scale=0.75) tg = _merge_weight_maps(tg, ap.target_group_ids, scale=0.75) sources.append("anchor_exercise") skill_target = _normalize_weight_map(skill_target) skill_plan_norm = _normalize_weight_map(skill_plan) skill_gap: Dict[int, float] = {} for sid, tw in skill_target.items(): pw = skill_plan_norm.get(sid, 0.0) gap = tw - pw * 0.85 if gap > 0.08: skill_gap[sid] = gap if skill_gap: sources.append("skill_gap_vs_plan") return PlanningTargetProfile( focus_area_ids=_normalize_weight_map(focus) if focus else focus, style_direction_ids=_normalize_weight_map(style) if style else style, training_type_ids=_normalize_weight_map(tt) if tt else tt, target_group_ids=_normalize_weight_map(tg) if tg else tg, skill_weights=skill_target, skill_gap_weights=_normalize_weight_map(skill_gap) if skill_gap else skill_gap, skill_plan_weights=skill_plan_norm, sources=sources, ) def score_exercise_against_target( exercise: ExerciseMatchProfile, target: PlanningTargetProfile, *, intent: str, ) -> Tuple[float, List[str]]: """Profil-Match 0..1 + deutschsprachige Gründe.""" reasons: List[str] = [] focus_sim = weighted_overlap(exercise.focus_area_ids, target.focus_area_ids) style_sim = weighted_overlap(exercise.style_direction_ids, target.style_direction_ids) tt_sim = weighted_overlap(exercise.training_type_ids, target.training_type_ids) tg_sim = weighted_overlap(exercise.target_group_ids, target.target_group_ids) skill_sim = weighted_overlap( _normalize_weight_map(exercise.skill_weights), target.skill_weights, ) gap_sim = gap_coverage(target.skill_gap_weights, _normalize_weight_map(exercise.skill_weights)) if focus_sim >= 0.5 and target.focus_area_ids: reasons.append("Fokusbereich passend zum Planungsziel") if style_sim >= 0.5 and target.style_direction_ids: reasons.append("Stilrichtung passend") if tt_sim >= 0.5 and target.training_type_ids: reasons.append("Trainingsstil passend") if tg_sim >= 0.5 and target.target_group_ids: reasons.append("Zielgruppe passend") if skill_sim >= 0.35 and target.skill_weights: reasons.append("Fähigkeiten-Schwerpunkt passend (Profilmetrik)") if gap_sim >= 0.25 and target.skill_gap_weights: reasons.append("Deckt Skill-Lücke im bisherigen Plan") if "query_intent" in (target.sources or []): reasons.append("Passt zur KI-interpretierten Suchanfrage") # Intent-gewichtete Dimensionen (Summe = 1.0) if intent == INTENT_FREE_SEARCH: weights = {"focus": 0.15, "style": 0.10, "tt": 0.10, "tg": 0.10, "skill": 0.25, "gap": 0.30} elif intent == INTENT_DEEPEN_EXERCISE: weights = {"focus": 0.15, "style": 0.10, "tt": 0.10, "tg": 0.05, "skill": 0.45, "gap": 0.15} elif intent == INTENT_PROGRESSION_NEXT: weights = {"focus": 0.20, "style": 0.10, "tt": 0.10, "tg": 0.05, "skill": 0.35, "gap": 0.20} else: weights = {"focus": 0.20, "style": 0.10, "tt": 0.10, "tg": 0.10, "skill": 0.30, "gap": 0.20} score = ( weights["focus"] * focus_sim + weights["style"] * style_sim + weights["tt"] * tt_sim + weights["tg"] * tg_sim + weights["skill"] * skill_sim + weights["gap"] * gap_sim ) return max(0.0, min(1.0, score)), reasons # Re-export intent constants for typing (avoid circular import at runtime in suggest module) INTENT_FREE_SEARCH = "free_search" INTENT_DEEPEN_EXERCISE = "deepen_exercise" INTENT_PROGRESSION_NEXT = "progression_next"