""" Phase 2 Planungs-Übungssuche: LLM-Rerank über Hybrid-Kandidaten. Prompt-Slug: planning_exercise_search_rank (Migration 072) """ from __future__ import annotations import json import logging import re from typing import Any, Dict, List, Mapping, Optional, Sequence, Set, Tuple from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt from exercise_ai import strip_html_to_plain from openrouter_chat import ( effective_openrouter_model_for_prompt_row, normalize_openrouter_env, openrouter_chat_completion, ) _logger = logging.getLogger("shinkan.planning_exercise_llm_rank") _LLM_RERANK_POOL = 32 _MAX_GOAL_PLAIN = 480 _MAX_SUMMARY_PLAIN = 320 _MAX_REASON_LEN = 160 def _compact_json(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) def _extract_json_object(text: str) -> Dict[str, Any]: s = (text or "").strip() if s.startswith("```"): s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s) if s.endswith("```"): s = s[:-3].strip() start = s.find("{") end = s.rfind("}") if start < 0 or end <= start: raise ValueError("Kein JSON-Objekt in LLM-Antwort") obj = json.loads(s[start : end + 1]) if not isinstance(obj, dict): raise ValueError("LLM-Antwort ist kein JSON-Objekt") return obj def parse_planning_exercise_rank_response( text: str, allowed_ids: Set[int], ) -> Tuple[List[int], Dict[int, str]]: """ Validiert LLM-Ranking: nur erlaubte exercise_id, dedupliziert, Reihenfolge beibehalten. """ obj = _extract_json_object(text) ranked_raw = obj.get("ranked_ids") or obj.get("ranked") or obj.get("ids") if not isinstance(ranked_raw, list): raise ValueError("ranked_ids fehlt oder ist keine Liste") ranked: List[int] = [] seen: Set[int] = set() for raw in ranked_raw: try: eid = int(raw) except (TypeError, ValueError): continue if eid < 1 or eid not in allowed_ids or eid in seen: continue seen.add(eid) ranked.append(eid) reasons_out: Dict[int, str] = {} reasons_raw = obj.get("reasons") or obj.get("reasons_by_id") or {} if isinstance(reasons_raw, dict): for k, v in reasons_raw.items(): try: eid = int(k) except (TypeError, ValueError): continue if eid not in allowed_ids: continue txt = str(v or "").strip() if txt: reasons_out[eid] = txt[:_MAX_REASON_LEN] return ranked, reasons_out def _build_candidate_payload( hit: Mapping[str, Any], *, goal_plain: str, skill_names: Sequence[str], ) -> Dict[str, Any]: return { "id": int(hit["id"]), "title": str(hit.get("title") or "").strip()[:200], "summary": strip_html_to_plain(hit.get("summary"), max_len=_MAX_SUMMARY_PLAIN), "goal": goal_plain, "skills": list(skill_names)[:8], "retrieval_score": float(hit.get("score") or 0.0), } def _load_exercise_goals(cur, exercise_ids: Sequence[int]) -> Dict[int, str]: ids = [int(x) for x in exercise_ids if int(x) > 0] if not ids: return {} ph = ",".join(["%s"] * len(ids)) cur.execute( f"SELECT id, goal FROM exercises WHERE id IN ({ph})", ids, ) return {int(r["id"]): str(r.get("goal") or "") for r in cur.fetchall()} def _load_skill_names(cur, skill_ids: Sequence[int]) -> Dict[int, str]: ids = sorted({int(x) for x in skill_ids if int(x) > 0}) if not ids: return {} ph = ",".join(["%s"] * len(ids)) cur.execute(f"SELECT id, name FROM skills WHERE id IN ({ph})", ids) return {int(r["id"]): str(r.get("name") or "") for r in cur.fetchall()} def try_llm_rerank_planning_hits( cur, *, hits: List[Dict[str, Any]], skills_by_ex: Mapping[int, Set[int]], query: str, intent: str, context_summary: Mapping[str, Any], target_profile_summary: Mapping[str, Any], limit: int, ) -> Tuple[List[Dict[str, Any]], bool]: """ Optionaler LLM-Rerank der Top-Kandidaten. Bei Fehler: Original-Reihenfolge, llm_applied=False. """ if not hits: return hits, False api_key, _ = normalize_openrouter_env() if not api_key: return hits, False pool = hits[:_LLM_RERANK_POOL] allowed_ids = {int(h["id"]) for h in pool} goals = _load_exercise_goals(cur, list(allowed_ids)) all_skill_ids: Set[int] = set() for eid in allowed_ids: all_skill_ids.update(skills_by_ex.get(eid) or set()) skill_name_map = _load_skill_names(cur, list(all_skill_ids)) candidates: List[Dict[str, Any]] = [] for hit in pool: eid = int(hit["id"]) sk_ids = sorted(skills_by_ex.get(eid) or set()) sk_names = [skill_name_map.get(sid, f"#{sid}") for sid in sk_ids[:8]] goal_plain = strip_html_to_plain(goals.get(eid), max_len=_MAX_GOAL_PLAIN) candidates.append( _build_candidate_payload(hit, goal_plain=goal_plain, skill_names=sk_names) ) variables = { "search_query": query or "", "intent": intent or "", "planning_context_json": _compact_json(dict(context_summary or {})), "target_profile_json": _compact_json(dict(target_profile_summary or {})), "candidates_json": _compact_json(candidates), "result_limit": str(max(1, min(int(limit), 50))), } try: prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_search_rank", variables) model = effective_openrouter_model_for_prompt_row(prow) raw = openrouter_chat_completion( api_key=api_key, model=model, user_content=rendered.text, ) ranked_ids, llm_reasons = parse_planning_exercise_rank_response(raw, allowed_ids) except AiPromptUnavailableError: return hits, False except Exception as exc: _logger.warning("Planungs-LLM-Rerank fehlgeschlagen: %s", exc) return hits, False if not ranked_ids: return hits, False hit_by_id = {int(h["id"]): h for h in hits} reranked: List[Dict[str, Any]] = [] used: Set[int] = set() for eid in ranked_ids: hit = hit_by_id.get(eid) if not hit: continue used.add(eid) new_hit = dict(hit) reasons = list(hit.get("reasons") or []) llm_reason = llm_reasons.get(eid) if llm_reason and llm_reason not in reasons: reasons.insert(0, llm_reason) new_hit["reasons"] = reasons new_hit["llm_rank"] = len(reranked) + 1 reranked.append(new_hit) for hit in hits: eid = int(hit["id"]) if eid in used: continue reranked.append(dict(hit)) return reranked[: max(int(limit), len(reranked))], True __all__ = [ "parse_planning_exercise_rank_response", "try_llm_rerank_planning_hits", ]