All checks were successful
Test Suite / pytest-backend (push) Successful in 41s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m18s
Deploy Development / deploy (push) Successful in 44s
- Updated `suggest_progression_path` to include AI-generated gap fill offers when exercises are missing, improving the relevance of suggested paths. - Introduced a match summary to provide insights on library matches and gap fill offers, enhancing user feedback in the `ProgressionGraphEditor`. - Refined the `pick_best_path_hit` function to ensure proper handling of roadmap stage matches based on primary topics. - Added tests to validate the new gap fill offer logic and match summary functionality, ensuring robustness in path suggestion features.
1349 lines
44 KiB
Python
1349 lines
44 KiB
Python
"""
|
|
Planungs-KI Phase E: Semantik-Schicht für Anfrage-Verständnis und Retrieval.
|
|
|
|
Trennt anfrage-spezifische Semantik (Technik, Phrasen, Entwicklungsbogen) vom
|
|
Katalog-Profil-Overlay (Fokus/Skills). Wird in Hybrid-Retrieval und Pfad-QA genutzt.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple
|
|
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
from ai_prompt_runtime import AiPromptUnavailableError, load_and_render_ai_prompt
|
|
from exercise_ai import strip_html_to_plain
|
|
from openrouter_chat import (
|
|
effective_openrouter_model_for_prompt_row,
|
|
normalize_openrouter_env,
|
|
openrouter_chat_completion,
|
|
)
|
|
|
|
_logger = logging.getLogger("shinkan.planning_exercise_semantics")
|
|
|
|
_GERI_TECHNIQUES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
|
|
("mae geri", ("mawashi geri", "yoko geri", "ushiro geri", "sakuto geri", "mikazuki geri")),
|
|
("mawashi geri", ("mae geri", "yoko geri", "ushiro geri", "sakuto geri")),
|
|
("yoko geri", ("mae geri", "mawashi geri", "ushiro geri", "sakuto geri")),
|
|
("ushiro geri", ("mae geri", "mawashi geri", "yoko geri", "sakuto geri")),
|
|
("sakuto geri", ("mae geri", "mawashi geri", "yoko geri", "mikazuki geri")),
|
|
("mikazuki geri", ("mae geri", "mawashi geri", "sakuto geri")),
|
|
)
|
|
|
|
_OTHER_TECHNIQUE_PATTERNS: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
|
|
("oi zuki", ("gyaku zuki", "age uke", "gedan barai")),
|
|
("gyaku zuki", ("oi zuki", "mae geri")),
|
|
("age uke", ("gedan barai", "soto uke")),
|
|
("gedan barai", ("age uke", "soto uke")),
|
|
)
|
|
|
|
_TECHNIQUE_EXPECTED_SKILLS: Dict[str, Tuple[str, ...]] = {
|
|
"mae geri": ("Geri Waza", "Koordination", "Gleichgewicht", "Kime"),
|
|
"mawashi geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"yoko geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"ushiro geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"sakuto geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
"mikazuki geri": ("Geri Waza", "Koordination", "Gleichgewicht"),
|
|
}
|
|
|
|
_DEFAULT_TECHNIQUE_SKILLS: Tuple[str, ...] = ("Geri Waza", "Koordination", "Gleichgewicht")
|
|
|
|
_ARC_PHASES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
|
|
("einstieg", ("einstieg", "erlernen", "lernen", "anfänger", "anfaenger", "beginn", "grund")),
|
|
("grundlage", ("grundlage", "fundament", "basis", "basic")),
|
|
("vertiefung", ("vertief", "festigung", "übung", "uebung", "wiederhol")),
|
|
("anwendung", ("anwend", "partner", "kampf", "kumite", "reaktion")),
|
|
("perfektion", ("perfekt", "meisterschaft", "höchst", "hoechst", "kime", "sauber")),
|
|
)
|
|
|
|
_PHASE_QUERY_HINTS: Dict[str, str] = {
|
|
"einstieg": "einstieg grundübung einfach",
|
|
"grundlage": "grundtechnik festigung",
|
|
"vertiefung": "vertiefung technik übung",
|
|
"anwendung": "anwendung partner variante",
|
|
"perfektion": "perfektion kontrolle kime höchste stufe",
|
|
}
|
|
|
|
_QUERY_STOPWORDS = frozenset(
|
|
{
|
|
"von",
|
|
"bis",
|
|
"zur",
|
|
"zum",
|
|
"der",
|
|
"die",
|
|
"das",
|
|
"des",
|
|
"den",
|
|
"dem",
|
|
"ein",
|
|
"eine",
|
|
"einer",
|
|
"eines",
|
|
"und",
|
|
"oder",
|
|
"mit",
|
|
"für",
|
|
"fuer",
|
|
"im",
|
|
"in",
|
|
"am",
|
|
"an",
|
|
"auf",
|
|
"aus",
|
|
"beim",
|
|
"nach",
|
|
"vor",
|
|
"über",
|
|
"ueber",
|
|
"unter",
|
|
"wie",
|
|
"was",
|
|
"wo",
|
|
"wir",
|
|
"soll",
|
|
"sollen",
|
|
"bitte",
|
|
"schlage",
|
|
"vorschlag",
|
|
"übung",
|
|
"uebung",
|
|
"übungen",
|
|
"uebungen",
|
|
}
|
|
)
|
|
|
|
|
|
class PlanningSemanticBrief(BaseModel):
|
|
primary_topic: Optional[str] = Field(default=None, max_length=120)
|
|
topic_type: str = Field(default="general", max_length=40)
|
|
must_phrases: List[str] = Field(default_factory=list)
|
|
exclude_phrases: List[str] = Field(default_factory=list)
|
|
development_arc: List[str] = Field(default_factory=list)
|
|
retrieval_query: str = Field(default="", max_length=500)
|
|
semantic_strength: float = Field(default=0.0, ge=0.0, le=1.0)
|
|
rationale: Optional[str] = Field(default=None, max_length=400)
|
|
|
|
@field_validator("topic_type")
|
|
@classmethod
|
|
def _topic_type(cls, v: str) -> str:
|
|
s = (v or "general").strip().lower()
|
|
return s if s in {"general", "technique", "focus", "method", "skill"} else "general"
|
|
|
|
@field_validator("must_phrases", "exclude_phrases", "development_arc", mode="before")
|
|
@classmethod
|
|
def _norm_phrase_list(cls, v: Any) -> List[str]:
|
|
if not v:
|
|
return []
|
|
if isinstance(v, str):
|
|
s = _normalize_phrase(v)
|
|
return [s] if s else []
|
|
out: List[str] = []
|
|
for item in v:
|
|
s = _normalize_phrase(str(item or ""))
|
|
if s and s not in out:
|
|
out.append(s[:120])
|
|
return out[:12]
|
|
|
|
|
|
def _normalize_phrase(text: str) -> str:
|
|
return re.sub(r"\s+", " ", (text or "").strip().lower())
|
|
|
|
|
|
def _normalize_query(text: str) -> str:
|
|
return re.sub(r"\s+", " ", (text or "").strip())
|
|
|
|
|
|
def _extract_json_object(text: str) -> Dict[str, Any]:
|
|
s = (text or "").strip()
|
|
if s.startswith("```"):
|
|
s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s)
|
|
if s.endswith("```"):
|
|
s = s[:-3].strip()
|
|
start = s.find("{")
|
|
end = s.rfind("}")
|
|
if start < 0 or end <= start:
|
|
raise ValueError("Kein JSON-Objekt in LLM-Antwort")
|
|
obj = json.loads(s[start : end + 1])
|
|
if not isinstance(obj, dict):
|
|
raise ValueError("LLM-Antwort ist kein JSON-Objekt")
|
|
return obj
|
|
|
|
|
|
def _find_technique_in_text(q_lower: str) -> Optional[Tuple[str, Tuple[str, ...]]]:
|
|
for primary, excludes in _GERI_TECHNIQUES + _OTHER_TECHNIQUE_PATTERNS:
|
|
if primary in q_lower:
|
|
return primary, excludes
|
|
return None
|
|
|
|
|
|
def resolve_path_primary_topic(
|
|
goal_query: str,
|
|
semantic_brief: Optional[PlanningSemanticBrief] = None,
|
|
*,
|
|
stage_learning_goal: Optional[str] = None,
|
|
extra_context: Optional[str] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Haupttechnik aus Anfrage, Kontext oder Stufen-Lernziel — nicht nur aus goal_query.
|
|
"""
|
|
if semantic_brief:
|
|
primary = (semantic_brief.primary_topic or "").strip()
|
|
if primary:
|
|
return primary
|
|
parts = [goal_query or "", extra_context or "", stage_learning_goal or ""]
|
|
combined = _normalize_phrase(" ".join(p for p in parts if p))
|
|
if not combined:
|
|
return None
|
|
hit = _find_technique_in_text(combined.lower())
|
|
return hit[0] if hit else None
|
|
|
|
|
|
def technique_sibling_excludes(primary_topic: str) -> List[str]:
|
|
"""Andere Techniken derselben Familie (z. B. Mae/Yoko bei Mawashi) — aus Katalog."""
|
|
topic = _normalize_phrase(primary_topic)
|
|
if not topic:
|
|
return []
|
|
hit = _find_technique_in_text(topic)
|
|
if not hit:
|
|
return []
|
|
out: List[str] = []
|
|
for raw in hit[1]:
|
|
for expanded in _expand_stage_exclude_phrase(raw):
|
|
if expanded and expanded not in out:
|
|
out.append(expanded)
|
|
return out[:16]
|
|
|
|
|
|
def exercise_passes_technique_path_scope(
|
|
*,
|
|
primary_topic: str,
|
|
title: str,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
learning_goal: str = "",
|
|
sibling_excludes: Optional[Sequence[str]] = None,
|
|
relaxed: bool = False,
|
|
) -> bool:
|
|
"""
|
|
Technik-Pfad: keine Geschwister-Technik; Haupttechnik muss im Übungstext vorkommen.
|
|
|
|
Das Stufen-Lernziel allein reicht nicht — sonst würden themenfremde Übungen (z. B. Kumite)
|
|
nur wegen „Mawashi Geri“ im Lernziel durch das Gate rutschen.
|
|
"""
|
|
primary = _normalize_phrase(primary_topic)
|
|
if not primary:
|
|
return True
|
|
|
|
blob = _blob_from_fields(title, summary, goal, [])
|
|
excludes = list(sibling_excludes or technique_sibling_excludes(primary))
|
|
if excludes and _blob_matches_stage_excludes(blob, excludes):
|
|
return False
|
|
|
|
if _phrase_in_blob(primary, blob):
|
|
return True
|
|
|
|
if relaxed:
|
|
parts = [p for p in primary.split() if len(p) >= 4]
|
|
if parts and any(_phrase_in_blob(part, blob) for part in parts):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _detect_development_arc(q_lower: str) -> List[str]:
|
|
found: List[str] = []
|
|
for phase, markers in _ARC_PHASES:
|
|
if any(m in q_lower for m in markers):
|
|
if phase not in found:
|
|
found.append(phase)
|
|
if not found and ("von" in q_lower and "bis" in q_lower):
|
|
found = ["einstieg", "perfektion"]
|
|
return found
|
|
|
|
|
|
def _keyword_phrases_from_query(query: str) -> List[str]:
|
|
q = _normalize_query(query).lower()
|
|
tokens = re.findall(r"[a-zäöüß]{3,}", q, flags=re.IGNORECASE)
|
|
phrases: List[str] = []
|
|
for i, tok in enumerate(tokens):
|
|
low = tok.lower()
|
|
if low in _QUERY_STOPWORDS:
|
|
continue
|
|
if i + 1 < len(tokens):
|
|
nxt = tokens[i + 1].lower()
|
|
if nxt not in _QUERY_STOPWORDS:
|
|
pair = _normalize_phrase(f"{low} {nxt}")
|
|
if len(pair) >= 5 and pair not in phrases:
|
|
phrases.append(pair)
|
|
if len(low) >= 4 and low not in phrases:
|
|
phrases.append(low)
|
|
return phrases[:6]
|
|
|
|
|
|
def build_semantic_brief(query: Optional[str]) -> PlanningSemanticBrief:
|
|
"""Deterministisches Anfrage-Verständnis — ohne LLM."""
|
|
q = _normalize_query(query)
|
|
if not q:
|
|
return PlanningSemanticBrief(retrieval_query="", semantic_strength=0.0)
|
|
|
|
q_lower = q.lower()
|
|
must: List[str] = []
|
|
exclude: List[str] = []
|
|
topic_type = "general"
|
|
primary: Optional[str] = None
|
|
strength = 0.25
|
|
|
|
technique = _find_technique_in_text(q_lower)
|
|
if technique:
|
|
primary, ex = technique
|
|
must.append(primary)
|
|
exclude.extend(list(ex))
|
|
topic_type = "technique"
|
|
strength = max(strength, 0.82)
|
|
|
|
arc = _detect_development_arc(q_lower)
|
|
if arc:
|
|
strength = max(strength, 0.55 if technique else 0.45)
|
|
|
|
# Keine generischen Stichwörter in must_phrases — sonst verwässert das Scoring.
|
|
retrieval_parts = list(must)
|
|
if primary:
|
|
retrieval_parts.append(primary)
|
|
if arc:
|
|
retrieval_parts.extend(arc[:2])
|
|
retrieval = " ".join(dict.fromkeys(retrieval_parts))[:500] if retrieval_parts else q
|
|
|
|
if len(q) >= 24 and not technique:
|
|
strength = max(strength, 0.4)
|
|
|
|
path_constraints = parse_stage_goal_constraints(q)
|
|
for item in path_constraints.exclude_phrases:
|
|
if item not in exclude:
|
|
exclude.append(item)
|
|
|
|
return PlanningSemanticBrief(
|
|
primary_topic=primary,
|
|
topic_type=topic_type,
|
|
must_phrases=must[:8],
|
|
exclude_phrases=exclude[:10],
|
|
development_arc=arc[:5],
|
|
retrieval_query=retrieval[:500],
|
|
semantic_strength=min(1.0, round(strength, 3)),
|
|
rationale=None,
|
|
)
|
|
|
|
|
|
def merge_semantic_brief_llm(
|
|
base: PlanningSemanticBrief,
|
|
llm_obj: Mapping[str, Any],
|
|
) -> PlanningSemanticBrief:
|
|
"""LLM-Enrichment in deterministisches Brief mergen (LLM ergänzt, ersetzt nicht harte Technik-Regeln)."""
|
|
data = base.model_dump()
|
|
for key in ("primary_topic", "topic_type", "rationale"):
|
|
val = llm_obj.get(key)
|
|
if val:
|
|
data[key] = val
|
|
|
|
for key in ("must_phrases", "exclude_phrases", "development_arc"):
|
|
extra = llm_obj.get(key) or []
|
|
merged = list(data.get(key) or [])
|
|
for item in extra:
|
|
s = _normalize_phrase(str(item or ""))
|
|
if s and s not in merged:
|
|
merged.append(s)
|
|
data[key] = merged[:12]
|
|
|
|
llm_strength = llm_obj.get("semantic_strength")
|
|
if llm_strength is not None:
|
|
try:
|
|
data["semantic_strength"] = min(
|
|
1.0,
|
|
max(float(data["semantic_strength"]), float(llm_strength)),
|
|
)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if data.get("must_phrases"):
|
|
core = semantic_core_phrases(PlanningSemanticBrief.model_validate(data))
|
|
data["retrieval_query"] = " ".join(core[:4])[:500] if core else data.get("retrieval_query", "")
|
|
out = PlanningSemanticBrief.model_validate(data)
|
|
if out.primary_topic and out.topic_type == "general":
|
|
out = out.model_copy(update={"topic_type": "technique"})
|
|
return out
|
|
|
|
|
|
def try_enrich_semantic_brief_with_llm(
|
|
cur,
|
|
query: str,
|
|
base: PlanningSemanticBrief,
|
|
) -> Tuple[PlanningSemanticBrief, bool]:
|
|
api_key, _ = normalize_openrouter_env()
|
|
if not api_key or base.semantic_strength < 0.35:
|
|
return base, False
|
|
if not (query or "").strip():
|
|
return base, False
|
|
|
|
variables = {
|
|
"search_query": (query or "").strip(),
|
|
"semantic_brief_json": json.dumps(brief_to_summary_dict(base), ensure_ascii=False),
|
|
}
|
|
try:
|
|
prow, rendered = load_and_render_ai_prompt(cur, "planning_exercise_query_semantics", variables)
|
|
model = effective_openrouter_model_for_prompt_row(prow)
|
|
raw = openrouter_chat_completion(api_key=api_key, model=model, user_content=rendered.text)
|
|
obj = _extract_json_object(raw)
|
|
return merge_semantic_brief_llm(base, obj), True
|
|
except AiPromptUnavailableError:
|
|
return base, False
|
|
except Exception as exc:
|
|
_logger.warning("Semantik-LLM fehlgeschlagen: %s", exc)
|
|
return base, False
|
|
|
|
|
|
def brief_to_summary_dict(brief: PlanningSemanticBrief) -> Dict[str, Any]:
|
|
return {
|
|
"primary_topic": brief.primary_topic,
|
|
"topic_type": brief.topic_type,
|
|
"must_phrases": list(brief.must_phrases),
|
|
"exclude_phrases": list(brief.exclude_phrases),
|
|
"development_arc": list(brief.development_arc),
|
|
"retrieval_query": brief.retrieval_query,
|
|
"semantic_strength": brief.semantic_strength,
|
|
"rationale": brief.rationale,
|
|
}
|
|
|
|
|
|
def step_phase_for_index(brief: PlanningSemanticBrief, step_index: int, max_steps: int) -> Optional[str]:
|
|
arc = list(brief.development_arc or [])
|
|
if not arc:
|
|
if max_steps <= 1:
|
|
return None
|
|
default_arc = ["einstieg", "grundlage", "vertiefung", "anwendung", "perfektion"]
|
|
arc = default_arc[:max_steps] if brief.semantic_strength >= 0.5 else []
|
|
if not arc:
|
|
return None
|
|
if len(arc) == 1:
|
|
return arc[0]
|
|
pos = step_index / max(max_steps - 1, 1)
|
|
idx = min(len(arc) - 1, int(round(pos * (len(arc) - 1))))
|
|
return arc[idx]
|
|
|
|
|
|
def step_retrieval_query(
|
|
brief: PlanningSemanticBrief,
|
|
goal_query: str,
|
|
step_index: int,
|
|
max_steps: int,
|
|
) -> str:
|
|
phase = step_phase_for_index(brief, step_index, max_steps)
|
|
parts: List[str] = []
|
|
if brief.primary_topic:
|
|
parts.append(brief.primary_topic)
|
|
elif brief.retrieval_query:
|
|
parts.append(brief.retrieval_query.split()[0] if brief.retrieval_query else "")
|
|
if phase:
|
|
parts.append(phase)
|
|
if not parts and brief.retrieval_query:
|
|
parts.append(brief.retrieval_query)
|
|
elif not parts and goal_query:
|
|
parts.append(goal_query)
|
|
return _normalize_query(" ".join(p for p in parts if p)) or _normalize_query(goal_query)
|
|
|
|
|
|
def apply_dynamic_retrieval_weights(
|
|
base_weights: Mapping[str, float],
|
|
brief: PlanningSemanticBrief,
|
|
*,
|
|
scenario: str,
|
|
has_planning_reference: bool,
|
|
) -> Dict[str, float]:
|
|
"""Semantik-Kanal dynamisch gegen Profil/Plan abwägen."""
|
|
out = dict(base_weights)
|
|
sem = float(brief.semantic_strength or 0.0)
|
|
if sem <= 0.05:
|
|
out.setdefault("semantic", 0.0)
|
|
return out
|
|
|
|
query_driven = scenario == "free_search" or not has_planning_reference
|
|
sem_weight = 0.12 + sem * (0.38 if query_driven else 0.22)
|
|
out["semantic"] = round(sem_weight, 4)
|
|
|
|
if query_driven:
|
|
scale = 1.0 - sem * 0.35
|
|
out["fulltext"] = round(float(out.get("fulltext", 0.18)) * scale, 4)
|
|
out["profile"] = round(float(out.get("profile", 0.22)) * (1.0 - sem * 0.25), 4)
|
|
else:
|
|
out["fulltext"] = round(float(out.get("fulltext", 0.18)) * (1.0 - sem * 0.15), 4)
|
|
|
|
total = sum(v for k, v in out.items() if k not in {"repeat_unit", "repeat_group"} and v > 0)
|
|
if total > 0.92:
|
|
factor = 0.88 / total
|
|
for k in list(out.keys()):
|
|
if k in {"repeat_unit", "repeat_group"}:
|
|
continue
|
|
if out[k] > 0:
|
|
out[k] = round(out[k] * factor, 4)
|
|
return out
|
|
|
|
|
|
def _blob_from_fields(
|
|
title: str,
|
|
summary: str,
|
|
goal: str,
|
|
variant_names: Sequence[str],
|
|
) -> str:
|
|
parts = [title or "", strip_html_to_plain(summary, max_len=600), strip_html_to_plain(goal, max_len=800)]
|
|
parts.extend(variant_names or [])
|
|
return " ".join(p for p in parts if p).lower()
|
|
|
|
|
|
def _compact_alpha(text: str) -> str:
|
|
return re.sub(r"[^a-z0-9äöüß]+", "", (text or "").lower())
|
|
|
|
|
|
def _phrase_in_blob(phrase: str, blob: str) -> bool:
|
|
ph = _normalize_phrase(phrase)
|
|
if not ph or not blob:
|
|
return False
|
|
low = blob.lower()
|
|
if ph in low:
|
|
return True
|
|
if _compact_alpha(ph) and _compact_alpha(ph) in _compact_alpha(low):
|
|
return True
|
|
if " " not in ph:
|
|
return bool(re.search(rf"\b{re.escape(ph)}\b", low))
|
|
return ph in low
|
|
|
|
|
|
def score_exercise_semantic_relevance(
|
|
*,
|
|
title: str,
|
|
summary: str,
|
|
goal: str,
|
|
variant_names: Sequence[str],
|
|
brief: PlanningSemanticBrief,
|
|
step_phase: Optional[str] = None,
|
|
) -> Tuple[float, List[str]]:
|
|
if brief.semantic_strength <= 0.05:
|
|
return 0.0, []
|
|
|
|
blob = _blob_from_fields(title, summary, goal, variant_names)
|
|
if not blob.strip():
|
|
return 0.0, []
|
|
|
|
reasons: List[str] = []
|
|
must = list(brief.must_phrases or [])
|
|
exclude = list(brief.exclude_phrases or [])
|
|
core = semantic_core_phrases(brief)
|
|
|
|
core_hits = sum(1 for ph in core if _phrase_in_blob(ph, blob))
|
|
must_hits = sum(1 for ph in must if _phrase_in_blob(ph, blob))
|
|
exclude_hits = sum(1 for ph in exclude if _phrase_excluded_in_blob(ph, blob))
|
|
|
|
score = 0.0
|
|
if core:
|
|
core_ratio = core_hits / len(core)
|
|
score += 0.62 * core_ratio
|
|
if core_hits == len(core):
|
|
reasons.append("Kern-Thema der Anfrage im Übungstext")
|
|
elif core_hits > 0:
|
|
reasons.append("Teilweise passend zum Kern-Thema")
|
|
elif brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob):
|
|
score += 0.55
|
|
reasons.append(f"Thema „{brief.primary_topic}“ im Übungstext")
|
|
|
|
if must and core != must:
|
|
extra_ratio = must_hits / len(must)
|
|
score += 0.12 * extra_ratio
|
|
|
|
primary_ok = bool(core_hits) or (
|
|
brief.primary_topic and _phrase_in_blob(brief.primary_topic, blob)
|
|
)
|
|
if exclude_hits > 0 and not primary_ok:
|
|
penalty = min(0.65, 0.22 * exclude_hits)
|
|
score -= penalty
|
|
reasons.append("Enthält ausgeschlossene Nebenthemen")
|
|
elif exclude_hits > 0 and primary_ok:
|
|
score -= min(0.12, 0.06 * exclude_hits)
|
|
|
|
if step_phase and step_phase in _PHASE_QUERY_HINTS:
|
|
phase_markers = next((markers for phase, markers in _ARC_PHASES if phase == step_phase), ())
|
|
if any(m in blob for m in phase_markers) or step_phase in blob:
|
|
score += 0.12
|
|
reasons.append(f"Passt zur Pfad-Phase „{step_phase}“")
|
|
|
|
if brief.development_arc and not step_phase:
|
|
arc_hits = sum(1 for phase in brief.development_arc if phase in blob)
|
|
if arc_hits:
|
|
score += min(0.15, 0.05 * arc_hits)
|
|
|
|
return max(0.0, min(1.0, round(score, 4))), reasons[:4]
|
|
|
|
|
|
def semantic_core_phrases(brief: PlanningSemanticBrief) -> List[str]:
|
|
"""Harte Kernphrasen fürs Matching."""
|
|
if brief.primary_topic:
|
|
return [_normalize_phrase(brief.primary_topic)]
|
|
core = [_normalize_phrase(p) for p in (brief.must_phrases or [])[:2] if p]
|
|
return [p for p in core if p]
|
|
|
|
|
|
def resolve_semantic_skill_weights(cur, brief: PlanningSemanticBrief) -> Dict[int, float]:
|
|
"""Deterministisches Fähigkeitserwartungsprofil aus Technik-Thema."""
|
|
topic = _normalize_phrase(brief.primary_topic or "")
|
|
if topic in _TECHNIQUE_EXPECTED_SKILLS:
|
|
names = list(_TECHNIQUE_EXPECTED_SKILLS[topic])
|
|
elif brief.topic_type == "technique" or "geri" in topic:
|
|
names = list(_DEFAULT_TECHNIQUE_SKILLS)
|
|
else:
|
|
return {}
|
|
|
|
weights: Dict[int, float] = {}
|
|
for name in names[:6]:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, name FROM skills
|
|
WHERE (status IS NULL OR status = 'active')
|
|
AND LOWER(name) LIKE %s
|
|
ORDER BY CASE WHEN LOWER(name) = %s THEN 0 WHEN LOWER(name) LIKE %s THEN 1 ELSE 2 END,
|
|
LENGTH(name) ASC
|
|
LIMIT 1
|
|
""",
|
|
(f"%{name.lower()}%", name.lower(), f"{name.lower()}%"),
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
sid = int(row["id"])
|
|
weights[sid] = max(weights.get(sid, 0.0), 1.0)
|
|
return weights
|
|
|
|
|
|
def enrich_target_with_semantic_expectations(
|
|
target,
|
|
*,
|
|
skill_weights: Dict[int, float],
|
|
):
|
|
from planning_exercise_profiles import PlanningTargetProfile, _merge_weight_maps, _normalize_weight_map
|
|
|
|
if not skill_weights:
|
|
return target
|
|
merged = _normalize_weight_map(_merge_weight_maps(dict(target.skill_weights), skill_weights, scale=1.0))
|
|
sources = list(target.sources)
|
|
if "semantic_expectation" not in sources:
|
|
sources.append("semantic_expectation")
|
|
return PlanningTargetProfile(
|
|
focus_area_ids=dict(target.focus_area_ids),
|
|
style_direction_ids=dict(target.style_direction_ids),
|
|
training_type_ids=dict(target.training_type_ids),
|
|
target_group_ids=dict(target.target_group_ids),
|
|
skill_weights=merged,
|
|
skill_gap_weights=dict(target.skill_gap_weights),
|
|
skill_plan_weights=dict(target.skill_plan_weights),
|
|
sources=sources,
|
|
)
|
|
|
|
|
|
def apply_path_retrieval_weights(brief: PlanningSemanticBrief) -> Dict[str, float]:
|
|
"""Pfad-Builder: Semantik + Profil dominieren."""
|
|
sem = float(brief.semantic_strength or 0.0)
|
|
if sem >= 0.65:
|
|
return {
|
|
"semantic": 0.50,
|
|
"fulltext": 0.16,
|
|
"profile": 0.26,
|
|
"progression": 0.04,
|
|
"skill": 0.04,
|
|
"plan": 0.0,
|
|
"repeat_unit": -0.40,
|
|
"repeat_group": -0.15,
|
|
}
|
|
if sem >= 0.35:
|
|
return {
|
|
"semantic": 0.38,
|
|
"fulltext": 0.18,
|
|
"profile": 0.28,
|
|
"progression": 0.06,
|
|
"skill": 0.06,
|
|
"plan": 0.04,
|
|
"repeat_unit": -0.35,
|
|
"repeat_group": -0.15,
|
|
}
|
|
return {
|
|
"semantic": 0.22,
|
|
"fulltext": 0.22,
|
|
"profile": 0.28,
|
|
"progression": 0.10,
|
|
"skill": 0.10,
|
|
"plan": 0.08,
|
|
"repeat_unit": -0.30,
|
|
"repeat_group": -0.15,
|
|
}
|
|
|
|
|
|
_STAGE_GOAL_STOPWORDS = _QUERY_STOPWORDS | frozenset(
|
|
{
|
|
"stufe",
|
|
"phase",
|
|
"lernziel",
|
|
"grundlage",
|
|
"vertiefung",
|
|
"anwendung",
|
|
"perfektion",
|
|
"einstieg",
|
|
"sicher",
|
|
"sauber",
|
|
"korrekt",
|
|
"technik",
|
|
"training",
|
|
}
|
|
)
|
|
|
|
|
|
_STAGE_NEGATION_PATTERNS = (
|
|
r"\bohne\s+([^,.;]+)",
|
|
r"\bkein(?:e|en|er|em)?\s+([^,.;]+)",
|
|
r"\bnicht\s+([^,.;]+)",
|
|
)
|
|
|
|
# Aus „ohne Tritttechnik“ etc. — erweiterte Treffer im Übungstext
|
|
_STAGE_EXCLUDE_ALIASES: Dict[str, Tuple[str, ...]] = {
|
|
"tritttechnik": (
|
|
"tritttechnik",
|
|
"trittpraezision",
|
|
"trittpräzision",
|
|
"tritt praesision",
|
|
"tritt-präzision",
|
|
"kicktechnik",
|
|
"tritt ausführung",
|
|
"tritt ausfuehrung",
|
|
),
|
|
"kumite": ("kumite", "partnerkampf", "freikampf", "jiyu kumite"),
|
|
"kraftuebung": ("kraftuebung", "kraftübung", "krafttraining", "kraftübungen"),
|
|
"anwendung": ("kumite anwendung", "kampfanwendung"),
|
|
}
|
|
|
|
_STAGE_FOCUS_TOKENS = frozenset(
|
|
{
|
|
"koordination",
|
|
"absprung",
|
|
"beinhebung",
|
|
"landung",
|
|
"sprung",
|
|
"sprungphase",
|
|
"balance",
|
|
"gleichgewicht",
|
|
"timing",
|
|
"vorbereitung",
|
|
"athletik",
|
|
"mobilitaet",
|
|
"mobilität",
|
|
"stabilisation",
|
|
"stabilisierung",
|
|
}
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class StageGoalConstraints:
|
|
positive_tokens: List[str] = field(default_factory=list)
|
|
exclude_phrases: List[str] = field(default_factory=list)
|
|
has_negation: bool = False
|
|
strict_positive: bool = False
|
|
|
|
|
|
def _expand_stage_exclude_phrase(phrase: str) -> List[str]:
|
|
norm = _normalize_phrase(phrase)
|
|
if not norm:
|
|
return []
|
|
out: List[str] = [norm]
|
|
compact = norm.replace(" ", "")
|
|
if compact and compact not in out:
|
|
out.append(compact)
|
|
for key, aliases in _STAGE_EXCLUDE_ALIASES.items():
|
|
if key in norm or norm in key:
|
|
for alias in aliases:
|
|
a = _normalize_phrase(alias)
|
|
if a and a not in out:
|
|
out.append(a)
|
|
return out[:12]
|
|
|
|
|
|
def _significant_stage_tokens(learning_goal: str, *, strip_negated: bool = True) -> List[str]:
|
|
"""Wörter aus Stufen-Lernziel für Text-Match (ohne Füllwörter, ohne Negationssegmente)."""
|
|
text = _normalize_phrase(learning_goal)
|
|
if strip_negated:
|
|
for pat in _STAGE_NEGATION_PATTERNS:
|
|
text = re.sub(pat, " ", text)
|
|
raw = re.findall(r"[a-zäöüß]{4,}", text, flags=re.IGNORECASE)
|
|
out: List[str] = []
|
|
for w in raw:
|
|
low = w.lower().replace("ä", "ae").replace("ö", "oe").replace("ü", "ue")
|
|
if low in _STAGE_GOAL_STOPWORDS:
|
|
continue
|
|
if low not in out:
|
|
out.append(low)
|
|
return out[:10]
|
|
|
|
|
|
def parse_stage_goal_constraints(
|
|
learning_goal: str,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
) -> StageGoalConstraints:
|
|
"""Positiv/Negativ aus Stufen-Lernziel + anti_patterns (Roadmap-Stufe)."""
|
|
lg = (learning_goal or "").strip()
|
|
if len(lg) < 3:
|
|
return StageGoalConstraints()
|
|
|
|
norm = _normalize_phrase(lg)
|
|
exclude: List[str] = []
|
|
has_negation = False
|
|
for pat in _STAGE_NEGATION_PATTERNS:
|
|
for m in re.finditer(pat, norm):
|
|
has_negation = True
|
|
chunk = (m.group(1) or "").strip()
|
|
if chunk:
|
|
exclude.extend(_expand_stage_exclude_phrase(chunk))
|
|
|
|
for raw in anti_patterns or []:
|
|
s = _normalize_phrase(str(raw or ""))
|
|
if s:
|
|
exclude.extend(_expand_stage_exclude_phrase(s))
|
|
|
|
positive = _significant_stage_tokens(lg, strip_negated=True)
|
|
focus_hits = [t for t in positive if t in _STAGE_FOCUS_TOKENS]
|
|
strict_positive = bool(focus_hits) or has_negation
|
|
|
|
dedup_exclude: List[str] = []
|
|
for item in exclude:
|
|
if item and item not in dedup_exclude:
|
|
dedup_exclude.append(item)
|
|
|
|
return StageGoalConstraints(
|
|
positive_tokens=positive,
|
|
exclude_phrases=dedup_exclude[:16],
|
|
has_negation=has_negation,
|
|
strict_positive=strict_positive,
|
|
)
|
|
|
|
|
|
def _phrase_excluded_in_blob(phrase: str, blob: str) -> bool:
|
|
"""Treffer nur wenn das Ausschluss-Thema nicht selbst negiert beschrieben ist."""
|
|
if not phrase or not blob:
|
|
return False
|
|
if not _phrase_in_blob(phrase, blob):
|
|
return False
|
|
norm = _normalize_phrase(phrase)
|
|
for pat in _STAGE_NEGATION_PATTERNS:
|
|
for m in re.finditer(pat, blob):
|
|
chunk = _normalize_phrase(m.group(1) or "")
|
|
if not chunk:
|
|
continue
|
|
if norm in chunk or chunk in norm or _phrase_in_blob(norm, chunk):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _blob_matches_stage_excludes(blob: str, exclude_phrases: Sequence[str]) -> bool:
|
|
for phrase in exclude_phrases:
|
|
if _phrase_excluded_in_blob(phrase, blob):
|
|
return True
|
|
return False
|
|
|
|
|
|
def resolve_path_anti_patterns(
|
|
goal_query: str,
|
|
*,
|
|
semantic_brief: Optional[PlanningSemanticBrief] = None,
|
|
extra_context: Optional[str] = None,
|
|
) -> List[str]:
|
|
"""
|
|
Pfadweite Ausschlüsse — nur aus expliziten Quellen, kein Themen-Raten.
|
|
|
|
Quellen (in dieser Reihenfolge):
|
|
1. Negationen in Anfrage/Kontext (ohne/kein/nicht …) via parse_stage_goal_constraints
|
|
2. exclude_phrases im Semantic Brief (inkl. LLM/Technik-Regeln)
|
|
3. stage_specs.anti_patterns (Roadmap-Stufe, vom Trainer oder LLM)
|
|
|
|
Keine stillen Ausschlüsse aus dem Hauptthema (z. B. „Mawashi“ → kein Kumite).
|
|
"""
|
|
parts = [str(goal_query or "").strip(), str(extra_context or "").strip()]
|
|
combined = " ".join(p for p in parts if p)
|
|
if not combined and not semantic_brief:
|
|
return []
|
|
|
|
constraints = parse_stage_goal_constraints(combined) if combined else StageGoalConstraints()
|
|
out: List[str] = []
|
|
for item in constraints.exclude_phrases:
|
|
if item and item not in out:
|
|
out.append(item)
|
|
|
|
if semantic_brief:
|
|
for raw in semantic_brief.exclude_phrases or []:
|
|
for expanded in _expand_stage_exclude_phrase(str(raw or "")):
|
|
if expanded and expanded not in out:
|
|
out.append(expanded)
|
|
|
|
return out[:24]
|
|
|
|
|
|
def enrich_brief_with_path_constraints(
|
|
brief: PlanningSemanticBrief,
|
|
goal_query: str,
|
|
*,
|
|
extra_context: Optional[str] = None,
|
|
) -> PlanningSemanticBrief:
|
|
"""Negationen/Ausschlüsse aus der Gesamtanfrage in den Semantic Brief übernehmen."""
|
|
anti = resolve_path_anti_patterns(
|
|
goal_query,
|
|
semantic_brief=brief,
|
|
extra_context=extra_context,
|
|
)
|
|
if not anti:
|
|
return brief
|
|
exclude = list(brief.exclude_phrases or [])
|
|
for item in anti:
|
|
if item not in exclude:
|
|
exclude.append(item)
|
|
return brief.model_copy(update={"exclude_phrases": exclude[:16]})
|
|
|
|
|
|
_MIN_STAGE_FIT_SEMANTIC = 0.30
|
|
_MIN_STAGE_FIT_RELAXED = 0.20
|
|
|
|
|
|
def build_stage_match_brief(
|
|
*,
|
|
learning_goal: str,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
success_criteria: Optional[Sequence[str]] = None,
|
|
load_profile: Optional[Sequence[str]] = None,
|
|
phase: Optional[str] = None,
|
|
path_context_note: Optional[str] = None,
|
|
path_anti_patterns: Optional[Sequence[str]] = None,
|
|
path_primary_topic: Optional[str] = None,
|
|
path_technique_excludes: Optional[Sequence[str]] = None,
|
|
stage_start_state: Optional[str] = None,
|
|
stage_target_state: Optional[str] = None,
|
|
path_target_state: Optional[str] = None,
|
|
contextualized_learning_goal: Optional[str] = None,
|
|
) -> PlanningSemanticBrief:
|
|
"""
|
|
Stufen-zentrierter Semantik-Brief — unabhängig vom Gesamt-Pfad-Thema.
|
|
|
|
Primär für Roadmap-Match: Bewertung gegen Titel + Kurzbeschreibung + Übungsziel.
|
|
"""
|
|
lg = (contextualized_learning_goal or learning_goal or "").strip()
|
|
if len(lg) < 3:
|
|
return PlanningSemanticBrief(semantic_strength=0.0)
|
|
|
|
merged_anti: List[str] = []
|
|
for raw in list(anti_patterns or []) + list(path_anti_patterns or []):
|
|
s = str(raw or "").strip()
|
|
if s and s not in merged_anti:
|
|
merged_anti.append(s)
|
|
primary_path = _normalize_phrase(path_primary_topic or "")
|
|
if primary_path:
|
|
for item in technique_sibling_excludes(primary_path):
|
|
if item not in merged_anti:
|
|
merged_anti.append(item)
|
|
for raw in path_technique_excludes or []:
|
|
for expanded in _expand_stage_exclude_phrase(str(raw or "")):
|
|
if expanded and expanded not in merged_anti:
|
|
merged_anti.append(expanded)
|
|
constraints = parse_stage_goal_constraints(lg, merged_anti)
|
|
must: List[str] = []
|
|
norm_lg = _normalize_phrase(lg)
|
|
if primary_path and primary_path not in must:
|
|
must.insert(0, primary_path[:120])
|
|
for token in constraints.positive_tokens:
|
|
if token not in must:
|
|
must.append(token)
|
|
if norm_lg and norm_lg not in must:
|
|
must.append(norm_lg[:120])
|
|
for raw in success_criteria or []:
|
|
s = _normalize_phrase(str(raw or ""))
|
|
if s and s not in must:
|
|
must.append(s[:100])
|
|
for raw in load_profile or []:
|
|
s = _normalize_phrase(str(raw or ""))
|
|
if s and s not in must:
|
|
must.append(s[:60])
|
|
|
|
retrieval_parts = [norm_lg]
|
|
for raw in (stage_start_state, stage_target_state, path_target_state):
|
|
s = _normalize_phrase(str(raw or ""))[:200]
|
|
if s and s not in retrieval_parts:
|
|
retrieval_parts.append(s)
|
|
if path_context_note:
|
|
note = _normalize_phrase(path_context_note)[:200]
|
|
if note:
|
|
retrieval_parts.append(note)
|
|
|
|
arc: List[str] = []
|
|
ph = (phase or "").strip().lower()
|
|
if ph:
|
|
arc.append(ph)
|
|
|
|
return PlanningSemanticBrief(
|
|
primary_topic="",
|
|
topic_type="focus",
|
|
must_phrases=must[:12],
|
|
exclude_phrases=list(constraints.exclude_phrases)[:12],
|
|
development_arc=arc[:4],
|
|
retrieval_query=" ".join(p for p in retrieval_parts if p)[:500],
|
|
semantic_strength=0.78,
|
|
rationale="stage_match_brief",
|
|
)
|
|
|
|
|
|
def score_exercise_stage_fit(
|
|
*,
|
|
title: str,
|
|
summary: str,
|
|
goal: str,
|
|
stage_brief: PlanningSemanticBrief,
|
|
variant_names: Optional[Sequence[str]] = None,
|
|
step_phase: Optional[str] = None,
|
|
) -> Tuple[float, List[str]]:
|
|
"""Semantik-Score Übung ↔ Stufen-Lernziel (Titel + Summary + Goal)."""
|
|
score, reasons = score_exercise_semantic_relevance(
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
variant_names=variant_names or [],
|
|
brief=stage_brief,
|
|
step_phase=step_phase,
|
|
)
|
|
blob = _blob_from_fields(title, summary, goal, variant_names or [])
|
|
focus_tokens = [
|
|
t
|
|
for t in (stage_brief.must_phrases or [])
|
|
if t and " " not in t and len(t) >= 4
|
|
][:6]
|
|
if focus_tokens:
|
|
hits = sum(1 for t in focus_tokens if _phrase_in_blob(t, blob))
|
|
ratio = hits / len(focus_tokens)
|
|
bonus = 0.28 * ratio
|
|
if bonus > 0:
|
|
score = min(1.0, score + bonus)
|
|
if hits >= max(1, len(focus_tokens) // 2):
|
|
reasons = ["Stufen-Schwerpunkte im Übungstext", *reasons]
|
|
return max(0.0, min(1.0, round(score, 4))), reasons[:4]
|
|
|
|
|
|
def exercise_passes_stage_fit(
|
|
*,
|
|
learning_goal: str,
|
|
title: str,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
stage_brief: Optional[PlanningSemanticBrief] = None,
|
|
stage_semantic_score: Optional[float] = None,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
step_phase: Optional[str] = None,
|
|
path_primary_topic: Optional[str] = None,
|
|
path_technique_excludes: Optional[Sequence[str]] = None,
|
|
min_stage_semantic: float = _MIN_STAGE_FIT_SEMANTIC,
|
|
relaxed: bool = False,
|
|
) -> bool:
|
|
"""Allgemeines Stufen-Fit-Gate: voller Übungstext vs. Stufen-Brief."""
|
|
lg = (learning_goal or "").strip()
|
|
if len(lg) < 3 and not (path_primary_topic or "").strip():
|
|
return True
|
|
|
|
blob = _blob_from_fields(title, summary, goal, [])
|
|
constraints = parse_stage_goal_constraints(lg, anti_patterns)
|
|
if constraints.exclude_phrases and _blob_matches_stage_excludes(blob, constraints.exclude_phrases):
|
|
return False
|
|
|
|
primary_path = (path_primary_topic or "").strip()
|
|
if not primary_path and lg:
|
|
hit = _find_technique_in_text(_normalize_phrase(lg))
|
|
if hit:
|
|
primary_path = hit[0]
|
|
tech_excludes = list(path_technique_excludes or [])
|
|
if primary_path:
|
|
for item in technique_sibling_excludes(primary_path):
|
|
if item not in tech_excludes:
|
|
tech_excludes.append(item)
|
|
if primary_path and not exercise_passes_technique_path_scope(
|
|
primary_topic=primary_path,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
learning_goal=lg,
|
|
sibling_excludes=tech_excludes,
|
|
relaxed=relaxed,
|
|
):
|
|
return False
|
|
|
|
brief = stage_brief or build_stage_match_brief(
|
|
learning_goal=lg,
|
|
anti_patterns=anti_patterns,
|
|
)
|
|
stage_sem = stage_semantic_score
|
|
if stage_sem is None:
|
|
stage_sem, _ = score_exercise_stage_fit(
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
stage_brief=brief,
|
|
step_phase=step_phase,
|
|
)
|
|
|
|
threshold = _MIN_STAGE_FIT_RELAXED if relaxed else min_stage_semantic
|
|
return float(stage_sem or 0.0) >= threshold
|
|
|
|
|
|
def apply_stage_match_retrieval_weights(brief: PlanningSemanticBrief) -> Dict[str, float]:
|
|
"""Roadmap-Stufe: Stufen-Semantik (Ziel/Summary/Goal) dominiert."""
|
|
return {
|
|
"semantic": 0.58,
|
|
"fulltext": 0.14,
|
|
"profile": 0.18,
|
|
"progression": 0.04,
|
|
"skill": 0.04,
|
|
"plan": 0.02,
|
|
"repeat_unit": -0.40,
|
|
"repeat_group": -0.15,
|
|
}
|
|
|
|
|
|
def semantic_brief_for_stage(
|
|
brief: PlanningSemanticBrief,
|
|
*,
|
|
learning_goal: str,
|
|
phase: Optional[str] = None,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
) -> PlanningSemanticBrief:
|
|
"""Legacy: globalen Brief anreichern — bevorzugt build_stage_match_brief für Roadmap-Match."""
|
|
lg = _normalize_phrase(learning_goal)
|
|
if not lg:
|
|
return brief
|
|
constraints = parse_stage_goal_constraints(learning_goal, anti_patterns)
|
|
must = list(brief.must_phrases or [])
|
|
for token in constraints.positive_tokens[:4]:
|
|
if token not in must:
|
|
must.append(token)
|
|
if lg not in must:
|
|
must.insert(0, lg[:120])
|
|
exclude = list(brief.exclude_phrases or [])
|
|
for item in constraints.exclude_phrases:
|
|
if item not in exclude:
|
|
exclude.append(item)
|
|
arc = list(brief.development_arc or [])
|
|
ph = (phase or "").strip().lower()
|
|
if ph and ph not in arc:
|
|
arc = [ph, *arc]
|
|
strength = max(float(brief.semantic_strength or 0.0), 0.58)
|
|
return brief.model_copy(
|
|
update={
|
|
"must_phrases": must[:12],
|
|
"exclude_phrases": exclude[:12],
|
|
"development_arc": arc[:8],
|
|
"semantic_strength": min(1.0, strength),
|
|
}
|
|
)
|
|
|
|
|
|
def exercise_passes_stage_learning_goal_gate(
|
|
*,
|
|
learning_goal: str,
|
|
title: str,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
semantic_score: float = 0.0,
|
|
min_semantic: float = 0.20,
|
|
relaxed: bool = False,
|
|
anti_patterns: Optional[Sequence[str]] = None,
|
|
stage_brief: Optional[PlanningSemanticBrief] = None,
|
|
stage_semantic_score: Optional[float] = None,
|
|
step_phase: Optional[str] = None,
|
|
) -> bool:
|
|
"""Roadmap-Stufe: delegiert an exercise_passes_stage_fit (Titel + Summary + Goal)."""
|
|
del semantic_score, min_semantic
|
|
return exercise_passes_stage_fit(
|
|
learning_goal=learning_goal,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal,
|
|
stage_brief=stage_brief,
|
|
stage_semantic_score=stage_semantic_score,
|
|
anti_patterns=anti_patterns,
|
|
step_phase=step_phase,
|
|
relaxed=relaxed,
|
|
)
|
|
|
|
|
|
def exercise_passes_path_semantic_gate(
|
|
*,
|
|
semantic_score: float,
|
|
title: str,
|
|
brief: PlanningSemanticBrief,
|
|
summary: str = "",
|
|
goal: str = "",
|
|
strict: bool = True,
|
|
) -> bool:
|
|
if brief.semantic_strength < 0.55:
|
|
return True
|
|
|
|
blob = _blob_from_fields(title, summary, goal, [])
|
|
min_score = 0.18 if strict else 0.06
|
|
if semantic_score >= min_score:
|
|
return True
|
|
|
|
topic = brief.primary_topic or ""
|
|
if topic and _phrase_in_blob(topic, blob):
|
|
return True
|
|
|
|
if not strict:
|
|
# Mae Geri oft im Fließtext, nicht im Titel
|
|
if semantic_score >= 0.04 and topic and _phrase_in_blob(topic, blob):
|
|
return True
|
|
parts = topic.split()
|
|
if len(parts) >= 2 and all(_phrase_in_blob(p, blob) for p in parts):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def pick_best_path_hit(
|
|
hits: List[Dict[str, Any]],
|
|
used_exercise_ids: Set[int],
|
|
*,
|
|
semantic_brief: Optional[PlanningSemanticBrief] = None,
|
|
stage_learning_goal: Optional[str] = None,
|
|
stage_anti_patterns: Optional[Sequence[str]] = None,
|
|
roadmap_stage_match: bool = False,
|
|
stage_match_brief: Optional[PlanningSemanticBrief] = None,
|
|
path_primary_topic: Optional[str] = None,
|
|
path_technique_excludes: Optional[Sequence[str]] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Gestufte Auswahl: strikt → relaxed → optional Notfall-Fallback."""
|
|
if not hits:
|
|
return None
|
|
|
|
stage_goal = (stage_learning_goal or "").strip()
|
|
|
|
stage_brief: Optional[PlanningSemanticBrief] = stage_match_brief
|
|
if roadmap_stage_match and stage_goal and stage_brief is None:
|
|
stage_brief = build_stage_match_brief(
|
|
learning_goal=stage_goal,
|
|
anti_patterns=stage_anti_patterns,
|
|
)
|
|
|
|
def _scan(*, strict: bool) -> Optional[Dict[str, Any]]:
|
|
best: Optional[Dict[str, Any]] = None
|
|
best_key: Tuple[float, float] = (-1.0, -1.0)
|
|
for hit in hits:
|
|
eid = int(hit["id"])
|
|
if eid in used_exercise_ids:
|
|
continue
|
|
title = str(hit.get("title") or "")
|
|
summary = str(hit.get("summary") or "")
|
|
goal_text = str(hit.get("goal") or hit.get("exercise_goal") or "")
|
|
sem = float(hit.get("semantic_score") or 0.0)
|
|
stage_sem = float(hit.get("stage_semantic_score") or sem)
|
|
|
|
if roadmap_stage_match and stage_goal:
|
|
if not exercise_passes_stage_fit(
|
|
learning_goal=stage_goal,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal_text,
|
|
stage_brief=stage_brief,
|
|
stage_semantic_score=stage_sem,
|
|
anti_patterns=stage_anti_patterns,
|
|
path_primary_topic=path_primary_topic,
|
|
path_technique_excludes=path_technique_excludes,
|
|
relaxed=not strict,
|
|
):
|
|
continue
|
|
else:
|
|
if semantic_brief and not exercise_passes_path_semantic_gate(
|
|
semantic_score=sem,
|
|
title=title,
|
|
summary=summary,
|
|
goal=goal_text,
|
|
brief=semantic_brief,
|
|
strict=strict,
|
|
):
|
|
continue
|
|
|
|
score = float(hit.get("score") or 0.0)
|
|
rank_sem = stage_sem if roadmap_stage_match and stage_goal else sem
|
|
key = (rank_sem, score)
|
|
if key > best_key:
|
|
best_key = key
|
|
best = hit
|
|
return best
|
|
|
|
chosen = _scan(strict=True)
|
|
if chosen:
|
|
return chosen
|
|
|
|
if roadmap_stage_match:
|
|
if (path_primary_topic or "").strip():
|
|
return None
|
|
chosen = _scan(strict=False)
|
|
return chosen
|
|
|
|
chosen = _scan(strict=False)
|
|
if chosen:
|
|
return chosen
|
|
|
|
# Notfall (nur retrieval-first / Brücken): bester verbleibender Treffer
|
|
fallback: Optional[Dict[str, Any]] = None
|
|
fallback_key: Tuple[float, float] = (-1.0, -1.0)
|
|
for hit in hits:
|
|
eid = int(hit["id"])
|
|
if eid in used_exercise_ids:
|
|
continue
|
|
sem = float(hit.get("semantic_score") or 0.0)
|
|
score = float(hit.get("score") or 0.0)
|
|
if sem <= 0 and semantic_brief and semantic_brief.primary_topic:
|
|
topic = semantic_brief.primary_topic
|
|
blob = (str(hit.get("title") or "") + " " + str(hit.get("summary") or "")).lower()
|
|
if not _phrase_in_blob(topic, blob):
|
|
continue
|
|
key = (sem, score)
|
|
if key > fallback_key:
|
|
fallback_key = key
|
|
fallback = hit
|
|
return fallback
|
|
|
|
|
|
__all__ = [
|
|
"PlanningSemanticBrief",
|
|
"apply_dynamic_retrieval_weights",
|
|
"apply_path_retrieval_weights",
|
|
"brief_to_summary_dict",
|
|
"build_semantic_brief",
|
|
"enrich_target_with_semantic_expectations",
|
|
"exercise_passes_path_semantic_gate",
|
|
"StageGoalConstraints",
|
|
"apply_stage_match_retrieval_weights",
|
|
"build_stage_match_brief",
|
|
"enrich_brief_with_path_constraints",
|
|
"exercise_passes_stage_fit",
|
|
"resolve_path_primary_topic",
|
|
"resolve_path_anti_patterns",
|
|
"exercise_passes_stage_learning_goal_gate",
|
|
"merge_semantic_brief_llm",
|
|
"parse_stage_goal_constraints",
|
|
"pick_best_path_hit",
|
|
"exercise_passes_technique_path_scope",
|
|
"score_exercise_stage_fit",
|
|
"semantic_brief_for_stage",
|
|
"technique_sibling_excludes",
|
|
"resolve_semantic_skill_weights",
|
|
"score_exercise_semantic_relevance",
|
|
"semantic_core_phrases",
|
|
"step_phase_for_index",
|
|
"step_retrieval_query",
|
|
"try_enrich_semantic_brief_with_llm",
|
|
]
|