shinkan-jinkendo/backend/planning_exercise_suggest.py
Lars 45e3b5f4f6
All checks were successful
Deploy Development / deploy (push) Successful in 43s
Test Suite / pytest-backend (push) Successful in 43s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 34s
Test Suite / playwright-tests (push) Successful in 1m14s
Implement Phase 1 of Planning Exercise Suggestion with Scenario Pipeline and LLM Intent Overlay
- Introduced the Scenario Pipeline for planning exercises, allowing for more nuanced query handling and exercise suggestions based on user intent.
- Enhanced the `suggestPlanningExercises` API to include `include_llm_intent`, `scenario_kind`, and `query_intent_summary`, improving the context provided to the frontend.
- Updated the `ExercisePickerModal` to display new information related to query intent and scenario classification, enhancing user experience during exercise selection.
- Incremented application version to 0.8.171 and updated changelog to document the new features and improvements in the planning AI capabilities.
2026-05-22 22:15:19 +02:00

618 lines
22 KiB
Python

"""
Planungs-KI P0: Kontext-Pack + Hybrid-Retrieval für Übungssuche in der Trainingsplanung.
Siehe .claude/docs/working/PLANNING_EXERCISE_SUGGEST_CONTEXT.md
"""
from __future__ import annotations
import re
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple
from fastapi import HTTPException
from pydantic import BaseModel, Field
from tenant_context import TenantContext, library_content_visibility_sql
from planning_exercise_profiles import (
load_exercise_match_profiles_bulk,
score_exercise_against_target,
)
from planning_exercise_llm_rank import try_llm_rerank_planning_hits
from planning_exercise_target_pipeline import (
build_planning_target_with_query_pipeline,
compose_retrieval_phase,
)
# Planungs-Berechtigung + Sektionen (bestehende Implementierung)
from routers.training_planning import (
_assert_training_unit_permission,
_fetch_sections,
_has_planning_role,
)
INTENT_SUGGEST_NEXT = "suggest_next"
INTENT_PROGRESSION_NEXT = "progression_next"
INTENT_DEEPEN_EXERCISE = "deepen_exercise"
INTENT_CONTINUE_PLAN = "continue_plan_goal"
INTENT_FREE_SEARCH = "free_search"
VALID_INTENTS = {
INTENT_SUGGEST_NEXT,
INTENT_PROGRESSION_NEXT,
INTENT_DEEPEN_EXERCISE,
INTENT_CONTINUE_PLAN,
INTENT_FREE_SEARCH,
}
_CANDIDATE_POOL_LIMIT = 400
_LLM_RERANK_PRE_LIMIT = 32
class PlanningExerciseSuggestRequest(BaseModel):
unit_id: int = Field(..., ge=1)
section_order_index: Optional[int] = Field(default=None, ge=0)
phase_order_index: Optional[int] = Field(default=None, ge=0)
parallel_stream_order_index: Optional[int] = Field(default=None, ge=0)
anchor_exercise_id: Optional[int] = Field(default=None, ge=1)
progression_graph_id: Optional[int] = Field(default=None, ge=1)
query: Optional[str] = ""
intent_hint: Optional[str] = None
planned_exercise_ids: Optional[List[int]] = None
include_llm_intent: bool = True
include_llm_rank: bool = False
limit: int = Field(default=20, ge=1, le=50)
exercise_kind_any: Optional[List[str]] = None
def resolve_planning_exercise_intent(query: Optional[str], intent_hint: Optional[str]) -> str:
hint = (intent_hint or "").strip().lower()
if hint in VALID_INTENTS:
return hint
q = (query or "").strip().lower()
if not q:
return INTENT_SUGGEST_NEXT
if any(w in q for w in ("nächste", "naechste", "vorschlag", "vorschlagen", "empfehl")):
return INTENT_SUGGEST_NEXT
if "vertief" in q:
return INTENT_DEEPEN_EXERCISE
if "progression" in q or "graph" in q or "pfad" in q:
return INTENT_PROGRESSION_NEXT
if "aufbau" in q or "planung" in q or "bisher" in q:
return INTENT_CONTINUE_PLAN
return INTENT_FREE_SEARCH
def _intent_weights(intent: str) -> Dict[str, float]:
base = {
"fulltext": 0.18,
"progression": 0.18,
"skill": 0.12,
"plan": 0.08,
"profile": 0.22,
"repeat_unit": -0.30,
"repeat_group": -0.15,
}
if intent == INTENT_SUGGEST_NEXT:
return {
**base,
"progression": 0.28,
"skill": 0.12,
"plan": 0.10,
"profile": 0.25,
"fulltext": 0.08,
}
if intent == INTENT_PROGRESSION_NEXT:
return {**base, "progression": 0.42, "fulltext": 0.12, "skill": 0.10, "profile": 0.20}
if intent == INTENT_DEEPEN_EXERCISE:
return {**base, "skill": 0.15, "profile": 0.35, "fulltext": 0.15, "progression": 0.10}
if intent == INTENT_CONTINUE_PLAN:
return {**base, "plan": 0.12, "skill": 0.10, "profile": 0.30, "fulltext": 0.10, "progression": 0.08}
if intent == INTENT_FREE_SEARCH:
return {**base, "fulltext": 0.45, "progression": 0.08, "skill": 0.08, "profile": 0.15}
return base
def _collect_planned_exercise_ids(sections: Sequence[Dict[str, Any]]) -> List[int]:
out: List[int] = []
seen: Set[int] = set()
for sec in sorted(sections, key=lambda s: int(s.get("order_index") or 0)):
items = sec.get("items") or []
for it in sorted(items, key=lambda x: int(x.get("order_index") or 0)):
if str(it.get("item_type") or "").strip().lower() == "note":
continue
raw = it.get("exercise_id")
if raw is None:
continue
try:
eid = int(raw)
except (TypeError, ValueError):
continue
if eid < 1 or eid in seen:
continue
seen.add(eid)
out.append(eid)
return out
def _resolve_anchor_from_plan(
planned_ids: Sequence[int],
anchor_exercise_id: Optional[int],
) -> Optional[int]:
if anchor_exercise_id and int(anchor_exercise_id) > 0:
return int(anchor_exercise_id)
if planned_ids:
return int(planned_ids[-1])
return None
def _load_exercise_titles(cur, exercise_ids: Sequence[int]) -> Dict[int, str]:
if not exercise_ids:
return {}
ids = list(dict.fromkeys(int(x) for x in exercise_ids if int(x) > 0))
ph = ",".join(["%s"] * len(ids))
cur.execute(
f"SELECT id, title FROM exercises WHERE id IN ({ph})",
ids,
)
return {int(r["id"]): str(r["title"] or "").strip() for r in cur.fetchall()}
def _load_skill_ids_for_exercise(cur, exercise_id: Optional[int]) -> Set[int]:
if not exercise_id:
return set()
cur.execute(
"SELECT skill_id FROM exercise_skills WHERE exercise_id = %s",
(int(exercise_id),),
)
return {int(r["skill_id"]) for r in cur.fetchall() if r.get("skill_id")}
def _load_progression_successors(
cur,
graph_id: Optional[int],
from_exercise_id: Optional[int],
) -> Tuple[Set[int], Dict[int, str]]:
if not graph_id or not from_exercise_id:
return set(), {}
cur.execute(
"""
SELECT to_exercise_id, notes
FROM exercise_progression_edges
WHERE graph_id = %s AND from_exercise_id = %s
AND LOWER(TRIM(edge_type)) = 'next_exercise'
""",
(int(graph_id), int(from_exercise_id)),
)
ids: Set[int] = set()
notes: Dict[int, str] = {}
for row in cur.fetchall():
tid = int(row["to_exercise_id"])
ids.add(tid)
n = (row.get("notes") or "").strip()
if n:
notes[tid] = n
return ids, notes
def _load_group_recent_exercise_ids(
cur,
group_id: Optional[int],
exclude_unit_id: int,
limit: int = 40,
) -> Set[int]:
if not group_id:
return set()
cur.execute(
"""
SELECT tusi.exercise_id AS eid
FROM training_units tu
INNER JOIN training_unit_sections tus ON tus.training_unit_id = tu.id
INNER JOIN training_unit_section_items tusi ON tusi.section_id = tus.id
WHERE tu.group_id = %s
AND tu.id <> %s
AND tusi.exercise_id IS NOT NULL
AND COALESCE(tu.status, '') <> 'cancelled'
ORDER BY tu.planned_date DESC NULLS LAST, tu.id DESC, tusi.order_index DESC
LIMIT 200
""",
(int(group_id), int(exclude_unit_id)),
)
out: Set[int] = set()
for r in cur.fetchall():
if r.get("eid") is None:
continue
out.add(int(r["eid"]))
if len(out) >= limit:
break
return out
def _section_title_for_index(sections: Sequence[Dict[str, Any]], section_order_index: Optional[int]) -> Optional[str]:
if section_order_index is None:
return None
for sec in sections:
if int(sec.get("order_index") or -1) == int(section_order_index):
t = (sec.get("title") or "").strip()
return t or None
return None
def _normalize_query(query: Optional[str]) -> str:
return re.sub(r"\s+", " ", (query or "").strip())
def _skill_jaccard(a: Set[int], b: Set[int]) -> float:
if not a or not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
def _apply_client_planned_override(
cur,
pack: Dict[str, Any],
body: PlanningExerciseSuggestRequest,
) -> Dict[str, Any]:
"""Client-Plan (ungespeichertes Formular) überschreibt DB-Stand."""
if not body.planned_exercise_ids:
return pack
planned_ids: List[int] = []
seen: Set[int] = set()
for raw in body.planned_exercise_ids:
try:
eid = int(raw)
except (TypeError, ValueError):
continue
if eid < 1 or eid in seen:
continue
seen.add(eid)
planned_ids.append(eid)
if not planned_ids:
return pack
pack["planned_exercise_ids"] = planned_ids
if not body.anchor_exercise_id:
anchor_id = _resolve_anchor_from_plan(planned_ids, None)
pack["anchor_exercise_id"] = anchor_id
if anchor_id:
titles = _load_exercise_titles(cur, [anchor_id])
pack["anchor_title"] = titles.get(anchor_id)
pack["anchor_skill_ids"] = sorted(_load_skill_ids_for_exercise(cur, anchor_id))
else:
pack["anchor_title"] = None
pack["anchor_skill_ids"] = []
return pack
def build_planning_exercise_context_pack(
cur,
*,
tenant: TenantContext,
body: PlanningExerciseSuggestRequest,
) -> Dict[str, Any]:
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Trainer dürfen Planungs-Vorschläge abrufen")
cur.execute(
"""
SELECT tu.*, tg.name AS group_name
FROM training_units tu
LEFT JOIN training_groups tg ON tg.id = tu.group_id
WHERE tu.id = %s
""",
(body.unit_id,),
)
unit_row = cur.fetchone()
if not unit_row:
raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden")
unit = dict(unit_row)
if unit.get("framework_slot_id"):
if role not in ("admin", "superadmin"):
cur.execute(
"""
SELECT fp.created_by FROM training_framework_slots s
JOIN training_framework_programs fp ON fp.id = s.framework_program_id
WHERE s.id = %s
""",
(unit["framework_slot_id"],),
)
fr = cur.fetchone()
cb = fr["created_by"] if fr else None
if unit.get("created_by") != profile_id and cb != profile_id:
raise HTTPException(status_code=403, detail="Keine Berechtigung")
else:
if not unit.get("group_id"):
raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden")
_assert_training_unit_permission(cur, unit, profile_id, role)
sections = _fetch_sections(cur, int(body.unit_id))
planned_ids = _collect_planned_exercise_ids(sections)
anchor_id = _resolve_anchor_from_plan(planned_ids, body.anchor_exercise_id)
anchor_skills = _load_skill_ids_for_exercise(cur, anchor_id)
progression_ids, progression_notes = _load_progression_successors(
cur, body.progression_graph_id, anchor_id
)
group_recent = _load_group_recent_exercise_ids(cur, unit.get("group_id"), int(body.unit_id))
titles = _load_exercise_titles(cur, [x for x in [anchor_id] if x])
anchor_title = titles.get(anchor_id) if anchor_id else None
return {
"unit_id": int(body.unit_id),
"unit": {
"id": int(body.unit_id),
"framework_slot_id": unit.get("framework_slot_id"),
"origin_framework_slot_id": unit.get("origin_framework_slot_id"),
},
"unit_title": (unit.get("title") or unit.get("planned_focus") or "").strip() or None,
"group_id": unit.get("group_id"),
"group_name": (unit.get("group_name") or "").strip() or None,
"section_order_index": body.section_order_index,
"section_title": _section_title_for_index(sections, body.section_order_index),
"planned_exercise_ids": planned_ids,
"anchor_exercise_id": anchor_id,
"anchor_title": anchor_title,
"anchor_skill_ids": sorted(anchor_skills),
"progression_graph_id": body.progression_graph_id,
"progression_successor_ids": sorted(progression_ids),
"progression_edge_notes": progression_notes,
"group_recent_exercise_ids": sorted(group_recent),
}
def suggest_planning_exercises(
cur,
*,
tenant: TenantContext,
body: PlanningExerciseSuggestRequest,
) -> Dict[str, Any]:
pack = build_planning_exercise_context_pack(cur, tenant=tenant, body=body)
pack = _apply_client_planned_override(cur, pack, body)
query = _normalize_query(body.query)
heuristic_intent = resolve_planning_exercise_intent(query, body.intent_hint)
pipeline_context = {
"unit_title": pack.get("unit_title"),
"group_name": pack.get("group_name"),
"section_title": pack.get("section_title"),
"planned_count": len(pack.get("planned_exercise_ids") or []),
"anchor_title": pack.get("anchor_title"),
"anchor_exercise_id": pack.get("anchor_exercise_id"),
"progression_graph_id": pack.get("progression_graph_id"),
}
target_profile, intent, scenario_kind, query_intent_summary = build_planning_target_with_query_pipeline(
cur,
unit=pack["unit"],
planned_exercise_ids=pack["planned_exercise_ids"],
anchor_exercise_id=pack.get("anchor_exercise_id"),
query=query,
heuristic_intent=heuristic_intent,
include_llm_intent=body.include_llm_intent,
context_summary=pipeline_context,
)
weights = _intent_weights(intent)
target_profile_summary = target_profile.to_summary_dict(cur)
query_intent_applied = bool(query_intent_summary.get("llm_applied"))
profile_id = tenant.profile_id
role = tenant.global_role
vis_sql, vis_params = library_content_visibility_sql(
alias="e",
profile_id=profile_id,
role=role,
effective_club_id=tenant.effective_club_id,
)
where = [vis_sql, "COALESCE(e.status, '') <> %s"]
params: List[Any] = []
if query:
ft_select = "ts_rank_cd(e.search_vector, plainto_tsquery('german', %s)) AS ft_rank"
params.append(query)
else:
ft_select = "0.0::float AS ft_rank"
params.extend(list(vis_params))
params.append("archived")
ek_filtered: List[str] = []
if body.exercise_kind_any:
for raw in body.exercise_kind_any:
s = str(raw or "").strip().lower()
if s in ("simple", "combination") and s not in ek_filtered:
ek_filtered.append(s)
if ek_filtered:
ph = ",".join(["%s"] * len(ek_filtered))
where.append(f"(LOWER(TRIM(COALESCE(e.exercise_kind::text,''))) IN ({ph}))")
params.extend(ek_filtered)
sql = f"""
SELECT e.id, e.title, e.summary,
(
SELECT fa.name FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC
LIMIT 1
) AS primary_focus_name,
{ft_select}
FROM exercises e
WHERE {' AND '.join(where)}
ORDER BY e.updated_at DESC, e.id DESC
LIMIT %s
"""
params.append(_CANDIDATE_POOL_LIMIT)
cur.execute(sql, params)
rows = cur.fetchall()
planned_set = set(pack["planned_exercise_ids"])
group_recent_set = set(pack["group_recent_exercise_ids"])
progression_set = set(pack["progression_successor_ids"])
anchor_skills = set(pack["anchor_skill_ids"])
anchor_id = pack.get("anchor_exercise_id")
progression_notes = pack.get("progression_edge_notes") or {}
last_planned_skills: Set[int] = set()
if pack["planned_exercise_ids"]:
last_planned_skills = _load_skill_ids_for_exercise(cur, pack["planned_exercise_ids"][-1])
# Skill-IDs + ExerciseMatchProfile pro Kandidat (Batch)
cand_ids = [int(r["id"]) for r in rows]
skills_by_ex: Dict[int, Set[int]] = {cid: set() for cid in cand_ids}
match_profiles = load_exercise_match_profiles_bulk(cur, cand_ids)
if cand_ids:
ph = ",".join(["%s"] * len(cand_ids))
cur.execute(
f"SELECT exercise_id, skill_id FROM exercise_skills WHERE exercise_id IN ({ph})",
cand_ids,
)
for r in cur.fetchall():
skills_by_ex.setdefault(int(r["exercise_id"]), set()).add(int(r["skill_id"]))
max_ft = 0.0
scored: List[Dict[str, Any]] = []
for row in rows:
eid = int(row["id"])
if anchor_id and eid == int(anchor_id):
continue
ft = float(row.get("ft_rank") or 0.0)
if ft > max_ft:
max_ft = ft
scored.append(
{
"row": row,
"eid": eid,
"ft": ft,
"skills": skills_by_ex.get(eid, set()),
}
)
hits: List[Dict[str, Any]] = []
for item in scored:
eid = item["eid"]
row = item["row"]
ft_norm = (item["ft"] / max_ft) if max_ft > 0 else 0.0
prog_hit = 1.0 if eid in progression_set else 0.0
skill_sim = _skill_jaccard(anchor_skills, item["skills"]) if anchor_skills else 0.0
plan_aff = 0.0
if last_planned_skills and item["skills"]:
plan_aff = _skill_jaccard(last_planned_skills, item["skills"])
repeat_unit = 1.0 if eid in planned_set else 0.0
repeat_group = 1.0 if eid in group_recent_set else 0.0
profile_score = 0.0
profile_reasons: List[str] = []
emp = match_profiles.get(eid)
if emp:
profile_score, profile_reasons = score_exercise_against_target(
emp, target_profile, intent=intent
)
score = (
weights["fulltext"] * ft_norm
+ weights["progression"] * prog_hit
+ weights["skill"] * skill_sim
+ weights["plan"] * plan_aff
+ weights["profile"] * profile_score
+ weights["repeat_unit"] * repeat_unit
+ weights["repeat_group"] * repeat_group
)
reasons: List[str] = []
if query and ft_norm >= 0.35:
reasons.append("Volltext-Treffer")
if prog_hit > 0:
note = progression_notes.get(eid)
reasons.append(
f"Nachfolger im Progressionsgraph{f': {note}' if note else ''}"
)
if skill_sim >= 0.2 and anchor_id:
reasons.append("Fähigkeiten passen zur Anker-Übung")
if plan_aff >= 0.25:
reasons.append("Schließt an Skills der letzten geplanten Übung an")
if repeat_unit > 0:
reasons.append("Bereits in dieser Einheit eingeplant")
if repeat_group > 0 and repeat_unit <= 0:
reasons.append("Kürzlich in der Gruppe verwendet")
for pr in profile_reasons:
if pr not in reasons:
reasons.append(pr)
if score <= 0 and not reasons and not query:
# Leere Query: trotzdem schwache Kandidaten mit Skill/Progression
if prog_hit or skill_sim or plan_aff or profile_score:
score = 0.05 + prog_hit * 0.3 + skill_sim * 0.2 + profile_score * 0.25
hits.append(
{
"id": eid,
"title": row.get("title"),
"summary": row.get("summary"),
"focus_area": row.get("primary_focus_name"),
"score": round(max(0.0, min(1.0, score)), 4),
"reasons": reasons,
}
)
hits.sort(key=lambda h: (-h["score"], h.get("title") or ""))
llm_applied = False
retrieval_phase = compose_retrieval_phase(query_intent=query_intent_applied, llm_rank=False)
if body.include_llm_rank:
pre_limit = max(int(body.limit), _LLM_RERANK_PRE_LIMIT)
pool_hits = hits[:pre_limit]
pool_hits, llm_applied = try_llm_rerank_planning_hits(
cur,
hits=pool_hits,
skills_by_ex=skills_by_ex,
query=query,
intent=intent,
context_summary={
"unit_title": pack.get("unit_title"),
"group_name": pack.get("group_name"),
"section_title": pack.get("section_title"),
"planned_count": len(planned_set),
"anchor_title": pack.get("anchor_title"),
"intent": intent,
},
target_profile_summary=target_profile_summary,
limit=int(body.limit),
)
if llm_applied:
retrieval_phase = compose_retrieval_phase(
query_intent=query_intent_applied,
llm_rank=True,
)
tail = hits[pre_limit:]
hits = pool_hits + tail
else:
hits = pool_hits[: int(body.limit)]
else:
hits = hits[: int(body.limit)]
hits = hits[: int(body.limit)]
context_summary = {
"unit_title": pack.get("unit_title"),
"group_name": pack.get("group_name"),
"section_title": pack.get("section_title"),
"planned_count": len(planned_set),
"anchor_title": pack.get("anchor_title"),
"anchor_exercise_id": pack.get("anchor_exercise_id"),
"progression_graph_id": pack.get("progression_graph_id"),
}
return {
"context_summary": context_summary,
"target_profile_summary": target_profile_summary,
"scenario_kind": scenario_kind,
"query_intent_summary": query_intent_summary,
"retrieval_phase": retrieval_phase,
"llm_rank_applied": llm_applied,
"intent_resolved": intent,
"intent_heuristic": heuristic_intent,
"query_normalized": query or None,
"hits": hits,
}