""" Planungs-KI Phase E: Pfad-QA — Lücken erkennen, Brücken vorschlagen, LLM-Prüfung. """ from __future__ import annotations import json import logging import re from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt from exercise_ai import strip_html_to_plain from openrouter_chat import ( effective_openrouter_model_for_prompt_row, normalize_openrouter_env, openrouter_chat_completion, ) from planning_exercise_semantics import ( PlanningSemanticBrief, brief_to_summary_dict, score_exercise_semantic_relevance, step_phase_for_index, ) _logger = logging.getLogger("shinkan.planning_exercise_path_qa") _GAP_SKILL_THRESHOLD = 0.10 _GAP_SEMANTIC_THRESHOLD = 0.28 _LARGE_GAP_SCORE = 0.52 _MAX_BRIDGE_INSERTS = 4 def _extract_json_object(text: str) -> Dict[str, Any]: s = (text or "").strip() if s.startswith("```"): s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s) if s.endswith("```"): s = s[:-3].strip() start = s.find("{") end = s.rfind("}") if start < 0 or end <= start: raise ValueError("Kein JSON-Objekt in LLM-Antwort") obj = json.loads(s[start : end + 1]) if not isinstance(obj, dict): raise ValueError("LLM-Antwort ist kein JSON-Objekt") return obj def _skill_jaccard(a: Set[int], b: Set[int]) -> float: if not a or not b: return 0.0 inter = len(a & b) union = len(a | b) return inter / union if union else 0.0 def _load_exercise_skill_ids(cur, exercise_id: int) -> Set[int]: cur.execute( "SELECT skill_id FROM exercise_skills WHERE exercise_id = %s", (int(exercise_id),), ) return {int(r["skill_id"]) for r in cur.fetchall() if r.get("skill_id") is not None} def _load_exercise_text_bundle(cur, exercise_id: int) -> Dict[str, Any]: cur.execute( "SELECT id, title, summary, goal FROM exercises WHERE id = %s", (int(exercise_id),), ) row = cur.fetchone() if not row: return {"title": "", "summary": "", "goal": "", "variant_names": []} cur.execute( """ SELECT variant_name FROM exercise_variants WHERE exercise_id = %s ORDER BY sequence_order ASC NULLS LAST, id ASC LIMIT 8 """, (int(exercise_id),), ) variants = [str(r.get("variant_name") or "") for r in cur.fetchall()] return { "title": str(row.get("title") or ""), "summary": str(row.get("summary") or ""), "goal": str(row.get("goal") or ""), "variant_names": variants, } def measure_step_transition_gap( cur, step_a: Mapping[str, Any], step_b: Mapping[str, Any], *, brief: PlanningSemanticBrief, segment_index: int, total_segments: int, ) -> Dict[str, Any]: eid_a = int(step_a["exercise_id"]) eid_b = int(step_b["exercise_id"]) skills_a = _load_exercise_skill_ids(cur, eid_a) skills_b = _load_exercise_skill_ids(cur, eid_b) skill_sim = _skill_jaccard(skills_a, skills_b) bundle_b = _load_exercise_text_bundle(cur, eid_b) mid_phase = step_phase_for_index(brief, segment_index + 1, total_segments + 1) sem_b, sem_reasons = score_exercise_semantic_relevance( title=bundle_b["title"], summary=bundle_b["summary"], goal=bundle_b["goal"], variant_names=bundle_b["variant_names"], brief=brief, step_phase=mid_phase, ) gap_score = 0.0 if skill_sim < _GAP_SKILL_THRESHOLD: gap_score += 0.45 * (1.0 - skill_sim / max(_GAP_SKILL_THRESHOLD, 0.01)) if sem_b < _GAP_SEMANTIC_THRESHOLD: gap_score += 0.35 * (1.0 - sem_b / max(_GAP_SEMANTIC_THRESHOLD, 0.01)) if brief.semantic_strength >= 0.5 and sem_b < 0.15: gap_score += 0.2 gap_score = min(1.0, round(gap_score, 4)) is_large = gap_score >= _LARGE_GAP_SCORE return { "from_exercise_id": eid_a, "to_exercise_id": eid_b, "from_title": step_a.get("title"), "to_title": step_b.get("title"), "skill_similarity": round(skill_sim, 4), "semantic_score_to": sem_b, "gap_score": gap_score, "is_large_gap": is_large, "expected_phase": mid_phase, "reasons": sem_reasons, } def detect_path_gaps( cur, steps: Sequence[Mapping[str, Any]], *, brief: PlanningSemanticBrief, ) -> List[Dict[str, Any]]: if len(steps) < 2: return [] gaps: List[Dict[str, Any]] = [] total_segments = len(steps) - 1 for i in range(total_segments): gap = measure_step_transition_gap( cur, steps[i], steps[i + 1], brief=brief, segment_index=i, total_segments=total_segments, ) if gap.get("is_large_gap"): gaps.append(gap) return gaps def _pick_bridge_hit( hits: Sequence[Mapping[str, Any]], *, used_ids: Set[int], step_a_id: int, step_b_id: int, ) -> Optional[Dict[str, Any]]: for hit in hits: eid = int(hit["id"]) if eid in used_ids or eid in {step_a_id, step_b_id}: continue return dict(hit) return None def insert_bridge_exercises( cur, steps: List[Dict[str, Any]], gaps: Sequence[Mapping[str, Any]], *, brief: PlanningSemanticBrief, bridge_search_fn: Callable[..., List[Dict[str, Any]]], max_inserts: int = _MAX_BRIDGE_INSERTS, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: """ Fügt zwischen großen Lücken Brücken-Übungen ein. bridge_search_fn(from_step, to_step, gap) -> hits Returns: (steps, bridge_inserts, unfilled_gaps) """ if not gaps: return steps, [], [] used_ids = {int(s["exercise_id"]) for s in steps if s.get("exercise_id") is not None} inserts: List[Dict[str, Any]] = [] unfilled: List[Dict[str, Any]] = [] out = list(steps) gap_by_pair = { (int(g["from_exercise_id"]), int(g["to_exercise_id"])): g for g in gaps } i = 0 while i < len(out) - 1 and len(inserts) < max_inserts: a = out[i] b = out[i + 1] if a.get("exercise_id") is None or b.get("exercise_id") is None: i += 1 continue key = (int(a["exercise_id"]), int(b["exercise_id"])) gap = gap_by_pair.get(key) if not gap: i += 1 continue hits = bridge_search_fn(a, b, gap) bridge_hit = _pick_bridge_hit( hits, used_ids=used_ids, step_a_id=int(a["exercise_id"]), step_b_id=int(b["exercise_id"]), ) if not bridge_hit: unfilled.append(gap) i += 1 continue bridge_step = { "exercise_id": int(bridge_hit["id"]), "variant_id": bridge_hit.get("suggested_variant_id"), "title": bridge_hit.get("title"), "summary": bridge_hit.get("summary"), "score": bridge_hit.get("score"), "reasons": list(bridge_hit.get("reasons") or []) + ["Brücken-Übung (Lückenfüller)"], "variants": bridge_hit.get("variants") or [], "suggested_variant_id": bridge_hit.get("suggested_variant_id"), "suggested_variant_name": bridge_hit.get("suggested_variant_name"), "is_bridge": True, "bridge_for_gap": { "from_exercise_id": int(a["exercise_id"]), "to_exercise_id": int(b["exercise_id"]), "gap_score": gap.get("gap_score"), }, } out.insert(i + 1, bridge_step) used_ids.add(int(bridge_step["exercise_id"])) inserts.append( { "inserted_after_index": i, "bridge_exercise_id": int(bridge_step["exercise_id"]), "bridge_title": bridge_step.get("title"), "gap": gap, } ) i += 2 return out, inserts, unfilled def try_llm_qa_progression_path( cur, *, goal_query: str, brief: PlanningSemanticBrief, steps: Sequence[Mapping[str, Any]], gaps: Sequence[Mapping[str, Any]], bridge_inserts: Sequence[Mapping[str, Any]], ) -> Tuple[Optional[Dict[str, Any]], bool]: api_key, _ = normalize_openrouter_env() if not api_key or len(steps) < 2: return None, False step_payload = [] for idx, step in enumerate(steps): if step.get("is_ai_proposal") or step.get("exercise_id") is None: step_payload.append( { "index": idx + 1, "proposal_key": step.get("proposal_key"), "title": step.get("title"), "summary": strip_html_to_plain(step.get("summary"), max_len=400), "is_bridge": bool(step.get("is_bridge")), "is_ai_proposal": True, "reasons": list(step.get("reasons") or [])[:3], } ) continue bundle = _load_exercise_text_bundle(cur, int(step["exercise_id"])) step_payload.append( { "index": idx + 1, "exercise_id": int(step["exercise_id"]), "proposal_key": step.get("proposal_key"), "title": step.get("title") or bundle["title"], "goal": strip_html_to_plain(bundle["goal"], max_len=400), "is_bridge": bool(step.get("is_bridge")), "is_ai_proposal": False, "reasons": list(step.get("reasons") or [])[:3], } ) variables = { "goal_query": goal_query or "", "semantic_brief_json": json.dumps(brief_to_summary_dict(brief), ensure_ascii=False), "steps_json": json.dumps(step_payload, ensure_ascii=False), "gaps_json": json.dumps(list(gaps), ensure_ascii=False), "bridge_inserts_json": json.dumps(list(bridge_inserts), ensure_ascii=False), } try: prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_path_qa", variables) model = effective_openrouter_model_for_prompt_row(prow) raw = openrouter_chat_completion(api_key=api_key, model=model, user_content=rendered.text) obj = _extract_json_object(raw) return obj, True except AiPromptUnavailableError: return None, False except Exception as exc: _logger.warning("Pfad-QA-LLM fehlgeschlagen: %s", exc) return None, False def apply_llm_path_reorder( steps: List[Dict[str, Any]], llm_qa: Mapping[str, Any], ) -> Tuple[List[Dict[str, Any]], bool, List[str]]: """ Wendet LLM-Neuordnung an (ordered_step_indices = Permutation der aktuellen Indizes). """ raw = llm_qa.get("ordered_step_indices") if not isinstance(raw, list) or len(raw) != len(steps): return steps, False, [] try: indices = [int(x) for x in raw] except (TypeError, ValueError): return steps, False, ["Neuordnung: ungültige Indizes"] if sorted(indices) != list(range(len(steps))): return steps, False, ["Neuordnung: keine gültige Permutation — ignoriert"] if indices == list(range(len(steps))): return steps, False, [] notes = [str(n) for n in (llm_qa.get("sequence_notes") or []) if str(n).strip()] return [steps[i] for i in indices], True, notes def build_path_qa_summary( *, gaps: Sequence[Mapping[str, Any]], bridge_inserts: Sequence[Mapping[str, Any]], ai_proposals: Sequence[Mapping[str, Any]], llm_qa: Optional[Mapping[str, Any]], llm_applied: bool, reorder_applied: bool = False, reorder_notes: Optional[Sequence[str]] = None, ) -> Dict[str, Any]: summary: Dict[str, Any] = { "gap_count": len(gaps), "large_gaps": list(gaps), "bridge_insert_count": len(bridge_inserts), "bridge_inserts": list(bridge_inserts), "ai_proposal_count": len(ai_proposals), "ai_proposals": list(ai_proposals), "llm_qa_applied": llm_applied, "reorder_applied": reorder_applied, "reorder_notes": list(reorder_notes or []), } if llm_qa: summary["overall_ok"] = bool(llm_qa.get("overall_ok", True)) summary["quality_score"] = llm_qa.get("quality_score") summary["issues"] = list(llm_qa.get("issues") or []) summary["sequence_notes"] = list(llm_qa.get("sequence_notes") or []) summary["topic_coverage"] = llm_qa.get("topic_coverage") summary["recommendations"] = list(llm_qa.get("recommendations") or []) else: summary["overall_ok"] = len(gaps) == 0 summary["issues"] = [ f"Lücke zwischen „{g.get('from_title')}“ und „{g.get('to_title')}“ (Score {g.get('gap_score')})" for g in gaps ] if gaps else [] return summary __all__ = [ "apply_llm_path_reorder", "build_path_qa_summary", "detect_path_gaps", "insert_bridge_exercises", "measure_step_transition_gap", "try_llm_qa_progression_path", ]