All checks were successful
Deploy Development / deploy (push) Successful in 49s
Test Suite / pytest-backend (push) Successful in 44s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 14s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m12s
- Updated `_annotate_roadmap_step` to change the condition for setting `slot_status` based on `roadmap_match_source`, improving clarity in slot assignment handling. - Removed the `_try_reconcile_slot_assignment` function to streamline the slot assignment process, as its logic is now integrated into the main flow. - Enhanced `_match_roadmap_slot` to conditionally preserve slot assignments based on exercise ID, ensuring better handling of existing assignments. - Improved the handling of semantic scores in `rank_visible_library_hits` to prioritize the best semantic fit, enhancing exercise retrieval accuracy. - Added tests to validate the new logic for title equivalence and semantic scoring, ensuring robustness in exercise selection processes.
1402 lines
46 KiB
Python
1402 lines
46 KiB
Python
"""
|
|
Planungs-KI Phase E: Semantik-Schicht für Anfrage-Verständnis und Retrieval.
|
|
|
|
Trennt anfrage-spezifische Semantik (Technik, Phrasen, Entwicklungsbogen) vom
|
|
Katalog-Profil-Overlay (Fokus/Skills). Wird in Hybrid-Retrieval und Pfad-QA genutzt.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple
|
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt
|
|
from exercise_ai import strip_html_to_plain
|
|
from openrouter_chat import (
|
|
effective_openrouter_model_for_prompt_row,
|
|
normalize_openrouter_env,
|
|
openrouter_chat_completion,
|
|
)
|
|
|
|
_logger = logging.getLogger("shinkan.planning_exercise_semantics")
|
|
|
|
_GERI_TECHNIQUES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
|
|
("mae geri", ("mawashi geri", "yoko geri", "ushiro geri", "sakuto geri", "mikazuki geri")),
|
|
("mawashi geri", ("mae geri", "yoko geri", "ushiro geri", "sakuto geri")),
|
|
("yoko geri", ("mae geri", "mawashi geri", "ushiro geri", "sakuto geri")),
|
|
("ushiro geri", ("mae geri", "mawashi geri", "yoko geri", "sakuto geri")),
|
|
("sakuto geri", ("mae geri", "mawashi geri", "yoko geri", "mikazuki geri")),
|
|
("mikazuki geri", ("mae geri", "mawashi geri", "sakuto geri")),
|
|
)
|
|
|
|
_OTHER_TECHNIQUE_PATTERNS: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
|
|
("oi zuki", ("gyaku zuki", "age uke", "gedan barai")),
|
|
("gyaku zuki", ("oi zuki", "mae geri")),
|
|
("age uke", ("gedan barai", "soto uke")),
|
|
("gedan barai", ("age uke", "soto uke")),
|
|
)
|
|
|
|
_TECHNIQUE_EXPECTED_SKILLS: Dict[str, Tuple[str, ...]] = {
|
|
"mae geri": ("Geri Waza", "Koordination", "Gleichgewicht", "Kime"),
|
|
"mawashi geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"yoko geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"ushiro geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"sakuto geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"mikazuki geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
}
|
|
|
|
_DEFAULT_TECHNIQUE_SKILLS: Tuple[str, ...] = ("Geri Waza", "Koordination", "Gleichgewicht")
|
|
|
|
_ARC_PHASES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
|
|
("einstieg", ("einstieg", "erlernen", "lernen", "anfänger", "anfaenger", "beginn", "grund")),
|
|
("grundlage", ("grundlage", "fundament", "basis", "basic")),
|
|
("vertiefung", ("vertief", "festigung", "übung", "uebung", "wiederhol")),
|
|
("anwendung", ("anwend", "partner", "kampf", "kumite", "reaktion")),
|
|
("perfektion", ("perfekt", "meisterschaft", "höchst", "hoechst", "kime", "sauber")),
|
|
)
|
|
|
|
_PHASE_QUERY_HINTS: Dict[str, str] = {
|
|
"einstieg": "einstieg grundübung einfach",
|
|
"grundlage": "grundtechnik festigung",
|
|
"vertiefung": "vertiefung technik übung",
|
|
"anwendung": "anwendung partner variante",
|
|
"perfektion": "perfektion kontrolle kime höchste stufe",
|
|
}
|
|
|
|
_QUERY_STOPWORDS = frozenset(
|
|
{
|
|
"von",
|
|
"bis",
|
|
"zur",
|
|
"zum",
|
|
"der",
|
|
"die",
|
|
"das",
|
|
"des",
|
|
"den",
|
|
"dem",
|
|
"ein",
|
|
"eine",
|
|
"einer",
|
|
"eines",
|
|
"und",
|
|
"oder",
|
|
"mit",
|
|
"für",
|
|
"fuer",
|
|
"im",
|
|
"in",
|
|
"am",
|
|
"an",
|
|
"auf",
|
|
"aus",
|
|
"beim",
|
|
"nach",
|
|
"vor",
|
|
"über",
|
|
"ueber",
|
|
"unter",
|
|
"wie",
|
|
"was",
|
|
"wo",
|
|
"wir",
|
|
"soll",
|
|
"sollen",
|
|
"bitte",
|
|
"schlage",
|
|
"vorschlag",
|
|
"übung",
|
|
"uebung",
|
|
"übungen",
|
|
"uebungen",
|
|
}
|
|
)
|
|
|
|
|
|
class PlanningSemanticBrief(BaseModel):
|
|
primary_topic: Optional[str] = Field(default=None, max_length=120)
|
|
topic_type: str = Field(default="general", max_length=40)
|
|
must_phrases: List[str] = Field(default_factory=list)
|
|
exclude_phrases: List[str] = Field(default_factory=list)
|
|
development_arc: List[str] = Field(default_factory=list)
|
|
retrieval_query: str = Field(default="", max_length=500)
|
|
semantic_strength: float = Field(default=0.0, ge=0.0, le=1.0)
|
|
rationale: Optional[str] = Field(default=None, max_length=400)
|
|
|
|
@field_validator("topic_type")
|
|
@classmethod
|
|
def _topic_type(cls, v: str) -> str:
|
|
s = (v or "general").strip().lower()
|
|
return s if s in {"general", "technique", "focus", "method", "skill"} else "general"
|
|
|
|
@field_validator("must_phrases", "exclude_phrases", "development_arc", mode="before")
|
|
@classmethod
|
|
def _norm_phrase_list(cls, v: Any) -> List[str]:
|
|
if not v:
|
|
return []
|
|
if isinstance(v, str):
|
|
s = _normalize_phrase(v)
|
|
return [s] if s else []
|
|
out: List[str] = []
|
|
for item in v:
|
|
s = _normalize_phrase(str(item or ""))
|
|
if s and s not in out:
|
|
out.append(s[:120])
|
|
return out[:12]
|
|
|
|
|
|
def _normalize_phrase(text: str) -> str:
|
|
return re.sub(r"\s+", " ", (text or "").strip().lower())
|
|
|
|
|
|
_STAGE_TITLE_STOP = frozenset(
|
|
{"für", "fur", "und", "der", "die", "das", "mit", "im", "in", "am", "an", "zur", "zum", "den", "dem", "des"}
|
|
)
|
|
|
|
|
|
def _stage_title_tokens(text: str) -> List[str]:
|
|
return [
|
|
tok
|
|
for tok in _normalize_phrase(text).split()
|
|
if tok not in _STAGE_TITLE_STOP and len(tok) > 1
|
|
]
|
|
|
|
|
|
def exercise_title_equivalent_to_stage_goal(title: str, learning_goal: str) -> bool:
|
|
"""
|
|
Titel entspricht dem Stufen-Lernziel (wortgleich oder nahezu identisch).
|
|
|
|
Deckt Graph-Slots ab, bei denen die Übung gezielt zum Lernziel angelegt wurde,
|
|
ohne dass die Pfad-Haupttechnik im Übungstext vorkommt.
|
|
"""
|
|
t = _normalize_phrase(title)
|
|
lg = _normalize_phrase(learning_goal)
|
|
if len(t) < 3 or len(lg) < 3:
|
|
return False
|
|
if t == lg:
|
|
return True
|
|
shorter, longer = (t, lg) if len(t) <= len(lg) else (lg, t)
|
|
if shorter in longer and len(shorter) >= 8 and len(shorter) / max(len(longer), 1) >= 0.72:
|
|
return True
|
|
t_tok = _stage_title_tokens(title)
|
|
lg_tok = _stage_title_tokens(learning_goal)
|
|
if len(t_tok) >= 2 and t_tok == lg_tok:
|
|
return True
|
|
if len(t_tok) >= 2 and len(lg_tok) >= 2:
|
|
t_set = set(t_tok)
|
|
lg_set = set(lg_tok)
|
|
overlap = len(t_set & lg_set)
|
|
if overlap >= 2 and overlap / max(len(t_set), len(lg_set)) >= 0.85:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _normalize_query(text: str) -> str:
|
|
return re.sub(r"\s+", " ", (text or "").strip())
|
|
|
|
|
|
def _extract_json_object(text: str) -> Dict[str, Any]:
|
|
s = (text or "").strip()
|
|
if s.startswith("```"):
|
|
s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s)
|
|
if s.endswith("```"):
|
|
s = s[:-3].strip()
|
|
start = s.find("{")
|
|
end = s.rfind("}")
|
|
if start < 0 or end <= start:
|
|
raise ValueError("Kein JSON-Objekt in LLM-Antwort")
|
|
obj = json.loads(s[start : end + 1])
|
|
if not isinstance(obj, dict):
|
|
raise ValueError("LLM-Antwort ist kein JSON-Objekt")
|
|
return obj
|
|
|
|
|
|
def _find_technique_in_text(q_lower: str) -> Optional[Tuple[str, Tuple[str, ...]]]:
|
|
for primary, excludes in _GERI_TECHNIQUES + _OTHER_TECHNIQUE_PATTERNS:
|
|
if primary in q_lower:
|
|
return primary, excludes
|
|
return None
|
|
|
|
|
|
def resolve_path_primary_topic(
|
|
goal_query: str,
|
|
semantic_brief: Optional[PlanningSemanticBrief] = None,
|
|
*,
|
|
stage_learning_goal: Optional[str] = None,
|
|
extra_context: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Haupttechnik aus Anfrage, Kontext oder Stufen-Lernziel — nicht nur aus goal_query.
|
|
"""
|
|
if semantic_brief:
|
|
primary = (semantic_brief.primary_topic or "").strip()
|
|
if primary:
|
|
return primary
|
|
parts = [goal_query or "", extra_context or "", stage_learning_goal or ""]
|
|
combined = _normalize_phrase(" ".join(p for p in parts if p))
|
|
if not combined:
|
|
return None
|
|
hit = _find_technique_in_text(combined.lower())
|
|
return hit[0] if hit else None
|
|
|
|
|
|
def technique_sibling_excludes(primary_topic: str) -> List[str]:
|
|
"""Andere Techniken derselben Familie (z. B. Mae/Yoko bei Mawashi) — aus Katalog."""
|
|
topic = _normalize_phrase(primary_topic)
|
|
if not topic:
|
|
return []
|
|
hit = _find_technique_in_text(topic)
|
|
if not hit:
|
|
return []
|
|
out: List[str] = []
|
|
for raw in hit[1]:
|
|
for expanded in _expand_stage_exclude_phrase(raw):
|
|
if expanded and expanded not in out:
|
|
out.append(expanded)
|
|
return out[:16]
|
|
|
|
|
|
def exercise_passes_technique_path_scope(
|
|
*,
|
|
primary_topic: str,
|
|
title: str,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
learning_goal: str = "",
|
|
sibling_excludes: Optional[Sequence[str]] = None,
|
|
relaxed: bool = False,
|
|
) -> bool:
|
|
"""
|
|
Technik-Pfad: keine Geschwister-Technik; Haupttechnik muss im Übungstext vorkommen.
|
|
|
|
Das Stufen-Lernziel allein reicht nicht — sonst würden themenfremde Übungen (z. B. Kumite)
|
|
nur wegen „Mawashi Geri“ im Lernziel durch das Gate rutschen.
|
|
"""
|
|
primary = _normalize_phrase(primary_topic)
|
|
if not primary:
|
|
return True
|
|
|
|
blob = _blob_from_fields(title, summary, goal, [])
|
|
excludes = list(sibling_excludes or technique_sibling_excludes(primary))
|
|
if excludes and _blob_matches_stage_excludes(blob, excludes):
|
|
return False
|
|
|
|
if _phrase_in_blob(primary, blob):
|
|
return True
|
|
|
|
if relaxed:
|
|
parts = [p for p in primary.split() if len(p) >= 4]
|
|
if parts and any(_phrase_in_blob(part, blob) for part in parts):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _detect_development_arc(q_lower: str) -> List[str]:
|
|
found: List[str] = []
|
|
for phase, markers in _ARC_PHASES:
|
|
if any(m in q_lower for m in markers):
|
|
if phase not in found:
|
|
found.append(phase)
|
|
if not found and ("von" in q_lower and "bis" in q_lower):
|
|
found = ["einstieg", "perfektion"]
|
|
return found
|
|
|
|
|
|
def _keyword_phrases_from_query(query: str) -> List[str]:
|
|
q = _normalize_query(query).lower()
|
|
tokens = re.findall(r"[a-zäöüß]{3,}", q, flags=re.IGNORECASE)
|
|
phrases: List[str] = []
|
|
for i, tok in enumerate(tokens):
|
|
low = tok.lower()
|
|
if low in _QUERY_STOPWORDS:
|
|
continue
|
|
if i + 1 < len(tokens):
|
|
nxt = tokens[i + 1].lower()
|
|
if nxt not in _QUERY_STOPWORDS:
|
|
pair = _normalize_phrase(f"{low} {nxt}")
|
|
if len(pair) >= 5 and pair not in phrases:
|
|
phrases.append(pair)
|
|
if len(low) >= 4 and low not in phrases:
|
|
phrases.append(low)
|
|
return phrases[:6]
|
|
|
|
|
|
def build_semantic_brief(query: Optional[str]) -> PlanningSemanticBrief:
|
|
"""Deterministisches Anfrage-Verständnis — ohne LLM."""
|
|
q = _normalize_query(query)
|
|
if not q:
|
|
return PlanningSemanticBrief(retrieval_query="", semantic_strength=0.0)
|
|
|
|
q_lower = q.lower()
|
|
must: List[str] = []
|
|
exclude: List[str] = []
|
|
topic_type = "general"
|
|
primary: Optional[str] = None
|
|
strength = 0.25
|
|
|
|
technique = _find_technique_in_text(q_lower)
|
|
if technique:
|
|
primary, ex = technique
|
|
must.append(primary)
|
|
exclude.extend(list(ex))
|
|
topic_type = "technique"
|
|
strength = max(strength, 0.82)
|
|
|
|
arc = _detect_development_arc(q_lower)
|
|
if arc:
|
|
strength = max(strength, 0.55 if technique else 0.45)
|
|
|
|
# Keine generischen Stichwörter in must_phrases — sonst verwässert das Scoring.
|
|
retrieval_parts = list(must)
|
|
if primary:
|
|
retrieval_parts.append(primary)
|
|
if arc:
|
|
retrieval_parts.extend(arc[:2])
|
|
retrieval = " ".join(dict.fromkeys(retrieval_parts))[:500] if retrieval_parts else q
|
|
|
|
if len(q) >= 24 and not technique:
|
|
strength = max(strength, 0.4)
|
|
|
|
path_constraints = parse_stage_goal_constraints(q)
|
|
for item in path_constraints.exclude_phrases:
|
|
if item not in exclude:
|
|
exclude.append(item)
|
|
|
|
return PlanningSemanticBrief(
|
|
primary_topic=primary,
|
|
topic_type=topic_type,
|
|
must_phrases=must[:8],
|
|
exclude_phrases=exclude[:10],
|
|
development_arc=arc[:5],
|
|
retrieval_query=retrieval[:500],
|
|
semantic_strength=min(1.0, round(strength, 3)),
|
|
rationale=None,
|
|
)
|
|
|
|
|
|
def merge_semantic_brief_llm(
|
|
base: PlanningSemanticBrief,
|
|
llm_obj: Mapping[str, Any],
|
|
) -> PlanningSemanticBrief:
|
|
"""LLM-Enrichment in deterministisches Brief mergen (LLM ergänzt, ersetzt nicht harte Technik-Regeln)."""
|
|
data = base.model_dump()
|
|
for key in ("primary_topic", "topic_type", "rationale"):
|
|
val = llm_obj.get(key)
|
|
if val:
|
|
data[key] = val
|
|
|
|
for key in ("must_phrases", "exclude_phrases", "development_arc"):
|
|
extra = llm_obj.get(key) or []
|
|
merged = list(data.get(key) or [])
|
|
for item in extra:
|
|
s = _normalize_phrase(str(item or ""))
|
|
if s and s not in merged:
|
|
merged.append(s)
|
|
data[key] = merged[:12]
|
|
|
|
llm_strength = llm_obj.get("semantic_strength")
|
|
if llm_strength is not None:
|
|
try:
|
|
data["semantic_strength"] = min(
|
|
1.0,
|
|
max(float(data["semantic_strength"]), float(llm_strength)),
|
|
)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if data.get("must_phrases"):
|
|
core = semantic_core_phrases(PlanningSemanticBrief.model_validate(data))
|
|
data["retrieval_query"] = " ".join(core[:4])[:500] if core else data.get("retrieval_query", "")
|
|
out = PlanningSemanticBrief.model_validate(data)
|
|
if out.primary_topic and out.topic_type == "general":
|
|
out = out.model_copy(update={"topic_type": "technique"})
|
|
return out
|
|
|
|
|
|
def try_enrich_semantic_brief_with_llm(
|
|
cur,
|
|
query: str,
|
|
base: PlanningSemanticBrief,
|
|
) -> Tuple[PlanningSemanticBrief, bool]:
|
|
api_key, _ = normalize_openrouter_env()
|
|
if not api_key or base.semantic_strength < 0.35:
|
|
return base, False
|
|
if not (query or "").strip():
|
|
return base, False
|
|
|
|
variables = {
|
|
"search_query": (query or "").strip(),
|
|
"semantic_brief_json": json.dumps(brief_to_summary_dict(base), ensure_ascii=False),
|
|
}
|
|
try:
|
|
prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_query_semantics", variables)
|
|
model = effective_openrouter_model_for_prompt_row(prow)
|
|
raw = openrouter_chat_completion(api_key=api_key, model=model, user_content=rendered.text)
|
|
obj = _extract_json_object(raw)
|
|
return merge_semantic_brief_llm(base, obj), True
|
|
except AiPromptUnavailableError:
|
|
return base, False
|
|
except Exception as exc:
|
|
_logger.warning("Semantik-LLM fehlgeschlagen: %s", exc)
|
|
return base, False
|
|
|
|
|
|
def brief_to_summary_dict(brief: PlanningSemanticBrief) -> Dict[str, Any]:
|
|
return {
|
|
"primary_topic": brief.primary_topic,
|
|
"topic_type": brief.topic_type,
|
|
"must_phrases": list(brief.must_phrases),
|
|
"exclude_phrases": list(brief.exclude_phrases),
|
|
"development_arc": list(brief.development_arc),
|
|
"retrieval_query": brief.retrieval_query,
|
|
"semantic_strength": brief.semantic_strength,
|
|
"rationale": brief.rationale,
|
|
}
|
|
|
|
|
|
def step_phase_for_index(brief: PlanningSemanticBrief, step_index: int, max_steps: int) -> Optional[str]:
|
|
arc = list(brief.development_arc or [])
|
|
if not arc:
|
|
if max_steps <= 1:
|
|
return None
|
|
default_arc = ["einstieg", "grundlage", "vertiefung", "anwendung", "perfektion"]
|
|
arc = default_arc[:max_steps] if brief.semantic_strength >= 0.5 else []
|
|
if not arc:
|
|
return None
|
|
if len(arc) == 1:
|
|
return arc[0]
|
|
pos = step_index / max(max_steps - 1, 1)
|
|
idx = min(len(arc) - 1, int(round(pos * (len(arc) - 1))))
|
|
return arc[idx]
|
|
|
|
|
|
def step_retrieval_query(
|
|
brief: PlanningSemanticBrief,
|
|
goal_query: str,
|
|
step_index: int,
|
|
max_steps: int,
|
|
) -> str:
|
|
phase = step_phase_for_index(brief, step_index, max_steps)
|
|
parts: List[str] = []
|
|
if brief.primary_topic:
|
|
parts.append(brief.primary_topic)
|
|
elif brief.retrieval_query:
|
|
parts.append(brief.retrieval_query.split()[0] if brief.retrieval_query else "")
|
|
if phase:
|
|
parts.append(phase)
|
|
if not parts and brief.retrieval_query:
|
|
parts.append(brief.retrieval_query)
|
|
elif not parts and goal_query:
|
|
parts.append(goal_query)
|
|
return _normalize_query(" ".join(p for p in parts if p)) or _normalize_query(goal_query)
|
|
|
|
|
|
def apply_dynamic_retrieval_weights(
|
|
base_weights: Mapping[str, float],
|
|
brief: PlanningSemanticBrief,
|
|
*,
|
|
scenario: str,
|
|
has_planning_reference: bool,
|
|
) -> Dict[str, float]:
|
|
"""Semantik-Kanal dynamisch gegen Profil/Plan abwägen."""
|
|
out = dict(base_weights)
|
|
sem = float(brief.semantic_strength or 0.0)
|
|
if sem <= 0.05:
|
|
out.setdefault("semantic", 0.0)
|
|
return out
|
|
|
|
query_driven = scenario == "free_search" or not has_planning_reference
|
|
sem_weight = 0.12 + sem * (0.38 if query_driven else 0.22)
|
|
out["semantic"] = round(sem_weight, 4)
|
|
|
|
if query_driven:
|
|
scale = 1.0 - sem * 0.35
|
|
out["fulltext"] = round(float(out.get("fulltext", 0.18)) * scale, 4)
|
|
out["profile"] = round(float(out.get("profile", 0.22)) * (1.0 - sem * 0.25), 4)
|
|
else:
|
|
out["fulltext"] = round(float(out.get("fulltext", 0.18)) * (1.0 - sem * 0.15), 4)
|
|
|
|
total = sum(v for k, v in out.items() if k not in {"repeat_unit", "repeat_group"} and v > 0)
|
|
if total > 0.92:
|
|
factor = 0.88 / total
|
|
for k in list(out.keys()):
|
|
if k in {"repeat_unit", "repeat_group"}:
|
|
continue
|
|
if out[k] > 0:
|
|
out[k] = round(out[k] * factor, 4)
|
|
return out
|
|
|
|
|
|
def _blob_from_fields(
|
|
title: str,
|
|
summary: str,
|
|
goal: str,
|
|
variant_names: Sequence[str],
|
|
) -> str:
|
|
parts = [title or "", strip_html_to_plain(summary, max_len=600), strip_html_to_plain(goal, max_len=800)]
|
|
parts.extend(variant_names or [])
|
|
return " ".join(p for p in parts if p).lower()
|
|
|
|
|
|
def _compact_alpha(text: str) -> str:
|
|
return re.sub(r"[^a-z0-9äöüß]+", "", (text or "").lower())
|
|
|
|
|
|
def _phrase_in_blob(phrase: str, blob: str) -> bool:
|
|
ph = _normalize_phrase(phrase)
|
|
if not ph or not blob:
|
|
return False
|
|
low = blob.lower()
|
|
if ph in low:
|
|
return True
|
|
if _compact_alpha(ph) and _compact_alpha(ph) in _compact_alpha(low):
|
|
return True
|
|
if " " not in ph:
|
|
return bool(re.search(rf"\b{re.escape(ph)}\b", low))
|
|
return ph in low
|
|
|
|
|
|
def score_exercise_semantic_relevance(
|
|
*,
|
|
title: str,
|
|
summary: str,
|
|
goal: str,
|
|
variant_names: Sequence[str],
|
|
brief: PlanningSemanticBrief,
|
|
step_phase: Optional[str] = None,
|
|
) -> Tuple[float, List[str]]:
|
|
if brief.semantic_strength <= 0.05:
|
|
return 0.0, []
|
|
|
|
blob = _blob_from_fields(title, summary, goal, variant_names)
|
|
if not blob.strip():
|
|
return 0.0, []
|
|
|
|
reasons: List[str] = []
|
|
must = list(brief.must_phrases or [])
|
|
exclude = list(brief.exclude_phrases or [])
|
|
core = semantic_core_phrases(brief)
|
|
|
|
core_hits = sum(1 for ph in core if _phrase_in_blob(ph, blob))
|
|
must_hits = sum(1 for ph in must if _phrase_in_blob(ph, blob))
|
|
exclude_hits = sum(1 for ph in exclude if _phrase_excluded_in_blob(ph, blob))
|
|
|
|
score = 0.0
|
|
if core:
|
|
core_ratio = core_hits / len(core)
|
|
score += 0.62 * core_ratio
|
|
if core_hits == len(core):
|
|
reasons.append("Kern-Thema der Anfrage im Übungstext")
|
|
elif core_hits > 0:
|
|
reasons.append("Teilweise passend zum Kern-Thema")
|
|
elif brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob):
|
|
score += 0.55
|
|
reasons.append(f"Thema „{brief.primary_topic}“ im Übungstext")
|
|
|
|
if must and core != must:
|
|
extra_ratio = must_hits / len(must)
|
|
score += 0.12 * extra_ratio
|
|
|
|
primary_ok = bool(core_hits) or (
|
|
brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob)
|
|
)
|
|
if exclude_hits > 0 and not primary_ok:
|
|
penalty = min(0.65, 0.22 * exclude_hits)
|
|
score -= penalty
|
|
reasons.append("Enthält ausgeschlossene Nebenthemen")
|
|
elif exclude_hits > 0 and primary_ok:
|
|
score -= min(0.12, 0.06 * exclude_hits)
|
|
|
|
if step_phase and step_phase in _PHASE_QUERY_HINTS:
|
|
phase_markers = next((markers for phase, markers in _ARC_PHASES if phase == step_phase), ())
|
|
if any(m in blob for m in phase_markers) or step_phase in blob:
|
|
score += 0.12
|
|
reasons.append(f"Passt zur Pfad-Phase „{step_phase}“")
|
|
|
|
if brief.development_arc and not step_phase:
|
|
arc_hits = sum(1 for phase in brief.development_arc if phase in blob)
|
|
if arc_hits:
|
|
score += min(0.15, 0.05 * arc_hits)
|
|
|
|
return max(0.0, min(1.0, round(score, 4))), reasons[:4]
|
|
|
|
|
|
def semantic_core_phrases(brief: PlanningSemanticBrief) -> List[str]:
|
|
"""Harte Kernphrasen fürs Matching."""
|
|
if brief.primary_topic:
|
|
return [_normalize_phrase(brief.primary_topic)]
|
|
core = [_normalize_phrase(p) for p in (brief.must_phrases or [])[:2] if p]
|
|
return [p for p in core if p]
|
|
|
|
|
|
def resolve_semantic_skill_weights(cur, brief: PlanningSemanticBrief) -> Dict[int, float]:
|
|
"""Deterministisches Fähigkeitserwartungsprofil aus Technik-Thema."""
|
|
topic = _normalize_phrase(brief.primary_topic or "")
|
|
if topic in _TECHNIQUE_EXPECTED_SKILLS:
|
|
names = list(_TECHNIQUE_EXPECTED_SKILLS[topic])
|
|
elif brief.topic_type == "technique" or "geri" in topic:
|
|
names = list(_DEFAULT_TECHNIQUE_SKILLS)
|
|
else:
|
|
return {}
|
|
|
|
weights: Dict[int, float] = {}
|
|
for name in names[:6]:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name FROM skills
|
|
WHERE (status IS NULL OR status = 'active')
|
|
AND LOWER(name) LIKE %s
|
|
ORDER BY CASE WHEN LOWER(name) = %s THEN 0 WHEN LOWER(name) LIKE %s THEN 1 ELSE 2 END,
|
|
LENGTH(name) ASC
|
|
LIMIT 1
|
|
""",
|
|
(f"%{name.lower()}%", name.lower(), f"{name.lower()}%"),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
sid = int(row["id"])
|
|
weights[sid] = max(weights.get(sid, 0.0), 1.0)
|
|
return weights
|
|
|
|
|
|
def enrich_target_with_semantic_expectations(
|
|
target,
|
|
*,
|
|
skill_weights: Dict[int, float],
|
|
):
|
|
from planning_exercise_profiles import PlanningTargetProfile, _merge_weight_maps, _normalize_weight_map
|
|
|
|
if not skill_weights:
|
|
return target
|
|
merged = _normalize_weight_map(_merge_weight_maps(dict(target.skill_weights), skill_weights, scale=1.0))
|
|
sources = list(target.sources)
|
|
if "semantic_expectation" not in sources:
|
|
sources.append("semantic_expectation")
|
|
return PlanningTargetProfile(
|
|
focus_area_ids=dict(target.focus_area_ids),
|
|
style_direction_ids=dict(target.style_direction_ids),
|
|
training_type_ids=dict(target.training_type_ids),
|
|
target_group_ids=dict(target.target_group_ids),
|
|
skill_weights=merged,
|
|
skill_gap_weights=dict(target.skill_gap_weights),
|
|
skill_plan_weights=dict(target.skill_plan_weights),
|
|
sources=sources,
|
|
)
|
|
|
|
|
|
def apply_path_retrieval_weights(brief: PlanningSemanticBrief) -> Dict[str, float]:
|
|
"""Pfad-Builder: Semantik + Profil dominieren."""
|
|
sem = float(brief.semantic_strength or 0.0)
|
|
if sem >= 0.65:
|
|
return {
|
|
"semantic": 0.50,
|
|
"fulltext": 0.16,
|
|
"profile": 0.26,
|
|
"progression": 0.04,
|
|
"skill": 0.04,
|
|
"plan": 0.0,
|
|
"repeat_unit": -0.40,
|
|
"repeat_group": -0.15,
|
|
}
|
|
if sem >= 0.35:
|
|
return {
|
|
"semantic": 0.38,
|
|
"fulltext": 0.18,
|
|
"profile": 0.28,
|
|
"progression": 0.06,
|
|
"skill": 0.06,
|
|
"plan": 0.04,
|
|
"repeat_unit": -0.35,
|
|
"repeat_group": -0.15,
|
|
}
|
|
return {
|
|
"semantic": 0.22,
|
|
"fulltext": 0.22,
|
|
"profile": 0.28,
|
|
"progression": 0.10,
|
|
"skill": 0.10,
|
|
"plan": 0.08,
|
|
"repeat_unit": -0.30,
|
|
"repeat_group": -0.15,
|
|
}
|
|
|
|
|
|
_STAGE_GOAL_STOPWORDS = _QUERY_STOPWORDS | frozenset(
|
|
{
|
|
"stufe",
|
|
"phase",
|
|
"lernziel",
|
|
"grundlage",
|
|
"vertiefung",
|
|
"anwendung",
|
|
"perfektion",
|
|
"einstieg",
|
|
"sicher",
|
|
"sauber",
|
|
"korrekt",
|
|
"technik",
|
|
"training",
|
|
}
|
|
)
|
|
|
|
|
|
_STAGE_NEGATION_PATTERNS = (
|
|
r"\bohne\s+([^,.;]+)",
|
|
r"\bkein(?:e|en|er|em)?\s+([^,.;]+)",
|
|
r"\bnicht\s+([^,.;]+)",
|
|
)
|
|
|
|
# Aus „ohne Tritttechnik“ etc. — erweiterte Treffer im Übungstext
|
|
_STAGE_EXCLUDE_ALIASES: Dict[str, Tuple[str, ...]] = {
|
|
"tritttechnik": (
|
|
"tritttechnik",
|
|
"trittpraezision",
|
|
"trittpräzision",
|
|
"tritt praesision",
|
|
"tritt-präzision",
|
|
"kicktechnik",
|
|
"tritt ausführung",
|
|
"tritt ausfuehrung",
|
|
),
|
|
"kumite": ("kumite", "partnerkampf", "freikampf", "jiyu kumite"),
|
|
"kraftuebung": ("kraftuebung", "kraftübung", "krafttraining", "kraftübungen"),
|
|
"anwendung": ("kumite anwendung", "kampfanwendung"),
|
|
}
|
|
|
|
_STAGE_FOCUS_TOKENS = frozenset(
|
|
{
|
|
"koordination",
|
|
"absprung",
|
|
"beinhebung",
|
|
"landung",
|
|
"sprung",
|
|
"sprungphase",
|
|
"balance",
|
|
"gleichgewicht",
|
|
"timing",
|
|
"vorbereitung",
|
|
"athletik",
|
|
"mobilitaet",
|
|
"mobilität",
|
|
"stabilisation",
|
|
"stabilisierung",
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class StageGoalConstraints:
|
|
positive_tokens: List[str] = field(default_factory=list)
|
|
exclude_phrases: List[str] = field(default_factory=list)
|
|
has_negation: bool = False
|
|
strict_positive: bool = False
|
|
|
|
|
|
def _expand_stage_exclude_phrase(phrase: str) -> List[str]:
|
|
norm = _normalize_phrase(phrase)
|
|
if not norm:
|
|
return []
|
|
out: List[str] = [norm]
|
|
compact = norm.replace(" ", "")
|
|
if compact and compact not in out:
|
|
out.append(compact)
|
|
for key, aliases in _STAGE_EXCLUDE_ALIASES.items():
|
|
if key in norm or norm in key:
|
|
for alias in aliases:
|
|
a = _normalize_phrase(alias)
|
|
if a and a not in out:
|
|
out.append(a)
|
|
return out[:12]
|
|
|
|
|
|
def _significant_stage_tokens(learning_goal: str, *, strip_negated: bool = True) -> List[str]:
|
|
"""Wörter aus Stufen-Lernziel für Text-Match (ohne Füllwörter, ohne Negationssegmente)."""
|
|
text = _normalize_phrase(learning_goal)
|
|
if strip_negated:
|
|
for pat in _STAGE_NEGATION_PATTERNS:
|
|
text = re.sub(pat, " ", text)
|
|
raw = re.findall(r"[a-zäöüß]{4,}", text, flags=re.IGNORECASE)
|
|
out: List[str] = []
|
|
for w in raw:
|
|
low = w.lower().replace("ä", "ae").replace("ö", "oe").replace("ü", "ue")
|
|
if low in _STAGE_GOAL_STOPWORDS:
|
|
continue
|
|
if low not in out:
|
|
out.append(low)
|
|
return out[:10]
|
|
|
|
|
|
def parse_stage_goal_constraints(
|
|
learning_goal: str,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
) -> StageGoalConstraints:
|
|
"""Positiv/Negativ aus Stufen-Lernziel + anti_patterns (Roadmap-Stufe)."""
|
|
lg = (learning_goal or "").strip()
|
|
if len(lg) < 3:
|
|
return StageGoalConstraints()
|
|
|
|
norm = _normalize_phrase(lg)
|
|
exclude: List[str] = []
|
|
has_negation = False
|
|
for pat in _STAGE_NEGATION_PATTERNS:
|
|
for m in re.finditer(pat, norm):
|
|
has_negation = True
|
|
chunk = (m.group(1) or "").strip()
|
|
if chunk:
|
|
exclude.extend(_expand_stage_exclude_phrase(chunk))
|
|
|
|
for raw in anti_patterns or []:
|
|
s = _normalize_phrase(str(raw or ""))
|
|
if s:
|
|
exclude.extend(_expand_stage_exclude_phrase(s))
|
|
|
|
positive = _significant_stage_tokens(lg, strip_negated=True)
|
|
focus_hits = [t for t in positive if t in _STAGE_FOCUS_TOKENS]
|
|
strict_positive = bool(focus_hits) or has_negation
|
|
|
|
dedup_exclude: List[str] = []
|
|
for item in exclude:
|
|
if item and item not in dedup_exclude:
|
|
dedup_exclude.append(item)
|
|
|
|
return StageGoalConstraints(
|
|
positive_tokens=positive,
|
|
exclude_phrases=dedup_exclude[:16],
|
|
has_negation=has_negation,
|
|
strict_positive=strict_positive,
|
|
)
|
|
|
|
|
|
def _phrase_excluded_in_blob(phrase: str, blob: str) -> bool:
|
|
"""Treffer nur wenn das Ausschluss-Thema nicht selbst negiert beschrieben ist."""
|
|
if not phrase or not blob:
|
|
return False
|
|
if not _phrase_in_blob(phrase, blob):
|
|
return False
|
|
norm = _normalize_phrase(phrase)
|
|
for pat in _STAGE_NEGATION_PATTERNS:
|
|
for m in re.finditer(pat, blob):
|
|
chunk = _normalize_phrase(m.group(1) or "")
|
|
if not chunk:
|
|
continue
|
|
if norm in chunk or chunk in norm or _phrase_in_blob(norm, chunk):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _blob_matches_stage_excludes(blob: str, exclude_phrases: Sequence[str]) -> bool:
|
|
for phrase in exclude_phrases:
|
|
if _phrase_excluded_in_blob(phrase, blob):
|
|
return True
|
|
return False
|
|
|
|
|
|
def resolve_path_anti_patterns(
|
|
goal_query: str,
|
|
*,
|
|
semantic_brief: Optional[PlanningSemanticBrief] = None,
|
|
extra_context: Optional[str] = None,
|
|
) -> List[str]:
|
|
"""
|
|
Pfadweite Ausschlüsse — nur aus expliziten Quellen, kein Themen-Raten.
|
|
|
|
Quellen (in dieser Reihenfolge):
|
|
1. Negationen in Anfrage/Kontext (ohne/kein/nicht …) via parse_stage_goal_constraints
|
|
2. exclude_phrases im Semantic Brief (inkl. LLM/Technik-Regeln)
|
|
3. stage_specs.anti_patterns (Roadmap-Stufe, vom Trainer oder LLM)
|
|
|
|
Keine stillen Ausschlüsse aus dem Hauptthema (z. B. „Mawashi“ → kein Kumite).
|
|
"""
|
|
parts = [str(goal_query or "").strip(), str(extra_context or "").strip()]
|
|
combined = " ".join(p for p in parts if p)
|
|
if not combined and not semantic_brief:
|
|
return []
|
|
|
|
constraints = parse_stage_goal_constraints(combined) if combined else StageGoalConstraints()
|
|
out: List[str] = []
|
|
for item in constraints.exclude_phrases:
|
|
if item and item not in out:
|
|
out.append(item)
|
|
|
|
if semantic_brief:
|
|
for raw in semantic_brief.exclude_phrases or []:
|
|
for expanded in _expand_stage_exclude_phrase(str(raw or "")):
|
|
if expanded and expanded not in out:
|
|
out.append(expanded)
|
|
|
|
return out[:24]
|
|
|
|
|
|
def enrich_brief_with_path_constraints(
|
|
brief: PlanningSemanticBrief,
|
|
goal_query: str,
|
|
*,
|
|
extra_context: Optional[str] = None,
|
|
) -> PlanningSemanticBrief:
|
|
"""Negationen/Ausschlüsse aus der Gesamtanfrage in den Semantic Brief übernehmen."""
|
|
anti = resolve_path_anti_patterns(
|
|
goal_query,
|
|
semantic_brief=brief,
|
|
extra_context=extra_context,
|
|
)
|
|
if not anti:
|
|
return brief
|
|
exclude = list(brief.exclude_phrases or [])
|
|
for item in anti:
|
|
if item not in exclude:
|
|
exclude.append(item)
|
|
return brief.model_copy(update={"exclude_phrases": exclude[:16]})
|
|
|
|
|
|
_MIN_STAGE_FIT_SEMANTIC = 0.30
|
|
_MIN_STAGE_FIT_RELAXED = 0.20
|
|
_MIN_TITLE_EQUIV_SEMANTIC = 0.15
|
|
|
|
|
|
def build_stage_match_brief(
|
|
*,
|
|
learning_goal: str,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
success_criteria: Optional[Sequence[str]] = None,
|
|
load_profile: Optional[Sequence[str]] = None,
|
|
phase: Optional[str] = None,
|
|
path_context_note: Optional[str] = None,
|
|
path_anti_patterns: Optional[Sequence[str]] = None,
|
|
path_primary_topic: Optional[str] = None,
|
|
path_technique_excludes: Optional[Sequence[str]] = None,
|
|
stage_start_state: Optional[str] = None,
|
|
stage_target_state: Optional[str] = None,
|
|
path_target_state: Optional[str] = None,
|
|
contextualized_learning_goal: Optional[str] = None,
|
|
) -> PlanningSemanticBrief:
|
|
"""
|
|
Stufen-zentrierter Semantik-Brief — unabhängig vom Gesamt-Pfad-Thema.
|
|
|
|
Primär für Roadmap-Match: Bewertung gegen Titel + Kurzbeschreibung + Übungsziel.
|
|
"""
|
|
lg = (contextualized_learning_goal or learning_goal or "").strip()
|
|
if len(lg) < 3:
|
|
return PlanningSemanticBrief(semantic_strength=0.0)
|
|
|
|
merged_anti: List[str] = []
|
|
for raw in list(anti_patterns or []) + list(path_anti_patterns or []):
|
|
s = str(raw or "").strip()
|
|
if s and s not in merged_anti:
|
|
merged_anti.append(s)
|
|
primary_path = _normalize_phrase(path_primary_topic or "")
|
|
if primary_path:
|
|
for item in technique_sibling_excludes(primary_path):
|
|
if item not in merged_anti:
|
|
merged_anti.append(item)
|
|
for raw in path_technique_excludes or []:
|
|
for expanded in _expand_stage_exclude_phrase(str(raw or "")):
|
|
if expanded and expanded not in merged_anti:
|
|
merged_anti.append(expanded)
|
|
constraints = parse_stage_goal_constraints(lg, merged_anti)
|
|
must: List[str] = []
|
|
norm_lg = _normalize_phrase(lg)
|
|
if primary_path and primary_path not in must:
|
|
must.insert(0, primary_path[:120])
|
|
for token in constraints.positive_tokens:
|
|
if token not in must:
|
|
must.append(token)
|
|
if norm_lg and norm_lg not in must:
|
|
must.append(norm_lg[:120])
|
|
for raw in success_criteria or []:
|
|
s = _normalize_phrase(str(raw or ""))
|
|
if s and s not in must:
|
|
must.append(s[:100])
|
|
for raw in load_profile or []:
|
|
s = _normalize_phrase(str(raw or ""))
|
|
if s and s not in must:
|
|
must.append(s[:60])
|
|
|
|
retrieval_parts = [norm_lg]
|
|
for raw in (stage_start_state, stage_target_state, path_target_state):
|
|
s = _normalize_phrase(str(raw or ""))[:200]
|
|
if s and s not in retrieval_parts:
|
|
retrieval_parts.append(s)
|
|
if path_context_note:
|
|
note = _normalize_phrase(path_context_note)[:200]
|
|
if note:
|
|
retrieval_parts.append(note)
|
|
|
|
arc: List[str] = []
|
|
ph = (phase or "").strip().lower()
|
|
if ph:
|
|
arc.append(ph)
|
|
|
|
return PlanningSemanticBrief(
|
|
primary_topic="",
|
|
topic_type="focus",
|
|
must_phrases=must[:12],
|
|
exclude_phrases=list(constraints.exclude_phrases)[:12],
|
|
development_arc=arc[:4],
|
|
retrieval_query=" ".join(p for p in retrieval_parts if p)[:500],
|
|
semantic_strength=0.78,
|
|
rationale="stage_match_brief",
|
|
)
|
|
|
|
|
|
def score_exercise_stage_fit(
|
|
*,
|
|
title: str,
|
|
summary: str,
|
|
goal: str,
|
|
stage_brief: PlanningSemanticBrief,
|
|
variant_names: Optional[Sequence[str]] = None,
|
|
step_phase: Optional[str] = None,
|
|
) -> Tuple[float, List[str]]:
|
|
"""Semantik-Score Übung ↔ Stufen-Lernziel (Titel + Summary + Goal)."""
|
|
score, reasons = score_exercise_semantic_relevance(
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
variant_names=variant_names or [],
|
|
brief=stage_brief,
|
|
step_phase=step_phase,
|
|
)
|
|
blob = _blob_from_fields(title, summary, goal, variant_names or [])
|
|
focus_tokens = [
|
|
t
|
|
for t in (stage_brief.must_phrases or [])
|
|
if t and " " not in t and len(t) >= 4
|
|
][:6]
|
|
if focus_tokens:
|
|
hits = sum(1 for t in focus_tokens if _phrase_in_blob(t, blob))
|
|
ratio = hits / len(focus_tokens)
|
|
bonus = 0.28 * ratio
|
|
if bonus > 0:
|
|
score = min(1.0, score + bonus)
|
|
if hits >= max(1, len(focus_tokens) // 2):
|
|
reasons = ["Stufen-Schwerpunkte im Übungstext", *reasons]
|
|
return max(0.0, min(1.0, round(score, 4))), reasons[:4]
|
|
|
|
|
|
def exercise_passes_stage_fit(
|
|
*,
|
|
learning_goal: str,
|
|
title: str,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
stage_brief: Optional[PlanningSemanticBrief] = None,
|
|
stage_semantic_score: Optional[float] = None,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
step_phase: Optional[str] = None,
|
|
path_primary_topic: Optional[str] = None,
|
|
path_technique_excludes: Optional[Sequence[str]] = None,
|
|
min_stage_semantic: float = _MIN_STAGE_FIT_SEMANTIC,
|
|
relaxed: bool = False,
|
|
) -> bool:
|
|
"""Allgemeines Stufen-Fit-Gate: voller Übungstext vs. Stufen-Brief."""
|
|
lg = (learning_goal or "").strip()
|
|
if len(lg) < 3 and not (path_primary_topic or "").strip():
|
|
return True
|
|
|
|
blob = _blob_from_fields(title, summary, goal, [])
|
|
constraints = parse_stage_goal_constraints(lg, anti_patterns)
|
|
if constraints.exclude_phrases and _blob_matches_stage_excludes(blob, constraints.exclude_phrases):
|
|
return False
|
|
|
|
title_equiv = exercise_title_equivalent_to_stage_goal(title, learning_goal or lg)
|
|
|
|
primary_path = (path_primary_topic or "").strip()
|
|
if not primary_path and lg:
|
|
hit = _find_technique_in_text(_normalize_phrase(lg))
|
|
if hit:
|
|
primary_path = hit[0]
|
|
tech_excludes = list(path_technique_excludes or [])
|
|
if primary_path:
|
|
for item in technique_sibling_excludes(primary_path):
|
|
if item not in tech_excludes:
|
|
tech_excludes.append(item)
|
|
if primary_path and not title_equiv and not exercise_passes_technique_path_scope(
|
|
primary_topic=primary_path,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
learning_goal=lg,
|
|
sibling_excludes=tech_excludes,
|
|
relaxed=relaxed,
|
|
):
|
|
return False
|
|
|
|
brief = stage_brief or build_stage_match_brief(
|
|
learning_goal=lg,
|
|
anti_patterns=anti_patterns,
|
|
)
|
|
stage_sem = stage_semantic_score
|
|
if stage_sem is None:
|
|
stage_sem, _ = score_exercise_stage_fit(
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
stage_brief=brief,
|
|
step_phase=step_phase,
|
|
)
|
|
|
|
if relaxed:
|
|
threshold = _MIN_STAGE_FIT_RELAXED
|
|
elif title_equiv:
|
|
threshold = _MIN_TITLE_EQUIV_SEMANTIC
|
|
else:
|
|
threshold = min_stage_semantic
|
|
return float(stage_sem or 0.0) >= threshold
|
|
|
|
|
|
def apply_stage_match_retrieval_weights(brief: PlanningSemanticBrief) -> Dict[str, float]:
|
|
"""Roadmap-Stufe: Stufen-Semantik (Ziel/Summary/Goal) dominiert."""
|
|
return {
|
|
"semantic": 0.58,
|
|
"fulltext": 0.14,
|
|
"profile": 0.18,
|
|
"progression": 0.04,
|
|
"skill": 0.04,
|
|
"plan": 0.02,
|
|
"repeat_unit": -0.40,
|
|
"repeat_group": -0.15,
|
|
}
|
|
|
|
|
|
def semantic_brief_for_stage(
|
|
brief: PlanningSemanticBrief,
|
|
*,
|
|
learning_goal: str,
|
|
phase: Optional[str] = None,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
) -> PlanningSemanticBrief:
|
|
"""Legacy: globalen Brief anreichern — bevorzugt build_stage_match_brief für Roadmap-Match."""
|
|
lg = _normalize_phrase(learning_goal)
|
|
if not lg:
|
|
return brief
|
|
constraints = parse_stage_goal_constraints(learning_goal, anti_patterns)
|
|
must = list(brief.must_phrases or [])
|
|
for token in constraints.positive_tokens[:4]:
|
|
if token not in must:
|
|
must.append(token)
|
|
if lg not in must:
|
|
must.insert(0, lg[:120])
|
|
exclude = list(brief.exclude_phrases or [])
|
|
for item in constraints.exclude_phrases:
|
|
if item not in exclude:
|
|
exclude.append(item)
|
|
arc = list(brief.development_arc or [])
|
|
ph = (phase or "").strip().lower()
|
|
if ph and ph not in arc:
|
|
arc = [ph, *arc]
|
|
strength = max(float(brief.semantic_strength or 0.0), 0.58)
|
|
return brief.model_copy(
|
|
update={
|
|
"must_phrases": must[:12],
|
|
"exclude_phrases": exclude[:12],
|
|
"development_arc": arc[:8],
|
|
"semantic_strength": min(1.0, strength),
|
|
}
|
|
)
|
|
|
|
|
|
def exercise_passes_stage_learning_goal_gate(
|
|
*,
|
|
learning_goal: str,
|
|
title: str,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
semantic_score: float = 0.0,
|
|
min_semantic: float = 0.20,
|
|
relaxed: bool = False,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
stage_brief: Optional[PlanningSemanticBrief] = None,
|
|
stage_semantic_score: Optional[float] = None,
|
|
step_phase: Optional[str] = None,
|
|
) -> bool:
|
|
"""Roadmap-Stufe: delegiert an exercise_passes_stage_fit (Titel + Summary + Goal)."""
|
|
del semantic_score, min_semantic
|
|
return exercise_passes_stage_fit(
|
|
learning_goal=learning_goal,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
stage_brief=stage_brief,
|
|
stage_semantic_score=stage_semantic_score,
|
|
anti_patterns=anti_patterns,
|
|
step_phase=step_phase,
|
|
relaxed=relaxed,
|
|
)
|
|
|
|
|
|
def exercise_passes_path_semantic_gate(
|
|
*,
|
|
semantic_score: float,
|
|
title: str,
|
|
brief: PlanningSemanticBrief,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
strict: bool = True,
|
|
) -> bool:
|
|
if brief.semantic_strength < 0.55:
|
|
return True
|
|
|
|
blob = _blob_from_fields(title, summary, goal, [])
|
|
min_score = 0.18 if strict else 0.06
|
|
if semantic_score >= min_score:
|
|
return True
|
|
|
|
topic = brief.primary_topic or ""
|
|
if topic and _phrase_in_blob(topic, blob):
|
|
return True
|
|
|
|
if not strict:
|
|
# Mae Geri oft im Fließtext, nicht im Titel
|
|
if semantic_score >= 0.04 and topic and _phrase_in_blob(topic, blob):
|
|
return True
|
|
parts = topic.split()
|
|
if len(parts) >= 2 and all(_phrase_in_blob(p, blob) for p in parts):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def pick_best_path_hit(
|
|
hits: List[Dict[str, Any]],
|
|
used_exercise_ids: Set[int],
|
|
*,
|
|
semantic_brief: Optional[PlanningSemanticBrief] = None,
|
|
stage_learning_goal: Optional[str] = None,
|
|
stage_anti_patterns: Optional[Sequence[str]] = None,
|
|
roadmap_stage_match: bool = False,
|
|
stage_match_brief: Optional[PlanningSemanticBrief] = None,
|
|
path_primary_topic: Optional[str] = None,
|
|
path_technique_excludes: Optional[Sequence[str]] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Gestufte Auswahl: strikt → relaxed → optional Notfall-Fallback."""
|
|
if not hits:
|
|
return None
|
|
|
|
stage_goal = (stage_learning_goal or "").strip()
|
|
|
|
stage_brief: Optional[PlanningSemanticBrief] = stage_match_brief
|
|
if roadmap_stage_match and stage_goal and stage_brief is None:
|
|
stage_brief = build_stage_match_brief(
|
|
learning_goal=stage_goal,
|
|
anti_patterns=stage_anti_patterns,
|
|
)
|
|
|
|
def _scan(*, strict: bool) -> Optional[Dict[str, Any]]:
|
|
best: Optional[Dict[str, Any]] = None
|
|
best_key: Tuple[float, float] = (-1.0, -1.0)
|
|
for hit in hits:
|
|
eid = int(hit["id"])
|
|
if eid in used_exercise_ids:
|
|
continue
|
|
title = str(hit.get("title") or "")
|
|
summary = str(hit.get("summary") or "")
|
|
goal_text = str(hit.get("goal") or hit.get("exercise_goal") or "")
|
|
sem = float(hit.get("semantic_score") or 0.0)
|
|
stage_sem = float(
|
|
hit.get("stage_rank_semantic")
|
|
or hit.get("stage_semantic_score")
|
|
or sem
|
|
)
|
|
|
|
if roadmap_stage_match and stage_goal:
|
|
if not exercise_passes_stage_fit(
|
|
learning_goal=stage_goal,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal_text,
|
|
stage_brief=stage_brief,
|
|
stage_semantic_score=stage_sem,
|
|
anti_patterns=stage_anti_patterns,
|
|
path_primary_topic=path_primary_topic,
|
|
path_technique_excludes=path_technique_excludes,
|
|
relaxed=not strict,
|
|
):
|
|
continue
|
|
else:
|
|
if semantic_brief and not exercise_passes_path_semantic_gate(
|
|
semantic_score=sem,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal_text,
|
|
brief=semantic_brief,
|
|
strict=strict,
|
|
):
|
|
continue
|
|
|
|
score = float(hit.get("score") or 0.0)
|
|
rank_sem = stage_sem if roadmap_stage_match and stage_goal else sem
|
|
key = (rank_sem, score)
|
|
if key > best_key:
|
|
best_key = key
|
|
best = hit
|
|
return best
|
|
|
|
chosen = _scan(strict=True)
|
|
if chosen:
|
|
return chosen
|
|
|
|
if roadmap_stage_match:
|
|
chosen = _scan(strict=False)
|
|
return chosen
|
|
|
|
chosen = _scan(strict=False)
|
|
if chosen:
|
|
return chosen
|
|
|
|
# Notfall (nur retrieval-first / Brücken): bester verbleibender Treffer
|
|
fallback: Optional[Dict[str, Any]] = None
|
|
fallback_key: Tuple[float, float] = (-1.0, -1.0)
|
|
for hit in hits:
|
|
eid = int(hit["id"])
|
|
if eid in used_exercise_ids:
|
|
continue
|
|
sem = float(hit.get("semantic_score") or 0.0)
|
|
score = float(hit.get("score") or 0.0)
|
|
if sem <= 0 and semantic_brief and semantic_brief.primary_topic:
|
|
topic = semantic_brief.primary_topic
|
|
blob = (str(hit.get("title") or "") + " " + str(hit.get("summary") or "")).lower()
|
|
if not _phrase_in_blob(topic, blob):
|
|
continue
|
|
key = (sem, score)
|
|
if key > fallback_key:
|
|
fallback_key = key
|
|
fallback = hit
|
|
return fallback
|
|
|
|
|
|
__all__ = [
|
|
"PlanningSemanticBrief",
|
|
"apply_dynamic_retrieval_weights",
|
|
"apply_path_retrieval_weights",
|
|
"brief_to_summary_dict",
|
|
"build_semantic_brief",
|
|
"enrich_target_with_semantic_expectations",
|
|
"exercise_passes_path_semantic_gate",
|
|
"StageGoalConstraints",
|
|
"apply_stage_match_retrieval_weights",
|
|
"build_stage_match_brief",
|
|
"enrich_brief_with_path_constraints",
|
|
"exercise_passes_stage_fit",
|
|
"exercise_title_equivalent_to_stage_goal",
|
|
"resolve_path_primary_topic",
|
|
"resolve_path_anti_patterns",
|
|
"exercise_passes_stage_learning_goal_gate",
|
|
"merge_semantic_brief_llm",
|
|
"parse_stage_goal_constraints",
|
|
"pick_best_path_hit",
|
|
"exercise_passes_technique_path_scope",
|
|
"score_exercise_stage_fit",
|
|
"semantic_brief_for_stage",
|
|
"technique_sibling_excludes",
|
|
"resolve_semantic_skill_weights",
|
|
"score_exercise_semantic_relevance",
|
|
"semantic_core_phrases",
|
|
"step_phase_for_index",
|
|
"step_retrieval_query",
|
|
"try_enrich_semantic_brief_with_llm",
|
|
]
|