shinkan-jinkendo/backend/planning_exercise_retrieval.py
Lars ca2adbd55e
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Test Suite / pytest-backend (push) Successful in 48s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 14s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m23s
Enhance Exercise Retrieval and Path Handling Logic
- Introduced new functions for handling exercise visibility and retrieval based on progression graph context, including `fetch_exercise_rows_by_ids_for_graph`.
- Updated `_load_supplemental_exercise_rows` to incorporate graph visibility rules, improving the accuracy of exercise retrieval.
- Enhanced `_run_path_step_retrieval` to utilize preloaded supplemental exercise rows, optimizing performance and clarity in path step processing.
- Added `exercise_title_equivalent_to_stage_goal` function to improve title matching against learning goals, enhancing exercise relevance.
- Updated tests to validate new retrieval logic and title equivalence functionality, ensuring robustness in exercise selection processes.
2026-06-11 12:33:02 +02:00

618 lines
22 KiB
Python

"""
Mehrstufiges Retrieval für Planungs-Übungssuche (Phase A).
Stufen:
S1b-0 Gesamte sichtbare Bibliothek (Governance + Hard-Filter, kein Profil-OR-Pool)
S1b-1 Deterministischer Hybrid-Score auf allen Kandidaten → sortiert
"""
from __future__ import annotations
from typing import Any, Dict, List, Mapping, Optional, Sequence, Set, Tuple
from planning_exercise_profiles import (
PlanningTargetProfile,
load_exercise_match_profiles_bulk,
score_exercise_against_target,
)
from exercise_ai import strip_html_to_plain
from planning_exercise_semantics import (
PlanningSemanticBrief,
build_stage_match_brief,
exercise_passes_path_semantic_gate,
exercise_passes_stage_fit,
score_exercise_semantic_relevance,
score_exercise_stage_fit,
)
_MAX_LIBRARY_ROWS = 8000
_PROFILE_LOAD_BATCH = 400
_PARTNER_TEXT_MARKERS = ("partner", "paar", "paarweise", "zu zweit")
def _exercise_looks_partner_related(row: Mapping[str, Any]) -> bool:
parts = [
str(row.get("method_archetype") or ""),
str(row.get("title") or ""),
str(row.get("summary") or ""),
]
blob = " ".join(parts).lower()
return any(m in blob for m in _PARTNER_TEXT_MARKERS)
def _skill_jaccard(a: Set[int], b: Set[int]) -> float:
if not a or not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
def _normalize_exercise_kind_filter(exercise_kind_any: Optional[List[str]]) -> List[str]:
out: List[str] = []
if not exercise_kind_any:
return out
for raw in exercise_kind_any:
s = str(raw or "").strip().lower()
if s in ("simple", "combination") and s not in out:
out.append(s)
return out
_EXERCISE_ROW_SELECT = """
SELECT e.id, e.title, e.summary, e.method_archetype,
e.visibility, e.club_id, e.created_by,
(
SELECT fa.name FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC
LIMIT 1
) AS primary_focus_name,
0.0::float AS ft_rank
FROM exercises e
"""
def fetch_exercise_rows_by_ids(
cur,
exercise_ids: Sequence[int],
*,
vis_sql: str,
vis_params: Sequence[Any],
) -> List[Dict[str, Any]]:
"""Lädt konkrete Übungen nach, wenn sie im Graph/Slot verankert sind (Pin-Sicherheit)."""
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
if not ids:
return []
ph = ",".join(["%s"] * len(ids))
sql = f"""
{_EXERCISE_ROW_SELECT.strip()}
WHERE e.id IN ({ph})
AND ({vis_sql})
AND COALESCE(e.status, '') <> %s
"""
params: List[Any] = list(ids) + list(vis_params) + ["archived"]
cur.execute(sql, params)
return [dict(r) for r in cur.fetchall()]
def fetch_exercise_rows_by_ids_for_graph(
cur,
exercise_ids: Sequence[int],
*,
graph_visibility: str,
graph_club_id: Optional[int],
profile_id: int,
role: str,
exercise_allowed_fn,
) -> List[Dict[str, Any]]:
"""
Lädt Übungen nach ID mit Graph-Sichtbarkeitsregeln (nicht Library-vis_sql).
Ermöglicht Re-Match für im Graph verankerte private Übungen auf Club-Graphen
(eigene private) bzw. alle graph-konformen Übungen.
"""
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
if not ids:
return []
ph = ",".join(["%s"] * len(ids))
sql = f"""
{_EXERCISE_ROW_SELECT.strip()}
WHERE e.id IN ({ph})
AND COALESCE(e.status, '') <> %s
"""
cur.execute(sql, [*ids, "archived"])
out: List[Dict[str, Any]] = []
for row in cur.fetchall() or []:
if exercise_allowed_fn(
row,
graph_visibility=graph_visibility,
graph_club_id=graph_club_id,
profile_id=profile_id,
role=role,
):
out.append(dict(row))
return out
def trim_hits_preserving_priority_ids(
hits: Sequence[Mapping[str, Any]],
priority_ids: Optional[Sequence[int]],
*,
limit: int = 48,
) -> List[Dict[str, Any]]:
"""Behält priorisierte Graph-/Slot-Übungen im Kandidatenpool (vor pick_best_path_hit)."""
priority_set = {int(x) for x in (priority_ids or []) if int(x) > 0}
if not priority_set:
return list(hits)[:limit]
by_id: Dict[int, Dict[str, Any]] = {}
for hit in hits:
try:
by_id[int(hit["id"])] = dict(hit)
except (TypeError, ValueError, KeyError):
continue
priority_hits = [by_id[eid] for eid in sorted(priority_set) if eid in by_id]
rest = [dict(h) for h in hits if int(h.get("id") or 0) not in priority_set]
merged = priority_hits + rest
return merged[: max(limit, len(priority_hits))]
def merge_supplemental_exercise_rows(
rows: Sequence[Dict[str, Any]],
supplemental: Sequence[Dict[str, Any]],
) -> List[Dict[str, Any]]:
seen = {int(r["id"]) for r in rows if r.get("id") is not None}
out = list(rows)
for row in supplemental:
rid = int(row["id"])
if rid not in seen:
seen.add(rid)
out.append(dict(row))
return out
def fetch_all_visible_exercise_rows(
cur,
*,
vis_sql: str,
vis_params: Sequence[Any],
query: str,
exercise_kind_any: Optional[List[str]],
max_rows: int = _MAX_LIBRARY_ROWS,
) -> List[Dict[str, Any]]:
"""
S1b-0: Alle sichtbaren Übungen (ohne Profil-/Volltext-Pool-Vorselektion).
Hard-Filter: Governance, nicht archiviert, optional exercise_kind.
Volltext-Rank nur als Score-Signal in SELECT, nicht als WHERE-Filter.
"""
where = [vis_sql, "COALESCE(e.status, '') <> %s"]
params: List[Any] = []
if query:
ft_select = "ts_rank_cd(e.search_vector, plainto_tsquery('german', %s)) AS ft_rank"
params.append(query)
else:
ft_select = "0.0::float AS ft_rank"
params.extend(vis_params)
params.append("archived")
ek_filtered = _normalize_exercise_kind_filter(exercise_kind_any)
if ek_filtered:
ph = ",".join(["%s"] * len(ek_filtered))
where.append(f"(LOWER(TRIM(COALESCE(e.exercise_kind::text,''))) IN ({ph}))")
params.extend(ek_filtered)
sql = f"""
SELECT e.id, e.title, e.summary, e.method_archetype,
(
SELECT fa.name FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC
LIMIT 1
) AS primary_focus_name,
{ft_select}
FROM exercises e
WHERE {' AND '.join(where)}
ORDER BY e.id ASC
LIMIT %s
"""
params.append(int(max_rows))
cur.execute(sql, params)
return [dict(r) for r in cur.fetchall()]
def _load_match_profiles_chunked(cur, exercise_ids: Sequence[int], *, batch: int = _PROFILE_LOAD_BATCH):
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
if not ids:
return {}
out: Dict[int, Any] = {}
for i in range(0, len(ids), batch):
chunk = ids[i : i + batch]
out.update(load_exercise_match_profiles_bulk(cur, chunk))
return out
def _load_skill_sets_chunked(cur, exercise_ids: Sequence[int], *, batch: int = _PROFILE_LOAD_BATCH) -> Dict[int, Set[int]]:
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
out: Dict[int, Set[int]] = {eid: set() for eid in ids}
if not ids:
return out
for i in range(0, len(ids), batch):
chunk = ids[i : i + batch]
ph = ",".join(["%s"] * len(chunk))
cur.execute(
f"SELECT exercise_id, skill_id FROM exercise_skills WHERE exercise_id IN ({ph})",
chunk,
)
for row in cur.fetchall():
eid = int(row["exercise_id"])
sid = row.get("skill_id")
if sid is not None:
out.setdefault(eid, set()).add(int(sid))
return out
def _load_exercise_goals_chunked(cur, exercise_ids: Sequence[int], *, batch: int = _PROFILE_LOAD_BATCH) -> Dict[int, str]:
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
out: Dict[int, str] = {}
if not ids:
return out
for i in range(0, len(ids), batch):
chunk = ids[i : i + batch]
ph = ",".join(["%s"] * len(chunk))
cur.execute(f"SELECT id, goal FROM exercises WHERE id IN ({ph})", chunk)
for row in cur.fetchall():
out[int(row["id"])] = strip_html_to_plain(row.get("goal"), max_len=1200)
return out
def _load_variant_names_chunked(cur, exercise_ids: Sequence[int], *, batch: int = _PROFILE_LOAD_BATCH) -> Dict[int, List[str]]:
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
out: Dict[int, List[str]] = {eid: [] for eid in ids}
if not ids:
return out
for i in range(0, len(ids), batch):
chunk = ids[i : i + batch]
ph = ",".join(["%s"] * len(chunk))
cur.execute(
f"""
SELECT exercise_id, variant_name FROM exercise_variants
WHERE exercise_id IN ({ph})
ORDER BY sequence_order ASC NULLS LAST, id ASC
""",
chunk,
)
for row in cur.fetchall():
eid = int(row["exercise_id"])
name = str(row.get("variant_name") or "").strip()
if name:
out.setdefault(eid, []).append(name[:80])
return out
def rank_visible_library_hits(
cur,
rows: Sequence[Dict[str, Any]],
*,
query: str,
intent: str,
intent_weights: Mapping[str, float],
target: PlanningTargetProfile,
pack: Mapping[str, Any],
) -> Tuple[List[Dict[str, Any]], Dict[int, Set[int]]]:
"""S1b-1: Hybrid-Score auf der gesamten sichtbaren Bibliothek."""
planned_set = set(pack.get("planned_exercise_ids") or [])
group_recent_set = set(pack.get("group_recent_exercise_ids") or [])
progression_set = set(pack.get("progression_successor_ids") or [])
anchor_skills = set(pack.get("anchor_skill_ids") or [])
anchor_id = pack.get("anchor_exercise_id")
progression_notes = pack.get("progression_edge_notes") or {}
requires_partner = pack.get("requires_partner")
semantic_brief_raw = pack.get("semantic_brief")
semantic_brief: Optional[PlanningSemanticBrief] = None
if isinstance(semantic_brief_raw, PlanningSemanticBrief):
semantic_brief = semantic_brief_raw
step_phase = pack.get("path_step_phase")
path_mode = pack.get("context_mode") == "progression_path"
stage_learning_goal = (pack.get("stage_learning_goal") or "").strip()
roadmap_stage_match = bool(pack.get("roadmap_stage_match"))
stage_match_brief_raw = pack.get("stage_match_brief")
stage_match_brief: Optional[PlanningSemanticBrief] = None
if isinstance(stage_match_brief_raw, PlanningSemanticBrief):
stage_match_brief = stage_match_brief_raw
elif roadmap_stage_match and stage_learning_goal:
stage_match_brief = build_stage_match_brief(
learning_goal=stage_learning_goal,
anti_patterns=pack.get("stage_anti_patterns"),
success_criteria=pack.get("stage_success_criteria"),
load_profile=pack.get("stage_load_profile"),
phase=step_phase,
path_context_note=pack.get("path_context_note"),
)
last_planned_skills: Set[int] = set()
planned_ids = pack.get("planned_exercise_ids") or []
if planned_ids:
cur.execute(
"SELECT skill_id FROM exercise_skills WHERE exercise_id = %s",
(int(planned_ids[-1]),),
)
last_planned_skills = {int(r["skill_id"]) for r in cur.fetchall() if r.get("skill_id")}
cand_rows: List[Dict[str, Any]] = []
for row in rows:
eid = int(row["id"])
if anchor_id and eid == int(anchor_id):
continue
if requires_partner is True and not _exercise_looks_partner_related(row):
continue
if requires_partner is False and _exercise_looks_partner_related(row):
continue
cand_rows.append(row)
cand_ids = [int(r["id"]) for r in cand_rows]
match_profiles = _load_match_profiles_chunked(cur, cand_ids)
skills_by_ex = _load_skill_sets_chunked(cur, cand_ids)
goals_by_ex: Dict[int, str] = {}
variants_by_ex: Dict[int, List[str]] = {}
need_exercise_semantic_text = (
(semantic_brief and semantic_brief.semantic_strength > 0.05)
or (stage_match_brief and stage_match_brief.semantic_strength > 0.05)
)
if need_exercise_semantic_text:
goals_by_ex = _load_exercise_goals_chunked(cur, cand_ids)
variants_by_ex = _load_variant_names_chunked(cur, cand_ids)
max_ft = 0.0
scored_items: List[Dict[str, Any]] = []
for row in cand_rows:
eid = int(row["id"])
ft = float(row.get("ft_rank") or 0.0)
if ft > max_ft:
max_ft = ft
scored_items.append(
{
"row": row,
"eid": eid,
"ft": ft,
"skills": skills_by_ex.get(eid, set()),
}
)
weights = dict(intent_weights)
hits: List[Dict[str, Any]] = []
for item in scored_items:
eid = item["eid"]
row = item["row"]
ft_norm = (item["ft"] / max_ft) if max_ft > 0 else 0.0
prog_hit = 1.0 if eid in progression_set else 0.0
skill_sim = _skill_jaccard(anchor_skills, item["skills"]) if anchor_skills else 0.0
plan_aff = 0.0
if last_planned_skills and item["skills"]:
plan_aff = _skill_jaccard(last_planned_skills, item["skills"])
repeat_unit = 1.0 if eid in planned_set else 0.0
repeat_group = 1.0 if eid in group_recent_set else 0.0
profile_score = 0.0
profile_reasons: List[str] = []
emp = match_profiles.get(eid)
if emp:
profile_score, profile_reasons = score_exercise_against_target(
emp, target, intent=intent
)
title_s = str(row.get("title") or "")
summary_s = str(row.get("summary") or "")
goal_s = goals_by_ex.get(eid, "")
semantic_score = 0.0
semantic_reasons: List[str] = []
if semantic_brief and semantic_brief.semantic_strength > 0.05:
semantic_score, semantic_reasons = score_exercise_semantic_relevance(
title=title_s,
summary=summary_s,
goal=goal_s,
variant_names=variants_by_ex.get(eid, []),
brief=semantic_brief,
step_phase=step_phase,
)
stage_semantic_score = 0.0
stage_semantic_reasons: List[str] = []
if stage_match_brief and stage_match_brief.semantic_strength > 0.05:
stage_semantic_score, stage_semantic_reasons = score_exercise_stage_fit(
title=title_s,
summary=summary_s,
goal=goal_s,
variant_names=variants_by_ex.get(eid, []),
stage_brief=stage_match_brief,
step_phase=step_phase,
)
effective_semantic = (
stage_semantic_score
if roadmap_stage_match and stage_match_brief
else semantic_score
)
score_penalty = 0.0
stage_match_reason: Optional[str] = None
if (
path_mode
and not roadmap_stage_match
and semantic_brief
and semantic_brief.semantic_strength >= 0.55
and not exercise_passes_path_semantic_gate(
semantic_score=semantic_score,
title=title_s,
summary=summary_s,
goal=goal_s,
brief=semantic_brief,
strict=True,
)
):
score_penalty = 0.42
if roadmap_stage_match and stage_learning_goal:
if exercise_passes_stage_fit(
learning_goal=stage_learning_goal,
title=title_s,
summary=summary_s,
goal=goal_s,
stage_brief=stage_match_brief,
stage_semantic_score=stage_semantic_score,
anti_patterns=pack.get("stage_anti_patterns"),
step_phase=step_phase,
path_primary_topic=pack.get("path_primary_topic"),
path_technique_excludes=pack.get("path_technique_excludes"),
):
score_penalty = max(0.0, score_penalty - 0.10)
stage_match_reason = "Passt zum Stufen-Lernziel"
else:
score_penalty += 0.48
score = (
weights.get("semantic", 0.0) * effective_semantic
+ weights["fulltext"] * ft_norm
+ weights["progression"] * prog_hit
+ weights["skill"] * skill_sim
+ weights["plan"] * plan_aff
+ weights["profile"] * profile_score
+ weights["repeat_unit"] * repeat_unit
+ weights["repeat_group"] * repeat_group
- score_penalty
)
reasons: List[str] = []
if stage_match_reason:
reasons.append(stage_match_reason)
if roadmap_stage_match and stage_semantic_score >= 0.30 and stage_semantic_reasons:
for sr in stage_semantic_reasons:
if sr not in reasons:
reasons.append(sr)
elif semantic_score >= 0.35 and semantic_reasons:
for sr in semantic_reasons:
if sr not in reasons:
reasons.append(sr)
if query and ft_norm >= 0.35:
reasons.append("Volltext-Treffer")
if prog_hit > 0:
note = progression_notes.get(eid)
reasons.append(
f"Nachfolger im Progressionsgraph{f': {note}' if note else ''}"
)
if skill_sim >= 0.2 and anchor_id:
reasons.append("Fähigkeiten passen zur Anker-Übung")
if plan_aff >= 0.25:
reasons.append("Schließt an Skills der letzten geplanten Übung an")
if repeat_unit > 0:
reasons.append("Bereits in dieser Einheit eingeplant")
if repeat_group > 0 and repeat_unit <= 0:
reasons.append("Kürzlich in der Gruppe verwendet")
for pr in profile_reasons:
if pr not in reasons:
reasons.append(pr)
if score <= 0 and not reasons and not query:
if prog_hit or skill_sim or plan_aff or profile_score:
score = 0.05 + prog_hit * 0.3 + skill_sim * 0.2 + profile_score * 0.25
hits.append(
{
"id": eid,
"title": row.get("title"),
"summary": row.get("summary"),
"focus_area": row.get("primary_focus_name"),
"score": round(max(0.0, min(1.0, score)), 4),
"reasons": reasons,
"semantic_score": round(semantic_score, 4),
"stage_semantic_score": round(stage_semantic_score, 4),
"goal": goal_s,
}
)
succ_variants = pack.get("progression_successor_variants") or {}
suggested_vid = succ_variants.get(eid)
if suggested_vid:
hits[-1]["suggested_variant_id"] = int(suggested_vid)
hits.sort(key=lambda h: (-h["score"], h.get("title") or ""))
return hits, skills_by_ex
def run_multistage_planning_retrieval(
cur,
*,
vis_sql: str,
vis_params: Sequence[Any],
query: str,
exercise_kind_any: Optional[List[str]],
target: PlanningTargetProfile,
intent: str,
intent_weights: Mapping[str, float],
pack: Mapping[str, Any],
supplemental_exercise_ids: Optional[Sequence[int]] = None,
supplemental_rows_preloaded: Optional[Sequence[Dict[str, Any]]] = None,
) -> Tuple[List[Dict[str, Any]], Dict[int, Set[int]], bool]:
"""Orchestriert S1b-0 → S1b-1 (Voll-Library-Ranking)."""
rows = fetch_all_visible_exercise_rows(
cur,
vis_sql=vis_sql,
vis_params=vis_params,
query=pack.get("retrieval_query") or query,
exercise_kind_any=exercise_kind_any,
)
if supplemental_rows_preloaded:
rows = merge_supplemental_exercise_rows(rows, supplemental_rows_preloaded)
elif supplemental_exercise_ids:
extra = fetch_exercise_rows_by_ids(
cur,
supplemental_exercise_ids,
vis_sql=vis_sql,
vis_params=vis_params,
)
rows = merge_supplemental_exercise_rows(rows, extra)
hits, skills_by_ex = rank_visible_library_hits(
cur,
rows,
query=query,
intent=intent,
intent_weights=intent_weights,
target=target,
pack=pack,
)
full_library_ranked = len(rows) > 0
return hits, skills_by_ex, full_library_ranked
# Legacy-Alias für Tests / externe Imports
fetch_retrieval_candidate_rows = fetch_all_visible_exercise_rows
hybrid_score_planning_hits = rank_visible_library_hits
def profile_preselect_rows(
cur,
rows: Sequence[Dict[str, Any]],
*,
target: PlanningTargetProfile,
intent: str,
progression_successor_ids: Set[int],
query: str,
preselect_limit: int = 160,
) -> Tuple[List[Dict[str, Any]], bool]:
"""Deprecated: Phase A rankt die volle Library — keine separate Vorselektion."""
_ = (cur, target, intent, progression_successor_ids, query, preselect_limit)
return list(rows), False
__all__ = [
"fetch_all_visible_exercise_rows",
"fetch_exercise_rows_by_ids",
"fetch_retrieval_candidate_rows",
"hybrid_score_planning_hits",
"merge_supplemental_exercise_rows",
"profile_preselect_rows",
"rank_visible_library_hits",
"run_multistage_planning_retrieval",
]