shinkan-jinkendo/backend/planning_exercise_semantics.py
Lars 5b73d1a1f5
All checks were successful
Deploy Development / deploy (push) Successful in 42s
Test Suite / pytest-backend (push) Successful in 41s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m23s
Enhance Planning Exercise Path Builder and Retrieval Logic
- Updated the path selection logic to incorporate semantic gating, ensuring only relevant exercises are considered based on semantic scores.
- Introduced new functions for building path target profiles and resolving semantic skill weights, enhancing the contextual understanding of exercise suggestions.
- Improved the retrieval process by applying dynamic retrieval weights based on semantic strength, refining the accuracy of exercise recommendations.
- Incremented version to 0.8.188 and updated changelog to document these enhancements in planning AI functionality.
2026-05-23 12:38:38 +02:00

632 lines
21 KiB
Python

"""
Planungs-KI Phase E: Semantik-Schicht für Anfrage-Verständnis und Retrieval.
Trennt anfrage-spezifische Semantik (Technik, Phrasen, Entwicklungsbogen) vom
Katalog-Profil-Overlay (Fokus/Skills). Wird in Hybrid-Retrieval und Pfad-QA genutzt.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple
from pydantic import BaseModel, Field, field_validator
from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt
from exercise_ai import strip_html_to_plain
from openrouter_chat import (
effective_openrouter_model_for_prompt_row,
normalize_openrouter_env,
openrouter_chat_completion,
)
_logger = logging.getLogger("shinkan.planning_exercise_semantics")
_GERI_TECHNIQUES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
("mae geri", ("mawashi geri", "yoko geri", "ushiro geri", "sakuto geri", "mikazuki geri")),
("mawashi geri", ("mae geri", "yoko geri", "ushiro geri", "sakuto geri")),
("yoko geri", ("mae geri", "mawashi geri", "ushiro geri", "sakuto geri")),
("ushiro geri", ("mae geri", "mawashi geri", "yoko geri", "sakuto geri")),
("sakuto geri", ("mae geri", "mawashi geri", "yoko geri", "mikazuki geri")),
("mikazuki geri", ("mae geri", "mawashi geri", "sakuto geri")),
)
_OTHER_TECHNIQUE_PATTERNS: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
("oi zuki", ("gyaku zuki", "age uke", "gedan barai")),
("gyaku zuki", ("oi zuki", "mae geri")),
("age uke", ("gedan barai", "soto uke")),
("gedan barai", ("age uke", "soto uke")),
)
_TECHNIQUE_EXPECTED_SKILLS: Dict[str, Tuple[str, ...]] = {
"mae geri": ("Geri Waza", "Koordination", "Gleichgewicht", "Kime"),
"mawashi geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
"yoko geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
"ushiro geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
"sakuto geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
"mikazuki geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
}
_DEFAULT_TECHNIQUE_SKILLS: Tuple[str, ...] = ("Geri Waza", "Koordination", "Gleichgewicht")
_ARC_PHASES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
("einstieg", ("einstieg", "erlernen", "lernen", "anfänger", "anfaenger", "beginn", "grund")),
("grundlage", ("grundlage", "fundament", "basis", "basic")),
("vertiefung", ("vertief", "festigung", "übung", "uebung", "wiederhol")),
("anwendung", ("anwend", "partner", "kampf", "kumite", "reaktion")),
("perfektion", ("perfekt", "meisterschaft", "höchst", "hoechst", "kime", "sauber")),
)
_PHASE_QUERY_HINTS: Dict[str, str] = {
"einstieg": "einstieg grundübung einfach",
"grundlage": "grundtechnik festigung",
"vertiefung": "vertiefung technik übung",
"anwendung": "anwendung partner variante",
"perfektion": "perfektion kontrolle kime höchste stufe",
}
_QUERY_STOPWORDS = frozenset(
{
"von",
"bis",
"zur",
"zum",
"der",
"die",
"das",
"des",
"den",
"dem",
"ein",
"eine",
"einer",
"eines",
"und",
"oder",
"mit",
"für",
"fuer",
"im",
"in",
"am",
"an",
"auf",
"aus",
"beim",
"nach",
"vor",
"über",
"ueber",
"unter",
"wie",
"was",
"wo",
"wir",
"soll",
"sollen",
"bitte",
"schlage",
"vorschlag",
"übung",
"uebung",
"übungen",
"uebungen",
}
)
class PlanningSemanticBrief(BaseModel):
primary_topic: Optional[str] = Field(default=None, max_length=120)
topic_type: str = Field(default="general", max_length=40)
must_phrases: List[str] = Field(default_factory=list)
exclude_phrases: List[str] = Field(default_factory=list)
development_arc: List[str] = Field(default_factory=list)
retrieval_query: str = Field(default="", max_length=500)
semantic_strength: float = Field(default=0.0, ge=0.0, le=1.0)
rationale: Optional[str] = Field(default=None, max_length=400)
@field_validator("topic_type")
@classmethod
def _topic_type(cls, v: str) -> str:
s = (v or "general").strip().lower()
return s if s in {"general", "technique", "focus", "method", "skill"} else "general"
@field_validator("must_phrases", "exclude_phrases", "development_arc", mode="before")
@classmethod
def _norm_phrase_list(cls, v: Any) -> List[str]:
if not v:
return []
if isinstance(v, str):
s = _normalize_phrase(v)
return [s] if s else []
out: List[str] = []
for item in v:
s = _normalize_phrase(str(item or ""))
if s and s not in out:
out.append(s[:120])
return out[:12]
def _normalize_phrase(text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip().lower())
def _normalize_query(text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip())
def _extract_json_object(text: str) -> Dict[str, Any]:
s = (text or "").strip()
if s.startswith("```"):
s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s)
if s.endswith("```"):
s = s[:-3].strip()
start = s.find("{")
end = s.rfind("}")
if start < 0 or end <= start:
raise ValueError("Kein JSON-Objekt in LLM-Antwort")
obj = json.loads(s[start : end + 1])
if not isinstance(obj, dict):
raise ValueError("LLM-Antwort ist kein JSON-Objekt")
return obj
def _find_technique_in_text(q_lower: str) -> Optional[Tuple[str, Tuple[str, ...]]]:
for primary, excludes in _GERI_TECHNIQUES + _OTHER_TECHNIQUE_PATTERNS:
if primary in q_lower:
return primary, excludes
return None
def _detect_development_arc(q_lower: str) -> List[str]:
found: List[str] = []
for phase, markers in _ARC_PHASES:
if any(m in q_lower for m in markers):
if phase not in found:
found.append(phase)
if not found and ("von" in q_lower and "bis" in q_lower):
found = ["einstieg", "perfektion"]
return found
def _keyword_phrases_from_query(query: str) -> List[str]:
q = _normalize_query(query).lower()
tokens = re.findall(r"[a-zäöüß]{3,}", q, flags=re.IGNORECASE)
phrases: List[str] = []
for i, tok in enumerate(tokens):
low = tok.lower()
if low in _QUERY_STOPWORDS:
continue
if i + 1 < len(tokens):
nxt = tokens[i + 1].lower()
if nxt not in _QUERY_STOPWORDS:
pair = _normalize_phrase(f"{low} {nxt}")
if len(pair) >= 5 and pair not in phrases:
phrases.append(pair)
if len(low) >= 4 and low not in phrases:
phrases.append(low)
return phrases[:6]
def build_semantic_brief(query: Optional[str]) -> PlanningSemanticBrief:
"""Deterministisches Anfrage-Verständnis — ohne LLM."""
q = _normalize_query(query)
if not q:
return PlanningSemanticBrief(retrieval_query="", semantic_strength=0.0)
q_lower = q.lower()
must: List[str] = []
exclude: List[str] = []
topic_type = "general"
primary: Optional[str] = None
strength = 0.25
technique = _find_technique_in_text(q_lower)
if technique:
primary, ex = technique
must.append(primary)
exclude.extend(list(ex))
topic_type = "technique"
strength = max(strength, 0.82)
arc = _detect_development_arc(q_lower)
if arc:
strength = max(strength, 0.55 if technique else 0.45)
# Keine generischen Stichwörter in must_phrases — sonst verwässert das Scoring.
retrieval_parts = list(must)
if primary:
retrieval_parts.append(primary)
if arc:
retrieval_parts.extend(arc[:2])
retrieval = " ".join(dict.fromkeys(retrieval_parts))[:500] if retrieval_parts else q
if len(q) >= 24 and not technique:
strength = max(strength, 0.4)
return PlanningSemanticBrief(
primary_topic=primary,
topic_type=topic_type,
must_phrases=must[:8],
exclude_phrases=exclude[:10],
development_arc=arc[:5],
retrieval_query=retrieval[:500],
semantic_strength=min(1.0, round(strength, 3)),
rationale=None,
)
def merge_semantic_brief_llm(
base: PlanningSemanticBrief,
llm_obj: Mapping[str, Any],
) -> PlanningSemanticBrief:
"""LLM-Enrichment in deterministisches Brief mergen (LLM ergänzt, ersetzt nicht harte Technik-Regeln)."""
data = base.model_dump()
for key in ("primary_topic", "topic_type", "rationale"):
val = llm_obj.get(key)
if val:
data[key] = val
for key in ("must_phrases", "exclude_phrases", "development_arc"):
extra = llm_obj.get(key) or []
merged = list(data.get(key) or [])
for item in extra:
s = _normalize_phrase(str(item or ""))
if s and s not in merged:
merged.append(s)
data[key] = merged[:12]
llm_strength = llm_obj.get("semantic_strength")
if llm_strength is not None:
try:
data["semantic_strength"] = min(
1.0,
max(float(data["semantic_strength"]), float(llm_strength)),
)
except (TypeError, ValueError):
pass
if data.get("must_phrases"):
core = semantic_core_phrases(PlanningSemanticBrief.model_validate(data))
data["retrieval_query"] = " ".join(core[:4])[:500] if core else data.get("retrieval_query", "")
out = PlanningSemanticBrief.model_validate(data)
if out.primary_topic and out.topic_type == "general":
out = out.model_copy(update={"topic_type": "technique"})
return out
def try_enrich_semantic_brief_with_llm(
cur,
query: str,
base: PlanningSemanticBrief,
) -> Tuple[PlanningSemanticBrief, bool]:
api_key, _ = normalize_openrouter_env()
if not api_key or base.semantic_strength < 0.35:
return base, False
if not (query or "").strip():
return base, False
variables = {
"search_query": (query or "").strip(),
"semantic_brief_json": json.dumps(brief_to_summary_dict(base), ensure_ascii=False),
}
try:
prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_query_semantics", variables)
model = effective_openrouter_model_for_prompt_row(prow)
raw = openrouter_chat_completion(api_key=api_key, model=model, user_content=rendered.text)
obj = _extract_json_object(raw)
return merge_semantic_brief_llm(base, obj), True
except AiPromptUnavailableError:
return base, False
except Exception as exc:
_logger.warning("Semantik-LLM fehlgeschlagen: %s", exc)
return base, False
def brief_to_summary_dict(brief: PlanningSemanticBrief) -> Dict[str, Any]:
return {
"primary_topic": brief.primary_topic,
"topic_type": brief.topic_type,
"must_phrases": list(brief.must_phrases),
"exclude_phrases": list(brief.exclude_phrases),
"development_arc": list(brief.development_arc),
"retrieval_query": brief.retrieval_query,
"semantic_strength": brief.semantic_strength,
"rationale": brief.rationale,
}
def step_phase_for_index(brief: PlanningSemanticBrief, step_index: int, max_steps: int) -> Optional[str]:
arc = list(brief.development_arc or [])
if not arc:
if max_steps <= 1:
return None
default_arc = ["einstieg", "grundlage", "vertiefung", "anwendung", "perfektion"]
arc = default_arc[:max_steps] if brief.semantic_strength >= 0.5 else []
if not arc:
return None
if len(arc) == 1:
return arc[0]
pos = step_index / max(max_steps - 1, 1)
idx = min(len(arc) - 1, int(round(pos * (len(arc) - 1))))
return arc[idx]
def step_retrieval_query(
brief: PlanningSemanticBrief,
goal_query: str,
step_index: int,
max_steps: int,
) -> str:
phase = step_phase_for_index(brief, step_index, max_steps)
parts: List[str] = []
if brief.primary_topic:
parts.append(brief.primary_topic)
elif brief.retrieval_query:
parts.append(brief.retrieval_query.split()[0] if brief.retrieval_query else "")
if phase:
parts.append(phase)
if not parts and brief.retrieval_query:
parts.append(brief.retrieval_query)
elif not parts and goal_query:
parts.append(goal_query)
return _normalize_query(" ".join(p for p in parts if p)) or _normalize_query(goal_query)
def apply_dynamic_retrieval_weights(
base_weights: Mapping[str, float],
brief: PlanningSemanticBrief,
*,
scenario: str,
has_planning_reference: bool,
) -> Dict[str, float]:
"""Semantik-Kanal dynamisch gegen Profil/Plan abwägen."""
out = dict(base_weights)
sem = float(brief.semantic_strength or 0.0)
if sem <= 0.05:
out.setdefault("semantic", 0.0)
return out
query_driven = scenario == "free_search" or not has_planning_reference
sem_weight = 0.12 + sem * (0.38 if query_driven else 0.22)
out["semantic"] = round(sem_weight, 4)
if query_driven:
scale = 1.0 - sem * 0.35
out["fulltext"] = round(float(out.get("fulltext", 0.18)) * scale, 4)
out["profile"] = round(float(out.get("profile", 0.22)) * (1.0 - sem * 0.25), 4)
else:
out["fulltext"] = round(float(out.get("fulltext", 0.18)) * (1.0 - sem * 0.15), 4)
total = sum(v for k, v in out.items() if k not in {"repeat_unit", "repeat_group"} and v > 0)
if total > 0.92:
factor = 0.88 / total
for k in list(out.keys()):
if k in {"repeat_unit", "repeat_group"}:
continue
if out[k] > 0:
out[k] = round(out[k] * factor, 4)
return out
def _blob_from_fields(
title: str,
summary: str,
goal: str,
variant_names: Sequence[str],
) -> str:
parts = [title or "", strip_html_to_plain(summary, max_len=600), strip_html_to_plain(goal, max_len=800)]
parts.extend(variant_names or [])
return " ".join(p for p in parts if p).lower()
def _phrase_in_blob(phrase: str, blob: str) -> bool:
ph = _normalize_phrase(phrase)
if not ph or not blob:
return False
if ph in blob:
return True
if " " not in ph:
return bool(re.search(rf"\b{re.escape(ph)}\b", blob))
return ph in blob
def score_exercise_semantic_relevance(
*,
title: str,
summary: str,
goal: str,
variant_names: Sequence[str],
brief: PlanningSemanticBrief,
step_phase: Optional[str] = None,
) -> Tuple[float, List[str]]:
if brief.semantic_strength <= 0.05:
return 0.0, []
blob = _blob_from_fields(title, summary, goal, variant_names)
if not blob.strip():
return 0.0, []
reasons: List[str] = []
must = list(brief.must_phrases or [])
exclude = list(brief.exclude_phrases or [])
core = semantic_core_phrases(brief)
core_hits = sum(1 for ph in core if _phrase_in_blob(ph, blob))
must_hits = sum(1 for ph in must if _phrase_in_blob(ph, blob))
exclude_hits = sum(1 for ph in exclude if _phrase_in_blob(ph, blob))
score = 0.0
if core:
core_ratio = core_hits / len(core)
score += 0.62 * core_ratio
if core_hits == len(core):
reasons.append("Kern-Thema der Anfrage im Übungstext")
elif core_hits > 0:
reasons.append("Teilweise passend zum Kern-Thema")
elif brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob):
score += 0.55
reasons.append(f"Thema „{brief.primary_topic}“ im Übungstext")
if must and core != must:
extra_ratio = must_hits / len(must)
score += 0.12 * extra_ratio
primary_ok = bool(core_hits) or (
brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob)
)
if exclude_hits > 0 and not primary_ok:
penalty = min(0.65, 0.22 * exclude_hits)
score -= penalty
reasons.append("Enthält ausgeschlossene Nebenthemen")
elif exclude_hits > 0 and primary_ok:
score -= min(0.12, 0.06 * exclude_hits)
if step_phase and step_phase in _PHASE_QUERY_HINTS:
phase_markers = next((markers for phase, markers in _ARC_PHASES if phase == step_phase), ())
if any(m in blob for m in phase_markers) or step_phase in blob:
score += 0.12
reasons.append(f"Passt zur Pfad-Phase „{step_phase}")
if brief.development_arc and not step_phase:
arc_hits = sum(1 for phase in brief.development_arc if phase in blob)
if arc_hits:
score += min(0.15, 0.05 * arc_hits)
return max(0.0, min(1.0, round(score, 4))), reasons[:4]
def semantic_core_phrases(brief: PlanningSemanticBrief) -> List[str]:
"""Harte Kernphrasen fürs Matching."""
if brief.primary_topic:
return [_normalize_phrase(brief.primary_topic)]
core = [_normalize_phrase(p) for p in (brief.must_phrases or [])[:2] if p]
return [p for p in core if p]
def resolve_semantic_skill_weights(cur, brief: PlanningSemanticBrief) -> Dict[int, float]:
"""Deterministisches Fähigkeitserwartungsprofil aus Technik-Thema."""
topic = _normalize_phrase(brief.primary_topic or "")
if topic in _TECHNIQUE_EXPECTED_SKILLS:
names = list(_TECHNIQUE_EXPECTED_SKILLS[topic])
elif brief.topic_type == "technique" or "geri" in topic:
names = list(_DEFAULT_TECHNIQUE_SKILLS)
else:
return {}
weights: Dict[int, float] = {}
for name in names[:6]:
cur.execute(
"""
SELECT id, name FROM skills
WHERE (status IS NULL OR status = 'active')
AND LOWER(name) LIKE %s
ORDER BY CASE WHEN LOWER(name) = %s THEN 0 WHEN LOWER(name) LIKE %s THEN 1 ELSE 2 END,
LENGTH(name) ASC
LIMIT 1
""",
(f"%{name.lower()}%", name.lower(), f"{name.lower()}%"),
)
row = cur.fetchone()
if row:
sid = int(row["id"])
weights[sid] = max(weights.get(sid, 0.0), 1.0)
return weights
def enrich_target_with_semantic_expectations(
target,
*,
skill_weights: Dict[int, float],
):
from planning_exercise_profiles import PlanningTargetProfile, _merge_weight_maps, _normalize_weight_map
if not skill_weights:
return target
merged = _normalize_weight_map(_merge_weight_maps(dict(target.skill_weights), skill_weights, scale=1.0))
sources = list(target.sources)
if "semantic_expectation" not in sources:
sources.append("semantic_expectation")
return PlanningTargetProfile(
focus_area_ids=dict(target.focus_area_ids),
style_direction_ids=dict(target.style_direction_ids),
training_type_ids=dict(target.training_type_ids),
target_group_ids=dict(target.target_group_ids),
skill_weights=merged,
skill_gap_weights=dict(target.skill_gap_weights),
skill_plan_weights=dict(target.skill_plan_weights),
sources=sources,
)
def apply_path_retrieval_weights(brief: PlanningSemanticBrief) -> Dict[str, float]:
"""Pfad-Builder: Semantik + Profil dominieren."""
sem = float(brief.semantic_strength or 0.0)
if sem >= 0.65:
return {
"semantic": 0.50,
"fulltext": 0.16,
"profile": 0.26,
"progression": 0.04,
"skill": 0.04,
"plan": 0.0,
"repeat_unit": -0.40,
"repeat_group": -0.15,
}
if sem >= 0.35:
return {
"semantic": 0.38,
"fulltext": 0.18,
"profile": 0.28,
"progression": 0.06,
"skill": 0.06,
"plan": 0.04,
"repeat_unit": -0.35,
"repeat_group": -0.15,
}
return {
"semantic": 0.22,
"fulltext": 0.22,
"profile": 0.28,
"progression": 0.10,
"skill": 0.10,
"plan": 0.08,
"repeat_unit": -0.30,
"repeat_group": -0.15,
}
def exercise_passes_path_semantic_gate(
*,
semantic_score: float,
title: str,
brief: PlanningSemanticBrief,
) -> bool:
if brief.semantic_strength < 0.55:
return True
if semantic_score >= 0.20:
return True
topic = brief.primary_topic or ""
if topic and _phrase_in_blob(topic, (title or "").lower()):
return True
return False
__all__ = [
"PlanningSemanticBrief",
"apply_dynamic_retrieval_weights",
"apply_path_retrieval_weights",
"brief_to_summary_dict",
"build_semantic_brief",
"enrich_target_with_semantic_expectations",
"exercise_passes_path_semantic_gate",
"merge_semantic_brief_llm",
"resolve_semantic_skill_weights",
"score_exercise_semantic_relevance",
"semantic_core_phrases",
"step_phase_for_index",
"step_retrieval_query",
"try_enrich_semantic_brief_with_llm",
]