""" Planungs-KI P0: Kontext-Pack + Hybrid-Retrieval für Übungssuche in der Trainingsplanung. Siehe .claude/docs/working/PLANNING_EXERCISE_SUGGEST_CONTEXT.md """ from __future__ import annotations import re from typing import Any, Dict, List, Optional, Sequence, Set, Tuple from fastapi import HTTPException from pydantic import BaseModel, Field from tenant_context import TenantContext, library_content_visibility_sql from planning_exercise_profiles import ( load_exercise_match_profiles_bulk, score_exercise_against_target, ) from planning_exercise_llm_rank import try_llm_rerank_planning_hits from planning_exercise_target_pipeline import ( build_planning_target_with_query_pipeline, compose_retrieval_phase, ) # Planungs-Berechtigung + Sektionen (bestehende Implementierung) from routers.training_planning import ( _assert_training_unit_permission, _fetch_sections, _has_planning_role, ) INTENT_SUGGEST_NEXT = "suggest_next" INTENT_PROGRESSION_NEXT = "progression_next" INTENT_DEEPEN_EXERCISE = "deepen_exercise" INTENT_CONTINUE_PLAN = "continue_plan_goal" INTENT_FREE_SEARCH = "free_search" VALID_INTENTS = { INTENT_SUGGEST_NEXT, INTENT_PROGRESSION_NEXT, INTENT_DEEPEN_EXERCISE, INTENT_CONTINUE_PLAN, INTENT_FREE_SEARCH, } _CANDIDATE_POOL_LIMIT = 400 _LLM_RERANK_PRE_LIMIT = 32 class PlanningExerciseSuggestRequest(BaseModel): unit_id: int = Field(..., ge=1) section_order_index: Optional[int] = Field(default=None, ge=0) phase_order_index: Optional[int] = Field(default=None, ge=0) parallel_stream_order_index: Optional[int] = Field(default=None, ge=0) anchor_exercise_id: Optional[int] = Field(default=None, ge=1) progression_graph_id: Optional[int] = Field(default=None, ge=1) query: Optional[str] = "" intent_hint: Optional[str] = None planned_exercise_ids: Optional[List[int]] = None include_llm_intent: bool = True include_llm_rank: bool = False limit: int = Field(default=20, ge=1, le=50) exercise_kind_any: Optional[List[str]] = None def resolve_planning_exercise_intent(query: Optional[str], intent_hint: Optional[str]) -> str: hint = (intent_hint or "").strip().lower() if hint in VALID_INTENTS: return hint q = (query or "").strip().lower() if not q: return INTENT_SUGGEST_NEXT if any(w in q for w in ("nächste", "naechste", "vorschlag", "vorschlagen", "empfehl")): return INTENT_SUGGEST_NEXT if "vertief" in q: return INTENT_DEEPEN_EXERCISE if "progression" in q or "graph" in q or "pfad" in q: return INTENT_PROGRESSION_NEXT if "aufbau" in q or "planung" in q or "bisher" in q: return INTENT_CONTINUE_PLAN return INTENT_FREE_SEARCH def _intent_weights(intent: str) -> Dict[str, float]: base = { "fulltext": 0.18, "progression": 0.18, "skill": 0.12, "plan": 0.08, "profile": 0.22, "repeat_unit": -0.30, "repeat_group": -0.15, } if intent == INTENT_SUGGEST_NEXT: return { **base, "progression": 0.28, "skill": 0.12, "plan": 0.10, "profile": 0.25, "fulltext": 0.08, } if intent == INTENT_PROGRESSION_NEXT: return {**base, "progression": 0.42, "fulltext": 0.12, "skill": 0.10, "profile": 0.20} if intent == INTENT_DEEPEN_EXERCISE: return {**base, "skill": 0.15, "profile": 0.35, "fulltext": 0.15, "progression": 0.10} if intent == INTENT_CONTINUE_PLAN: return {**base, "plan": 0.12, "skill": 0.10, "profile": 0.30, "fulltext": 0.10, "progression": 0.08} if intent == INTENT_FREE_SEARCH: return {**base, "fulltext": 0.45, "progression": 0.08, "skill": 0.08, "profile": 0.15} return base def _collect_planned_exercise_ids(sections: Sequence[Dict[str, Any]]) -> List[int]: out: List[int] = [] seen: Set[int] = set() for sec in sorted(sections, key=lambda s: int(s.get("order_index") or 0)): items = sec.get("items") or [] for it in sorted(items, key=lambda x: int(x.get("order_index") or 0)): if str(it.get("item_type") or "").strip().lower() == "note": continue raw = it.get("exercise_id") if raw is None: continue try: eid = int(raw) except (TypeError, ValueError): continue if eid < 1 or eid in seen: continue seen.add(eid) out.append(eid) return out def _resolve_anchor_from_plan( planned_ids: Sequence[int], anchor_exercise_id: Optional[int], ) -> Optional[int]: if anchor_exercise_id and int(anchor_exercise_id) > 0: return int(anchor_exercise_id) if planned_ids: return int(planned_ids[-1]) return None def _load_exercise_titles(cur, exercise_ids: Sequence[int]) -> Dict[int, str]: if not exercise_ids: return {} ids = list(dict.fromkeys(int(x) for x in exercise_ids if int(x) > 0)) ph = ",".join(["%s"] * len(ids)) cur.execute( f"SELECT id, title FROM exercises WHERE id IN ({ph})", ids, ) return {int(r["id"]): str(r["title"] or "").strip() for r in cur.fetchall()} def _load_skill_ids_for_exercise(cur, exercise_id: Optional[int]) -> Set[int]: if not exercise_id: return set() cur.execute( "SELECT skill_id FROM exercise_skills WHERE exercise_id = %s", (int(exercise_id),), ) return {int(r["skill_id"]) for r in cur.fetchall() if r.get("skill_id")} def _load_progression_successors( cur, graph_id: Optional[int], from_exercise_id: Optional[int], ) -> Tuple[Set[int], Dict[int, str]]: if not graph_id or not from_exercise_id: return set(), {} cur.execute( """ SELECT to_exercise_id, notes FROM exercise_progression_edges WHERE graph_id = %s AND from_exercise_id = %s AND LOWER(TRIM(edge_type)) = 'next_exercise' """, (int(graph_id), int(from_exercise_id)), ) ids: Set[int] = set() notes: Dict[int, str] = {} for row in cur.fetchall(): tid = int(row["to_exercise_id"]) ids.add(tid) n = (row.get("notes") or "").strip() if n: notes[tid] = n return ids, notes def _load_group_recent_exercise_ids( cur, group_id: Optional[int], exclude_unit_id: int, limit: int = 40, ) -> Set[int]: if not group_id: return set() cur.execute( """ SELECT tusi.exercise_id AS eid FROM training_units tu INNER JOIN training_unit_sections tus ON tus.training_unit_id = tu.id INNER JOIN training_unit_section_items tusi ON tusi.section_id = tus.id WHERE tu.group_id = %s AND tu.id <> %s AND tusi.exercise_id IS NOT NULL AND COALESCE(tu.status, '') <> 'cancelled' ORDER BY tu.planned_date DESC NULLS LAST, tu.id DESC, tusi.order_index DESC LIMIT 200 """, (int(group_id), int(exclude_unit_id)), ) out: Set[int] = set() for r in cur.fetchall(): if r.get("eid") is None: continue out.add(int(r["eid"])) if len(out) >= limit: break return out def _section_title_for_index(sections: Sequence[Dict[str, Any]], section_order_index: Optional[int]) -> Optional[str]: if section_order_index is None: return None for sec in sections: if int(sec.get("order_index") or -1) == int(section_order_index): t = (sec.get("title") or "").strip() return t or None return None def _normalize_query(query: Optional[str]) -> str: return re.sub(r"\s+", " ", (query or "").strip()) def _skill_jaccard(a: Set[int], b: Set[int]) -> float: if not a or not b: return 0.0 inter = len(a & b) union = len(a | b) return inter / union if union else 0.0 def _apply_client_planned_override( cur, pack: Dict[str, Any], body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: """Client-Plan (ungespeichertes Formular) überschreibt DB-Stand.""" if not body.planned_exercise_ids: return pack planned_ids: List[int] = [] seen: Set[int] = set() for raw in body.planned_exercise_ids: try: eid = int(raw) except (TypeError, ValueError): continue if eid < 1 or eid in seen: continue seen.add(eid) planned_ids.append(eid) if not planned_ids: return pack pack["planned_exercise_ids"] = planned_ids if not body.anchor_exercise_id: anchor_id = _resolve_anchor_from_plan(planned_ids, None) pack["anchor_exercise_id"] = anchor_id if anchor_id: titles = _load_exercise_titles(cur, [anchor_id]) pack["anchor_title"] = titles.get(anchor_id) pack["anchor_skill_ids"] = sorted(_load_skill_ids_for_exercise(cur, anchor_id)) else: pack["anchor_title"] = None pack["anchor_skill_ids"] = [] return pack def build_planning_exercise_context_pack( cur, *, tenant: TenantContext, body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: profile_id = tenant.profile_id role = tenant.global_role if not _has_planning_role(role): raise HTTPException(status_code=403, detail="Nur Trainer dürfen Planungs-Vorschläge abrufen") cur.execute( """ SELECT tu.*, tg.name AS group_name FROM training_units tu LEFT JOIN training_groups tg ON tg.id = tu.group_id WHERE tu.id = %s """, (body.unit_id,), ) unit_row = cur.fetchone() if not unit_row: raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden") unit = dict(unit_row) if unit.get("framework_slot_id"): if role not in ("admin", "superadmin"): cur.execute( """ SELECT fp.created_by FROM training_framework_slots s JOIN training_framework_programs fp ON fp.id = s.framework_program_id WHERE s.id = %s """, (unit["framework_slot_id"],), ) fr = cur.fetchone() cb = fr["created_by"] if fr else None if unit.get("created_by") != profile_id and cb != profile_id: raise HTTPException(status_code=403, detail="Keine Berechtigung") else: if not unit.get("group_id"): raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden") _assert_training_unit_permission(cur, unit, profile_id, role) sections = _fetch_sections(cur, int(body.unit_id)) planned_ids = _collect_planned_exercise_ids(sections) anchor_id = _resolve_anchor_from_plan(planned_ids, body.anchor_exercise_id) anchor_skills = _load_skill_ids_for_exercise(cur, anchor_id) progression_ids, progression_notes = _load_progression_successors( cur, body.progression_graph_id, anchor_id ) group_recent = _load_group_recent_exercise_ids(cur, unit.get("group_id"), int(body.unit_id)) titles = _load_exercise_titles(cur, [x for x in [anchor_id] if x]) anchor_title = titles.get(anchor_id) if anchor_id else None return { "unit_id": int(body.unit_id), "unit": { "id": int(body.unit_id), "framework_slot_id": unit.get("framework_slot_id"), "origin_framework_slot_id": unit.get("origin_framework_slot_id"), }, "unit_title": (unit.get("title") or unit.get("planned_focus") or "").strip() or None, "group_id": unit.get("group_id"), "group_name": (unit.get("group_name") or "").strip() or None, "section_order_index": body.section_order_index, "section_title": _section_title_for_index(sections, body.section_order_index), "planned_exercise_ids": planned_ids, "anchor_exercise_id": anchor_id, "anchor_title": anchor_title, "anchor_skill_ids": sorted(anchor_skills), "progression_graph_id": body.progression_graph_id, "progression_successor_ids": sorted(progression_ids), "progression_edge_notes": progression_notes, "group_recent_exercise_ids": sorted(group_recent), } def suggest_planning_exercises( cur, *, tenant: TenantContext, body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: pack = build_planning_exercise_context_pack(cur, tenant=tenant, body=body) pack = _apply_client_planned_override(cur, pack, body) query = _normalize_query(body.query) heuristic_intent = resolve_planning_exercise_intent(query, body.intent_hint) pipeline_context = { "unit_title": pack.get("unit_title"), "group_name": pack.get("group_name"), "section_title": pack.get("section_title"), "planned_count": len(pack.get("planned_exercise_ids") or []), "anchor_title": pack.get("anchor_title"), "anchor_exercise_id": pack.get("anchor_exercise_id"), "progression_graph_id": pack.get("progression_graph_id"), } target_profile, intent, scenario_kind, query_intent_summary = build_planning_target_with_query_pipeline( cur, unit=pack["unit"], planned_exercise_ids=pack["planned_exercise_ids"], anchor_exercise_id=pack.get("anchor_exercise_id"), query=query, heuristic_intent=heuristic_intent, include_llm_intent=body.include_llm_intent, context_summary=pipeline_context, ) weights = _intent_weights(intent) target_profile_summary = target_profile.to_summary_dict(cur) query_intent_applied = bool(query_intent_summary.get("llm_applied")) profile_id = tenant.profile_id role = tenant.global_role vis_sql, vis_params = library_content_visibility_sql( alias="e", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) where = [vis_sql, "COALESCE(e.status, '') <> %s"] params: List[Any] = [] if query: ft_select = "ts_rank_cd(e.search_vector, plainto_tsquery('german', %s)) AS ft_rank" params.append(query) else: ft_select = "0.0::float AS ft_rank" params.extend(list(vis_params)) params.append("archived") ek_filtered: List[str] = [] if body.exercise_kind_any: for raw in body.exercise_kind_any: s = str(raw or "").strip().lower() if s in ("simple", "combination") and s not in ek_filtered: ek_filtered.append(s) if ek_filtered: ph = ",".join(["%s"] * len(ek_filtered)) where.append(f"(LOWER(TRIM(COALESCE(e.exercise_kind::text,''))) IN ({ph}))") params.extend(ek_filtered) sql = f""" SELECT e.id, e.title, e.summary, ( SELECT fa.name FROM exercise_focus_areas efa JOIN focus_areas fa ON fa.id = efa.focus_area_id WHERE efa.exercise_id = e.id ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC LIMIT 1 ) AS primary_focus_name, {ft_select} FROM exercises e WHERE {' AND '.join(where)} ORDER BY e.updated_at DESC, e.id DESC LIMIT %s """ params.append(_CANDIDATE_POOL_LIMIT) cur.execute(sql, params) rows = cur.fetchall() planned_set = set(pack["planned_exercise_ids"]) group_recent_set = set(pack["group_recent_exercise_ids"]) progression_set = set(pack["progression_successor_ids"]) anchor_skills = set(pack["anchor_skill_ids"]) anchor_id = pack.get("anchor_exercise_id") progression_notes = pack.get("progression_edge_notes") or {} last_planned_skills: Set[int] = set() if pack["planned_exercise_ids"]: last_planned_skills = _load_skill_ids_for_exercise(cur, pack["planned_exercise_ids"][-1]) # Skill-IDs + ExerciseMatchProfile pro Kandidat (Batch) cand_ids = [int(r["id"]) for r in rows] skills_by_ex: Dict[int, Set[int]] = {cid: set() for cid in cand_ids} match_profiles = load_exercise_match_profiles_bulk(cur, cand_ids) if cand_ids: ph = ",".join(["%s"] * len(cand_ids)) cur.execute( f"SELECT exercise_id, skill_id FROM exercise_skills WHERE exercise_id IN ({ph})", cand_ids, ) for r in cur.fetchall(): skills_by_ex.setdefault(int(r["exercise_id"]), set()).add(int(r["skill_id"])) max_ft = 0.0 scored: List[Dict[str, Any]] = [] for row in rows: eid = int(row["id"]) if anchor_id and eid == int(anchor_id): continue ft = float(row.get("ft_rank") or 0.0) if ft > max_ft: max_ft = ft scored.append( { "row": row, "eid": eid, "ft": ft, "skills": skills_by_ex.get(eid, set()), } ) hits: List[Dict[str, Any]] = [] for item in scored: eid = item["eid"] row = item["row"] ft_norm = (item["ft"] / max_ft) if max_ft > 0 else 0.0 prog_hit = 1.0 if eid in progression_set else 0.0 skill_sim = _skill_jaccard(anchor_skills, item["skills"]) if anchor_skills else 0.0 plan_aff = 0.0 if last_planned_skills and item["skills"]: plan_aff = _skill_jaccard(last_planned_skills, item["skills"]) repeat_unit = 1.0 if eid in planned_set else 0.0 repeat_group = 1.0 if eid in group_recent_set else 0.0 profile_score = 0.0 profile_reasons: List[str] = [] emp = match_profiles.get(eid) if emp: profile_score, profile_reasons = score_exercise_against_target( emp, target_profile, intent=intent ) score = ( weights["fulltext"] * ft_norm + weights["progression"] * prog_hit + weights["skill"] * skill_sim + weights["plan"] * plan_aff + weights["profile"] * profile_score + weights["repeat_unit"] * repeat_unit + weights["repeat_group"] * repeat_group ) reasons: List[str] = [] if query and ft_norm >= 0.35: reasons.append("Volltext-Treffer") if prog_hit > 0: note = progression_notes.get(eid) reasons.append( f"Nachfolger im Progressionsgraph{f': {note}' if note else ''}" ) if skill_sim >= 0.2 and anchor_id: reasons.append("Fähigkeiten passen zur Anker-Übung") if plan_aff >= 0.25: reasons.append("Schließt an Skills der letzten geplanten Übung an") if repeat_unit > 0: reasons.append("Bereits in dieser Einheit eingeplant") if repeat_group > 0 and repeat_unit <= 0: reasons.append("Kürzlich in der Gruppe verwendet") for pr in profile_reasons: if pr not in reasons: reasons.append(pr) if score <= 0 and not reasons and not query: # Leere Query: trotzdem schwache Kandidaten mit Skill/Progression if prog_hit or skill_sim or plan_aff or profile_score: score = 0.05 + prog_hit * 0.3 + skill_sim * 0.2 + profile_score * 0.25 hits.append( { "id": eid, "title": row.get("title"), "summary": row.get("summary"), "focus_area": row.get("primary_focus_name"), "score": round(max(0.0, min(1.0, score)), 4), "reasons": reasons, } ) hits.sort(key=lambda h: (-h["score"], h.get("title") or "")) llm_applied = False retrieval_phase = compose_retrieval_phase(query_intent=query_intent_applied, llm_rank=False) if body.include_llm_rank: pre_limit = max(int(body.limit), _LLM_RERANK_PRE_LIMIT) pool_hits = hits[:pre_limit] pool_hits, llm_applied = try_llm_rerank_planning_hits( cur, hits=pool_hits, skills_by_ex=skills_by_ex, query=query, intent=intent, context_summary={ "unit_title": pack.get("unit_title"), "group_name": pack.get("group_name"), "section_title": pack.get("section_title"), "planned_count": len(planned_set), "anchor_title": pack.get("anchor_title"), "intent": intent, }, target_profile_summary=target_profile_summary, limit=int(body.limit), ) if llm_applied: retrieval_phase = compose_retrieval_phase( query_intent=query_intent_applied, llm_rank=True, ) tail = hits[pre_limit:] hits = pool_hits + tail else: hits = pool_hits[: int(body.limit)] else: hits = hits[: int(body.limit)] hits = hits[: int(body.limit)] context_summary = { "unit_title": pack.get("unit_title"), "group_name": pack.get("group_name"), "section_title": pack.get("section_title"), "planned_count": len(planned_set), "anchor_title": pack.get("anchor_title"), "anchor_exercise_id": pack.get("anchor_exercise_id"), "progression_graph_id": pack.get("progression_graph_id"), } return { "context_summary": context_summary, "target_profile_summary": target_profile_summary, "scenario_kind": scenario_kind, "query_intent_summary": query_intent_summary, "retrieval_phase": retrieval_phase, "llm_rank_applied": llm_applied, "intent_resolved": intent, "intent_heuristic": heuristic_intent, "query_normalized": query or None, "hits": hits, }