shinkan-jinkendo/backend/planning_exercise_profiles.py
Lars 04cc77d501
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Test Suite / pytest-backend (push) Successful in 43s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m14s
Enhance Planning Exercise Profiles and Context Handling
- Introduced new functions to generate skill profiles from exercise IDs, improving the ability to summarize skills for both units and sections.
- Updated the planning target profile to incorporate section-specific exercise IDs, allowing for more granular skill tracking and context.
- Enhanced the ExercisePickerModal and related pages to support section context, including titles, guidance notes, and exercise counts.
- Implemented expectation mode handling in the planning target pipeline to differentiate between planning references and query-only scenarios.
- Incremented version to 0.8.174 and updated changelog to reflect these enhancements in planning AI capabilities.
2026-05-22 23:00:31 +02:00

499 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ExerciseMatchProfile / PlanningTargetProfile — Phase-1-Vorselektion Planungs-Übungssuche.
Siehe .claude/docs/working/PLANNING_EXERCISE_SUGGEST_CONTEXT.md §12§14
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple
from skill_scoring import (
ExerciseOccurrence,
collect_unit_exercise_occurrences,
fetch_exercise_skills_bulk,
profile_for_occurrences,
_skill_link_multiplier,
DEFAULT_ITEM_MINUTES,
)
def _ids_to_weights(ids: Sequence[int], primary_id: Optional[int] = None) -> Dict[int, float]:
out: Dict[int, float] = {}
for raw in ids or []:
try:
fid = int(raw)
except (TypeError, ValueError):
continue
if fid < 1:
continue
w = 1.0 if primary_id is not None and fid == int(primary_id) else 0.85
out[fid] = max(out.get(fid, 0.0), w)
return out
def _merge_weight_maps(*maps: Optional[Dict[int, float]], scale: float = 1.0) -> Dict[int, float]:
out: Dict[int, float] = {}
for m in maps:
if not m:
continue
for k, v in m.items():
try:
kid = int(k)
val = float(v) * scale
except (TypeError, ValueError):
continue
if kid < 1 or val <= 0:
continue
out[kid] = max(out.get(kid, 0.0), val)
return out
def _normalize_weight_map(m: Dict[int, float]) -> Dict[int, float]:
if not m:
return {}
mx = max(m.values())
if mx <= 0:
return {}
return {k: v / mx for k, v in m.items() if v > 0}
def weighted_overlap(a: Dict[int, float], b: Dict[int, float]) -> float:
"""Gewichtete Überlappung 0..1 (min-Summe / max-Summe)."""
if not a or not b:
return 0.0
keys = set(a) | set(b)
num = sum(min(a.get(k, 0.0), b.get(k, 0.0)) for k in keys)
den = sum(max(a.get(k, 0.0), b.get(k, 0.0)) for k in keys)
return num / den if den > 0 else 0.0
def gap_coverage(gap: Dict[int, float], candidate: Dict[int, float]) -> float:
"""Anteil der Skill-Lücke, den der Kandidat abdeckt (0..1)."""
if not gap:
return 0.0
total_gap = sum(gap.values())
if total_gap <= 0:
return 0.0
covered = sum(min(gap.get(k, 0.0), candidate.get(k, 0.0)) for k in gap)
return covered / total_gap
@dataclass
class ExerciseMatchProfile:
exercise_id: int
focus_area_ids: Dict[int, float] = field(default_factory=dict)
style_direction_ids: Dict[int, float] = field(default_factory=dict)
training_type_ids: Dict[int, float] = field(default_factory=dict)
target_group_ids: Dict[int, float] = field(default_factory=dict)
skill_weights: Dict[int, float] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"exercise_id": self.exercise_id,
"focus_area_ids": self.focus_area_ids,
"style_direction_ids": self.style_direction_ids,
"training_type_ids": self.training_type_ids,
"target_group_ids": self.target_group_ids,
"skill_weights": self.skill_weights,
}
@dataclass
class PlanningTargetProfile:
focus_area_ids: Dict[int, float] = field(default_factory=dict)
style_direction_ids: Dict[int, float] = field(default_factory=dict)
training_type_ids: Dict[int, float] = field(default_factory=dict)
target_group_ids: Dict[int, float] = field(default_factory=dict)
skill_weights: Dict[int, float] = field(default_factory=dict)
skill_gap_weights: Dict[int, float] = field(default_factory=dict)
skill_plan_weights: Dict[int, float] = field(default_factory=dict)
sources: List[str] = field(default_factory=list)
def to_summary_dict(self, cur, limit_skills: int = 5) -> Dict[str, Any]:
focus_labels = _load_focus_labels(cur, list(self.focus_area_ids.keys())[:6])
top_skills = sorted(self.skill_weights.items(), key=lambda x: -x[1])[:limit_skills]
skill_names = _load_skill_names(cur, [s[0] for s in top_skills])
return {
"sources": list(self.sources),
"focus_areas": focus_labels,
"top_skills": [
{"skill_id": sid, "name": skill_names.get(sid, f"#{sid}"), "weight": round(w, 2)}
for sid, w in top_skills
],
"has_skill_gap": bool(self.skill_gap_weights),
}
def _load_focus_labels(cur, ids: Sequence[int]) -> List[str]:
if not ids:
return []
ph = ",".join(["%s"] * len(ids))
cur.execute(
f"SELECT id, name FROM focus_areas WHERE id IN ({ph}) ORDER BY name",
list(ids),
)
return [f"{r['name'] or r['id']}" for r in cur.fetchall()]
def _load_skill_names(cur, ids: Sequence[int]) -> Dict[int, str]:
if not ids:
return {}
ph = ",".join(["%s"] * len(ids))
cur.execute(f"SELECT id, name FROM skills WHERE id IN ({ph})", list(ids))
return {int(r["id"]): str(r["name"] or "") for r in cur.fetchall()}
def _skill_weights_from_profile(skills_out: Sequence[Dict[str, Any]]) -> Dict[int, float]:
out: Dict[int, float] = {}
for row in skills_out or []:
sid = row.get("skill_id")
if sid is None:
continue
w = float(row.get("weight") or row.get("score") or 0)
if w > 0:
out[int(sid)] = w
return out
def _single_exercise_skill_weights(
skill_rows: Sequence[Dict[str, Any]],
*,
minutes: float = DEFAULT_ITEM_MINUTES,
) -> Dict[int, float]:
out: Dict[int, float] = {}
for link in skill_rows or []:
sid = link.get("skill_id")
if sid is None:
continue
sid = int(sid)
mult = _skill_link_multiplier(
intensity=link.get("intensity"),
required_level=link.get("required_level"),
target_level=link.get("target_level"),
)
w = minutes * mult
if w > 0:
out[sid] = out.get(sid, 0.0) + w
return out
def _load_relation_maps_bulk(
cur,
exercise_ids: Sequence[int],
table: str,
id_column: str,
) -> Dict[int, Dict[int, float]]:
ids = [int(x) for x in exercise_ids if int(x) > 0]
if not ids:
return {}
ph = ",".join(["%s"] * len(ids))
cur.execute(
f"""
SELECT exercise_id, {id_column} AS rel_id, is_primary
FROM {table}
WHERE exercise_id IN ({ph})
""",
ids,
)
out: Dict[int, Dict[int, float]] = {eid: {} for eid in ids}
for row in cur.fetchall():
eid = int(row["exercise_id"])
rid = int(row["rel_id"])
w = 1.0 if row.get("is_primary") else 0.85
out.setdefault(eid, {})[rid] = max(out[eid].get(rid, 0.0), w)
return out
def load_exercise_match_profiles_bulk(cur, exercise_ids: Sequence[int]) -> Dict[int, ExerciseMatchProfile]:
ids = sorted({int(x) for x in exercise_ids if int(x) > 0})
if not ids:
return {}
focus_map = _load_relation_maps_bulk(cur, ids, "exercise_focus_areas", "focus_area_id")
style_map = _load_relation_maps_bulk(cur, ids, "exercise_style_directions", "style_direction_id")
type_map = _load_relation_maps_bulk(cur, ids, "exercise_training_types", "training_type_id")
tg_map = _load_relation_maps_bulk(cur, ids, "exercise_target_groups", "target_group_id")
skills_bulk = fetch_exercise_skills_bulk(cur, ids)
profiles: Dict[int, ExerciseMatchProfile] = {}
for eid in ids:
profiles[eid] = ExerciseMatchProfile(
exercise_id=eid,
focus_area_ids=focus_map.get(eid, {}),
style_direction_ids=style_map.get(eid, {}),
training_type_ids=type_map.get(eid, {}),
target_group_ids=tg_map.get(eid, {}),
skill_weights=_single_exercise_skill_weights(skills_bulk.get(eid, [])),
)
return profiles
def _resolve_framework_for_unit(cur, unit: Dict[str, Any]) -> Optional[Dict[str, Any]]:
slot_id = unit.get("framework_slot_id") or unit.get("origin_framework_slot_id")
if not slot_id:
return None
cur.execute(
"""
SELECT s.id AS slot_id, s.framework_program_id, s.sort_order, s.title AS slot_title,
fp.title AS framework_title, fp.focus_area_id AS header_focus_area_id
FROM training_framework_slots s
JOIN training_framework_programs fp ON fp.id = s.framework_program_id
WHERE s.id = %s
""",
(int(slot_id),),
)
row = cur.fetchone()
return dict(row) if row else None
def _framework_catalog_weights(cur, framework_id: int) -> Tuple[Dict[int, float], Dict[int, float], Dict[int, float], Dict[int, float]]:
cur.execute(
"SELECT focus_area_id FROM training_framework_programs WHERE id = %s",
(framework_id,),
)
hdr = cur.fetchone()
header_fa = int(hdr["focus_area_id"]) if hdr and hdr.get("focus_area_id") else None
cur.execute(
"SELECT focus_area_id FROM training_framework_program_focus_areas WHERE framework_program_id = %s",
(framework_id,),
)
fa_ids = [int(r["focus_area_id"]) for r in cur.fetchall()]
if header_fa and header_fa not in fa_ids:
fa_ids.insert(0, header_fa)
focus = _ids_to_weights(fa_ids, primary_id=header_fa)
cur.execute(
"SELECT style_direction_id FROM training_framework_program_style_directions WHERE framework_program_id = %s",
(framework_id,),
)
style = _ids_to_weights([int(r["style_direction_id"]) for r in cur.fetchall()])
cur.execute(
"SELECT training_type_id FROM training_framework_program_training_types WHERE framework_program_id = %s",
(framework_id,),
)
tt = _ids_to_weights([int(r["training_type_id"]) for r in cur.fetchall()])
cur.execute(
"SELECT target_group_id FROM training_framework_program_target_groups WHERE framework_program_id = %s",
(framework_id,),
)
tg = _ids_to_weights([int(r["target_group_id"]) for r in cur.fetchall()])
return focus, style, tt, tg
def _profile_from_unit_occurrences(cur, unit_id: int) -> Dict[int, float]:
occ = collect_unit_exercise_occurrences(cur, int(unit_id))
if not occ:
return {}
prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None)
return _skill_weights_from_profile(prof.get("skills") or [])
def _profile_from_exercise_ids(cur, exercise_ids: Sequence[int]) -> Dict[int, float]:
ids = [int(x) for x in exercise_ids if int(x) > 0]
if not ids:
return {}
occ = [ExerciseOccurrence(exercise_id=eid) for eid in ids]
prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None)
return _skill_weights_from_profile(prof.get("skills") or [])
def skill_profile_summary_from_exercise_ids(
cur,
exercise_ids: Sequence[int],
*,
limit_skills: int = 8,
) -> Dict[str, Any]:
"""Kompaktes Fähigkeitenprofil für LLM-Kontext und UI."""
ids = [int(x) for x in exercise_ids if int(x) > 0]
if not ids:
return {"exercise_count": 0, "skills": []}
occ = [ExerciseOccurrence(exercise_id=eid) for eid in ids]
prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None)
skills_out = prof.get("skills") or []
top = sorted(skills_out, key=lambda s: -float(s.get("weight") or s.get("score") or 0))[:limit_skills]
names = _load_skill_names(cur, [int(s["skill_id"]) for s in top if s.get("skill_id") is not None])
return {
"exercise_count": len(ids),
"skills": [
{
"skill_id": int(s["skill_id"]),
"name": names.get(int(s["skill_id"]), f"#{s['skill_id']}"),
"weight": round(float(s.get("weight") or s.get("score") or 0), 3),
}
for s in top
if s.get("skill_id") is not None
],
}
def build_planning_target_profile(
cur,
*,
unit: Dict[str, Any],
planned_exercise_ids: Sequence[int],
section_planned_exercise_ids: Optional[Sequence[int]] = None,
anchor_exercise_id: Optional[int],
intent: str,
) -> PlanningTargetProfile:
sources: List[str] = []
focus: Dict[int, float] = {}
style: Dict[int, float] = {}
tt: Dict[int, float] = {}
tg: Dict[int, float] = {}
skill_target: Dict[int, float] = {}
skill_plan: Dict[int, float] = {}
fw = _resolve_framework_for_unit(cur, unit)
if fw:
fid = int(fw["framework_program_id"])
f_focus, f_style, f_tt, f_tg = _framework_catalog_weights(cur, fid)
focus = _merge_weight_maps(focus, f_focus)
style = _merge_weight_maps(style, f_style)
tt = _merge_weight_maps(tt, f_tt)
tg = _merge_weight_maps(tg, f_tg)
sources.append("framework_catalog")
slot_id = fw.get("slot_id")
cur.execute(
"SELECT id FROM training_units WHERE framework_slot_id = %s LIMIT 1",
(int(slot_id),),
)
bp = cur.fetchone()
if bp and bp.get("id"):
slot_skills = _profile_from_unit_occurrences(cur, int(bp["id"]))
if slot_skills:
skill_target = _merge_weight_maps(skill_target, slot_skills, scale=1.0)
sources.append("framework_slot_skill_profile")
if not skill_target:
cur.execute(
"""
SELECT tu.id FROM training_framework_slots s
LEFT JOIN training_units tu ON tu.framework_slot_id = s.id
WHERE s.framework_program_id = %s AND tu.id IS NOT NULL
""",
(fid,),
)
all_occ: List[ExerciseOccurrence] = []
for r in cur.fetchall():
all_occ.extend(collect_unit_exercise_occurrences(cur, int(r["id"])))
if all_occ:
prof = profile_for_occurrences(cur, all_occ, reference_max_by_skill=None)
skill_target = _merge_weight_maps(
skill_target, _skill_weights_from_profile(prof.get("skills") or []), scale=0.85
)
sources.append("framework_overall_skill_profile")
if planned_exercise_ids:
occ = [ExerciseOccurrence(exercise_id=int(eid)) for eid in planned_exercise_ids]
prof = profile_for_occurrences(cur, occ, reference_max_by_skill=None)
skill_plan = _skill_weights_from_profile(prof.get("skills") or [])
if skill_plan:
sources.append("current_unit_plan")
section_ids = [int(x) for x in (section_planned_exercise_ids or []) if int(x) > 0]
if section_ids:
section_skills = _profile_from_exercise_ids(cur, section_ids)
if section_skills:
skill_target = _merge_weight_maps(skill_target, section_skills, scale=1.0)
sources.append("current_section_plan")
if anchor_exercise_id:
anchor_profiles = load_exercise_match_profiles_bulk(cur, [int(anchor_exercise_id)])
ap = anchor_profiles.get(int(anchor_exercise_id))
if ap:
if intent in ("deepen_exercise", "suggest_next", "progression_next", "continue_plan_goal"):
skill_target = _merge_weight_maps(skill_target, ap.skill_weights, scale=1.0)
focus = _merge_weight_maps(focus, ap.focus_area_ids, scale=0.9)
style = _merge_weight_maps(style, ap.style_direction_ids, scale=0.75)
tt = _merge_weight_maps(tt, ap.training_type_ids, scale=0.75)
tg = _merge_weight_maps(tg, ap.target_group_ids, scale=0.75)
sources.append("anchor_exercise")
skill_target = _normalize_weight_map(skill_target)
skill_plan_norm = _normalize_weight_map(skill_plan)
skill_gap: Dict[int, float] = {}
for sid, tw in skill_target.items():
pw = skill_plan_norm.get(sid, 0.0)
gap = tw - pw * 0.85
if gap > 0.08:
skill_gap[sid] = gap
if skill_gap:
sources.append("skill_gap_vs_plan")
return PlanningTargetProfile(
focus_area_ids=_normalize_weight_map(focus) if focus else focus,
style_direction_ids=_normalize_weight_map(style) if style else style,
training_type_ids=_normalize_weight_map(tt) if tt else tt,
target_group_ids=_normalize_weight_map(tg) if tg else tg,
skill_weights=skill_target,
skill_gap_weights=_normalize_weight_map(skill_gap) if skill_gap else skill_gap,
skill_plan_weights=skill_plan_norm,
sources=sources,
)
def score_exercise_against_target(
exercise: ExerciseMatchProfile,
target: PlanningTargetProfile,
*,
intent: str,
) -> Tuple[float, List[str]]:
"""Profil-Match 0..1 + deutschsprachige Gründe."""
reasons: List[str] = []
focus_sim = weighted_overlap(exercise.focus_area_ids, target.focus_area_ids)
style_sim = weighted_overlap(exercise.style_direction_ids, target.style_direction_ids)
tt_sim = weighted_overlap(exercise.training_type_ids, target.training_type_ids)
tg_sim = weighted_overlap(exercise.target_group_ids, target.target_group_ids)
skill_sim = weighted_overlap(
_normalize_weight_map(exercise.skill_weights),
target.skill_weights,
)
gap_sim = gap_coverage(target.skill_gap_weights, _normalize_weight_map(exercise.skill_weights))
if focus_sim >= 0.5 and target.focus_area_ids:
reasons.append("Fokusbereich passend zum Planungsziel")
if style_sim >= 0.5 and target.style_direction_ids:
reasons.append("Stilrichtung passend")
if tt_sim >= 0.5 and target.training_type_ids:
reasons.append("Trainingsstil passend")
if tg_sim >= 0.5 and target.target_group_ids:
reasons.append("Zielgruppe passend")
if skill_sim >= 0.35 and target.skill_weights:
reasons.append("Fähigkeiten-Schwerpunkt passend (Profilmetrik)")
if gap_sim >= 0.25 and target.skill_gap_weights:
reasons.append("Deckt Skill-Lücke im bisherigen Plan")
if "query_intent" in (target.sources or []):
reasons.append("Passt zur KI-interpretierten Suchanfrage")
# Intent-gewichtete Dimensionen (Summe = 1.0)
if intent == INTENT_FREE_SEARCH:
weights = {"focus": 0.15, "style": 0.10, "tt": 0.10, "tg": 0.10, "skill": 0.25, "gap": 0.30}
elif intent == INTENT_DEEPEN_EXERCISE:
weights = {"focus": 0.15, "style": 0.10, "tt": 0.10, "tg": 0.05, "skill": 0.45, "gap": 0.15}
elif intent == INTENT_PROGRESSION_NEXT:
weights = {"focus": 0.20, "style": 0.10, "tt": 0.10, "tg": 0.05, "skill": 0.35, "gap": 0.20}
else:
weights = {"focus": 0.20, "style": 0.10, "tt": 0.10, "tg": 0.10, "skill": 0.30, "gap": 0.20}
score = (
weights["focus"] * focus_sim
+ weights["style"] * style_sim
+ weights["tt"] * tt_sim
+ weights["tg"] * tg_sim
+ weights["skill"] * skill_sim
+ weights["gap"] * gap_sim
)
return max(0.0, min(1.0, score)), reasons
# Re-export intent constants for typing (avoid circular import at runtime in suggest module)
INTENT_FREE_SEARCH = "free_search"
INTENT_DEEPEN_EXERCISE = "deepen_exercise"
INTENT_PROGRESSION_NEXT = "progression_next"