""" Szenario-Routing und Erwartungsprofil-Pipeline für Planungs-Übungssuche (P1). Ablauf: 1. Heuristik: Intent + Szenario-Klasse aus Query/Kontext 2. Optional LLM (planning_exercise_search_intent) bei komplexen Anfragen 3. Deterministisches Basis-Profil (Rahmen, Plan, Anker) 4. Query-Overlay mergen → PlanningTargetProfile für Vorselektion """ from __future__ import annotations import re from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple from planning_exercise_expectation import try_build_planning_expectation_from_context from planning_exercise_intent import ( PlanningQueryIntentParsed, resolve_query_intent_catalog_ids, try_parse_planning_query_intent, ) from planning_exercise_profiles import ( PlanningTargetProfile, _merge_weight_maps, _normalize_weight_map, build_planning_target_profile, ) SCENARIO_PRESET_NEXT = "preset_next" SCENARIO_PROGRESSION = "progression" SCENARIO_DEEPEN = "deepen" SCENARIO_CONTINUE_PLAN = "continue_plan" SCENARIO_ADDITIVE = "additive_constraint" SCENARIO_FREE_SEARCH = "free_search" _SIMPLE_PRESET_PATTERNS = ( r"^(schlage?\s+(mir\s+)?(die\s+)?(n[aä]chste|naechste)\s+(sinnvolle\s+)?(übung|uebung)\s*(vor)?\.?)$", r"^(n[aä]chste|naechste)\s+(übung|uebung)\s*(vorschlag|vorschlagen|empfehl\w*)?\.?$", r"^(vorschlag|vorschlagen|empfehl\w*)\s*(für|fuer)?\s*(die\s+)?(n[aä]chste|naechste)?\s*(übung|uebung)?\.?$", r"^n[aä]chste\s+übung$", r"^n[aä]chste\s+uebung$", r"^(n[aä]chste|naechste)\s+(übung|uebung)\s+planen\.?$", ) _ADDITIVE_MARKERS = ( "zusätzlich", "zusaetzlich", "auch ", " außerdem", " ausserdem", " dazu", " extra", " mehr ", " und dabei", " sowie ", ) def _normalize_query(q: Optional[str]) -> str: return re.sub(r"\s+", " ", (q or "").strip()) def is_simple_preset_query(query: Optional[str]) -> bool: q = _normalize_query(query).lower() if not q: return True for pat in _SIMPLE_PRESET_PATTERNS: if re.match(pat, q, flags=re.IGNORECASE): return True return False def classify_planning_scenario( query: Optional[str], heuristic_intent: str, ) -> str: q = _normalize_query(query).lower() if not q or is_simple_preset_query(q): return SCENARIO_PRESET_NEXT if heuristic_intent == "progression_next": return SCENARIO_PROGRESSION if heuristic_intent == "deepen_exercise": return SCENARIO_DEEPEN if any(m in f" {q} " for m in _ADDITIVE_MARKERS): return SCENARIO_ADDITIVE if heuristic_intent == "continue_plan_goal": return SCENARIO_CONTINUE_PLAN if heuristic_intent == "free_search": return SCENARIO_FREE_SEARCH if heuristic_intent == "suggest_next": return SCENARIO_CONTINUE_PLAN return SCENARIO_FREE_SEARCH def should_run_llm_expectation_pipeline( scenario: str, *, include_llm_intent: bool, has_planning_reference: bool, ) -> bool: """Preset/leere Anfrage mit Planungsbezug → LLM-Erwartungsprofil statt Query-Intent.""" if not include_llm_intent: return False if not has_planning_reference: return False return scenario == SCENARIO_PRESET_NEXT def should_run_llm_intent_pipeline( query: Optional[str], scenario: str, *, include_llm_intent: bool, ) -> bool: if not include_llm_intent: return False if scenario == SCENARIO_PRESET_NEXT: return False q = _normalize_query(query) if not q: return False # Kurze Stichwortsuche: Volltext + Profil reichen — kein Intent-LLM if scenario == SCENARIO_FREE_SEARCH and len(q) < 14: return False if scenario in (SCENARIO_CONTINUE_PLAN, SCENARIO_PROGRESSION) and len(q) < 18: return False return True def deterministic_rank_confident(hits: Sequence[Mapping[str, Any]], *, gap_threshold: float = 0.12) -> bool: """True wenn Hybrid-Ranking schon klar genug ist — LLM-Rerank sparen.""" if len(hits) < 4: return True top = float(hits[0].get("score") or 0.0) fourth = float(hits[3].get("score") or 0.0) return (top - fourth) >= gap_threshold def hybrid_ranking_ambiguous( hits: Sequence[Mapping[str, Any]], *, top_four_gap: float = 0.08, top_ten_gap: float = 0.055, ) -> bool: """True wenn Top-Kandidaten scores zu nah beieinander liegen — Rerank lohnt sich.""" if len(hits) < 3: return False top = float(hits[0].get("score") or 0.0) if len(hits) >= 4: fourth = float(hits[3].get("score") or 0.0) if (top - fourth) < top_four_gap: return True if len(hits) >= 10: tenth = float(hits[9].get("score") or 0.0) if (top - tenth) < top_ten_gap: return True elif len(hits) >= 2: tail = float(hits[min(len(hits) - 1, 9)].get("score") or 0.0) if (top - tail) < top_four_gap: return True return False def should_run_llm_rank_pipeline( query: Optional[str], scenario: str, *, include_llm_rank: bool, query_intent_applied: bool, llm_expectation_applied: bool = False, has_planning_reference: bool = True, hits: Sequence[Mapping[str, Any]], ) -> bool: """ Phase B2: Rerank bei unklarem Hybrid-Ranking — auch nach Erwartungs-/Intent-LLM. Budget: max. 2 LLM-Calls pro Suche (Profil-LLM + optional Rerank). """ if not include_llm_rank: return False if len(hits) < 3: return False if not hybrid_ranking_ambiguous(hits): return False q = _normalize_query(query) profile_llm = query_intent_applied or llm_expectation_applied if scenario == SCENARIO_PRESET_NEXT: return has_planning_reference if scenario == SCENARIO_FREE_SEARCH: if len(q) < 10 and not profile_llm: return False return True if scenario == SCENARIO_ADDITIVE: return len(q) >= 8 or profile_llm if profile_llm: return True return len(q) >= 14 def _recalculate_skill_gap(target: PlanningTargetProfile) -> PlanningTargetProfile: skill_target = _normalize_weight_map(dict(target.skill_weights)) skill_plan_norm = _normalize_weight_map(dict(target.skill_plan_weights)) skill_gap: Dict[int, float] = {} for sid, tw in skill_target.items(): pw = skill_plan_norm.get(sid, 0.0) gap = tw - pw * 0.85 if gap > 0.08: skill_gap[sid] = gap sources = list(target.sources) if skill_gap and "skill_gap_vs_plan" not in sources: sources.append("skill_gap_vs_plan") elif not skill_gap: sources = [s for s in sources if s != "skill_gap_vs_plan"] return PlanningTargetProfile( focus_area_ids=target.focus_area_ids, style_direction_ids=target.style_direction_ids, training_type_ids=target.training_type_ids, target_group_ids=target.target_group_ids, skill_weights=skill_target, skill_gap_weights=_normalize_weight_map(skill_gap) if skill_gap else {}, skill_plan_weights=target.skill_plan_weights, sources=sources, ) def merge_query_overlay_into_target( base: PlanningTargetProfile, *, focus: Dict[int, float], style: Dict[int, float], tt: Dict[int, float], tg: Dict[int, float], skills: Dict[int, float], emphasis: str = "additive", scenario: str, ) -> PlanningTargetProfile: sources = list(base.sources) if "query_intent" not in sources: sources.append("query_intent") if emphasis == "replace" or scenario == SCENARIO_FREE_SEARCH: skill_w = _merge_weight_maps({}, skills, scale=1.0) if skills: skill_w = _normalize_weight_map(_merge_weight_maps(base.skill_weights, skills, scale=0.55)) if emphasis == "replace": skill_w = _normalize_weight_map(skills) focus_w = _merge_weight_maps(base.focus_area_ids, focus, scale=0.5 if emphasis == "replace" else 0.85) style_w = _merge_weight_maps(base.style_direction_ids, style, scale=0.5) tt_w = _merge_weight_maps(base.training_type_ids, tt, scale=0.5) tg_w = _merge_weight_maps(base.target_group_ids, tg, scale=0.5) else: skill_scale = 1.0 if scenario == SCENARIO_ADDITIVE else 0.85 skill_w = _merge_weight_maps(base.skill_weights, skills, scale=skill_scale) focus_w = _merge_weight_maps(base.focus_area_ids, focus, scale=0.9) style_w = _merge_weight_maps(base.style_direction_ids, style, scale=0.75) tt_w = _merge_weight_maps(base.training_type_ids, tt, scale=0.75) tg_w = _merge_weight_maps(base.target_group_ids, tg, scale=0.75) out = PlanningTargetProfile( focus_area_ids=_normalize_weight_map(focus_w) if focus_w else focus_w, style_direction_ids=_normalize_weight_map(style_w) if style_w else style_w, training_type_ids=_normalize_weight_map(tt_w) if tt_w else tt_w, target_group_ids=_normalize_weight_map(tg_w) if tg_w else tg_w, skill_weights=_normalize_weight_map(skill_w) if skill_w else skill_w, skill_gap_weights=dict(base.skill_gap_weights), skill_plan_weights=dict(base.skill_plan_weights), sources=sources, ) return _recalculate_skill_gap(out) def build_planning_target_with_query_pipeline( cur, *, unit: Dict[str, Any], planned_exercise_ids: List[int], section_planned_exercise_ids: Optional[List[int]] = None, anchor_exercise_id: Optional[int], query: Optional[str], heuristic_intent: str, include_llm_intent: bool, context_summary: Mapping[str, Any], has_planning_reference: bool = True, ) -> Tuple[PlanningTargetProfile, str, str, Dict[str, Any]]: """ Returns: target_profile, resolved_intent, scenario_kind, query_intent_summary dict Ohne Planungsbezug (keine Übungen/Anker/Rahmen): Erwartungsprofil primär aus Suchtext (query_only). Mit Planungsbezug: hybrid aus Plan + optional Query-Overlay. """ scenario = classify_planning_scenario(query, heuristic_intent) resolved_intent = heuristic_intent llm_applied = False llm_expectation_applied = False parsed: Optional[PlanningQueryIntentParsed] = None expectation_parsed: Optional[PlanningQueryIntentParsed] = None resolved_skills: List[Dict[str, Any]] = [] if has_planning_reference: base = build_planning_target_profile( cur, unit=unit, planned_exercise_ids=planned_exercise_ids, section_planned_exercise_ids=section_planned_exercise_ids or [], anchor_exercise_id=anchor_exercise_id, intent=heuristic_intent, section_guidance_notes=(context_summary.get("section_guidance_notes") or None), section_title=(context_summary.get("section_title") or None), ) else: base = PlanningTargetProfile(sources=["query_only"]) base_summary = base.to_summary_dict(cur) target = base if should_run_llm_expectation_pipeline( scenario, include_llm_intent=include_llm_intent, has_planning_reference=has_planning_reference, ): expectation_parsed, llm_expectation_applied = try_build_planning_expectation_from_context( cur, heuristic_intent=heuristic_intent, context_summary=context_summary, target_profile_summary=base_summary, ) parsed = expectation_parsed if parsed and llm_expectation_applied: if parsed.intent in { "suggest_next", "progression_next", "deepen_exercise", "continue_plan_goal", "free_search", }: resolved_intent = parsed.intent focus, style, tt, tg, skills, resolved_skills = resolve_query_intent_catalog_ids(cur, parsed) if focus or style or tt or tg or skills or parsed.rationale: target = merge_query_overlay_into_target( base, focus=focus, style=style, tt=tt, tg=tg, skills=skills, emphasis=parsed.emphasis or "additive", scenario=SCENARIO_PRESET_NEXT, ) if "context_expectation" not in target.sources: target.sources.append("context_expectation") elif should_run_llm_intent_pipeline(query, scenario, include_llm_intent=include_llm_intent): parsed, llm_applied = try_parse_planning_query_intent( cur, query=_normalize_query(query), heuristic_intent=heuristic_intent, scenario_hint=scenario, context_summary=context_summary, target_profile_summary=base_summary, ) if parsed and llm_applied and not llm_expectation_applied: if parsed.intent in { "suggest_next", "progression_next", "deepen_exercise", "continue_plan_goal", "free_search", }: resolved_intent = parsed.intent if parsed.scenario in VALID_SCENARIOS_SET: scenario = parsed.scenario focus, style, tt, tg, skills, resolved_skills = resolve_query_intent_catalog_ids(cur, parsed) if focus or style or tt or tg or skills: overlay_scenario = scenario overlay_emphasis = parsed.emphasis if not has_planning_reference: overlay_scenario = SCENARIO_FREE_SEARCH overlay_emphasis = "replace" target = merge_query_overlay_into_target( base, focus=focus, style=style, tt=tt, tg=tg, skills=skills, emphasis=overlay_emphasis, scenario=overlay_scenario, ) elif not has_planning_reference and _normalize_query(query): # Kein LLM, aber Freitext: leichtes Profil bleibt leer — Retrieval nutzt Volltext target = PlanningTargetProfile(sources=["query_only"]) query_intent_summary: Dict[str, Any] = { "scenario": scenario, "intent": resolved_intent, "heuristic_intent": heuristic_intent, "llm_applied": llm_applied, "llm_expectation_applied": llm_expectation_applied, "profile_llm_applied": llm_applied or llm_expectation_applied, "emphasis": parsed.emphasis if parsed else None, "rationale": (parsed.rationale if parsed else None), "skill_hints_resolved": resolved_skills, "requires_partner": parsed.requires_partner if parsed else None, "expectation_mode": "planning_hybrid" if has_planning_reference else "query_only", } return target, resolved_intent, scenario, query_intent_summary VALID_SCENARIOS_SET = { SCENARIO_PRESET_NEXT, SCENARIO_PROGRESSION, SCENARIO_DEEPEN, SCENARIO_CONTINUE_PLAN, SCENARIO_ADDITIVE, SCENARIO_FREE_SEARCH, } def compose_retrieval_phase( *, full_library: bool = False, profile_preselect: bool = False, text_signals: bool = False, query_intent: bool = False, llm_expectation: bool = False, llm_rank: bool = False, semantics: bool = False, ) -> str: parts = ["profile_v1"] if full_library or profile_preselect: parts.append("full_library") if text_signals: parts.append("text_signals") if semantics: parts.append("semantics") if llm_expectation: parts.append("llm_expectation") elif query_intent: parts.append("query_intent") if llm_rank: parts.append("llm_rank") return "+".join(parts) __all__ = [ "SCENARIO_ADDITIVE", "SCENARIO_PRESET_NEXT", "build_planning_target_with_query_pipeline", "classify_planning_scenario", "compose_retrieval_phase", "is_simple_preset_query", "merge_query_overlay_into_target", "should_run_llm_expectation_pipeline", "should_run_llm_intent_pipeline", "should_run_llm_rank_pipeline", "deterministic_rank_confident", "hybrid_ranking_ambiguous", ]