""" Planungs-KI Phase C3/E/F: Pfad-Vorschläge für Progressionsgraphen. Legacy: retrieval-first. Phase F: optional Roadmap-Preview (A→B→C) parallel — siehe planning_progression_roadmap.py und PLANNING_PROGRESSION_ROADMAP_SPEC.md. """ from __future__ import annotations from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple from fastapi import HTTPException from pydantic import BaseModel, Field from tenant_context import ( TenantContext, library_content_visibility_for_progression_graph_sql, library_content_visibility_sql, ) from planning_exercise_profiles import PlanningTargetProfile from planning_path_qa_pipeline import run_multistage_path_qa from planning_path_rematch import ( collect_rematch_slot_indices, prune_stripped_after_rematch, rematch_roadmap_slots, ) from planning_path_refine_stage import apply_stage_spec_refinements, collect_refine_stage_targets from planning_stage_context import build_contextualized_stage_goal, resolve_path_start_target from planning_exercise_path_qa import ( apply_llm_path_reorder, build_path_qa_summary, detect_off_topic_steps, detect_path_gaps, insert_bridge_exercises, parse_llm_suggested_new_exercises, strip_off_topic_steps_from_path, try_llm_qa_progression_path, ) from planning_exercise_path_ai_fill import ( apply_gap_fill_after_qa, build_gap_fill_offer, collect_gap_fill_specs, ) from planning_exercise_retrieval import run_multistage_planning_retrieval from planning_exercise_semantics import ( PlanningSemanticBrief, apply_path_retrieval_weights, apply_stage_match_retrieval_weights, brief_to_summary_dict, build_semantic_brief, build_stage_match_brief, enrich_brief_with_path_constraints, enrich_target_with_semantic_expectations, resolve_path_anti_patterns, resolve_path_primary_topic, exercise_passes_path_semantic_gate, exercise_passes_stage_fit, exercise_title_matches_peer_stage_goal, pick_best_path_hit, resolve_semantic_skill_weights, step_phase_for_index, step_retrieval_query, try_enrich_semantic_brief_with_llm, ) from planning_exercise_target_pipeline import build_planning_target_with_query_pipeline from planning_exercise_progression import apply_progression_context_to_pack from planning_exercise_suggest import ( _enrich_planning_hits_with_variant_meta, _load_skill_ids_for_exercise, _normalize_query, resolve_planning_exercise_intent, ) from planning_exercise_form_context import build_progression_gap_snapshot from planning_skill_expectations import ( apply_expectations_to_target, build_planning_skill_expectations, expectation_input_from_progression_path, expectation_input_from_progression_stage, ) from planning_progression_roadmap import ( MajorStep, ProgressionRoadmapContext, RoadmapOverridePayload, RoadmapStructuredInput, StageSpecArtifact, build_roadmap_unfilled_gap_specs, progression_roadmap_to_api_dict, resolve_step_exercise_kind_filter, roadmap_context_from_override, run_progression_roadmap_pipeline, run_start_target_resolve_only, stage_spec_retrieval_query, ) from routers.training_planning import _has_planning_role class EvaluateStepPayload(BaseModel): exercise_id: Optional[int] = Field(default=None, ge=1) variant_id: Optional[int] = Field(default=None, ge=1) title: Optional[str] = Field(default=None, max_length=500) is_ai_proposal: bool = False ai_suggestion: Optional[Dict[str, Any]] = None proposal_key: Optional[str] = Field(default=None, max_length=120) roadmap_major_step_index: Optional[int] = Field(default=None, ge=0, le=20) roadmap_phase: Optional[str] = Field(default=None, max_length=80) roadmap_learning_goal: Optional[str] = Field(default=None, max_length=2000) class ProgressionPathSuggestRequest(BaseModel): query: str = Field(..., min_length=3, max_length=2000) max_steps: int = Field(default=5, ge=2, le=10) include_llm_intent: bool = True include_path_qa: bool = True auto_rematch_after_qa: bool = True auto_refine_stage_spec: bool = True max_rematch_rounds: int = Field(default=3, ge=0, le=4) include_llm_path_qa: bool = True include_path_reorder: bool = True include_ai_gap_fill: bool = True include_roadmap_preview: bool = False include_llm_roadmap: bool = True include_llm_start_target: bool = True roadmap_first: bool = False roadmap_only: bool = False start_target_only: bool = False evaluate_only: bool = False evaluate_steps: Optional[List[EvaluateStepPayload]] = None slot_assignments: Optional[List[EvaluateStepPayload]] = None preserve_slot_assignments: bool = False retrieval_boost_exercise_ids: Optional[List[int]] = None roadmap_override: Optional[RoadmapOverridePayload] = None start_situation: Optional[str] = Field(default=None, max_length=2000) target_state: Optional[str] = Field(default=None, max_length=2000) roadmap_notes: Optional[str] = Field(default=None, max_length=2000) progression_graph_id: Optional[int] = Field(default=None, ge=1) exercise_kind_any: Optional[List[str]] = None def _roadmap_gap_snapshot_for_spec( cur, roadmap_ctx: Optional[ProgressionRoadmapContext], spec: Mapping[str, Any], *, goal_query: str, semantic_brief: PlanningSemanticBrief, ) -> Dict[str, Any]: """Roadmap-Kontext für KI-Lücken-Übung (Start, Ziel, Stufenspec, Fähigkeiten).""" major_idx = spec.get("roadmap_major_step_index") stage_spec_dict: Optional[Dict[str, Any]] = None major_dict: Optional[Dict[str, Any]] = None if roadmap_ctx and major_idx is not None: for s in roadmap_ctx.stage_specs or []: if int(s.major_step_index) == int(major_idx): stage_spec_dict = s.model_dump() if roadmap_ctx.roadmap: for m in roadmap_ctx.roadmap.major_steps: if m.index == int(major_idx): stage_spec_dict["phase"] = m.phase major_dict = m.model_dump() break break ga = roadmap_ctx.goal_analysis.model_dump() if roadmap_ctx and roadmap_ctx.goal_analysis else None rs = ( roadmap_ctx.resolved_structured.model_dump() if roadmap_ctx and roadmap_ctx.resolved_structured else None ) brief_summary = ( roadmap_ctx.semantic_brief if roadmap_ctx and roadmap_ctx.semantic_brief else brief_to_summary_dict(semantic_brief) ) snap = build_progression_gap_snapshot( goal_analysis=ga, resolved_structured=rs, stage_spec=stage_spec_dict, semantic_brief=brief_summary, ) inp = expectation_input_from_progression_stage( goal_query=goal_query, goal_analysis=ga, resolved_structured=rs, stage_spec=stage_spec_dict, semantic_brief_summary=brief_summary, major_step=major_dict, ) exp = build_planning_skill_expectations(cur, inp, semantic_brief=semantic_brief) if exp.items: snap["expected_skills"] = exp.to_api_dict()["expected_skills"] snap["skill_expectation_sources"] = exp.sources return snap def _roadmap_structured_from_body(body: ProgressionPathSuggestRequest) -> Optional[RoadmapStructuredInput]: start = (body.start_situation or "").strip() or None target = (body.target_state or "").strip() or None notes = (body.roadmap_notes or "").strip() or None if not any([start, target, notes]): return None return RoadmapStructuredInput( start_situation=start, target_state=target, roadmap_notes=notes, ) def _peer_stage_learning_goals( roadmap_ctx: ProgressionRoadmapContext, *, current_major_index: int, ) -> List[str]: goals: List[str] = [] for spec in roadmap_ctx.stage_specs or []: if int(spec.major_step_index) == int(current_major_index): continue lg = (spec.learning_goal or "").strip() if lg and lg not in goals: goals.append(lg) return goals def _filter_learning_goal_candidate_ids( cur, *, tenant: TenantContext, progression_graph_id: Optional[int], candidate_ids: Sequence[int], stage_goal: str, stage_match_brief: PlanningSemanticBrief, stage_anti: Optional[List[str]], path_primary: str, path_tech_excludes: Optional[List[str]], peer_learning_goals: Sequence[str], ) -> List[int]: """Learning-Goal-Kandidaten nur, wenn sie Stufen-Gate und Peer-Check bestehen.""" if not candidate_ids: return [] vis_sql, vis_params = _planning_visibility_sql(cur, tenant, progression_graph_id) rows = _load_supplemental_exercise_rows( cur, tenant=tenant, progression_graph_id=progression_graph_id, exercise_ids=list(candidate_ids), vis_sql=vis_sql, vis_params=vis_params, ) out: List[int] = [] for row in rows: try: eid = int(row.get("id") or 0) except (TypeError, ValueError): continue if eid <= 0: continue title = str(row.get("title") or "") if peer_learning_goals and exercise_title_matches_peer_stage_goal( title, current_learning_goal=stage_goal, peer_learning_goals=peer_learning_goals, ): continue summary = str(row.get("summary") or "") goal_text = str(row.get("goal") or row.get("exercise_goal") or "") if exercise_passes_stage_fit( learning_goal=stage_goal, title=title, summary=summary, goal=goal_text, stage_brief=stage_match_brief, anti_patterns=stage_anti, path_primary_topic=path_primary or None, path_technique_excludes=path_tech_excludes, relaxed=False, ): out.append(eid) return out def _pick_best_path_hit( hits: List[Dict[str, Any]], used_exercise_ids: Set[int], *, semantic_brief: Optional[PlanningSemanticBrief] = None, stage_learning_goal: Optional[str] = None, stage_anti_patterns: Optional[List[str]] = None, roadmap_stage_match: bool = False, stage_match_brief: Optional[PlanningSemanticBrief] = None, path_primary_topic: Optional[str] = None, path_technique_excludes: Optional[List[str]] = None, peer_learning_goals: Optional[List[str]] = None, ) -> Optional[Dict[str, Any]]: return pick_best_path_hit( hits, used_exercise_ids, semantic_brief=semantic_brief, stage_learning_goal=stage_learning_goal, stage_anti_patterns=stage_anti_patterns, roadmap_stage_match=roadmap_stage_match, stage_match_brief=stage_match_brief, path_primary_topic=path_primary_topic, path_technique_excludes=path_technique_excludes, peer_learning_goals=peer_learning_goals, ) def _build_path_target_profile( cur, *, goal_query: str, semantic_brief: PlanningSemanticBrief, include_llm_intent: bool, ) -> Tuple[PlanningTargetProfile, Dict[str, Any], str]: """Einmaliges Erwartungsprofil für den gesamten Pfad (Query + Semantik + Skills).""" empty_unit = { "id": None, "framework_slot_id": None, "origin_framework_slot_id": None, } pipeline_context = { "unit_title": None, "group_name": None, "section_title": None, "section_guidance_notes": goal_query, "section_exercise_count": 0, "planned_count": 0, "anchor_title": None, "anchor_exercise_id": None, "last_section_exercise_title": None, "progression_graph_id": None, "unit_skill_profile": None, "section_skill_profile": None, "has_planning_reference": False, "expectation_mode": "query_only", } target, intent, _scenario, query_intent_summary = build_planning_target_with_query_pipeline( cur, unit=empty_unit, planned_exercise_ids=[], section_planned_exercise_ids=[], anchor_exercise_id=None, query=goal_query, heuristic_intent=resolve_planning_exercise_intent(goal_query, "free_search"), include_llm_intent=include_llm_intent, context_summary=pipeline_context, has_planning_reference=False, ) skill_weights = resolve_semantic_skill_weights(cur, semantic_brief) target = enrich_target_with_semantic_expectations(target, skill_weights=skill_weights) return target, query_intent_summary, intent def _graph_edge_exercise_ids(cur, graph_id: Optional[int]) -> List[int]: """Übungs-IDs aus gespeicherten Graph-Kanten (für Re-Match-Boost).""" if not graph_id or int(graph_id) < 1: return [] cur.execute( """ SELECT from_exercise_id AS eid FROM exercise_progression_edges WHERE graph_id = %s AND from_exercise_id IS NOT NULL UNION SELECT to_exercise_id AS eid FROM exercise_progression_edges WHERE graph_id = %s AND to_exercise_id IS NOT NULL """, (int(graph_id), int(graph_id)), ) out: List[int] = [] for row in cur.fetchall() or []: try: eid = int(row.get("eid") or 0) except (TypeError, ValueError): continue if eid > 0: out.append(eid) return out def _supplemental_exercise_ids_from_body( cur, body: ProgressionPathSuggestRequest, ) -> List[int]: """Kandidatenpool erweitern (Graph-Kanten, Boost, Slot-Zuordnungen).""" ids: List[int] = [] for raw in body.evaluate_steps or []: if raw.exercise_id is not None: try: eid = int(raw.exercise_id) except (TypeError, ValueError): continue if eid > 0: ids.append(eid) for raw in body.slot_assignments or []: if raw.exercise_id is not None: try: eid = int(raw.exercise_id) except (TypeError, ValueError): continue if eid > 0: ids.append(eid) for eid in body.retrieval_boost_exercise_ids or []: try: val = int(eid) except (TypeError, ValueError): continue if val > 0: ids.append(val) ids.extend(_graph_edge_exercise_ids(cur, body.progression_graph_id)) return list(dict.fromkeys(ids)) def _graph_visibility_context( cur, progression_graph_id: Optional[int], ) -> Tuple[str, Optional[int]]: if not progression_graph_id or int(progression_graph_id) < 1: return "private", None cur.execute( "SELECT visibility, club_id FROM exercise_progression_graphs WHERE id = %s", (int(progression_graph_id),), ) row = cur.fetchone() if not row: return "private", None g_club = row.get("club_id") return ( str(row.get("visibility") or "private"), int(g_club) if g_club is not None else None, ) def _safe_tsquery_fragment(text: str) -> str: import re cleaned = re.sub(r"[^\w\säöüßÄÖÜ]", " ", text or "", flags=re.UNICODE) words = [w for w in cleaned.split() if len(w) >= 2][:10] return " ".join(words) if words else (text or "")[:60].strip() def _fetch_learning_goal_library_candidate_ids( cur, *, tenant: TenantContext, progression_graph_id: Optional[int], learning_goal: str, limit: int = 24, ) -> List[int]: """Sichtbare Übungen mit exakt passendem Titel oder Volltext-Treffer (kein breites LIKE).""" lg = (learning_goal or "").strip() if len(lg) < 3: return [] vis_sql, vis_params = _planning_visibility_sql(cur, tenant, progression_graph_id) tsq = _safe_tsquery_fragment(lg) try: cur.execute( f""" SELECT e.id FROM exercises e WHERE ({vis_sql}) AND COALESCE(e.status, '') <> %s AND ( lower(trim(e.title)) = lower(trim(%s)) OR (%s <> '' AND e.search_vector @@ plainto_tsquery('german', %s)) ) ORDER BY CASE WHEN lower(trim(e.title)) = lower(trim(%s)) THEN 0 ELSE 1 END, CASE WHEN %s <> '' THEN ts_rank_cd(e.search_vector, plainto_tsquery('german', %s)) ELSE 0 END DESC, e.id ASC LIMIT %s """, [ *vis_params, "archived", lg, tsq, tsq, lg, tsq, tsq, int(limit), ], ) except Exception: cur.execute( f""" SELECT e.id FROM exercises e WHERE ({vis_sql}) AND COALESCE(e.status, '') <> %s AND lower(trim(e.title)) = lower(trim(%s)) ORDER BY e.id ASC LIMIT %s """, [*vis_params, "archived", lg, int(limit)], ) out: List[int] = [] for row in cur.fetchall() or []: try: eid = int(row.get("id") or 0) except (TypeError, ValueError): continue if eid > 0: out.append(eid) return out def _load_supplemental_exercise_rows( cur, *, tenant: TenantContext, progression_graph_id: Optional[int], exercise_ids: Optional[Sequence[int]], vis_sql: str, vis_params: Sequence[Any], ) -> List[Dict[str, Any]]: """Supplemental-Übungen mit Graph-Sichtbarkeit, Fallback Library-vis_sql.""" ids: List[int] = [] for raw in exercise_ids or []: if raw is None: continue try: eid = int(raw) except (TypeError, ValueError): continue if eid > 0: ids.append(eid) ids = list(dict.fromkeys(ids)) if not ids: return [] if progression_graph_id and int(progression_graph_id) > 0: from planning_exercise_retrieval import fetch_exercise_rows_by_ids_for_graph gvis, gclub = _graph_visibility_context(cur, progression_graph_id) graph_rows = fetch_exercise_rows_by_ids_for_graph( cur, ids, graph_visibility=gvis, graph_club_id=gclub, profile_id=tenant.profile_id, role=tenant.global_role, exercise_allowed_fn=_exercise_allowed_in_progression_graph, ) if graph_rows: return graph_rows from planning_exercise_retrieval import fetch_exercise_rows_by_ids return fetch_exercise_rows_by_ids( cur, ids, vis_sql=vis_sql, vis_params=vis_params, ) def _planning_visibility_sql( cur, tenant: TenantContext, progression_graph_id: Optional[int], ) -> Tuple[str, List[Any]]: if progression_graph_id and int(progression_graph_id) > 0: cur.execute( "SELECT visibility, club_id FROM exercise_progression_graphs WHERE id = %s", (int(progression_graph_id),), ) grow = cur.fetchone() if grow: g_club = grow.get("club_id") return library_content_visibility_for_progression_graph_sql( alias="e", profile_id=tenant.profile_id, role=tenant.global_role, effective_club_id=tenant.effective_club_id, graph_visibility=str(grow.get("visibility") or "private"), graph_club_id=int(g_club) if g_club is not None else None, ) return library_content_visibility_sql( alias="e", profile_id=tenant.profile_id, role=tenant.global_role, effective_club_id=tenant.effective_club_id, ) def _exercise_allowed_in_progression_graph( exercise_row: Mapping[str, Any], *, graph_visibility: str, graph_club_id: Optional[int], profile_id: int, role: str, ) -> bool: """Prüft, ob eine Übung zur Sichtbarkeit des Progressionsgraphen passt.""" from club_tenancy import is_platform_admin ex_vis = (exercise_row.get("visibility") or "private").strip().lower() gvis = (graph_visibility or "private").strip().lower() if gvis == "private": if ex_vis == "official": return True if ex_vis == "club": return True if ex_vis == "private": if is_platform_admin(role): return True try: return int(exercise_row.get("created_by") or 0) == int(profile_id) except (TypeError, ValueError): return False return False if gvis == "club": if ex_vis == "official": return True if ex_vis != "club": return False ex_club = exercise_row.get("club_id") if ex_club is None: return False if graph_club_id is None: return True return int(ex_club) == int(graph_club_id) return ex_vis == "official" def _slot_assignments_by_major_index( assignments: Optional[List[EvaluateStepPayload]], ) -> Dict[int, EvaluateStepPayload]: out: Dict[int, EvaluateStepPayload] = {} for raw in assignments or []: if raw.exercise_id is None or raw.roadmap_major_step_index is None: continue out[int(raw.roadmap_major_step_index)] = raw return out def _path_step_from_slot_assignment( cur, *, assignment: EvaluateStepPayload, stage_spec: StageSpecArtifact, major_step: Optional[MajorStep], tenant: Optional[TenantContext] = None, progression_graph_id: Optional[int] = None, ) -> Optional[Dict[str, Any]]: """Bestehende Slot-Zuordnung aus dem Graph-Editor (nach KI-Anlage) übernehmen.""" eid = int(assignment.exercise_id) cur.execute( "SELECT id, title, summary, visibility, club_id, created_by FROM exercises WHERE id = %s", (eid,), ) row = cur.fetchone() if not row: return None if tenant and progression_graph_id: cur.execute( "SELECT visibility, club_id FROM exercise_progression_graphs WHERE id = %s", (int(progression_graph_id),), ) grow = cur.fetchone() if grow and not _exercise_allowed_in_progression_graph( row, graph_visibility=str(grow.get("visibility") or "private"), graph_club_id=int(grow["club_id"]) if grow.get("club_id") is not None else None, profile_id=tenant.profile_id, role=tenant.global_role, ): return None title = (assignment.title or row.get("title") or "").strip() or str(row.get("title") or "") step = { "exercise_id": eid, "variant_id": assignment.variant_id, "title": title, "summary": row.get("summary"), "score": None, "semantic_score": None, "reasons": ["Bestehende Slot-Zuordnung (Graph-Editor)"], "variants": [], "slot_assignment": True, } return _annotate_roadmap_step( step, stage_spec=stage_spec, major_step=major_step, ) def _hit_to_path_step(hit: Dict[str, Any], *, is_bridge: bool = False) -> Dict[str, Any]: raw_vid = hit.get("suggested_variant_id") variant_id: Optional[int] = None if raw_vid is not None: try: vid = int(raw_vid) if vid > 0: variant_id = vid except (TypeError, ValueError): variant_id = None step = { "exercise_id": int(hit["id"]), "variant_id": variant_id, "title": hit.get("title"), "summary": hit.get("summary"), "score": hit.get("score"), "semantic_score": hit.get("semantic_score"), "reasons": list(hit.get("reasons") or []), "variants": hit.get("variants") or [], "suggested_variant_id": hit.get("suggested_variant_id"), "suggested_variant_name": hit.get("suggested_variant_name"), } if is_bridge: step["is_bridge"] = True return step def _run_path_step_retrieval( cur, *, tenant: TenantContext, goal_query: str, step_index: int, max_steps: int, planned_ids: List[int], anchor_id: Optional[int], anchor_variant_id: Optional[int], progression_graph_id: Optional[int], include_llm_intent: bool, exercise_kind_any: Optional[List[str]], semantic_brief: PlanningSemanticBrief, bridge_mode: bool = False, step_a: Optional[Dict[str, Any]] = None, step_b: Optional[Dict[str, Any]] = None, path_target_profile: Optional[PlanningTargetProfile] = None, path_intent: Optional[str] = None, step_query_override: Optional[str] = None, step_phase_override: Optional[str] = None, step_target_profile_override: Optional[PlanningTargetProfile] = None, stage_learning_goal: Optional[str] = None, stage_anti_patterns: Optional[List[str]] = None, stage_match_brief: Optional[PlanningSemanticBrief] = None, stage_success_criteria: Optional[List[str]] = None, stage_load_profile: Optional[List[str]] = None, path_context_note: Optional[str] = None, path_primary_topic: Optional[str] = None, path_technique_excludes: Optional[List[str]] = None, supplemental_exercise_ids: Optional[List[int]] = None, priority_exercise_ids: Optional[List[int]] = None, ) -> Tuple[List[Dict[str, Any]], PlanningTargetProfile, Dict[str, Any], str]: step_query = step_query_override or step_retrieval_query( semantic_brief, goal_query, step_index, max_steps ) if bridge_mode and step_a and step_b: phase = step_phase_for_index(semantic_brief, step_index, max_steps) parts = [semantic_brief.primary_topic or semantic_brief.retrieval_query or goal_query] if phase: parts.append(phase) step_query = _normalize_query(" ".join(p for p in parts if p) + " brücke") pack: Dict[str, Any] = { "unit_id": None, "unit": { "id": None, "framework_slot_id": None, "origin_framework_slot_id": None, }, "unit_title": None, "group_id": None, "group_name": None, "section_order_index": None, "section_title": None, "section_guidance_notes": goal_query if step_index == 0 and not bridge_mode else step_query, "planned_exercise_ids": list(planned_ids), "anchor_exercise_id": anchor_id, "anchor_title": None, "anchor_skill_ids": sorted(_load_skill_ids_for_exercise(cur, anchor_id)), "group_recent_exercise_ids": [], "context_mode": "progression_path", "has_planning_reference": bool(planned_ids or anchor_id or bridge_mode), "semantic_brief": semantic_brief, "retrieval_query": step_query, "path_step_phase": step_phase_override or step_phase_for_index(semantic_brief, step_index, max_steps), "stage_learning_goal": (stage_learning_goal or "").strip() or None, "stage_anti_patterns": list(stage_anti_patterns or []), "roadmap_stage_match": bool((stage_learning_goal or "").strip()), "stage_match_brief": stage_match_brief, "stage_success_criteria": list(stage_success_criteria or []), "stage_load_profile": list(stage_load_profile or []), "path_context_note": (path_context_note or "").strip() or None, "path_primary_topic": (path_primary_topic or "").strip() or None, "path_technique_excludes": list(path_technique_excludes or []), } pack = apply_progression_context_to_pack( cur, tenant, pack, explicit_graph_id=progression_graph_id, anchor_variant_id=anchor_variant_id, ) if step_index == 0 and not bridge_mode: heuristic_intent = resolve_planning_exercise_intent(goal_query, "free_search") else: heuristic_intent = path_intent or resolve_planning_exercise_intent(goal_query, "free_search") has_plan_ref = bool(pack.get("has_planning_reference")) pipeline_context = { "unit_title": None, "group_name": None, "section_title": pack.get("section_title"), "section_guidance_notes": pack.get("section_guidance_notes"), "section_exercise_count": len(planned_ids), "planned_count": len(planned_ids), "anchor_title": pack.get("anchor_title"), "anchor_exercise_id": pack.get("anchor_exercise_id"), "last_section_exercise_title": None, "progression_graph_id": pack.get("progression_graph_id"), "unit_skill_profile": None, "section_skill_profile": None, "has_planning_reference": has_plan_ref, "expectation_mode": "query_only" if step_index == 0 and not planned_ids else "planning_hybrid", } if step_target_profile_override is not None: target_profile = step_target_profile_override intent = path_intent or resolve_planning_exercise_intent(goal_query, "free_search") query_intent_summary = {} elif path_target_profile is not None: target_profile = path_target_profile intent = path_intent or resolve_planning_exercise_intent(goal_query, "free_search") query_intent_summary = {} else: target_profile, intent, _scenario, query_intent_summary = build_planning_target_with_query_pipeline( cur, unit=pack["unit"], planned_exercise_ids=pack["planned_exercise_ids"], section_planned_exercise_ids=[], anchor_exercise_id=pack.get("anchor_exercise_id"), query=goal_query if step_index == 0 and not bridge_mode else step_query, heuristic_intent=heuristic_intent, include_llm_intent=include_llm_intent and step_index == 0 and not bridge_mode, context_summary=pipeline_context, has_planning_reference=has_plan_ref, ) if pack.get("roadmap_stage_match"): weights = apply_stage_match_retrieval_weights(semantic_brief) else: weights = apply_path_retrieval_weights(semantic_brief) vis_sql, vis_params = _planning_visibility_sql( cur, tenant, progression_graph_id, ) supplemental_rows = _load_supplemental_exercise_rows( cur, tenant=tenant, progression_graph_id=progression_graph_id, exercise_ids=supplemental_exercise_ids, vis_sql=vis_sql, vis_params=vis_params, ) hits, _skills_by_ex, _full_lib = run_multistage_planning_retrieval( cur, vis_sql=vis_sql, vis_params=vis_params, query=step_query, exercise_kind_any=exercise_kind_any, target=target_profile, intent=intent, intent_weights=weights, pack=pack, supplemental_rows_preloaded=supplemental_rows, ) from planning_exercise_retrieval import trim_hits_preserving_priority_ids priority_ids = list( dict.fromkeys( int(x) for x in (priority_exercise_ids or supplemental_exercise_ids or []) if int(x) > 0 ) ) hits = trim_hits_preserving_priority_ids(hits, priority_ids, limit=48) hits = _enrich_planning_hits_with_variant_meta(cur, hits) return hits, target_profile, query_intent_summary, intent def _make_bridge_search_fn( cur, *, tenant: TenantContext, goal_query: str, max_steps: int, progression_graph_id: Optional[int], include_llm_intent: bool, exercise_kind_any: Optional[List[str]], semantic_brief: PlanningSemanticBrief, planned_ids: List[int], path_target_profile: PlanningTargetProfile, path_intent: str, supplemental_exercise_ids: Optional[List[int]] = None, ) -> Callable[..., List[Dict[str, Any]]]: def _bridge_search( step_a: Dict[str, Any], step_b: Dict[str, Any], _gap: Dict[str, Any], ) -> List[Dict[str, Any]]: hits, _, _, _ = _run_path_step_retrieval( cur, tenant=tenant, goal_query=goal_query, step_index=1, max_steps=max_steps, planned_ids=list(planned_ids) + [int(step_a["exercise_id"])], anchor_id=int(step_a["exercise_id"]), anchor_variant_id=step_a.get("variant_id"), progression_graph_id=progression_graph_id, include_llm_intent=include_llm_intent, exercise_kind_any=exercise_kind_any, semantic_brief=semantic_brief, bridge_mode=True, step_a=step_a, step_b=step_b, path_target_profile=path_target_profile, supplemental_exercise_ids=supplemental_exercise_ids, path_intent=path_intent, ) gated = [ h for h in hits if exercise_passes_path_semantic_gate( semantic_score=float(h.get("semantic_score") or 0.0), title=str(h.get("title") or ""), summary=str(h.get("summary") or ""), brief=semantic_brief, strict=False, ) ] return gated or hits[:12] return _bridge_search def _annotate_roadmap_step( step: Dict[str, Any], *, stage_spec: StageSpecArtifact, major_step: Optional[MajorStep], skill_expectations: Optional[Dict[str, Any]] = None, anti_patterns_override: Optional[List[str]] = None, ) -> Dict[str, Any]: reasons = list(step.get("reasons") or []) learning_goal = (stage_spec.learning_goal or "").strip() if learning_goal: roadmap_reason = f"Roadmap: {learning_goal[:120]}" if roadmap_reason not in reasons: reasons.insert(0, roadmap_reason) if skill_expectations and skill_expectations.get("expected_skills"): names = [ str(s.get("skill_name") or "").strip() for s in skill_expectations["expected_skills"][:3] if str(s.get("skill_name") or "").strip() ] if names: skill_reason = f"Fähigkeiten: {', '.join(names)}" if skill_reason not in reasons: reasons.append(skill_reason) step["reasons"] = reasons[:4] step["roadmap_major_step_index"] = stage_spec.major_step_index step["roadmap_phase"] = major_step.phase if major_step else None step["roadmap_learning_goal"] = learning_goal or None anti = list(anti_patterns_override or stage_spec.anti_patterns or []) if anti: step["roadmap_anti_patterns"] = anti if (stage_spec.start_state or "").strip(): step["roadmap_start_state"] = stage_spec.start_state.strip() if (stage_spec.target_state or "").strip(): step["roadmap_target_state"] = stage_spec.target_state.strip() if stage_spec.success_criteria: step["success_criteria"] = list(stage_spec.success_criteria) step["stage_success_criteria"] = list(stage_spec.success_criteria) if not step.get("roadmap_match_source"): step["roadmap_match_source"] = "stage_spec" if step.get("exercise_id") is not None: step["slot_status"] = step.get("slot_status") or ( "preserved" if step.get("roadmap_match_source") == "slot_best_match" else "matched" ) else: step["slot_status"] = step.get("slot_status") or "unfilled" if skill_expectations: step["skill_expectations"] = skill_expectations return step def _stage_validation_context_for_spec( cur, *, body: ProgressionPathSuggestRequest, goal_query: str, semantic_brief: PlanningSemanticBrief, path_target_profile: PlanningTargetProfile, roadmap_ctx: ProgressionRoadmapContext, stage_spec: StageSpecArtifact, step_index: int, stage_count: int, major: Optional[MajorStep], ) -> Dict[str, Any]: """Gemeinsamer Kontext für Reconcile + Match eines Roadmap-Slots.""" ga_dump = ( roadmap_ctx.goal_analysis.model_dump() if roadmap_ctx.goal_analysis else None ) rs_dump = ( roadmap_ctx.resolved_structured.model_dump() if roadmap_ctx.resolved_structured else None ) path_start, path_target = resolve_path_start_target( structured=roadmap_ctx.resolved_structured, goal_analysis=roadmap_ctx.goal_analysis, ) stage_goal = (stage_spec.learning_goal or "").strip() stage_start = (stage_spec.start_state or "").strip() stage_target = (stage_spec.target_state or "").strip() contextual_goal = build_contextualized_stage_goal( learning_goal=stage_goal, start_state=stage_start, target_state=stage_target, path_target_state=path_target, path_start_state=path_start, stage_index=step_index, stage_count=stage_count, ) path_context_note = None if rs_dump: ctx_parts = [ str(rs_dump.get("start_situation") or "").strip()[:120], str(rs_dump.get("target_state") or "").strip()[:120], str(rs_dump.get("roadmap_notes") or "").strip()[:120], ] path_context_note = " ".join(p for p in ctx_parts if p)[:240] or None path_anti = resolve_path_anti_patterns( goal_query, semantic_brief=semantic_brief, extra_context=path_context_note, ) stage_anti = list(dict.fromkeys([*(stage_spec.anti_patterns or []), *path_anti])) path_primary = ( resolve_path_primary_topic( goal_query, semantic_brief, stage_learning_goal=stage_goal, extra_context=path_context_note, ) or "" ).strip() path_tech_excludes = list(semantic_brief.exclude_phrases or []) if path_primary: from planning_exercise_semantics import technique_sibling_excludes for item in technique_sibling_excludes(path_primary): if item not in path_tech_excludes: path_tech_excludes.append(item) stage_match_brief = build_stage_match_brief( learning_goal=stage_goal, anti_patterns=stage_anti, success_criteria=list(stage_spec.success_criteria or []), load_profile=list(stage_spec.load_profile or []), phase=major.phase if major else None, path_context_note=path_context_note, path_anti_patterns=path_anti, path_primary_topic=path_primary or None, path_technique_excludes=path_tech_excludes or None, stage_start_state=stage_start or None, stage_target_state=stage_target or None, path_target_state=path_target or None, contextualized_learning_goal=contextual_goal or None, ) return { "stage_goal": stage_goal, "stage_anti": stage_anti, "path_primary": path_primary, "path_tech_excludes": path_tech_excludes, "stage_match_brief": stage_match_brief, "path_context_note": path_context_note, "path_anti": path_anti, "path_start": path_start, "path_target": path_target, "ga_dump": ga_dump, "rs_dump": rs_dump, } def _match_roadmap_slot( cur, *, tenant: TenantContext, body: ProgressionPathSuggestRequest, goal_query: str, max_steps: int, semantic_brief: PlanningSemanticBrief, path_target_profile: PlanningTargetProfile, path_intent: str, roadmap_ctx: ProgressionRoadmapContext, stage_spec: StageSpecArtifact, step_index: int, stage_count: int, planned_ids: List[int], anchor_id: Optional[int], anchor_variant_id: Optional[int], used: Set[int], slot_priority_exercise_id: Optional[int] = None, ) -> Tuple[Optional[Dict[str, Any]], Optional[StageSpecArtifact]]: """Einzelnen Roadmap-Slot matchen (Initial-Build und Auto-Rematch).""" major_by_index: Dict[int, MajorStep] = {} if roadmap_ctx.roadmap: major_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps} major = major_by_index.get(stage_spec.major_step_index) ctx = _stage_validation_context_for_spec( cur, body=body, goal_query=goal_query, semantic_brief=semantic_brief, path_target_profile=path_target_profile, roadmap_ctx=roadmap_ctx, stage_spec=stage_spec, step_index=step_index, stage_count=stage_count, major=major, ) stage_goal = ctx["stage_goal"] stage_anti = ctx["stage_anti"] path_primary = ctx["path_primary"] path_tech_excludes = ctx["path_tech_excludes"] stage_match_brief = ctx["stage_match_brief"] path_context_note = ctx["path_context_note"] ga_dump = ctx["ga_dump"] rs_dump = ctx["rs_dump"] brief_summary = ( roadmap_ctx.semantic_brief if roadmap_ctx.semantic_brief else brief_to_summary_dict(semantic_brief) ) stage_spec_dict = stage_spec.model_dump() if major: stage_spec_dict["phase"] = major.phase stage_inp = expectation_input_from_progression_stage( goal_query=goal_query, goal_analysis=ga_dump, resolved_structured=rs_dump, stage_spec=stage_spec_dict, semantic_brief_summary=brief_summary, major_step=major.model_dump() if major else None, ) stage_exp = build_planning_skill_expectations(cur, stage_inp, semantic_brief=semantic_brief) step_target = apply_expectations_to_target(path_target_profile, stage_exp) skill_exp_api = stage_exp.to_api_dict() if stage_exp.items else None step_query = stage_spec_retrieval_query( semantic_brief=semantic_brief, goal_query=goal_query, stage_spec=stage_spec, major_step=major, ) step_kind = resolve_step_exercise_kind_filter(stage_spec, body.exercise_kind_any) peer_goals = _peer_stage_learning_goals( roadmap_ctx, current_major_index=int(stage_spec.major_step_index), ) supplemental_ids = _supplemental_exercise_ids_from_body(cur, body) lg_candidates_raw = _fetch_learning_goal_library_candidate_ids( cur, tenant=tenant, progression_graph_id=body.progression_graph_id, learning_goal=stage_goal, ) lg_candidates = _filter_learning_goal_candidate_ids( cur, tenant=tenant, progression_graph_id=body.progression_graph_id, candidate_ids=lg_candidates_raw, stage_goal=stage_goal, stage_match_brief=stage_match_brief, stage_anti=stage_anti, path_primary=path_primary, path_tech_excludes=path_tech_excludes, peer_learning_goals=peer_goals, ) supplemental_ids = list( dict.fromkeys( int(x) for x in [ *supplemental_ids, *lg_candidates, slot_priority_exercise_id, ] if x is not None and int(x) > 0 ) ) priority_ids = list( dict.fromkeys( int(x) for x in [ slot_priority_exercise_id, *(body.retrieval_boost_exercise_ids or []), *lg_candidates[:8], ] if x is not None and int(x) > 0 ) ) hits, _, _, _ = _run_path_step_retrieval( cur, tenant=tenant, goal_query=goal_query, step_index=step_index, max_steps=max_steps, planned_ids=planned_ids, anchor_id=anchor_id, anchor_variant_id=anchor_variant_id, progression_graph_id=body.progression_graph_id, include_llm_intent=body.include_llm_intent and step_index == 0, exercise_kind_any=step_kind, semantic_brief=stage_match_brief, path_target_profile=path_target_profile, path_intent=path_intent, step_query_override=step_query, step_phase_override=major.phase if major else None, step_target_profile_override=step_target, stage_learning_goal=stage_goal or None, stage_anti_patterns=stage_anti or None, stage_match_brief=stage_match_brief, stage_success_criteria=list(stage_spec.success_criteria or []), stage_load_profile=list(stage_spec.load_profile or []), path_context_note=path_context_note, path_primary_topic=path_primary or None, path_technique_excludes=path_tech_excludes or None, supplemental_exercise_ids=supplemental_ids, priority_exercise_ids=priority_ids, ) hit = _pick_best_path_hit( hits, used, semantic_brief=stage_match_brief, stage_learning_goal=stage_goal or None, stage_anti_patterns=stage_anti or None, roadmap_stage_match=True, stage_match_brief=stage_match_brief, path_primary_topic=path_primary or None, path_technique_excludes=path_tech_excludes or None, peer_learning_goals=peer_goals, ) if not hit: return None, stage_spec step = _annotate_roadmap_step( _hit_to_path_step(hit), stage_spec=stage_spec, major_step=major, skill_expectations=skill_exp_api, anti_patterns_override=stage_anti, ) if ( slot_priority_exercise_id is not None and int(step["exercise_id"]) == int(slot_priority_exercise_id) ): step["slot_status"] = "preserved" step["roadmap_match_source"] = "slot_best_match" step["reasons"] = ["Bester Treffer (bestehende Zuordnung)"] + list(step.get("reasons") or [])[:2] else: step["slot_status"] = "matched" step["roadmap_match_source"] = "stage_spec" if step.get("roadmap_match_source") != "slot_best_match" and not _roadmap_step_passes_post_match_gate( cur, step, goal_query=goal_query, semantic_brief=semantic_brief, ): return None, stage_spec return step, None def _roadmap_step_passes_post_match_gate( cur, step: Dict[str, Any], *, goal_query: str, semantic_brief: PlanningSemanticBrief, ) -> bool: """Abgleich mit Pfad-QA — kein Rematch-Treffer, der sofort wieder stage_mismatch wäre.""" if step.get("exercise_id") is None: return False issues = detect_off_topic_steps( cur, [step], brief=semantic_brief, goal_query=goal_query, ) return not issues def _normalize_roadmap_steps_coverage( steps: List[Dict[str, Any]], *, roadmap_ctx: ProgressionRoadmapContext, max_steps: int, ) -> List[Dict[str, Any]]: """Ein Eintrag pro Roadmap-Major-Step — fehlende Slots als leere Platzhalter.""" stage_specs = list(roadmap_ctx.stage_specs or [])[:max_steps] if not stage_specs: return steps major_by_index: Dict[int, MajorStep] = {} if roadmap_ctx.roadmap: major_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps} by_major: Dict[int, Dict[str, Any]] = {} for raw in steps: step = dict(raw) midx = step.get("roadmap_major_step_index") if midx is not None: by_major[int(midx)] = step out: List[Dict[str, Any]] = [] for spec in sorted(stage_specs, key=lambda s: s.major_step_index): midx = int(spec.major_step_index) if midx in by_major: out.append(by_major[midx]) continue major = major_by_index.get(midx) goal = (spec.learning_goal or "").strip() out.append( { "exercise_id": None, "variant_id": None, "title": goal or f"Slot {midx + 1}", "is_ai_proposal": False, "roadmap_major_step_index": midx, "roadmap_phase": major.phase if major else None, "roadmap_learning_goal": goal or None, "roadmap_match_source": "unfilled", "slot_status": "unfilled", "reasons": [], } ) return out def _purge_stage_mismatch_roadmap_slots( cur, *, steps: List[Dict[str, Any]], roadmap_ctx: ProgressionRoadmapContext, goal_query: str, semantic_brief: PlanningSemanticBrief, ) -> Tuple[List[Dict[str, Any]], List[Tuple[int, StageSpecArtifact]]]: """Leert Slots mit persistentem stage_mismatch — KI-Gap statt schlechter Bibliotheks-Übung.""" issues = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) purge_majors: Set[int] = set() for item in issues: if str(item.get("issue") or "") != "stage_mismatch": continue midx = item.get("roadmap_major_step_index") if midx is None: continue try: purge_majors.add(int(midx)) except (TypeError, ValueError): continue if not purge_majors: return steps, [] stage_specs = list(roadmap_ctx.stage_specs or []) spec_by_major = {int(s.major_step_index): s for s in stage_specs} major_by_index: Dict[int, MajorStep] = {} if roadmap_ctx.roadmap: major_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps} new_unfilled: List[Tuple[int, StageSpecArtifact]] = [] out: List[Dict[str, Any]] = [] for raw in steps: step = dict(raw) midx = step.get("roadmap_major_step_index") if midx is None or int(midx) not in purge_majors: out.append(step) continue major_idx = int(midx) spec = spec_by_major.get(major_idx) if spec is None: out.append(step) continue step_index = next( (i for i, sp in enumerate(stage_specs) if int(sp.major_step_index) == major_idx), major_idx, ) major = major_by_index.get(major_idx) goal = (spec.learning_goal or step.get("roadmap_learning_goal") or "").strip() out.append( { "exercise_id": None, "variant_id": None, "title": goal or f"Slot {major_idx + 1}", "is_ai_proposal": False, "roadmap_major_step_index": major_idx, "roadmap_phase": major.phase if major else step.get("roadmap_phase"), "roadmap_learning_goal": goal or None, "roadmap_match_source": "unfilled", "slot_status": "unfilled", "reasons": ["Keine passende Bibliotheks-Übung für Stufen-Lernziel"], } ) new_unfilled.append((step_index, spec)) return out, new_unfilled def _enrich_roadmap_unfilled_gap_offers( cur, *, steps: List[Dict[str, Any]], gap_fill_offers: List[Dict[str, Any]], body: ProgressionPathSuggestRequest, roadmap_ctx: ProgressionRoadmapContext, goal_query: str, semantic_brief: PlanningSemanticBrief, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """KI-Lücken-Angebote für alle leeren Roadmap-Slots (nach Rematch/Normalize).""" if not body.include_ai_gap_fill: return steps, gap_fill_offers seen_offer_ids = {o.get("offer_id") for o in gap_fill_offers if o.get("offer_id")} out_steps: List[Dict[str, Any]] = [] offers = list(gap_fill_offers) for raw in steps: step = dict(raw) if step.get("exercise_id") is not None: out_steps.append(step) continue try: major_idx = int(step["roadmap_major_step_index"]) except (TypeError, ValueError, KeyError): out_steps.append(step) continue if step.get("gap_offer") and step.get("proposal_key"): oid = step["gap_offer"].get("offer_id") if oid and oid not in seen_offer_ids: offers.append(dict(step["gap_offer"])) seen_offer_ids.add(oid) out_steps.append(step) continue stage_spec = next( ( s for s in (roadmap_ctx.stage_specs or []) if int(s.major_step_index) == major_idx ), None, ) learning_goal = ( (stage_spec.learning_goal if stage_spec else None) or step.get("roadmap_learning_goal") or step.get("title") or "" ).strip() spec = { "source": "roadmap_unfilled", "insert_after_index": max(major_idx - 1, -1), "roadmap_major_step_index": major_idx, "phase": (step.get("roadmap_phase") or "vertiefung").strip().lower(), "title_hint": (learning_goal or f"Slot {major_idx + 1}")[:120], "sketch": learning_goal, "rationale": ( f"Slot {major_idx + 1} — keine passende Bibliotheks-Übung; " "KI-Entwurf für diese Stufe." ), } offer = build_gap_fill_offer( spec=spec, steps=steps, goal_query=goal_query, brief=semantic_brief, proposal=None, roadmap_snapshot=_roadmap_gap_snapshot_for_spec( cur, roadmap_ctx, spec, goal_query=goal_query, semantic_brief=semantic_brief, ), ) step["gap_offer"] = offer step["proposal_key"] = offer.get("offer_id") step["slot_status"] = "unfilled" if offer.get("offer_id") and offer.get("offer_id") not in seen_offer_ids: offers.append(offer) seen_offer_ids.add(offer.get("offer_id")) out_steps.append(step) return out_steps, offers def _merge_rematch_unfilled( roadmap_unfilled: List[Tuple[int, StageSpecArtifact]], rematch_new_unfilled: List[Tuple[int, StageSpecArtifact]], ) -> List[Tuple[int, StageSpecArtifact]]: if not rematch_new_unfilled: return roadmap_unfilled remapped = {sp.major_step_index for _, sp in rematch_new_unfilled} kept = [item for item in roadmap_unfilled if item[1].major_step_index not in remapped] kept.extend(rematch_new_unfilled) return kept def _run_roadmap_rematch_loop( cur, *, tenant: TenantContext, body: ProgressionPathSuggestRequest, goal_query: str, max_steps: int, semantic_brief: PlanningSemanticBrief, path_target_profile: PlanningTargetProfile, path_intent: str, roadmap_ctx: ProgressionRoadmapContext, steps: List[Dict[str, Any]], stripped_off_topic: List[Dict[str, Any]], off_topic_before_strip: List[Dict[str, Any]], roadmap_unfilled: List[Tuple[int, StageSpecArtifact]], gaps: List[Dict[str, Any]], ) -> Tuple[ List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]], int, List[Tuple[int, StageSpecArtifact]], List[Dict[str, Any]], ]: """Phase A/B/C: Rematch-Schleife mit optionaler Stufen-Spec-Verfeinerung.""" rematch_log: List[Dict[str, Any]] = [] refine_log: List[Dict[str, Any]] = [] rematch_rounds = 0 max_rounds = int(body.max_rematch_rounds or 0) if not body.auto_rematch_after_qa or max_rounds <= 0 or not roadmap_ctx.stage_specs: off_topic_steps = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) return ( steps, rematch_log, stripped_off_topic, off_topic_steps, rematch_rounds, roadmap_unfilled, refine_log, ) current_stripped = list(stripped_off_topic or []) use_initial_off_topic = not current_stripped off_topic_steps: List[Dict[str, Any]] = [] rejected_by_major: Dict[int, Set[int]] = {} def _track_rejected(items: Sequence[Mapping[str, Any]]) -> None: for item in items or []: if not isinstance(item, dict): continue eid = item.get("exercise_id") midx = item.get("roadmap_major_step_index") if eid is None or midx is None: continue try: rejected_by_major.setdefault(int(midx), set()).add(int(eid)) except (TypeError, ValueError): continue _track_rejected(off_topic_before_strip) _track_rejected(current_stripped) slot_assignment_history: Dict[int, Set[int]] = {} for raw in steps: midx = raw.get("roadmap_major_step_index") eid = raw.get("exercise_id") if midx is None or eid is None: continue try: slot_assignment_history.setdefault(int(midx), set()).add(int(eid)) except (TypeError, ValueError): continue for round_idx in range(max_rounds): mini_qa = run_multistage_path_qa( off_topic_steps=off_topic_steps if round_idx > 0 else [], stripped_off_topic=current_stripped if round_idx == 0 else [], gaps=gaps if round_idx == 0 else [], llm_qa=None, llm_applied=False, roadmap_unfilled=roadmap_unfilled, ) optimization_hints = list(mini_qa.get("optimization_hints") or []) if body.auto_refine_stage_spec: _, round_refine = apply_stage_spec_refinements( roadmap_ctx, optimization_hints=optimization_hints, off_topic_steps=off_topic_steps if round_idx > 0 else off_topic_before_strip, goal_query=goal_query, semantic_brief=semantic_brief, ) if round_refine: for entry in round_refine: tagged = dict(entry) tagged["round"] = rematch_rounds + 1 refine_log.append(tagged) slot_indices, rematch_reasons = collect_rematch_slot_indices( stripped_off_topic=current_stripped if round_idx == 0 else [], off_topic_steps=off_topic_before_strip if round_idx == 0 and use_initial_off_topic else [], optimization_hints=optimization_hints, stage_specs=roadmap_ctx.stage_specs, roadmap_unfilled=roadmap_unfilled, ) if body.auto_refine_stage_spec: refine_targets = collect_refine_stage_targets( optimization_hints=optimization_hints, off_topic_steps=off_topic_steps if round_idx > 0 else off_topic_before_strip, stage_specs=roadmap_ctx.stage_specs, ) for midx in refine_targets: slot_indices.add(int(midx)) if int(midx) not in rematch_reasons: rematch_reasons[int(midx)] = "refine_stage_spec" if not slot_indices: break steps, round_log, rematch_new_unfilled = rematch_roadmap_slots( cur, tenant=tenant, body=body, goal_query=goal_query, max_steps=max_steps, semantic_brief=semantic_brief, path_target_profile=path_target_profile, path_intent=path_intent, roadmap_ctx=roadmap_ctx, steps=steps, slot_indices=slot_indices, rematch_reasons=rematch_reasons, match_slot_fn=_match_roadmap_slot, rejected_by_major=rejected_by_major, slot_assignment_history=slot_assignment_history, ) rematch_rounds += 1 for entry in round_log: tagged = dict(entry) tagged["round"] = rematch_rounds rematch_log.append(tagged) rid = entry.get("replaced_exercise_id") midx = entry.get("roadmap_major_step_index") if rid is not None and midx is not None: try: rejected_by_major.setdefault(int(midx), set()).add(int(rid)) except (TypeError, ValueError): pass new_eid = entry.get("new_exercise_id") if ( str(entry.get("action") or "") == "replaced" and new_eid is not None and midx is not None ): try: slot_assignment_history.setdefault(int(midx), set()).add(int(new_eid)) except (TypeError, ValueError): pass current_stripped = prune_stripped_after_rematch(current_stripped, round_log) roadmap_unfilled = _merge_rematch_unfilled(roadmap_unfilled, rematch_new_unfilled) use_initial_off_topic = False off_topic_steps = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) _track_rejected(off_topic_steps) if round_idx + 1 >= max_rounds: break if not off_topic_steps and not roadmap_unfilled: break if not off_topic_steps: off_topic_steps = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) steps, purged_unfilled = _purge_stage_mismatch_roadmap_slots( cur, steps=steps, roadmap_ctx=roadmap_ctx, goal_query=goal_query, semantic_brief=semantic_brief, ) if purged_unfilled: roadmap_unfilled = _merge_rematch_unfilled(roadmap_unfilled, purged_unfilled) off_topic_steps = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) return ( steps, rematch_log, current_stripped, off_topic_steps, rematch_rounds, roadmap_unfilled, refine_log, ) def _build_steps_roadmap_first( cur, *, tenant: TenantContext, body: ProgressionPathSuggestRequest, goal_query: str, max_steps: int, semantic_brief: PlanningSemanticBrief, path_target_profile: PlanningTargetProfile, path_intent: str, roadmap_ctx: ProgressionRoadmapContext, ) -> Tuple[List[Dict[str, Any]], List[Tuple[int, StageSpecArtifact]]]: """Retrieval pro stage_spec statt iterativem Pfad-Bau (Phase F3).""" stage_specs = list(roadmap_ctx.stage_specs or [])[:max_steps] if not stage_specs and roadmap_ctx.roadmap: stage_specs = [ StageSpecArtifact( major_step_index=m.index, learning_goal=m.learning_goal, ) for m in roadmap_ctx.roadmap.major_steps[:max_steps] ] used: Set[int] = set() steps: List[Dict[str, Any]] = [] planned_ids: List[int] = [] anchor_id: Optional[int] = None anchor_variant_id: Optional[int] = None unfilled: List[Tuple[int, StageSpecArtifact]] = [] stage_count = len(stage_specs) assignments = _slot_assignments_by_major_index(body.slot_assignments) majors_by_index: Dict[int, MajorStep] = {} if roadmap_ctx.roadmap: majors_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps} for step_index, stage_spec in enumerate(stage_specs): major_idx = stage_spec.major_step_index major = majors_by_index.get(major_idx) slot_priority_id: Optional[int] = None if major_idx in assignments: try: slot_priority_id = int(assignments[major_idx].exercise_id) except (TypeError, ValueError): slot_priority_id = None step, unfilled_spec = _match_roadmap_slot( cur, tenant=tenant, body=body, goal_query=goal_query, max_steps=max_steps, semantic_brief=semantic_brief, path_target_profile=path_target_profile, path_intent=path_intent, roadmap_ctx=roadmap_ctx, stage_spec=stage_spec, step_index=step_index, stage_count=stage_count, planned_ids=planned_ids, anchor_id=anchor_id, anchor_variant_id=anchor_variant_id, used=used, slot_priority_exercise_id=slot_priority_id, ) if not step: unfilled.append((step_index, unfilled_spec or stage_spec)) continue steps.append(step) eid = int(step["exercise_id"]) used.add(eid) planned_ids.append(eid) anchor_id = eid anchor_variant_id = step.get("variant_id") return steps, unfilled def _evaluate_steps_from_payload( cur, payloads: List[EvaluateStepPayload], ) -> List[Dict[str, Any]]: steps: List[Dict[str, Any]] = [] for raw in payloads: is_proposal = bool(raw.is_ai_proposal) or raw.exercise_id is None title = (raw.title or "").strip() or None if is_proposal: steps.append( { "exercise_id": None, "variant_id": None, "title": title or "KI-Vorschlag", "is_ai_proposal": True, "ai_suggestion": raw.ai_suggestion, "proposal_key": raw.proposal_key, "roadmap_major_step_index": raw.roadmap_major_step_index, "roadmap_phase": raw.roadmap_phase, "roadmap_learning_goal": raw.roadmap_learning_goal, "reasons": [], } ) continue eid = int(raw.exercise_id) cur.execute( "SELECT id, title, summary FROM exercises WHERE id = %s", (eid,), ) row = cur.fetchone() if not row: raise HTTPException(status_code=400, detail=f"Übung {eid} nicht gefunden") steps.append( { "exercise_id": eid, "variant_id": raw.variant_id, "title": title or row.get("title"), "summary": row.get("summary"), "is_ai_proposal": False, "roadmap_major_step_index": raw.roadmap_major_step_index, "roadmap_phase": raw.roadmap_phase, "roadmap_learning_goal": raw.roadmap_learning_goal, "reasons": [], } ) return steps def _build_evaluate_empty_slot_gap_specs( steps: List[Dict[str, Any]], *, goal_query: str, ) -> List[Dict[str, Any]]: """Gap-Angebote für leere Roadmap-Slots im evaluate_only-Modus.""" specs: List[Dict[str, Any]] = [] for step in steps: if step.get("exercise_id") is not None: continue major_idx = step.get("roadmap_major_step_index") if major_idx is None: continue try: roadmap_idx = int(major_idx) except (TypeError, ValueError): continue phase = (step.get("roadmap_phase") or "vertiefung").strip().lower() learning_goal = (step.get("roadmap_learning_goal") or step.get("title") or "").strip() title_hint = learning_goal[:120] if learning_goal else f"Slot {roadmap_idx + 1}" specs.append( { "source": "roadmap_unfilled", "insert_after_index": max(roadmap_idx - 1, -1), "gap": { "expected_phase": phase, "roadmap_major_step_index": roadmap_idx, "learning_goal": learning_goal, }, "phase": phase, "title_hint": title_hint, "sketch": learning_goal or title_hint, "rationale": ( f"Slot {roadmap_idx + 1} ohne Übung — KI-Entwurf für diese Roadmap-Stufe." ), "roadmap_major_step_index": roadmap_idx, } ) return specs[:8] def _run_evaluate_only_path_qa( cur, *, body: ProgressionPathSuggestRequest, goal_query: str, semantic_brief: PlanningSemanticBrief, steps: List[Dict[str, Any]], roadmap_ctx: Optional[ProgressionRoadmapContext], ) -> Dict[str, Any]: roadmap_first = roadmap_ctx is not None gaps: List[Dict[str, Any]] = [] bridge_inserts: List[Dict[str, Any]] = [] unfilled_gaps: List[Dict[str, Any]] = [] llm_qa: Optional[Dict[str, Any]] = None llm_qa_applied = False off_topic_steps: List[Dict[str, Any]] = [] stripped_off_topic: List[Dict[str, Any]] = [] ai_proposals: List[Dict[str, Any]] = [] gap_fill_offers: List[Dict[str, Any]] = [] roadmap_qa_mode: Optional[str] = None if body.include_path_qa: if roadmap_first: roadmap_qa_mode = "roadmap_first_lite" gaps = detect_path_gaps( cur, steps, brief=semantic_brief, roadmap_first=roadmap_first, ) if gaps and roadmap_first: unfilled_gaps = list(gaps) if body.include_llm_path_qa: llm_qa, llm_qa_applied = try_llm_qa_progression_path( cur, goal_query=goal_query, brief=semantic_brief, steps=steps, gaps=gaps, bridge_inserts=bridge_inserts, ) off_topic_steps = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) llm_gap_specs = parse_llm_suggested_new_exercises( llm_qa, brief=semantic_brief, step_count=len(steps), ) if body.include_ai_gap_fill: fresh_large_gaps = [g for g in gaps if g.get("is_large_gap")] gap_specs = collect_gap_fill_specs( steps=steps, unfilled_gaps=fresh_large_gaps or unfilled_gaps, off_topic_steps=off_topic_steps, llm_specs=llm_gap_specs, brief=semantic_brief, goal_query=goal_query, ) empty_slot_specs = _build_evaluate_empty_slot_gap_specs( steps, goal_query=goal_query, ) seen_spec_keys = { ( s.get("source"), s.get("roadmap_major_step_index"), s.get("insert_after_index"), ) for s in gap_specs } for spec in empty_slot_specs: key = ( spec.get("source"), spec.get("roadmap_major_step_index"), spec.get("insert_after_index"), ) if key not in seen_spec_keys: gap_specs.append(spec) seen_spec_keys.add(key) path_roadmap_snapshot = None if roadmap_ctx: path_roadmap_snapshot = build_progression_gap_snapshot( goal_analysis=( roadmap_ctx.goal_analysis.model_dump() if roadmap_ctx.goal_analysis else None ), resolved_structured=( roadmap_ctx.resolved_structured.model_dump() if roadmap_ctx.resolved_structured else None ), semantic_brief=roadmap_ctx.semantic_brief or brief_to_summary_dict(semantic_brief), ) _, ai_proposals, gap_fill_offers = apply_gap_fill_after_qa( cur, steps, gap_specs, goal_query=goal_query, brief=semantic_brief, include_ai_calls=False, max_ai_proposals=0, auto_insert_proposals=False, roadmap_snapshot=path_roadmap_snapshot, ) multistage_qa = run_multistage_path_qa( off_topic_steps=off_topic_steps, stripped_off_topic=stripped_off_topic, gaps=gaps, llm_qa=llm_qa, llm_applied=llm_qa_applied, ) path_qa = build_path_qa_summary( gaps=gaps, bridge_inserts=bridge_inserts, ai_proposals=ai_proposals, gap_fill_offers=gap_fill_offers, off_topic_steps=off_topic_steps, stripped_off_topic=stripped_off_topic, llm_qa=llm_qa, llm_applied=llm_qa_applied, reorder_applied=False, reorder_notes=[], roadmap_qa_mode=roadmap_qa_mode, multistage_qa=multistage_qa, ) return { "path_qa": path_qa, "gap_fill_offers": gap_fill_offers, "steps": steps, } def suggest_progression_path( cur, *, tenant: TenantContext, body: ProgressionPathSuggestRequest, ) -> Dict[str, Any]: role = tenant.global_role if not _has_planning_role(role): raise HTTPException(status_code=403, detail="Nur Trainer dürfen Pfad-Vorschläge abrufen") goal_query = _normalize_query(body.query) if len(goal_query) < 3: raise HTTPException(status_code=400, detail="Ziel-Anfrage: mindestens 3 Zeichen") max_steps = int(body.max_steps) semantic_brief = build_semantic_brief(goal_query) semantic_llm_applied = False if body.include_llm_intent and semantic_brief.semantic_strength >= 0.35: semantic_brief, semantic_llm_applied = try_enrich_semantic_brief_with_llm( cur, goal_query, semantic_brief ) extra_path_ctx = " ".join( p for p in ( (body.start_situation or "").strip(), (body.target_state or "").strip(), (body.roadmap_notes or "").strip(), ) if p ) semantic_brief = enrich_brief_with_path_constraints( semantic_brief, goal_query, extra_context=extra_path_ctx or None, ) roadmap_first = bool(body.roadmap_first) roadmap_only = bool(body.roadmap_only) start_target_only = bool(body.start_target_only) evaluate_only = bool(body.evaluate_only) include_roadmap = ( roadmap_first or body.include_roadmap_preview or roadmap_only or start_target_only ) progression_roadmap: Optional[Dict[str, Any]] = None roadmap_ctx: Optional[ProgressionRoadmapContext] = None roadmap_edited = False roadmap_structured = _roadmap_structured_from_body(body) if body.roadmap_override is not None: try: roadmap_ctx = roadmap_context_from_override( goal_query, max_steps=max_steps, semantic_brief=semantic_brief, override=body.roadmap_override, structured=roadmap_structured, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc progression_roadmap = progression_roadmap_to_api_dict(roadmap_ctx) progression_roadmap["roadmap_edited"] = True roadmap_edited = True max_steps = int(roadmap_ctx.max_steps) roadmap_first = True elif start_target_only: roadmap_ctx = run_start_target_resolve_only( goal_query, semantic_brief=semantic_brief, cur=cur, include_llm_start_target=body.include_llm_start_target, structured=roadmap_structured, ) progression_roadmap = progression_roadmap_to_api_dict(roadmap_ctx) elif include_roadmap: roadmap_ctx = run_progression_roadmap_pipeline( goal_query, max_steps=max_steps, semantic_brief=semantic_brief, cur=cur, include_llm_roadmap=body.include_llm_roadmap, include_llm_start_target=body.include_llm_start_target, structured=roadmap_structured, ) progression_roadmap = progression_roadmap_to_api_dict(roadmap_ctx) if start_target_only: return { "goal_query": goal_query, "max_steps_requested": max_steps, "steps": [], "step_count": 0, "target_profile_summary": None, "semantic_brief_summary": brief_to_summary_dict(semantic_brief), "semantic_llm_applied": semantic_llm_applied, "query_intent_summary": {}, "progression_graph_id": body.progression_graph_id, "path_qa": None, "gap_fill_offers": [], "progression_roadmap": progression_roadmap, "roadmap_first": False, "roadmap_only": False, "start_target_only": True, "roadmap_edited": False, "roadmap_unfilled_count": 0, "retrieval_phase": "start_target_only", } if roadmap_only: return { "goal_query": goal_query, "max_steps_requested": max_steps, "steps": [], "step_count": 0, "target_profile_summary": None, "semantic_brief_summary": brief_to_summary_dict(semantic_brief), "semantic_llm_applied": semantic_llm_applied, "query_intent_summary": {}, "progression_graph_id": body.progression_graph_id, "path_qa": None, "gap_fill_offers": [], "progression_roadmap": progression_roadmap, "roadmap_first": False, "roadmap_only": True, "roadmap_edited": roadmap_edited, "roadmap_unfilled_count": 0, "retrieval_phase": "roadmap_only", } if evaluate_only: if not body.evaluate_steps: raise HTTPException( status_code=400, detail="evaluate_only erfordert evaluate_steps", ) eval_steps = _evaluate_steps_from_payload(cur, body.evaluate_steps) qa_pack = _run_evaluate_only_path_qa( cur, body=body, goal_query=goal_query, semantic_brief=semantic_brief, steps=eval_steps, roadmap_ctx=roadmap_ctx, ) return { "goal_query": goal_query, "max_steps_requested": max_steps, "steps": qa_pack["steps"], "step_count": len(qa_pack["steps"]), "target_profile_summary": None, "semantic_brief_summary": brief_to_summary_dict(semantic_brief), "semantic_llm_applied": semantic_llm_applied, "query_intent_summary": {}, "progression_graph_id": body.progression_graph_id, "path_qa": qa_pack["path_qa"], "gap_fill_offers": qa_pack["gap_fill_offers"], "progression_roadmap": progression_roadmap, "roadmap_first": bool(roadmap_ctx), "roadmap_only": False, "roadmap_edited": roadmap_edited, "roadmap_unfilled_count": 0, "path_skill_expectations": None, "retrieval_phase": "evaluate_only", } path_target_profile, first_intent_summary, path_intent = _build_path_target_profile( cur, goal_query=goal_query, semantic_brief=semantic_brief, include_llm_intent=body.include_llm_intent, ) path_skill_expectations: Optional[Dict[str, Any]] = None if roadmap_ctx and roadmap_ctx.goal_analysis: path_inp = expectation_input_from_progression_path( goal_query=goal_query, goal_analysis=roadmap_ctx.goal_analysis.model_dump(), resolved_structured=( roadmap_ctx.resolved_structured.model_dump() if roadmap_ctx.resolved_structured else None ), semantic_brief_summary=( roadmap_ctx.semantic_brief if roadmap_ctx.semantic_brief else brief_to_summary_dict(semantic_brief) ), ) path_exp = build_planning_skill_expectations(cur, path_inp, semantic_brief=semantic_brief) if path_exp.items: path_target_profile = apply_expectations_to_target(path_target_profile, path_exp) path_skill_expectations = path_exp.to_api_dict() roadmap_unfilled: List[Tuple[int, StageSpecArtifact]] = [] roadmap_gap_offers: List[Dict[str, Any]] = [] used: Set[int] = set() steps: List[Dict[str, Any]] = [] planned_ids: List[int] = [] anchor_id: Optional[int] = None anchor_variant_id: Optional[int] = None if roadmap_first and roadmap_ctx is not None: steps, roadmap_unfilled = _build_steps_roadmap_first( cur, tenant=tenant, body=body, goal_query=goal_query, max_steps=max_steps, semantic_brief=semantic_brief, path_target_profile=path_target_profile, path_intent=path_intent, roadmap_ctx=roadmap_ctx, ) planned_ids = [int(s["exercise_id"]) for s in steps if s.get("exercise_id") is not None] if planned_ids: anchor_id = planned_ids[-1] anchor_variant_id = steps[-1].get("variant_id") if body.include_ai_gap_fill and roadmap_unfilled: major_by_index = ( {m.index: m for m in roadmap_ctx.roadmap.major_steps} if roadmap_ctx.roadmap else {} ) roadmap_gap_specs = build_roadmap_unfilled_gap_specs( unfilled_specs=roadmap_unfilled, major_steps_by_index=major_by_index, steps=steps, brief=semantic_brief, goal_query=goal_query, goal_analysis=roadmap_ctx.goal_analysis if roadmap_ctx else None, resolved_structured=roadmap_ctx.resolved_structured if roadmap_ctx else None, ) for spec in roadmap_gap_specs: roadmap_gap_offers.append( build_gap_fill_offer( spec=spec, steps=steps, goal_query=goal_query, brief=semantic_brief, proposal=None, roadmap_snapshot=_roadmap_gap_snapshot_for_spec( cur, roadmap_ctx, spec, goal_query=goal_query, semantic_brief=semantic_brief, ), ) ) else: for step_index in range(max_steps): hits, _tp, _qis, _intent = _run_path_step_retrieval( cur, tenant=tenant, goal_query=goal_query, step_index=step_index, max_steps=max_steps, planned_ids=planned_ids, anchor_id=anchor_id, anchor_variant_id=anchor_variant_id, progression_graph_id=body.progression_graph_id, include_llm_intent=body.include_llm_intent, exercise_kind_any=body.exercise_kind_any, semantic_brief=semantic_brief, path_target_profile=path_target_profile, path_intent=path_intent, supplemental_exercise_ids=_supplemental_exercise_ids_from_body(cur, body), ) hit = _pick_best_path_hit(hits, used, semantic_brief=semantic_brief) if not hit: break step = _hit_to_path_step(hit) steps.append(step) eid = int(step["exercise_id"]) used.add(eid) planned_ids.append(eid) anchor_id = eid anchor_variant_id = step.get("variant_id") stage_spec_count = len(roadmap_ctx.stage_specs or []) if roadmap_ctx else 0 if roadmap_first and stage_spec_count >= 2: pass elif len(steps) < 2: raise HTTPException( status_code=422, detail="Zu wenig passende Übungen für einen Pfad (mindestens 2 Schritte). Ziel präzisieren oder max_steps senken.", ) gaps: List[Dict[str, Any]] = [] bridge_inserts: List[Dict[str, Any]] = [] ai_proposals: List[Dict[str, Any]] = [] gap_fill_offers: List[Dict[str, Any]] = [] off_topic_steps: List[Dict[str, Any]] = [] stripped_off_topic: List[Dict[str, Any]] = [] rematch_log: List[Dict[str, Any]] = [] refine_log: List[Dict[str, Any]] = [] rematch_rounds = 0 llm_qa: Optional[Dict[str, Any]] = None llm_qa_applied = False reorder_applied = False reorder_notes: List[str] = [] roadmap_qa_mode: Optional[str] = None if body.include_path_qa: if roadmap_first: roadmap_qa_mode = "roadmap_first_lite" gaps = detect_path_gaps( cur, steps, brief=semantic_brief, roadmap_first=roadmap_first, ) unfilled_gaps: List[Dict[str, Any]] = [] if gaps and not roadmap_first: bridge_fn = _make_bridge_search_fn( cur, tenant=tenant, goal_query=goal_query, max_steps=max_steps, progression_graph_id=body.progression_graph_id, include_llm_intent=body.include_llm_intent, exercise_kind_any=body.exercise_kind_any, semantic_brief=semantic_brief, planned_ids=planned_ids, path_target_profile=path_target_profile, path_intent=path_intent, supplemental_exercise_ids=_supplemental_exercise_ids_from_body(cur, body), ) steps, bridge_inserts, unfilled_gaps = insert_bridge_exercises( cur, steps, gaps, brief=semantic_brief, bridge_search_fn=bridge_fn, ) elif gaps and roadmap_first: unfilled_gaps = list(gaps) if body.include_llm_path_qa and not roadmap_first: llm_qa, llm_qa_applied = try_llm_qa_progression_path( cur, goal_query=goal_query, brief=semantic_brief, steps=steps, gaps=gaps, bridge_inserts=bridge_inserts, ) if ( body.include_path_reorder and not roadmap_first and llm_qa_applied and llm_qa ): q_score = llm_qa.get("quality_score") try: q_val = float(q_score) if q_score is not None else None except (TypeError, ValueError): q_val = None if llm_qa.get("overall_ok") or (q_val is not None and q_val >= 0.45): steps, reorder_applied, reorder_notes = apply_llm_path_reorder(steps, llm_qa) off_topic_steps = detect_off_topic_steps( cur, steps, brief=semantic_brief, goal_query=goal_query, ) off_topic_before_strip = list(off_topic_steps) steps, stripped_off_topic = strip_off_topic_steps_from_path( steps, off_topic_steps, min_remaining=0 if roadmap_first else 2, ) if stripped_off_topic: off_topic_steps = [] gaps = detect_path_gaps( cur, steps, brief=semantic_brief, roadmap_first=roadmap_first, ) if roadmap_first and roadmap_ctx is not None: ( steps, rematch_log, stripped_off_topic, rematch_off_topic, rematch_rounds, roadmap_unfilled, refine_log, ) = _run_roadmap_rematch_loop( cur, tenant=tenant, body=body, goal_query=goal_query, max_steps=max_steps, semantic_brief=semantic_brief, path_target_profile=path_target_profile, path_intent=path_intent, roadmap_ctx=roadmap_ctx, steps=steps, stripped_off_topic=stripped_off_topic, off_topic_before_strip=off_topic_before_strip, roadmap_unfilled=roadmap_unfilled, gaps=gaps, ) if rematch_off_topic: off_topic_steps = rematch_off_topic gaps = detect_path_gaps( cur, steps, brief=semantic_brief, roadmap_first=roadmap_first, ) if body.include_llm_path_qa and roadmap_first: gaps = detect_path_gaps( cur, steps, brief=semantic_brief, roadmap_first=roadmap_first, ) llm_qa, llm_qa_applied = try_llm_qa_progression_path( cur, goal_query=goal_query, brief=semantic_brief, steps=steps, gaps=gaps, bridge_inserts=bridge_inserts, ) llm_gap_specs = parse_llm_suggested_new_exercises( llm_qa, brief=semantic_brief, step_count=len(steps), ) if body.include_ai_gap_fill: fresh_large_gaps = [g for g in gaps if g.get("is_large_gap")] gap_specs = collect_gap_fill_specs( steps=steps, unfilled_gaps=fresh_large_gaps or unfilled_gaps, off_topic_steps=off_topic_steps, llm_specs=llm_gap_specs, brief=semantic_brief, goal_query=goal_query, ) path_roadmap_snapshot = None if roadmap_ctx: path_roadmap_snapshot = build_progression_gap_snapshot( goal_analysis=( roadmap_ctx.goal_analysis.model_dump() if roadmap_ctx.goal_analysis else None ), resolved_structured=( roadmap_ctx.resolved_structured.model_dump() if roadmap_ctx.resolved_structured else None ), semantic_brief=roadmap_ctx.semantic_brief or brief_to_summary_dict(semantic_brief), ) steps, ai_proposals, gap_fill_offers = apply_gap_fill_after_qa( cur, steps, gap_specs, goal_query=goal_query, brief=semantic_brief, include_ai_calls=False, max_ai_proposals=0, auto_insert_proposals=False, roadmap_snapshot=path_roadmap_snapshot, ) if roadmap_gap_offers: seen_offer_ids = {o.get("offer_id") for o in gap_fill_offers} for offer in roadmap_gap_offers: if offer.get("offer_id") not in seen_offer_ids: gap_fill_offers.append(offer) if roadmap_first and roadmap_ctx is not None: steps = _normalize_roadmap_steps_coverage( steps, roadmap_ctx=roadmap_ctx, max_steps=max_steps, ) steps, gap_fill_offers = _enrich_roadmap_unfilled_gap_offers( cur, steps=steps, gap_fill_offers=gap_fill_offers, body=body, roadmap_ctx=roadmap_ctx, goal_query=goal_query, semantic_brief=semantic_brief, ) multistage_qa = run_multistage_path_qa( off_topic_steps=off_topic_steps, stripped_off_topic=stripped_off_topic, gaps=gaps, llm_qa=llm_qa, llm_applied=llm_qa_applied, roadmap_unfilled=roadmap_unfilled if roadmap_first else None, ) path_qa = build_path_qa_summary( gaps=gaps, bridge_inserts=bridge_inserts, ai_proposals=ai_proposals, gap_fill_offers=gap_fill_offers, off_topic_steps=off_topic_steps, stripped_off_topic=stripped_off_topic, llm_qa=llm_qa, llm_applied=llm_qa_applied, reorder_applied=reorder_applied, reorder_notes=reorder_notes, roadmap_qa_mode=roadmap_qa_mode, multistage_qa=multistage_qa, ) if rematch_log: path_qa["rematch_applied"] = True path_qa["rematch_log"] = rematch_log path_qa["rematch_rounds"] = rematch_rounds if refine_log: path_qa["refine_applied"] = True path_qa["refine_log"] = refine_log path_qa["refine_count"] = len(refine_log) filled_library_steps = sum(1 for s in steps if s.get("exercise_id") is not None) match_summary = { "roadmap_first": roadmap_first, "library_matches": filled_library_steps, "slot_count": len(steps), "gap_fill_offer_count": len(gap_fill_offers), "roadmap_unfilled_count": len(roadmap_unfilled), } target_profile_summary = path_target_profile.to_summary_dict(cur) retrieval_parts = ["profile_v1", "full_library", "path_builder", "semantics"] if roadmap_first: retrieval_parts.append("roadmap_first") if roadmap_qa_mode: retrieval_parts.append(roadmap_qa_mode) if body.include_path_qa: retrieval_parts.append("path_qa") if llm_qa_applied: retrieval_parts.append("llm_path_qa") if reorder_applied: retrieval_parts.append("path_reorder") if ai_proposals: retrieval_parts.append("ai_gap_fill") if gap_fill_offers: retrieval_parts.append("gap_fill_offers") if include_roadmap: retrieval_parts.append("roadmap_preview") if roadmap_edited: retrieval_parts.append("roadmap_edited") if roadmap_unfilled: retrieval_parts.append("roadmap_unfilled") if rematch_log: retrieval_parts.append("path_rematch") if refine_log: retrieval_parts.append("stage_spec_refine") return { "goal_query": goal_query, "max_steps_requested": max_steps, "steps": steps, "step_count": len(steps), "target_profile_summary": target_profile_summary, "semantic_brief_summary": brief_to_summary_dict(semantic_brief), "semantic_llm_applied": semantic_llm_applied, "query_intent_summary": first_intent_summary, "progression_graph_id": body.progression_graph_id, "path_qa": path_qa, "gap_fill_offers": gap_fill_offers, "progression_roadmap": progression_roadmap, "roadmap_first": roadmap_first, "roadmap_only": False, "roadmap_edited": roadmap_edited, "roadmap_unfilled_count": len(roadmap_unfilled), "path_skill_expectations": path_skill_expectations, "match_summary": match_summary, "retrieval_phase": "+".join(retrieval_parts), } __all__ = [ "EvaluateStepPayload", "ProgressionPathSuggestRequest", "suggest_progression_path", "_pick_best_path_hit", "_pick_next_path_hit", ] # Legacy-Alias für Tests _pick_next_path_hit = _pick_best_path_hit