""" Planungs-KI P0: Kontext-Pack + Hybrid-Retrieval für Übungssuche in der Trainingsplanung. Siehe .claude/docs/working/PLANNING_EXERCISE_SUGGEST_CONTEXT.md """ from __future__ import annotations import re from typing import Any, Dict, List, Optional, Sequence, Set, Tuple from fastapi import HTTPException from pydantic import BaseModel, Field from tenant_context import TenantContext, library_content_visibility_sql from planning_exercise_retrieval import run_multistage_planning_retrieval from planning_exercise_llm_rank import try_llm_rerank_planning_hits from planning_exercise_target_pipeline import ( build_planning_target_with_query_pipeline, compose_retrieval_phase, should_run_llm_rank_pipeline, ) # Planungs-Berechtigung + Sektionen (bestehende Implementierung) from routers.training_planning import ( _assert_training_unit_permission, _fetch_sections, _has_planning_role, ) INTENT_SUGGEST_NEXT = "suggest_next" INTENT_PROGRESSION_NEXT = "progression_next" INTENT_DEEPEN_EXERCISE = "deepen_exercise" INTENT_CONTINUE_PLAN = "continue_plan_goal" INTENT_FREE_SEARCH = "free_search" VALID_INTENTS = { INTENT_SUGGEST_NEXT, INTENT_PROGRESSION_NEXT, INTENT_DEEPEN_EXERCISE, INTENT_CONTINUE_PLAN, INTENT_FREE_SEARCH, } _LLM_RERANK_PRE_LIMIT = 32 class PlanningExerciseSuggestRequest(BaseModel): unit_id: Optional[int] = Field(default=None, ge=1) group_id: Optional[int] = Field(default=None, ge=1) section_order_index: Optional[int] = Field(default=None, ge=0) phase_order_index: Optional[int] = Field(default=None, ge=0) parallel_stream_order_index: Optional[int] = Field(default=None, ge=0) anchor_exercise_id: Optional[int] = Field(default=None, ge=1) progression_graph_id: Optional[int] = Field(default=None, ge=1) query: Optional[str] = "" intent_hint: Optional[str] = None planned_exercise_ids: Optional[List[int]] = None include_llm_intent: bool = True include_llm_rank: bool = False limit: int = Field(default=20, ge=1, le=50) exercise_kind_any: Optional[List[str]] = None def resolve_planning_exercise_intent(query: Optional[str], intent_hint: Optional[str]) -> str: hint = (intent_hint or "").strip().lower() if hint in VALID_INTENTS: return hint q = (query or "").strip().lower() if not q: return INTENT_SUGGEST_NEXT if any(w in q for w in ("nächste", "naechste", "vorschlag", "vorschlagen", "empfehl")): return INTENT_SUGGEST_NEXT if "vertief" in q: return INTENT_DEEPEN_EXERCISE if "progression" in q or "graph" in q or "pfad" in q: return INTENT_PROGRESSION_NEXT if "aufbau" in q or "planung" in q or "bisher" in q: return INTENT_CONTINUE_PLAN return INTENT_FREE_SEARCH def _intent_weights(intent: str) -> Dict[str, float]: base = { "fulltext": 0.18, "progression": 0.18, "skill": 0.12, "plan": 0.08, "profile": 0.22, "repeat_unit": -0.30, "repeat_group": -0.15, } if intent == INTENT_SUGGEST_NEXT: return { **base, "progression": 0.28, "skill": 0.12, "plan": 0.10, "profile": 0.25, "fulltext": 0.08, } if intent == INTENT_PROGRESSION_NEXT: return {**base, "progression": 0.42, "fulltext": 0.12, "skill": 0.10, "profile": 0.20} if intent == INTENT_DEEPEN_EXERCISE: return {**base, "skill": 0.15, "profile": 0.35, "fulltext": 0.15, "progression": 0.10} if intent == INTENT_CONTINUE_PLAN: return {**base, "plan": 0.12, "skill": 0.10, "profile": 0.30, "fulltext": 0.10, "progression": 0.08} if intent == INTENT_FREE_SEARCH: return {**base, "fulltext": 0.45, "progression": 0.08, "skill": 0.08, "profile": 0.15} return base def _collect_planned_exercise_ids(sections: Sequence[Dict[str, Any]]) -> List[int]: out: List[int] = [] seen: Set[int] = set() for sec in sorted(sections, key=lambda s: int(s.get("order_index") or 0)): items = sec.get("items") or [] for it in sorted(items, key=lambda x: int(x.get("order_index") or 0)): if str(it.get("item_type") or "").strip().lower() == "note": continue raw = it.get("exercise_id") if raw is None: continue try: eid = int(raw) except (TypeError, ValueError): continue if eid < 1 or eid in seen: continue seen.add(eid) out.append(eid) return out def _resolve_anchor_from_plan( planned_ids: Sequence[int], anchor_exercise_id: Optional[int], ) -> Optional[int]: if anchor_exercise_id and int(anchor_exercise_id) > 0: return int(anchor_exercise_id) if planned_ids: return int(planned_ids[-1]) return None def _load_exercise_titles(cur, exercise_ids: Sequence[int]) -> Dict[int, str]: if not exercise_ids: return {} ids = list(dict.fromkeys(int(x) for x in exercise_ids if int(x) > 0)) ph = ",".join(["%s"] * len(ids)) cur.execute( f"SELECT id, title FROM exercises WHERE id IN ({ph})", ids, ) return {int(r["id"]): str(r["title"] or "").strip() for r in cur.fetchall()} def _load_skill_ids_for_exercise(cur, exercise_id: Optional[int]) -> Set[int]: if not exercise_id: return set() cur.execute( "SELECT skill_id FROM exercise_skills WHERE exercise_id = %s", (int(exercise_id),), ) return {int(r["skill_id"]) for r in cur.fetchall() if r.get("skill_id")} def _load_progression_successors( cur, graph_id: Optional[int], from_exercise_id: Optional[int], ) -> Tuple[Set[int], Dict[int, str]]: if not graph_id or not from_exercise_id: return set(), {} cur.execute( """ SELECT to_exercise_id, notes FROM exercise_progression_edges WHERE graph_id = %s AND from_exercise_id = %s AND LOWER(TRIM(edge_type)) = 'next_exercise' """, (int(graph_id), int(from_exercise_id)), ) ids: Set[int] = set() notes: Dict[int, str] = {} for row in cur.fetchall(): tid = int(row["to_exercise_id"]) ids.add(tid) n = (row.get("notes") or "").strip() if n: notes[tid] = n return ids, notes def _load_group_recent_exercise_ids( cur, group_id: Optional[int], exclude_unit_id: Optional[int] = None, limit: int = 40, ) -> Set[int]: if not group_id: return set() if exclude_unit_id is not None: cur.execute( """ SELECT tusi.exercise_id AS eid FROM training_units tu INNER JOIN training_unit_sections tus ON tus.training_unit_id = tu.id INNER JOIN training_unit_section_items tusi ON tusi.section_id = tus.id WHERE tu.group_id = %s AND tu.id <> %s AND tusi.exercise_id IS NOT NULL AND COALESCE(tu.status, '') <> 'cancelled' ORDER BY tu.planned_date DESC NULLS LAST, tu.id DESC, tusi.order_index DESC LIMIT 200 """, (int(group_id), int(exclude_unit_id)), ) else: cur.execute( """ SELECT tusi.exercise_id AS eid FROM training_units tu INNER JOIN training_unit_sections tus ON tus.training_unit_id = tu.id INNER JOIN training_unit_section_items tusi ON tusi.section_id = tus.id WHERE tu.group_id = %s AND tusi.exercise_id IS NOT NULL AND COALESCE(tu.status, '') <> 'cancelled' ORDER BY tu.planned_date DESC NULLS LAST, tu.id DESC, tusi.order_index DESC LIMIT 200 """, (int(group_id),), ) out: Set[int] = set() for r in cur.fetchall(): if r.get("eid") is None: continue out.add(int(r["eid"])) if len(out) >= limit: break return out def _section_title_for_index(sections: Sequence[Dict[str, Any]], section_order_index: Optional[int]) -> Optional[str]: if section_order_index is None: return None for sec in sections: if int(sec.get("order_index") or -1) == int(section_order_index): t = (sec.get("title") or "").strip() return t or None return None def _normalize_query(query: Optional[str]) -> str: return re.sub(r"\s+", " ", (query or "").strip()) def _apply_client_planned_override( cur, pack: Dict[str, Any], body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: """Client-Plan (ungespeichertes Formular) überschreibt DB-Stand.""" if not body.planned_exercise_ids: return pack planned_ids: List[int] = [] seen: Set[int] = set() for raw in body.planned_exercise_ids: try: eid = int(raw) except (TypeError, ValueError): continue if eid < 1 or eid in seen: continue seen.add(eid) planned_ids.append(eid) if not planned_ids: return pack pack["planned_exercise_ids"] = planned_ids if not body.anchor_exercise_id: anchor_id = _resolve_anchor_from_plan(planned_ids, None) pack["anchor_exercise_id"] = anchor_id if anchor_id: titles = _load_exercise_titles(cur, [anchor_id]) pack["anchor_title"] = titles.get(anchor_id) pack["anchor_skill_ids"] = sorted(_load_skill_ids_for_exercise(cur, anchor_id)) else: pack["anchor_title"] = None pack["anchor_skill_ids"] = [] return pack def build_planning_exercise_context_pack( cur, *, tenant: TenantContext, body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: profile_id = tenant.profile_id role = tenant.global_role if not _has_planning_role(role): raise HTTPException(status_code=403, detail="Nur Trainer dürfen Planungs-Vorschläge abrufen") cur.execute( """ SELECT tu.*, tg.name AS group_name FROM training_units tu LEFT JOIN training_groups tg ON tg.id = tu.group_id WHERE tu.id = %s """, (body.unit_id,), ) unit_row = cur.fetchone() if not unit_row: raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden") unit = dict(unit_row) if unit.get("framework_slot_id"): if role not in ("admin", "superadmin"): cur.execute( """ SELECT fp.created_by FROM training_framework_slots s JOIN training_framework_programs fp ON fp.id = s.framework_program_id WHERE s.id = %s """, (unit["framework_slot_id"],), ) fr = cur.fetchone() cb = fr["created_by"] if fr else None if unit.get("created_by") != profile_id and cb != profile_id: raise HTTPException(status_code=403, detail="Keine Berechtigung") else: if not unit.get("group_id"): raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden") _assert_training_unit_permission(cur, unit, profile_id, role) sections = _fetch_sections(cur, int(body.unit_id)) planned_ids = _collect_planned_exercise_ids(sections) anchor_id = _resolve_anchor_from_plan(planned_ids, body.anchor_exercise_id) anchor_skills = _load_skill_ids_for_exercise(cur, anchor_id) progression_ids, progression_notes = _load_progression_successors( cur, body.progression_graph_id, anchor_id ) group_recent = _load_group_recent_exercise_ids(cur, unit.get("group_id"), int(body.unit_id)) titles = _load_exercise_titles(cur, [x for x in [anchor_id] if x]) anchor_title = titles.get(anchor_id) if anchor_id else None return { "unit_id": int(body.unit_id), "unit": { "id": int(body.unit_id), "framework_slot_id": unit.get("framework_slot_id"), "origin_framework_slot_id": unit.get("origin_framework_slot_id"), }, "unit_title": (unit.get("title") or unit.get("planned_focus") or "").strip() or None, "group_id": unit.get("group_id"), "group_name": (unit.get("group_name") or "").strip() or None, "section_order_index": body.section_order_index, "section_title": _section_title_for_index(sections, body.section_order_index), "planned_exercise_ids": planned_ids, "anchor_exercise_id": anchor_id, "anchor_title": anchor_title, "anchor_skill_ids": sorted(anchor_skills), "progression_graph_id": body.progression_graph_id, "progression_successor_ids": sorted(progression_ids), "progression_edge_notes": progression_notes, "group_recent_exercise_ids": sorted(group_recent), } def build_client_planning_context_pack( cur, *, tenant: TenantContext, body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: """Freie / Client-Kontext-Suche ohne persistierte training_units.id (Formular, Rahmen-Slot).""" role = tenant.global_role if not _has_planning_role(role): raise HTTPException(status_code=403, detail="Nur Trainer dürfen Planungs-Vorschläge abrufen") planned_ids: List[int] = [] if body.planned_exercise_ids: seen: Set[int] = set() for raw in body.planned_exercise_ids: try: eid = int(raw) except (TypeError, ValueError): continue if eid < 1 or eid in seen: continue seen.add(eid) planned_ids.append(eid) anchor_id = _resolve_anchor_from_plan(planned_ids, body.anchor_exercise_id) anchor_skills = _load_skill_ids_for_exercise(cur, anchor_id) progression_ids, progression_notes = _load_progression_successors( cur, body.progression_graph_id, anchor_id ) group_id = body.group_id group_name = None if group_id: cur.execute("SELECT name FROM training_groups WHERE id = %s", (int(group_id),)) gr = cur.fetchone() if gr: group_name = (gr.get("name") or "").strip() or None group_recent = _load_group_recent_exercise_ids(cur, group_id, exclude_unit_id=None) titles = _load_exercise_titles(cur, [x for x in [anchor_id] if x]) anchor_title = titles.get(anchor_id) if anchor_id else None return { "unit_id": None, "unit": { "id": None, "framework_slot_id": None, "origin_framework_slot_id": None, }, "unit_title": None, "group_id": group_id, "group_name": group_name, "section_order_index": body.section_order_index, "section_title": None, "planned_exercise_ids": planned_ids, "anchor_exercise_id": anchor_id, "anchor_title": anchor_title, "anchor_skill_ids": sorted(anchor_skills), "progression_graph_id": body.progression_graph_id, "progression_successor_ids": sorted(progression_ids), "progression_edge_notes": progression_notes, "group_recent_exercise_ids": sorted(group_recent), "context_mode": "client_free", } def suggest_planning_exercises( cur, *, tenant: TenantContext, body: PlanningExerciseSuggestRequest, ) -> Dict[str, Any]: if body.unit_id: pack = build_planning_exercise_context_pack(cur, tenant=tenant, body=body) else: pack = build_client_planning_context_pack(cur, tenant=tenant, body=body) pack = _apply_client_planned_override(cur, pack, body) query = _normalize_query(body.query) heuristic_intent = resolve_planning_exercise_intent(query, body.intent_hint) pipeline_context = { "unit_title": pack.get("unit_title"), "group_name": pack.get("group_name"), "section_title": pack.get("section_title"), "planned_count": len(pack.get("planned_exercise_ids") or []), "anchor_title": pack.get("anchor_title"), "anchor_exercise_id": pack.get("anchor_exercise_id"), "progression_graph_id": pack.get("progression_graph_id"), } target_profile, intent, scenario_kind, query_intent_summary = build_planning_target_with_query_pipeline( cur, unit=pack["unit"], planned_exercise_ids=pack["planned_exercise_ids"], anchor_exercise_id=pack.get("anchor_exercise_id"), query=query, heuristic_intent=heuristic_intent, include_llm_intent=body.include_llm_intent, context_summary=pipeline_context, ) weights = _intent_weights(intent) target_profile_summary = target_profile.to_summary_dict(cur) query_intent_applied = bool(query_intent_summary.get("llm_applied")) profile_id = tenant.profile_id role = tenant.global_role vis_sql, vis_params = library_content_visibility_sql( alias="e", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) hits, skills_by_ex, profile_preselect_applied = run_multistage_planning_retrieval( cur, vis_sql=vis_sql, vis_params=vis_params, query=query, exercise_kind_any=body.exercise_kind_any, target=target_profile, intent=intent, intent_weights=weights, pack=pack, ) planned_set = set(pack["planned_exercise_ids"]) llm_rank_applied = False retrieval_phase = compose_retrieval_phase( profile_preselect=profile_preselect_applied, query_intent=query_intent_applied, llm_rank=False, ) run_llm_rank = should_run_llm_rank_pipeline( query, scenario_kind, include_llm_rank=body.include_llm_rank, query_intent_applied=query_intent_applied, hits=hits, ) if run_llm_rank: pre_limit = max(int(body.limit), _LLM_RERANK_PRE_LIMIT) pool_hits = hits[:pre_limit] pool_hits, llm_rank_applied = try_llm_rerank_planning_hits( cur, hits=pool_hits, skills_by_ex=skills_by_ex, query=query, intent=intent, context_summary={ "unit_title": pack.get("unit_title"), "group_name": pack.get("group_name"), "section_title": pack.get("section_title"), "planned_count": len(planned_set), "anchor_title": pack.get("anchor_title"), "intent": intent, }, target_profile_summary=target_profile_summary, limit=int(body.limit), ) if llm_rank_applied: retrieval_phase = compose_retrieval_phase( profile_preselect=profile_preselect_applied, query_intent=query_intent_applied, llm_rank=True, ) tail = hits[pre_limit:] hits = pool_hits + tail else: hits = pool_hits[: int(body.limit)] else: hits = hits[: int(body.limit)] hits = hits[: int(body.limit)] context_summary = { "unit_title": pack.get("unit_title"), "group_name": pack.get("group_name"), "section_title": pack.get("section_title"), "planned_count": len(planned_set), "anchor_title": pack.get("anchor_title"), "anchor_exercise_id": pack.get("anchor_exercise_id"), "progression_graph_id": pack.get("progression_graph_id"), "context_mode": pack.get("context_mode") or ("unit" if pack.get("unit_id") else "client_free"), } return { "context_summary": context_summary, "target_profile_summary": target_profile_summary, "scenario_kind": scenario_kind, "query_intent_summary": query_intent_summary, "retrieval_phase": retrieval_phase, "profile_preselect_applied": profile_preselect_applied, "llm_rank_applied": llm_rank_applied, "llm_intent_applied": query_intent_applied, "intent_resolved": intent, "intent_heuristic": heuristic_intent, "query_normalized": query or None, "hits": hits, }