""" Planungs-KI Phase E: Pfad-QA — Lücken erkennen, Brücken vorschlagen, LLM-Prüfung. """ from __future__ import annotations import json import logging import re from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt from exercise_ai import strip_html_to_plain from openrouter_chat import ( effective_openrouter_model_for_prompt_row, normalize_openrouter_env, openrouter_chat_completion, ) from planning_exercise_semantics import ( PlanningSemanticBrief, brief_to_summary_dict, exercise_passes_path_semantic_gate, score_exercise_semantic_relevance, step_phase_for_index, ) _logger = logging.getLogger("shinkan.planning_exercise_path_qa") _GAP_SKILL_THRESHOLD = 0.10 _GAP_SEMANTIC_THRESHOLD = 0.28 _LARGE_GAP_SCORE = 0.52 _MAX_BRIDGE_INSERTS = 4 def _extract_json_object(text: str) -> Dict[str, Any]: s = (text or "").strip() if s.startswith("```"): s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s) if s.endswith("```"): s = s[:-3].strip() start = s.find("{") end = s.rfind("}") if start < 0 or end <= start: raise ValueError("Kein JSON-Objekt in LLM-Antwort") obj = json.loads(s[start : end + 1]) if not isinstance(obj, dict): raise ValueError("LLM-Antwort ist kein JSON-Objekt") return obj def _skill_jaccard(a: Set[int], b: Set[int]) -> float: if not a or not b: return 0.0 inter = len(a & b) union = len(a | b) return inter / union if union else 0.0 def _load_exercise_skill_ids(cur, exercise_id: int) -> Set[int]: cur.execute( "SELECT skill_id FROM exercise_skills WHERE exercise_id = %s", (int(exercise_id),), ) return {int(r["skill_id"]) for r in cur.fetchall() if r.get("skill_id") is not None} def _load_exercise_text_bundle(cur, exercise_id: int) -> Dict[str, Any]: cur.execute( "SELECT id, title, summary, goal FROM exercises WHERE id = %s", (int(exercise_id),), ) row = cur.fetchone() if not row: return {"title": "", "summary": "", "goal": "", "variant_names": []} cur.execute( """ SELECT variant_name FROM exercise_variants WHERE exercise_id = %s ORDER BY sequence_order ASC NULLS LAST, id ASC LIMIT 8 """, (int(exercise_id),), ) variants = [str(r.get("variant_name") or "") for r in cur.fetchall()] return { "title": str(row.get("title") or ""), "summary": str(row.get("summary") or ""), "goal": str(row.get("goal") or ""), "variant_names": variants, } def measure_step_transition_gap( cur, step_a: Mapping[str, Any], step_b: Mapping[str, Any], *, brief: PlanningSemanticBrief, segment_index: int, total_segments: int, ) -> Dict[str, Any]: eid_a = int(step_a["exercise_id"]) eid_b = int(step_b["exercise_id"]) skills_a = _load_exercise_skill_ids(cur, eid_a) skills_b = _load_exercise_skill_ids(cur, eid_b) skill_sim = _skill_jaccard(skills_a, skills_b) bundle_b = _load_exercise_text_bundle(cur, eid_b) mid_phase = step_phase_for_index(brief, segment_index + 1, total_segments + 1) sem_b, sem_reasons = score_exercise_semantic_relevance( title=bundle_b["title"], summary=bundle_b["summary"], goal=bundle_b["goal"], variant_names=bundle_b["variant_names"], brief=brief, step_phase=mid_phase, ) gap_score = 0.0 if skill_sim < _GAP_SKILL_THRESHOLD: gap_score += 0.45 * (1.0 - skill_sim / max(_GAP_SKILL_THRESHOLD, 0.01)) if sem_b < _GAP_SEMANTIC_THRESHOLD: gap_score += 0.35 * (1.0 - sem_b / max(_GAP_SEMANTIC_THRESHOLD, 0.01)) if brief.semantic_strength >= 0.5 and sem_b < 0.15: gap_score += 0.2 gap_score = min(1.0, round(gap_score, 4)) is_large = gap_score >= _LARGE_GAP_SCORE return { "from_exercise_id": eid_a, "to_exercise_id": eid_b, "from_title": step_a.get("title"), "to_title": step_b.get("title"), "skill_similarity": round(skill_sim, 4), "semantic_score_to": sem_b, "gap_score": gap_score, "is_large_gap": is_large, "expected_phase": mid_phase, "reasons": sem_reasons, } def detect_path_gaps( cur, steps: Sequence[Mapping[str, Any]], *, brief: PlanningSemanticBrief, ) -> List[Dict[str, Any]]: if len(steps) < 2: return [] gaps: List[Dict[str, Any]] = [] total_segments = len(steps) - 1 for i in range(total_segments): gap = measure_step_transition_gap( cur, steps[i], steps[i + 1], brief=brief, segment_index=i, total_segments=total_segments, ) if gap.get("is_large_gap"): gaps.append(gap) return gaps def _pick_bridge_hit( hits: Sequence[Mapping[str, Any]], *, used_ids: Set[int], step_a_id: int, step_b_id: int, ) -> Optional[Dict[str, Any]]: for hit in hits: eid = int(hit["id"]) if eid in used_ids or eid in {step_a_id, step_b_id}: continue return dict(hit) return None def insert_bridge_exercises( cur, steps: List[Dict[str, Any]], gaps: Sequence[Mapping[str, Any]], *, brief: PlanningSemanticBrief, bridge_search_fn: Callable[..., List[Dict[str, Any]]], max_inserts: int = _MAX_BRIDGE_INSERTS, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: """ Fügt zwischen großen Lücken Brücken-Übungen ein. bridge_search_fn(from_step, to_step, gap) -> hits Returns: (steps, bridge_inserts, unfilled_gaps) """ if not gaps: return steps, [], [] used_ids = {int(s["exercise_id"]) for s in steps if s.get("exercise_id") is not None} inserts: List[Dict[str, Any]] = [] unfilled: List[Dict[str, Any]] = [] out = list(steps) gap_by_pair = { (int(g["from_exercise_id"]), int(g["to_exercise_id"])): g for g in gaps } i = 0 while i < len(out) - 1 and len(inserts) < max_inserts: a = out[i] b = out[i + 1] if a.get("exercise_id") is None or b.get("exercise_id") is None: i += 1 continue key = (int(a["exercise_id"]), int(b["exercise_id"])) gap = gap_by_pair.get(key) if not gap: i += 1 continue hits = bridge_search_fn(a, b, gap) bridge_hit = _pick_bridge_hit( hits, used_ids=used_ids, step_a_id=int(a["exercise_id"]), step_b_id=int(b["exercise_id"]), ) if not bridge_hit: unfilled.append(gap) i += 1 continue bridge_sem = float(bridge_hit.get("semantic_score") or 0.0) if brief.semantic_strength >= 0.55 and not exercise_passes_path_semantic_gate( semantic_score=bridge_sem, title=str(bridge_hit.get("title") or ""), summary=str(bridge_hit.get("summary") or ""), brief=brief, strict=True, ): unfilled.append({**gap, "weak_bridge_rejected": True, "bridge_title": bridge_hit.get("title")}) i += 1 continue bridge_step = { "exercise_id": int(bridge_hit["id"]), "variant_id": bridge_hit.get("suggested_variant_id"), "title": bridge_hit.get("title"), "summary": bridge_hit.get("summary"), "score": bridge_hit.get("score"), "reasons": list(bridge_hit.get("reasons") or []) + ["Brücken-Übung (Lückenfüller)"], "variants": bridge_hit.get("variants") or [], "suggested_variant_id": bridge_hit.get("suggested_variant_id"), "suggested_variant_name": bridge_hit.get("suggested_variant_name"), "is_bridge": True, "bridge_for_gap": { "from_exercise_id": int(a["exercise_id"]), "to_exercise_id": int(b["exercise_id"]), "gap_score": gap.get("gap_score"), }, } out.insert(i + 1, bridge_step) used_ids.add(int(bridge_step["exercise_id"])) inserts.append( { "inserted_after_index": i, "bridge_exercise_id": int(bridge_step["exercise_id"]), "bridge_title": bridge_step.get("title"), "gap": gap, } ) i += 2 return out, inserts, unfilled def try_llm_qa_progression_path( cur, *, goal_query: str, brief: PlanningSemanticBrief, steps: Sequence[Mapping[str, Any]], gaps: Sequence[Mapping[str, Any]], bridge_inserts: Sequence[Mapping[str, Any]], ) -> Tuple[Optional[Dict[str, Any]], bool]: api_key, _ = normalize_openrouter_env() if not api_key or len(steps) < 2: return None, False step_payload = [] for idx, step in enumerate(steps): if step.get("is_ai_proposal") or step.get("exercise_id") is None: step_payload.append( { "index": idx + 1, "proposal_key": step.get("proposal_key"), "title": step.get("title"), "summary": strip_html_to_plain(step.get("summary"), max_len=400), "is_bridge": bool(step.get("is_bridge")), "is_ai_proposal": True, "reasons": list(step.get("reasons") or [])[:3], } ) continue bundle = _load_exercise_text_bundle(cur, int(step["exercise_id"])) step_payload.append( { "index": idx + 1, "exercise_id": int(step["exercise_id"]), "proposal_key": step.get("proposal_key"), "title": step.get("title") or bundle["title"], "goal": strip_html_to_plain(bundle["goal"], max_len=400), "is_bridge": bool(step.get("is_bridge")), "is_ai_proposal": False, "reasons": list(step.get("reasons") or [])[:3], } ) variables = { "goal_query": goal_query or "", "semantic_brief_json": json.dumps(brief_to_summary_dict(brief), ensure_ascii=False), "steps_json": json.dumps(step_payload, ensure_ascii=False), "gaps_json": json.dumps(list(gaps), ensure_ascii=False), "bridge_inserts_json": json.dumps(list(bridge_inserts), ensure_ascii=False), } try: prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_path_qa", variables) model = effective_openrouter_model_for_prompt_row(prow) raw = openrouter_chat_completion(api_key=api_key, model=model, user_content=rendered.text) obj = _extract_json_object(raw) return obj, True except AiPromptUnavailableError: return None, False except Exception as exc: _logger.warning("Pfad-QA-LLM fehlgeschlagen: %s", exc) return None, False def apply_llm_path_reorder( steps: List[Dict[str, Any]], llm_qa: Mapping[str, Any], ) -> Tuple[List[Dict[str, Any]], bool, List[str]]: """ Wendet LLM-Neuordnung an (ordered_step_indices = Permutation der aktuellen Indizes). """ raw = llm_qa.get("ordered_step_indices") if not isinstance(raw, list) or len(raw) != len(steps): return steps, False, [] try: indices = [int(x) for x in raw] except (TypeError, ValueError): return steps, False, ["Neuordnung: ungültige Indizes"] if sorted(indices) != list(range(len(steps))): return steps, False, ["Neuordnung: keine gültige Permutation — ignoriert"] if indices == list(range(len(steps))): return steps, False, [] notes = [str(n) for n in (llm_qa.get("sequence_notes") or []) if str(n).strip()] return [steps[i] for i in indices], True, notes _OFF_TOPIC_SEMANTIC_MAX = 0.10 def detect_off_topic_steps( cur, steps: Sequence[Mapping[str, Any]], *, brief: PlanningSemanticBrief, ) -> List[Dict[str, Any]]: """Schritte ohne Bezug zum Pfad-Thema (z. B. reine Kraftübungen bei Mae Geri).""" if brief.semantic_strength < 0.55 or len(steps) < 2: return [] off_topic: List[Dict[str, Any]] = [] total = len(steps) for idx, step in enumerate(steps): if step.get("is_ai_proposal") or step.get("exercise_id") is None: continue bundle = _load_exercise_text_bundle(cur, int(step["exercise_id"])) phase = step_phase_for_index(brief, idx, total) sem, sem_reasons = score_exercise_semantic_relevance( title=bundle["title"], summary=bundle["summary"], goal=bundle["goal"], variant_names=bundle["variant_names"], brief=brief, step_phase=phase, ) if exercise_passes_path_semantic_gate( semantic_score=sem, title=bundle["title"], summary=bundle["summary"], goal=bundle["goal"], brief=brief, strict=True, ): continue if sem > _OFF_TOPIC_SEMANTIC_MAX: continue off_topic.append( { "step_index": idx, "exercise_id": int(step["exercise_id"]), "title": step.get("title") or bundle["title"], "semantic_score": round(sem, 4), "expected_phase": phase, "issue": "off_topic", "reasons": sem_reasons[:3], } ) return off_topic def parse_llm_suggested_new_exercises( llm_qa: Optional[Mapping[str, Any]], *, brief: PlanningSemanticBrief, step_count: int, ) -> List[Dict[str, Any]]: """Strukturierte Neuanlage-Vorschläge aus LLM-Pfad-QS.""" if not llm_qa: return [] raw = llm_qa.get("suggested_new_exercises") if not isinstance(raw, list): return [] topic = (brief.primary_topic or "Technik").strip() out: List[Dict[str, Any]] = [] for item in raw[:5]: if not isinstance(item, dict): continue title_hint = str(item.get("title_hint") or item.get("title") or "").strip() if len(title_hint) < 3: title_hint = f"{topic} — Zwischenschritt" sketch = str(item.get("sketch") or item.get("goal_hint") or item.get("rationale") or "").strip() phase = str(item.get("phase") or item.get("expected_phase") or "vertiefung").strip() rationale = str(item.get("rationale") or "").strip() insert_after = item.get("insert_after_step_index") if insert_after is None: insert_after = item.get("insert_after_index") try: insert_idx = int(insert_after) if insert_after is not None else max(0, step_count // 2 - 1) except (TypeError, ValueError): insert_idx = max(0, step_count // 2 - 1) insert_idx = max(0, min(step_count - 2, insert_idx)) out.append( { "source": "llm_suggested", "insert_after_index": insert_idx, "title_hint": title_hint[:280], "sketch": sketch[:1200], "phase": phase, "rationale": rationale[:500], } ) return out def find_step_pair_index( steps: Sequence[Mapping[str, Any]], from_exercise_id: int, to_exercise_id: int, ) -> Optional[int]: for i in range(len(steps) - 1): a = steps[i] b = steps[i + 1] if a.get("exercise_id") is None or b.get("exercise_id") is None: continue if int(a["exercise_id"]) == int(from_exercise_id) and int(b["exercise_id"]) == int(to_exercise_id): return i return None def build_path_qa_summary( *, gaps: Sequence[Mapping[str, Any]], bridge_inserts: Sequence[Mapping[str, Any]], ai_proposals: Sequence[Mapping[str, Any]], gap_fill_offers: Optional[Sequence[Mapping[str, Any]]] = None, off_topic_steps: Optional[Sequence[Mapping[str, Any]]] = None, llm_qa: Optional[Mapping[str, Any]], llm_applied: bool, reorder_applied: bool = False, reorder_notes: Optional[Sequence[str]] = None, ) -> Dict[str, Any]: offers = list(gap_fill_offers or []) off_topic = list(off_topic_steps or []) summary: Dict[str, Any] = { "gap_count": len(gaps), "large_gaps": list(gaps), "bridge_insert_count": len(bridge_inserts), "bridge_inserts": list(bridge_inserts), "ai_proposal_count": len(ai_proposals), "ai_proposals": list(ai_proposals), "gap_fill_offer_count": len(offers), "gap_fill_offers": offers, "off_topic_count": len(off_topic), "off_topic_steps": off_topic, "llm_qa_applied": llm_applied, "reorder_applied": reorder_applied, "reorder_notes": list(reorder_notes or []), } if llm_qa: summary["overall_ok"] = bool(llm_qa.get("overall_ok", True)) summary["quality_score"] = llm_qa.get("quality_score") summary["issues"] = list(llm_qa.get("issues") or []) summary["sequence_notes"] = list(llm_qa.get("sequence_notes") or []) summary["topic_coverage"] = llm_qa.get("topic_coverage") summary["recommendations"] = list(llm_qa.get("recommendations") or []) summary["suggested_new_exercises"] = list(llm_qa.get("suggested_new_exercises") or []) else: summary["overall_ok"] = len(gaps) == 0 and len(off_topic) == 0 summary["issues"] = [ f"Lücke zwischen „{g.get('from_title')}“ und „{g.get('to_title')}“ (Score {g.get('gap_score')})" for g in gaps ] if gaps else [] if off_topic: summary["issues"] = list(summary["issues"]) + [ f"Schritt „{o.get('title')}“ passt nicht zum Pfad-Thema" for o in off_topic ] return summary __all__ = [ "apply_llm_path_reorder", "build_path_qa_summary", "detect_off_topic_steps", "detect_path_gaps", "find_step_pair_index", "insert_bridge_exercises", "measure_step_transition_gap", "parse_llm_suggested_new_exercises", "try_llm_qa_progression_path", ]