""" Planungs-KI Phase E: Semantik-Schicht für Anfrage-Verständnis und Retrieval. Trennt anfrage-spezifische Semantik (Technik, Phrasen, Entwicklungsbogen) vom Katalog-Profil-Overlay (Fokus/Skills). Wird in Hybrid-Retrieval und Pfad-QA genutzt. """ from __future__ import annotations import json import logging import re from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple from pydantic import BaseModel, Field, field_validator from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt from exercise_ai import strip_html_to_plain from openrouter_chat import ( effective_openrouter_model_for_prompt_row, normalize_openrouter_env, openrouter_chat_completion, ) _logger = logging.getLogger("shinkan.planning_exercise_semantics") _GERI_TECHNIQUES: Tuple[Tuple[str, Tuple[str, ...]], ...] = ( ("mae geri", ("mawashi geri", "yoko geri", "ushiro geri", "sakuto geri", "mikazuki geri")), ("mawashi geri", ("mae geri", "yoko geri", "ushiro geri", "sakuto geri")), ("yoko geri", ("mae geri", "mawashi geri", "ushiro geri", "sakuto geri")), ("ushiro geri", ("mae geri", "mawashi geri", "yoko geri", "sakuto geri")), ("sakuto geri", ("mae geri", "mawashi geri", "yoko geri", "mikazuki geri")), ("mikazuki geri", ("mae geri", "mawashi geri", "sakuto geri")), ) _OTHER_TECHNIQUE_PATTERNS: Tuple[Tuple[str, Tuple[str, ...]], ...] = ( ("oi zuki", ("gyaku zuki", "age uke", "gedan barai")), ("gyaku zuki", ("oi zuki", "mae geri")), ("age uke", ("gedan barai", "soto uke")), ("gedan barai", ("age uke", "soto uke")), ) _ARC_PHASES: Tuple[Tuple[str, Tuple[str, ...]], ...] = ( ("einstieg", ("einstieg", "erlernen", "lernen", "anfänger", "anfaenger", "beginn", "grund")), ("grundlage", ("grundlage", "fundament", "basis", "basic")), ("vertiefung", ("vertief", "festigung", "übung", "uebung", "wiederhol")), ("anwendung", ("anwend", "partner", "kampf", "kumite", "reaktion")), ("perfektion", ("perfekt", "meisterschaft", "höchst", "hoechst", "kime", "sauber")), ) _PHASE_QUERY_HINTS: Dict[str, str] = { "einstieg": "einstieg grundübung einfach", "grundlage": "grundtechnik festigung", "vertiefung": "vertiefung technik übung", "anwendung": "anwendung partner variante", "perfektion": "perfektion kontrolle kime höchste stufe", } _QUERY_STOPWORDS = frozenset( { "von", "bis", "zur", "zum", "der", "die", "das", "des", "den", "dem", "ein", "eine", "einer", "eines", "und", "oder", "mit", "für", "fuer", "im", "in", "am", "an", "auf", "aus", "beim", "nach", "vor", "über", "ueber", "unter", "wie", "was", "wo", "wir", "soll", "sollen", "bitte", "schlage", "vorschlag", "übung", "uebung", "übungen", "uebungen", } ) class PlanningSemanticBrief(BaseModel): primary_topic: Optional[str] = Field(default=None, max_length=120) topic_type: str = Field(default="general", max_length=40) must_phrases: List[str] = Field(default_factory=list) exclude_phrases: List[str] = Field(default_factory=list) development_arc: List[str] = Field(default_factory=list) retrieval_query: str = Field(default="", max_length=500) semantic_strength: float = Field(default=0.0, ge=0.0, le=1.0) rationale: Optional[str] = Field(default=None, max_length=400) @field_validator("topic_type") @classmethod def _topic_type(cls, v: str) -> str: s = (v or "general").strip().lower() return s if s in {"general", "technique", "focus", "method", "skill"} else "general" @field_validator("must_phrases", "exclude_phrases", "development_arc", mode="before") @classmethod def _norm_phrase_list(cls, v: Any) -> List[str]: if not v: return [] if isinstance(v, str): s = _normalize_phrase(v) return [s] if s else [] out: List[str] = [] for item in v: s = _normalize_phrase(str(item or "")) if s and s not in out: out.append(s[:120]) return out[:12] def _normalize_phrase(text: str) -> str: return re.sub(r"\s+", " ", (text or "").strip().lower()) def _normalize_query(text: str) -> str: return re.sub(r"\s+", " ", (text or "").strip()) def _extract_json_object(text: str) -> Dict[str, Any]: s = (text or "").strip() if s.startswith("```"): s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s) if s.endswith("```"): s = s[:-3].strip() start = s.find("{") end = s.rfind("}") if start < 0 or end <= start: raise ValueError("Kein JSON-Objekt in LLM-Antwort") obj = json.loads(s[start : end + 1]) if not isinstance(obj, dict): raise ValueError("LLM-Antwort ist kein JSON-Objekt") return obj def _find_technique_in_text(q_lower: str) -> Optional[Tuple[str, Tuple[str, ...]]]: for primary, excludes in _GERI_TECHNIQUES + _OTHER_TECHNIQUE_PATTERNS: if primary in q_lower: return primary, excludes return None def _detect_development_arc(q_lower: str) -> List[str]: found: List[str] = [] for phase, markers in _ARC_PHASES: if any(m in q_lower for m in markers): if phase not in found: found.append(phase) if not found and ("von" in q_lower and "bis" in q_lower): found = ["einstieg", "perfektion"] return found def _keyword_phrases_from_query(query: str) -> List[str]: q = _normalize_query(query).lower() tokens = re.findall(r"[a-zäöüß]{3,}", q, flags=re.IGNORECASE) phrases: List[str] = [] for i, tok in enumerate(tokens): low = tok.lower() if low in _QUERY_STOPWORDS: continue if i + 1 < len(tokens): nxt = tokens[i + 1].lower() if nxt not in _QUERY_STOPWORDS: pair = _normalize_phrase(f"{low} {nxt}") if len(pair) >= 5 and pair not in phrases: phrases.append(pair) if len(low) >= 4 and low not in phrases: phrases.append(low) return phrases[:6] def build_semantic_brief(query: Optional[str]) -> PlanningSemanticBrief: """Deterministisches Anfrage-Verständnis — ohne LLM.""" q = _normalize_query(query) if not q: return PlanningSemanticBrief(retrieval_query="", semantic_strength=0.0) q_lower = q.lower() must: List[str] = [] exclude: List[str] = [] topic_type = "general" primary: Optional[str] = None strength = 0.25 technique = _find_technique_in_text(q_lower) if technique: primary, ex = technique must.append(primary) exclude.extend(list(ex)) topic_type = "technique" strength = max(strength, 0.82) arc = _detect_development_arc(q_lower) if arc: strength = max(strength, 0.55 if technique else 0.45) extra_phrases = _keyword_phrases_from_query(q) for ph in extra_phrases: if ph not in must and not any(ph in m or m in ph for m in must): if len(ph) >= 5: must.append(ph) if len(q) >= 24 and not technique: strength = max(strength, 0.4) retrieval = " ".join(must[:4]) if must else q if arc and primary: retrieval = f"{primary} {' '.join(arc[:2])}" return PlanningSemanticBrief( primary_topic=primary, topic_type=topic_type, must_phrases=must[:8], exclude_phrases=exclude[:10], development_arc=arc[:5], retrieval_query=retrieval[:500], semantic_strength=min(1.0, round(strength, 3)), rationale=None, ) def merge_semantic_brief_llm( base: PlanningSemanticBrief, llm_obj: Mapping[str, Any], ) -> PlanningSemanticBrief: """LLM-Enrichment in deterministisches Brief mergen (LLM ergänzt, ersetzt nicht harte Technik-Regeln).""" data = base.model_dump() for key in ("primary_topic", "topic_type", "rationale"): val = llm_obj.get(key) if val: data[key] = val for key in ("must_phrases", "exclude_phrases", "development_arc"): extra = llm_obj.get(key) or [] merged = list(data.get(key) or []) for item in extra: s = _normalize_phrase(str(item or "")) if s and s not in merged: merged.append(s) data[key] = merged[:12] llm_strength = llm_obj.get("semantic_strength") if llm_strength is not None: try: data["semantic_strength"] = min( 1.0, max(float(data["semantic_strength"]), float(llm_strength)), ) except (TypeError, ValueError): pass if data.get("must_phrases"): data["retrieval_query"] = " ".join(data["must_phrases"][:4])[:500] out = PlanningSemanticBrief.model_validate(data) if out.primary_topic and out.topic_type == "general": out = out.model_copy(update={"topic_type": "technique"}) return out def try_enrich_semantic_brief_with_llm( cur, query: str, base: PlanningSemanticBrief, ) -> Tuple[PlanningSemanticBrief, bool]: api_key, _ = normalize_openrouter_env() if not api_key or base.semantic_strength < 0.35: return base, False if not (query or "").strip(): return base, False variables = { "search_query": (query or "").strip(), "semantic_brief_json": json.dumps(brief_to_summary_dict(base), ensure_ascii=False), } try: prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_query_semantics", variables) model = effective_openrouter_model_for_prompt_row(prow) raw = openrouter_chat_completion(api_key=api_key, model=model, user_content=rendered.text) obj = _extract_json_object(raw) return merge_semantic_brief_llm(base, obj), True except AiPromptUnavailableError: return base, False except Exception as exc: _logger.warning("Semantik-LLM fehlgeschlagen: %s", exc) return base, False def brief_to_summary_dict(brief: PlanningSemanticBrief) -> Dict[str, Any]: return { "primary_topic": brief.primary_topic, "topic_type": brief.topic_type, "must_phrases": list(brief.must_phrases), "exclude_phrases": list(brief.exclude_phrases), "development_arc": list(brief.development_arc), "retrieval_query": brief.retrieval_query, "semantic_strength": brief.semantic_strength, "rationale": brief.rationale, } def step_phase_for_index(brief: PlanningSemanticBrief, step_index: int, max_steps: int) -> Optional[str]: arc = list(brief.development_arc or []) if not arc: if max_steps <= 1: return None default_arc = ["einstieg", "grundlage", "vertiefung", "anwendung", "perfektion"] arc = default_arc[:max_steps] if brief.semantic_strength >= 0.5 else [] if not arc: return None if len(arc) == 1: return arc[0] pos = step_index / max(max_steps - 1, 1) idx = min(len(arc) - 1, int(round(pos * (len(arc) - 1)))) return arc[idx] def step_retrieval_query( brief: PlanningSemanticBrief, goal_query: str, step_index: int, max_steps: int, ) -> str: phase = step_phase_for_index(brief, step_index, max_steps) parts: List[str] = [] if brief.retrieval_query: parts.append(brief.retrieval_query) elif goal_query: parts.append(goal_query) if brief.primary_topic and brief.primary_topic not in " ".join(parts).lower(): parts.append(brief.primary_topic) if phase: hint = _PHASE_QUERY_HINTS.get(phase, phase) parts.append(hint) return _normalize_query(" ".join(parts)) or _normalize_query(goal_query) def apply_dynamic_retrieval_weights( base_weights: Mapping[str, float], brief: PlanningSemanticBrief, *, scenario: str, has_planning_reference: bool, ) -> Dict[str, float]: """Semantik-Kanal dynamisch gegen Profil/Plan abwägen.""" out = dict(base_weights) sem = float(brief.semantic_strength or 0.0) if sem <= 0.05: out.setdefault("semantic", 0.0) return out query_driven = scenario == "free_search" or not has_planning_reference sem_weight = 0.12 + sem * (0.38 if query_driven else 0.22) out["semantic"] = round(sem_weight, 4) if query_driven: scale = 1.0 - sem * 0.35 out["fulltext"] = round(float(out.get("fulltext", 0.18)) * scale, 4) out["profile"] = round(float(out.get("profile", 0.22)) * (1.0 - sem * 0.25), 4) else: out["fulltext"] = round(float(out.get("fulltext", 0.18)) * (1.0 - sem * 0.15), 4) total = sum(v for k, v in out.items() if k not in {"repeat_unit", "repeat_group"} and v > 0) if total > 0.92: factor = 0.88 / total for k in list(out.keys()): if k in {"repeat_unit", "repeat_group"}: continue if out[k] > 0: out[k] = round(out[k] * factor, 4) return out def _blob_from_fields( title: str, summary: str, goal: str, variant_names: Sequence[str], ) -> str: parts = [title or "", strip_html_to_plain(summary, max_len=600), strip_html_to_plain(goal, max_len=800)] parts.extend(variant_names or []) return " ".join(p for p in parts if p).lower() def _phrase_in_blob(phrase: str, blob: str) -> bool: ph = _normalize_phrase(phrase) if not ph or not blob: return False if ph in blob: return True if " " not in ph: return bool(re.search(rf"\b{re.escape(ph)}\b", blob)) return ph in blob def score_exercise_semantic_relevance( *, title: str, summary: str, goal: str, variant_names: Sequence[str], brief: PlanningSemanticBrief, step_phase: Optional[str] = None, ) -> Tuple[float, List[str]]: if brief.semantic_strength <= 0.05: return 0.0, [] blob = _blob_from_fields(title, summary, goal, variant_names) if not blob.strip(): return 0.0, [] reasons: List[str] = [] must = list(brief.must_phrases or []) exclude = list(brief.exclude_phrases or []) must_hits = sum(1 for ph in must if _phrase_in_blob(ph, blob)) exclude_hits = sum(1 for ph in exclude if _phrase_in_blob(ph, blob)) score = 0.0 if must: must_ratio = must_hits / len(must) score += 0.55 * must_ratio if must_hits == len(must): reasons.append("Alle Kernbegriffe der Anfrage im Übungstext") elif must_hits > 0: reasons.append("Teilweise passende Kernbegriffe") elif brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob): score += 0.45 reasons.append(f"Thema „{brief.primary_topic}“ im Übungstext") elif brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob): score += 0.5 reasons.append(f"Thema „{brief.primary_topic}“ im Übungstext") if exclude_hits > 0: penalty = min(0.55, 0.18 * exclude_hits) if must_hits == 0 or exclude_hits >= must_hits: score -= penalty reasons.append("Enthält ausgeschlossene Nebenthemen") if step_phase and step_phase in _PHASE_QUERY_HINTS: phase_markers = next((markers for phase, markers in _ARC_PHASES if phase == step_phase), ()) if any(m in blob for m in phase_markers) or step_phase in blob: score += 0.12 reasons.append(f"Passt zur Pfad-Phase „{step_phase}“") if brief.development_arc and not step_phase: arc_hits = sum(1 for phase in brief.development_arc if phase in blob) if arc_hits: score += min(0.15, 0.05 * arc_hits) return max(0.0, min(1.0, round(score, 4))), reasons[:4] __all__ = [ "PlanningSemanticBrief", "apply_dynamic_retrieval_weights", "brief_to_summary_dict", "build_semantic_brief", "merge_semantic_brief_llm", "score_exercise_semantic_relevance", "step_phase_for_index", "step_retrieval_query", "try_enrich_semantic_brief_with_llm", ]