shinkan-jinkendo/backend/planning_exercise_path_builder.py
Lars f0e581a9f5
All checks were successful
Deploy Development / deploy (push) Successful in 42s
Test Suite / pytest-backend (push) Successful in 43s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m34s
Implement Off-Topic Slot Gap Specification and Unified Slot Review Enhancements
- Introduced `_build_off_topic_slot_gap_spec` to generate specifications for off-topic slots, improving the handling of filled but thematically inappropriate slots.
- Added `_build_unified_slot_review_entry` to streamline the review process for slots, incorporating various parameters for better evaluation and suggestions.
- Enhanced existing logic in slot management to improve the robustness of path evaluations and user feedback.
- Added tests for the new off-topic slot gap specification to ensure functionality and correctness.
2026-06-13 12:43:59 +02:00

4419 lines
158 KiB
Python

"""
Planungs-KI Phase C3/E/F: Pfad-Vorschläge für Progressionsgraphen.
Legacy: retrieval-first. Phase F: optional Roadmap-Preview (A→B→C) parallel — siehe
planning_progression_roadmap.py und PLANNING_PROGRESSION_ROADMAP_SPEC.md.
"""
from __future__ import annotations
import re
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple
from fastapi import HTTPException
from pydantic import BaseModel, Field
from tenant_context import (
TenantContext,
library_content_visibility_for_progression_graph_sql,
library_content_visibility_sql,
)
from planning_catalog_context import (
ProgressionPlanningCatalogContext,
catalog_context_has_items,
enrich_target_from_planning_text_blobs,
load_catalog_context_from_graph_row,
merge_catalog_context_into_target,
)
from planning_exercise_profiles import PlanningTargetProfile
from planning_path_qa_pipeline import run_multistage_path_qa
from planning_path_rematch import (
collect_rematch_slot_indices,
filter_rematch_slot_indices,
prune_stripped_after_rematch,
rematch_roadmap_slots,
)
from planning_path_refine_stage import apply_stage_spec_refinements, collect_refine_stage_targets
from planning_stage_context import build_contextualized_stage_goal, resolve_path_start_target
from planning_exercise_path_qa import (
_load_exercise_text_bundle,
apply_llm_path_reorder,
build_path_qa_summary,
compute_deterministic_path_quality_score,
detect_off_topic_steps,
detect_path_gaps,
insert_bridge_exercises,
parse_llm_suggested_new_exercises,
strip_off_topic_steps_from_path,
try_llm_qa_progression_path,
)
from planning_exercise_path_ai_fill import (
apply_gap_fill_after_qa,
build_gap_fill_offer,
collect_gap_fill_specs,
)
from planning_exercise_retrieval import run_multistage_planning_retrieval
from planning_exercise_semantics import (
PlanningSemanticBrief,
apply_path_retrieval_weights,
apply_stage_match_retrieval_weights,
brief_to_summary_dict,
build_semantic_brief,
build_stage_match_brief,
enrich_brief_with_path_constraints,
enrich_target_with_semantic_expectations,
resolve_path_anti_patterns,
resolve_path_primary_topic,
exercise_passes_path_semantic_gate,
exercise_passes_stage_fit,
exercise_title_matches_peer_stage_goal,
pick_best_path_hit,
score_exercise_stage_fit,
resolve_semantic_skill_weights,
step_phase_for_index,
step_retrieval_query,
try_enrich_semantic_brief_with_llm,
)
from planning_exercise_target_pipeline import build_planning_target_with_query_pipeline
from planning_exercise_progression import apply_progression_context_to_pack
from planning_exercise_suggest import (
_enrich_planning_hits_with_variant_meta,
_load_skill_ids_for_exercise,
_normalize_query,
resolve_planning_exercise_intent,
)
from planning_exercise_form_context import build_progression_gap_snapshot
from planning_skill_expectations import (
apply_expectations_to_target,
build_planning_skill_expectations,
expectation_input_from_progression_path,
expectation_input_from_progression_stage,
)
from planning_progression_roadmap import (
MajorStep,
ProgressionRoadmapContext,
RoadmapOverridePayload,
RoadmapStructuredInput,
StageSpecArtifact,
build_roadmap_unfilled_gap_specs,
progression_roadmap_to_api_dict,
resolve_step_exercise_kind_filter,
roadmap_context_from_override,
run_progression_roadmap_pipeline,
run_start_target_resolve_only,
stage_spec_retrieval_query,
)
from routers.training_planning import _has_planning_role
class EvaluateStepPayload(BaseModel):
exercise_id: Optional[int] = Field(default=None, ge=1)
variant_id: Optional[int] = Field(default=None, ge=1)
title: Optional[str] = Field(default=None, max_length=500)
is_ai_proposal: bool = False
ai_suggestion: Optional[Dict[str, Any]] = None
proposal_key: Optional[str] = Field(default=None, max_length=120)
roadmap_major_step_index: Optional[int] = Field(default=None, ge=0, le=20)
roadmap_phase: Optional[str] = Field(default=None, max_length=80)
roadmap_learning_goal: Optional[str] = Field(default=None, max_length=2000)
class ProgressionPathSuggestRequest(BaseModel):
query: str = Field(..., min_length=3, max_length=2000)
max_steps: int = Field(default=5, ge=2, le=10)
include_llm_intent: bool = True
include_path_qa: bool = True
auto_rematch_after_qa: bool = True
auto_refine_stage_spec: bool = True
max_rematch_rounds: int = Field(default=3, ge=0, le=4)
include_llm_path_qa: bool = True
include_path_reorder: bool = True
include_ai_gap_fill: bool = True
include_roadmap_preview: bool = False
include_llm_roadmap: bool = True
include_llm_start_target: bool = True
roadmap_first: bool = False
roadmap_only: bool = False
start_target_only: bool = False
evaluate_only: bool = False
evaluate_steps: Optional[List[EvaluateStepPayload]] = None
slot_assignments: Optional[List[EvaluateStepPayload]] = None
preserve_slot_assignments: bool = False
retrieval_boost_exercise_ids: Optional[List[int]] = None
roadmap_override: Optional[RoadmapOverridePayload] = None
start_situation: Optional[str] = Field(default=None, max_length=2000)
target_state: Optional[str] = Field(default=None, max_length=2000)
roadmap_notes: Optional[str] = Field(default=None, max_length=2000)
progression_graph_id: Optional[int] = Field(default=None, ge=1)
exercise_kind_any: Optional[List[str]] = None
compare_with_assignments: bool = False
planning_catalog_context: Optional[ProgressionPlanningCatalogContext] = None
# Für Match-Vergleich: Baseline aus evaluate_only (Schritt 1) — inkrementelles QS-Scoring je Diff
baseline_evaluate_steps: Optional[List[EvaluateStepPayload]] = None
baseline_quality_score: Optional[float] = Field(default=None, ge=0.0, le=1.0)
include_incremental_diff_scoring: bool = False
unified_slot_review: bool = False
baseline_path_qa_snapshot: Optional[Dict[str, Any]] = None
def _resolve_planning_catalog_context(
cur,
body: ProgressionPathSuggestRequest,
) -> Optional[ProgressionPlanningCatalogContext]:
"""Request-Kontext oder gespeichertes Graph-Artefakt."""
if body.planning_catalog_context and catalog_context_has_items(body.planning_catalog_context):
return body.planning_catalog_context
gid = body.progression_graph_id
if not gid or int(gid) < 1:
return None
cur.execute(
"SELECT planning_roadmap FROM exercise_progression_graphs WHERE id = %s",
(int(gid),),
)
row = cur.fetchone()
if not row:
return None
return load_catalog_context_from_graph_row(row.get("planning_roadmap"))
def _roadmap_gap_snapshot_for_spec(
cur,
roadmap_ctx: Optional[ProgressionRoadmapContext],
spec: Mapping[str, Any],
*,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
) -> Dict[str, Any]:
"""Roadmap-Kontext für KI-Lücken-Übung (Start, Ziel, Stufenspec, Fähigkeiten)."""
major_idx = spec.get("roadmap_major_step_index")
stage_spec_dict: Optional[Dict[str, Any]] = None
major_dict: Optional[Dict[str, Any]] = None
if roadmap_ctx and major_idx is not None:
for s in roadmap_ctx.stage_specs or []:
if int(s.major_step_index) == int(major_idx):
stage_spec_dict = s.model_dump()
if roadmap_ctx.roadmap:
for m in roadmap_ctx.roadmap.major_steps:
if m.index == int(major_idx):
stage_spec_dict["phase"] = m.phase
major_dict = m.model_dump()
break
break
ga = roadmap_ctx.goal_analysis.model_dump() if roadmap_ctx and roadmap_ctx.goal_analysis else None
rs = (
roadmap_ctx.resolved_structured.model_dump()
if roadmap_ctx and roadmap_ctx.resolved_structured
else None
)
brief_summary = (
roadmap_ctx.semantic_brief
if roadmap_ctx and roadmap_ctx.semantic_brief
else brief_to_summary_dict(semantic_brief)
)
snap = build_progression_gap_snapshot(
goal_analysis=ga,
resolved_structured=rs,
stage_spec=stage_spec_dict,
semantic_brief=brief_summary,
)
inp = expectation_input_from_progression_stage(
goal_query=goal_query,
goal_analysis=ga,
resolved_structured=rs,
stage_spec=stage_spec_dict,
semantic_brief_summary=brief_summary,
major_step=major_dict,
)
exp = build_planning_skill_expectations(cur, inp, semantic_brief=semantic_brief)
if exp.items:
snap["expected_skills"] = exp.to_api_dict()["expected_skills"]
snap["skill_expectation_sources"] = exp.sources
return snap
def _roadmap_structured_from_body(body: ProgressionPathSuggestRequest) -> Optional[RoadmapStructuredInput]:
start = (body.start_situation or "").strip() or None
target = (body.target_state or "").strip() or None
notes = (body.roadmap_notes or "").strip() or None
if not any([start, target, notes]):
return None
return RoadmapStructuredInput(
start_situation=start,
target_state=target,
roadmap_notes=notes,
)
def _peer_stage_learning_goals(
roadmap_ctx: ProgressionRoadmapContext,
*,
current_major_index: int,
) -> List[str]:
goals: List[str] = []
for spec in roadmap_ctx.stage_specs or []:
if int(spec.major_step_index) == int(current_major_index):
continue
lg = (spec.learning_goal or "").strip()
if lg and lg not in goals:
goals.append(lg)
return goals
def _filter_learning_goal_candidate_ids(
cur,
*,
tenant: TenantContext,
progression_graph_id: Optional[int],
candidate_ids: Sequence[int],
stage_goal: str,
stage_match_brief: PlanningSemanticBrief,
stage_anti: Optional[List[str]],
path_primary: str,
path_tech_excludes: Optional[List[str]],
peer_learning_goals: Sequence[str],
) -> List[int]:
"""Learning-Goal-Kandidaten nur, wenn sie Stufen-Gate und Peer-Check bestehen."""
if not candidate_ids:
return []
vis_sql, vis_params = _planning_visibility_sql(cur, tenant, progression_graph_id)
rows = _load_supplemental_exercise_rows(
cur,
tenant=tenant,
progression_graph_id=progression_graph_id,
exercise_ids=list(candidate_ids),
vis_sql=vis_sql,
vis_params=vis_params,
)
out: List[int] = []
for row in rows:
try:
eid = int(row.get("id") or 0)
except (TypeError, ValueError):
continue
if eid <= 0:
continue
title = str(row.get("title") or "")
if peer_learning_goals and exercise_title_matches_peer_stage_goal(
title,
current_learning_goal=stage_goal,
peer_learning_goals=peer_learning_goals,
):
continue
summary = str(row.get("summary") or "")
goal_text = str(row.get("goal") or row.get("exercise_goal") or "")
if exercise_passes_stage_fit(
learning_goal=stage_goal,
title=title,
summary=summary,
goal=goal_text,
stage_brief=stage_match_brief,
anti_patterns=stage_anti,
path_primary_topic=path_primary or None,
path_technique_excludes=path_tech_excludes,
relaxed=False,
):
out.append(eid)
return out
def _pick_best_path_hit(
hits: List[Dict[str, Any]],
used_exercise_ids: Set[int],
*,
semantic_brief: Optional[PlanningSemanticBrief] = None,
stage_learning_goal: Optional[str] = None,
stage_anti_patterns: Optional[List[str]] = None,
roadmap_stage_match: bool = False,
stage_match_brief: Optional[PlanningSemanticBrief] = None,
path_primary_topic: Optional[str] = None,
path_technique_excludes: Optional[List[str]] = None,
peer_learning_goals: Optional[List[str]] = None,
) -> Optional[Dict[str, Any]]:
return pick_best_path_hit(
hits,
used_exercise_ids,
semantic_brief=semantic_brief,
stage_learning_goal=stage_learning_goal,
stage_anti_patterns=stage_anti_patterns,
roadmap_stage_match=roadmap_stage_match,
stage_match_brief=stage_match_brief,
path_primary_topic=path_primary_topic,
path_technique_excludes=path_technique_excludes,
peer_learning_goals=peer_learning_goals,
)
def _build_path_target_profile(
cur,
*,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
include_llm_intent: bool,
start_situation: Optional[str] = None,
target_state: Optional[str] = None,
roadmap_notes: Optional[str] = None,
catalog_context: Optional[ProgressionPlanningCatalogContext] = None,
) -> Tuple[PlanningTargetProfile, Dict[str, Any], str]:
"""Einmaliges Erwartungsprofil für den gesamten Pfad (Query + Semantik + Katalog)."""
empty_unit = {
"id": None,
"framework_slot_id": None,
"origin_framework_slot_id": None,
}
pipeline_context = {
"unit_title": None,
"group_name": None,
"section_title": None,
"section_guidance_notes": goal_query,
"section_exercise_count": 0,
"planned_count": 0,
"anchor_title": None,
"anchor_exercise_id": None,
"last_section_exercise_title": None,
"progression_graph_id": None,
"unit_skill_profile": None,
"section_skill_profile": None,
"has_planning_reference": False,
"expectation_mode": "query_only",
}
target, intent, _scenario, query_intent_summary = build_planning_target_with_query_pipeline(
cur,
unit=empty_unit,
planned_exercise_ids=[],
section_planned_exercise_ids=[],
anchor_exercise_id=None,
query=goal_query,
heuristic_intent=resolve_planning_exercise_intent(goal_query, "free_search"),
include_llm_intent=include_llm_intent,
context_summary=pipeline_context,
has_planning_reference=False,
)
skill_weights = resolve_semantic_skill_weights(cur, semantic_brief)
target = enrich_target_with_semantic_expectations(target, skill_weights=skill_weights)
target = enrich_target_from_planning_text_blobs(
cur,
target,
goal_query,
start_situation,
target_state,
roadmap_notes,
)
if catalog_context and catalog_context_has_items(catalog_context):
target = merge_catalog_context_into_target(
target,
catalog_context,
emphasis="replace",
)
return target, query_intent_summary, intent
def _graph_edge_exercise_ids(cur, graph_id: Optional[int]) -> List[int]:
"""Übungs-IDs aus gespeicherten Graph-Kanten (für Re-Match-Boost)."""
if not graph_id or int(graph_id) < 1:
return []
cur.execute(
"""
SELECT from_exercise_id AS eid FROM exercise_progression_edges
WHERE graph_id = %s AND from_exercise_id IS NOT NULL
UNION
SELECT to_exercise_id AS eid FROM exercise_progression_edges
WHERE graph_id = %s AND to_exercise_id IS NOT NULL
""",
(int(graph_id), int(graph_id)),
)
out: List[int] = []
for row in cur.fetchall() or []:
try:
eid = int(row.get("eid") or 0)
except (TypeError, ValueError):
continue
if eid > 0:
out.append(eid)
return out
def _supplemental_exercise_ids_from_body(
cur,
body: ProgressionPathSuggestRequest,
) -> List[int]:
"""Kandidatenpool erweitern (Graph-Kanten, Boost, Slot-Zuordnungen)."""
ids: List[int] = []
for raw in body.evaluate_steps or []:
if raw.exercise_id is not None:
try:
eid = int(raw.exercise_id)
except (TypeError, ValueError):
continue
if eid > 0:
ids.append(eid)
for raw in body.slot_assignments or []:
if raw.exercise_id is not None:
try:
eid = int(raw.exercise_id)
except (TypeError, ValueError):
continue
if eid > 0:
ids.append(eid)
for eid in body.retrieval_boost_exercise_ids or []:
try:
val = int(eid)
except (TypeError, ValueError):
continue
if val > 0:
ids.append(val)
ids.extend(_graph_edge_exercise_ids(cur, body.progression_graph_id))
return list(dict.fromkeys(ids))
def _graph_visibility_context(
cur,
progression_graph_id: Optional[int],
) -> Tuple[str, Optional[int]]:
if not progression_graph_id or int(progression_graph_id) < 1:
return "private", None
cur.execute(
"SELECT visibility, club_id FROM exercise_progression_graphs WHERE id = %s",
(int(progression_graph_id),),
)
row = cur.fetchone()
if not row:
return "private", None
g_club = row.get("club_id")
return (
str(row.get("visibility") or "private"),
int(g_club) if g_club is not None else None,
)
def _safe_tsquery_fragment(text: str) -> str:
import re
cleaned = re.sub(r"[^\w\säöüßÄÖÜ]", " ", text or "", flags=re.UNICODE)
words = [w for w in cleaned.split() if len(w) >= 2][:10]
return " ".join(words) if words else (text or "")[:60].strip()
def _fetch_learning_goal_library_candidate_ids(
cur,
*,
tenant: TenantContext,
progression_graph_id: Optional[int],
learning_goal: str,
limit: int = 24,
) -> List[int]:
"""Sichtbare Übungen mit exakt passendem Titel oder Volltext-Treffer (kein breites LIKE)."""
lg = (learning_goal or "").strip()
if len(lg) < 3:
return []
vis_sql, vis_params = _planning_visibility_sql(cur, tenant, progression_graph_id)
tsq = _safe_tsquery_fragment(lg)
try:
cur.execute(
f"""
SELECT e.id
FROM exercises e
WHERE ({vis_sql})
AND COALESCE(e.status, '') <> %s
AND (
lower(trim(e.title)) = lower(trim(%s))
OR (%s <> '' AND e.search_vector @@ plainto_tsquery('german', %s))
)
ORDER BY
CASE WHEN lower(trim(e.title)) = lower(trim(%s)) THEN 0 ELSE 1 END,
CASE WHEN %s <> '' THEN ts_rank_cd(e.search_vector, plainto_tsquery('german', %s)) ELSE 0 END DESC,
e.id ASC
LIMIT %s
""",
[
*vis_params,
"archived",
lg,
tsq,
tsq,
lg,
tsq,
tsq,
int(limit),
],
)
except Exception:
cur.execute(
f"""
SELECT e.id
FROM exercises e
WHERE ({vis_sql})
AND COALESCE(e.status, '') <> %s
AND lower(trim(e.title)) = lower(trim(%s))
ORDER BY e.id ASC
LIMIT %s
""",
[*vis_params, "archived", lg, int(limit)],
)
out: List[int] = []
for row in cur.fetchall() or []:
try:
eid = int(row.get("id") or 0)
except (TypeError, ValueError):
continue
if eid > 0:
out.append(eid)
return out
def _load_supplemental_exercise_rows(
cur,
*,
tenant: TenantContext,
progression_graph_id: Optional[int],
exercise_ids: Optional[Sequence[int]],
vis_sql: str,
vis_params: Sequence[Any],
) -> List[Dict[str, Any]]:
"""Supplemental-Übungen mit Graph-Sichtbarkeit, Fallback Library-vis_sql."""
ids: List[int] = []
for raw in exercise_ids or []:
if raw is None:
continue
try:
eid = int(raw)
except (TypeError, ValueError):
continue
if eid > 0:
ids.append(eid)
ids = list(dict.fromkeys(ids))
if not ids:
return []
if progression_graph_id and int(progression_graph_id) > 0:
from planning_exercise_retrieval import fetch_exercise_rows_by_ids_for_graph
gvis, gclub = _graph_visibility_context(cur, progression_graph_id)
graph_rows = fetch_exercise_rows_by_ids_for_graph(
cur,
ids,
graph_visibility=gvis,
graph_club_id=gclub,
profile_id=tenant.profile_id,
role=tenant.global_role,
exercise_allowed_fn=_exercise_allowed_in_progression_graph,
)
if graph_rows:
return graph_rows
from planning_exercise_retrieval import fetch_exercise_rows_by_ids
return fetch_exercise_rows_by_ids(
cur,
ids,
vis_sql=vis_sql,
vis_params=vis_params,
)
def _planning_visibility_sql(
cur,
tenant: TenantContext,
progression_graph_id: Optional[int],
) -> Tuple[str, List[Any]]:
if progression_graph_id and int(progression_graph_id) > 0:
cur.execute(
"SELECT visibility, club_id FROM exercise_progression_graphs WHERE id = %s",
(int(progression_graph_id),),
)
grow = cur.fetchone()
if grow:
g_club = grow.get("club_id")
return library_content_visibility_for_progression_graph_sql(
alias="e",
profile_id=tenant.profile_id,
role=tenant.global_role,
effective_club_id=tenant.effective_club_id,
graph_visibility=str(grow.get("visibility") or "private"),
graph_club_id=int(g_club) if g_club is not None else None,
)
return library_content_visibility_sql(
alias="e",
profile_id=tenant.profile_id,
role=tenant.global_role,
effective_club_id=tenant.effective_club_id,
)
def _exercise_allowed_in_progression_graph(
exercise_row: Mapping[str, Any],
*,
graph_visibility: str,
graph_club_id: Optional[int],
profile_id: int,
role: str,
) -> bool:
"""Prüft, ob eine Übung zur Sichtbarkeit des Progressionsgraphen passt."""
from club_tenancy import is_platform_admin
ex_vis = (exercise_row.get("visibility") or "private").strip().lower()
gvis = (graph_visibility or "private").strip().lower()
if gvis == "private":
if ex_vis == "official":
return True
if ex_vis == "club":
return True
if ex_vis == "private":
if is_platform_admin(role):
return True
try:
return int(exercise_row.get("created_by") or 0) == int(profile_id)
except (TypeError, ValueError):
return False
return False
if gvis == "club":
if ex_vis == "official":
return True
if ex_vis != "club":
return False
ex_club = exercise_row.get("club_id")
if ex_club is None:
return False
if graph_club_id is None:
return True
return int(ex_club) == int(graph_club_id)
return ex_vis == "official"
def _slot_assignments_by_major_index(
assignments: Optional[List[EvaluateStepPayload]],
) -> Dict[int, EvaluateStepPayload]:
out: Dict[int, EvaluateStepPayload] = {}
for raw in assignments or []:
if raw.exercise_id is None or raw.roadmap_major_step_index is None:
continue
out[int(raw.roadmap_major_step_index)] = raw
return out
def _assignment_preservation_active(body: ProgressionPathSuggestRequest) -> bool:
"""Trainer-Pfad schützen — nur bei explizitem Flag (Frontend entscheidet pro Aktion)."""
return bool(body.preserve_slot_assignments)
def _path_step_from_slot_assignment(
cur,
*,
assignment: EvaluateStepPayload,
stage_spec: StageSpecArtifact,
major_step: Optional[MajorStep],
tenant: Optional[TenantContext] = None,
progression_graph_id: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
"""Bestehende Slot-Zuordnung aus dem Graph-Editor (nach KI-Anlage) übernehmen."""
eid = int(assignment.exercise_id)
cur.execute(
"SELECT id, title, summary, visibility, club_id, created_by FROM exercises WHERE id = %s",
(eid,),
)
row = cur.fetchone()
if not row:
return None
if tenant and progression_graph_id:
cur.execute(
"SELECT visibility, club_id FROM exercise_progression_graphs WHERE id = %s",
(int(progression_graph_id),),
)
grow = cur.fetchone()
if grow and not _exercise_allowed_in_progression_graph(
row,
graph_visibility=str(grow.get("visibility") or "private"),
graph_club_id=int(grow["club_id"]) if grow.get("club_id") is not None else None,
profile_id=tenant.profile_id,
role=tenant.global_role,
):
return None
title = (assignment.title or row.get("title") or "").strip() or str(row.get("title") or "")
step = {
"exercise_id": eid,
"variant_id": assignment.variant_id,
"title": title,
"summary": row.get("summary"),
"score": None,
"semantic_score": None,
"reasons": ["Bestehende Slot-Zuordnung (Graph-Editor)"],
"variants": [],
"slot_assignment": True,
}
return _annotate_roadmap_step(
step,
stage_spec=stage_spec,
major_step=major_step,
)
def _hit_to_path_step(hit: Dict[str, Any], *, is_bridge: bool = False) -> Dict[str, Any]:
raw_vid = hit.get("suggested_variant_id")
variant_id: Optional[int] = None
if raw_vid is not None:
try:
vid = int(raw_vid)
if vid > 0:
variant_id = vid
except (TypeError, ValueError):
variant_id = None
step = {
"exercise_id": int(hit["id"]),
"variant_id": variant_id,
"title": hit.get("title"),
"summary": hit.get("summary"),
"score": hit.get("score"),
"semantic_score": hit.get("semantic_score"),
"reasons": list(hit.get("reasons") or []),
"variants": hit.get("variants") or [],
"suggested_variant_id": hit.get("suggested_variant_id"),
"suggested_variant_name": hit.get("suggested_variant_name"),
}
if is_bridge:
step["is_bridge"] = True
return step
def _run_path_step_retrieval(
cur,
*,
tenant: TenantContext,
goal_query: str,
step_index: int,
max_steps: int,
planned_ids: List[int],
anchor_id: Optional[int],
anchor_variant_id: Optional[int],
progression_graph_id: Optional[int],
include_llm_intent: bool,
exercise_kind_any: Optional[List[str]],
semantic_brief: PlanningSemanticBrief,
bridge_mode: bool = False,
step_a: Optional[Dict[str, Any]] = None,
step_b: Optional[Dict[str, Any]] = None,
path_target_profile: Optional[PlanningTargetProfile] = None,
path_intent: Optional[str] = None,
step_query_override: Optional[str] = None,
step_phase_override: Optional[str] = None,
step_target_profile_override: Optional[PlanningTargetProfile] = None,
stage_learning_goal: Optional[str] = None,
stage_anti_patterns: Optional[List[str]] = None,
stage_match_brief: Optional[PlanningSemanticBrief] = None,
stage_success_criteria: Optional[List[str]] = None,
stage_load_profile: Optional[List[str]] = None,
path_context_note: Optional[str] = None,
path_primary_topic: Optional[str] = None,
path_technique_excludes: Optional[List[str]] = None,
supplemental_exercise_ids: Optional[List[int]] = None,
priority_exercise_ids: Optional[List[int]] = None,
) -> Tuple[List[Dict[str, Any]], PlanningTargetProfile, Dict[str, Any], str]:
step_query = step_query_override or step_retrieval_query(
semantic_brief, goal_query, step_index, max_steps
)
if bridge_mode and step_a and step_b:
phase = step_phase_for_index(semantic_brief, step_index, max_steps)
parts = [semantic_brief.primary_topic or semantic_brief.retrieval_query or goal_query]
if phase:
parts.append(phase)
step_query = _normalize_query(" ".join(p for p in parts if p) + " brücke")
pack: Dict[str, Any] = {
"unit_id": None,
"unit": {
"id": None,
"framework_slot_id": None,
"origin_framework_slot_id": None,
},
"unit_title": None,
"group_id": None,
"group_name": None,
"section_order_index": None,
"section_title": None,
"section_guidance_notes": goal_query if step_index == 0 and not bridge_mode else step_query,
"planned_exercise_ids": list(planned_ids),
"anchor_exercise_id": anchor_id,
"anchor_title": None,
"anchor_skill_ids": sorted(_load_skill_ids_for_exercise(cur, anchor_id)),
"group_recent_exercise_ids": [],
"context_mode": "progression_path",
"has_planning_reference": bool(planned_ids or anchor_id or bridge_mode),
"semantic_brief": semantic_brief,
"retrieval_query": step_query,
"path_step_phase": step_phase_override
or step_phase_for_index(semantic_brief, step_index, max_steps),
"stage_learning_goal": (stage_learning_goal or "").strip() or None,
"stage_anti_patterns": list(stage_anti_patterns or []),
"roadmap_stage_match": bool((stage_learning_goal or "").strip()),
"stage_match_brief": stage_match_brief,
"stage_success_criteria": list(stage_success_criteria or []),
"stage_load_profile": list(stage_load_profile or []),
"path_context_note": (path_context_note or "").strip() or None,
"path_primary_topic": (path_primary_topic or "").strip() or None,
"path_technique_excludes": list(path_technique_excludes or []),
}
pack = apply_progression_context_to_pack(
cur,
tenant,
pack,
explicit_graph_id=progression_graph_id,
anchor_variant_id=anchor_variant_id,
)
if step_index == 0 and not bridge_mode:
heuristic_intent = resolve_planning_exercise_intent(goal_query, "free_search")
else:
heuristic_intent = path_intent or resolve_planning_exercise_intent(goal_query, "free_search")
has_plan_ref = bool(pack.get("has_planning_reference"))
pipeline_context = {
"unit_title": None,
"group_name": None,
"section_title": pack.get("section_title"),
"section_guidance_notes": pack.get("section_guidance_notes"),
"section_exercise_count": len(planned_ids),
"planned_count": len(planned_ids),
"anchor_title": pack.get("anchor_title"),
"anchor_exercise_id": pack.get("anchor_exercise_id"),
"last_section_exercise_title": None,
"progression_graph_id": pack.get("progression_graph_id"),
"unit_skill_profile": None,
"section_skill_profile": None,
"has_planning_reference": has_plan_ref,
"expectation_mode": "query_only" if step_index == 0 and not planned_ids else "planning_hybrid",
}
if step_target_profile_override is not None:
target_profile = step_target_profile_override
intent = path_intent or resolve_planning_exercise_intent(goal_query, "free_search")
query_intent_summary = {}
elif path_target_profile is not None:
target_profile = path_target_profile
intent = path_intent or resolve_planning_exercise_intent(goal_query, "free_search")
query_intent_summary = {}
else:
target_profile, intent, _scenario, query_intent_summary = build_planning_target_with_query_pipeline(
cur,
unit=pack["unit"],
planned_exercise_ids=pack["planned_exercise_ids"],
section_planned_exercise_ids=[],
anchor_exercise_id=pack.get("anchor_exercise_id"),
query=goal_query if step_index == 0 and not bridge_mode else step_query,
heuristic_intent=heuristic_intent,
include_llm_intent=include_llm_intent and step_index == 0 and not bridge_mode,
context_summary=pipeline_context,
has_planning_reference=has_plan_ref,
)
if pack.get("roadmap_stage_match"):
weights = apply_stage_match_retrieval_weights(semantic_brief)
else:
weights = apply_path_retrieval_weights(semantic_brief)
vis_sql, vis_params = _planning_visibility_sql(
cur,
tenant,
progression_graph_id,
)
supplemental_rows = _load_supplemental_exercise_rows(
cur,
tenant=tenant,
progression_graph_id=progression_graph_id,
exercise_ids=supplemental_exercise_ids,
vis_sql=vis_sql,
vis_params=vis_params,
)
hits, _skills_by_ex, _full_lib = run_multistage_planning_retrieval(
cur,
vis_sql=vis_sql,
vis_params=vis_params,
query=step_query,
exercise_kind_any=exercise_kind_any,
target=target_profile,
intent=intent,
intent_weights=weights,
pack=pack,
supplemental_rows_preloaded=supplemental_rows,
)
from planning_exercise_retrieval import trim_hits_preserving_priority_ids
priority_ids = list(
dict.fromkeys(
int(x)
for x in (priority_exercise_ids or supplemental_exercise_ids or [])
if int(x) > 0
)
)
hits = trim_hits_preserving_priority_ids(hits, priority_ids, limit=48)
hits = _enrich_planning_hits_with_variant_meta(cur, hits)
return hits, target_profile, query_intent_summary, intent
def _make_bridge_search_fn(
cur,
*,
tenant: TenantContext,
goal_query: str,
max_steps: int,
progression_graph_id: Optional[int],
include_llm_intent: bool,
exercise_kind_any: Optional[List[str]],
semantic_brief: PlanningSemanticBrief,
planned_ids: List[int],
path_target_profile: PlanningTargetProfile,
path_intent: str,
supplemental_exercise_ids: Optional[List[int]] = None,
) -> Callable[..., List[Dict[str, Any]]]:
def _bridge_search(
step_a: Dict[str, Any],
step_b: Dict[str, Any],
_gap: Dict[str, Any],
) -> List[Dict[str, Any]]:
hits, _, _, _ = _run_path_step_retrieval(
cur,
tenant=tenant,
goal_query=goal_query,
step_index=1,
max_steps=max_steps,
planned_ids=list(planned_ids) + [int(step_a["exercise_id"])],
anchor_id=int(step_a["exercise_id"]),
anchor_variant_id=step_a.get("variant_id"),
progression_graph_id=progression_graph_id,
include_llm_intent=include_llm_intent,
exercise_kind_any=exercise_kind_any,
semantic_brief=semantic_brief,
bridge_mode=True,
step_a=step_a,
step_b=step_b,
path_target_profile=path_target_profile,
supplemental_exercise_ids=supplemental_exercise_ids,
path_intent=path_intent,
)
gated = [
h
for h in hits
if exercise_passes_path_semantic_gate(
semantic_score=float(h.get("semantic_score") or 0.0),
title=str(h.get("title") or ""),
summary=str(h.get("summary") or ""),
brief=semantic_brief,
strict=False,
)
]
return gated or hits[:12]
return _bridge_search
def _annotate_roadmap_step(
step: Dict[str, Any],
*,
stage_spec: StageSpecArtifact,
major_step: Optional[MajorStep],
skill_expectations: Optional[Dict[str, Any]] = None,
anti_patterns_override: Optional[List[str]] = None,
) -> Dict[str, Any]:
reasons = list(step.get("reasons") or [])
learning_goal = (stage_spec.learning_goal or "").strip()
if learning_goal:
roadmap_reason = f"Roadmap: {learning_goal[:120]}"
if roadmap_reason not in reasons:
reasons.insert(0, roadmap_reason)
if skill_expectations and skill_expectations.get("expected_skills"):
names = [
str(s.get("skill_name") or "").strip()
for s in skill_expectations["expected_skills"][:3]
if str(s.get("skill_name") or "").strip()
]
if names:
skill_reason = f"Fähigkeiten: {', '.join(names)}"
if skill_reason not in reasons:
reasons.append(skill_reason)
step["reasons"] = reasons[:4]
step["roadmap_major_step_index"] = stage_spec.major_step_index
step["roadmap_phase"] = major_step.phase if major_step else None
step["roadmap_learning_goal"] = learning_goal or None
anti = list(anti_patterns_override or stage_spec.anti_patterns or [])
if anti:
step["roadmap_anti_patterns"] = anti
if (stage_spec.start_state or "").strip():
step["roadmap_start_state"] = stage_spec.start_state.strip()
if (stage_spec.target_state or "").strip():
step["roadmap_target_state"] = stage_spec.target_state.strip()
if stage_spec.success_criteria:
step["success_criteria"] = list(stage_spec.success_criteria)
step["stage_success_criteria"] = list(stage_spec.success_criteria)
if not step.get("roadmap_match_source"):
step["roadmap_match_source"] = "stage_spec"
if step.get("exercise_id") is not None:
step["slot_status"] = step.get("slot_status") or (
"preserved" if step.get("roadmap_match_source") == "slot_best_match" else "matched"
)
else:
step["slot_status"] = step.get("slot_status") or "unfilled"
if skill_expectations:
step["skill_expectations"] = skill_expectations
return step
def _stage_validation_context_for_spec(
cur,
*,
body: ProgressionPathSuggestRequest,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
path_target_profile: PlanningTargetProfile,
roadmap_ctx: ProgressionRoadmapContext,
stage_spec: StageSpecArtifact,
step_index: int,
stage_count: int,
major: Optional[MajorStep],
) -> Dict[str, Any]:
"""Gemeinsamer Kontext für Reconcile + Match eines Roadmap-Slots."""
ga_dump = (
roadmap_ctx.goal_analysis.model_dump() if roadmap_ctx.goal_analysis else None
)
rs_dump = (
roadmap_ctx.resolved_structured.model_dump()
if roadmap_ctx.resolved_structured
else None
)
path_start, path_target = resolve_path_start_target(
structured=roadmap_ctx.resolved_structured,
goal_analysis=roadmap_ctx.goal_analysis,
)
stage_goal = (stage_spec.learning_goal or "").strip()
stage_start = (stage_spec.start_state or "").strip()
stage_target = (stage_spec.target_state or "").strip()
contextual_goal = build_contextualized_stage_goal(
learning_goal=stage_goal,
start_state=stage_start,
target_state=stage_target,
path_target_state=path_target,
path_start_state=path_start,
stage_index=step_index,
stage_count=stage_count,
)
path_context_note = None
if rs_dump:
ctx_parts = [
str(rs_dump.get("start_situation") or "").strip()[:120],
str(rs_dump.get("target_state") or "").strip()[:120],
str(rs_dump.get("roadmap_notes") or "").strip()[:120],
]
path_context_note = " ".join(p for p in ctx_parts if p)[:240] or None
path_anti = resolve_path_anti_patterns(
goal_query,
semantic_brief=semantic_brief,
extra_context=path_context_note,
)
stage_anti = list(dict.fromkeys([*(stage_spec.anti_patterns or []), *path_anti]))
path_primary = (
resolve_path_primary_topic(
goal_query,
semantic_brief,
stage_learning_goal=stage_goal,
extra_context=path_context_note,
)
or ""
).strip()
path_tech_excludes = list(semantic_brief.exclude_phrases or [])
if path_primary and semantic_brief.topic_type == "technique":
from planning_exercise_semantics import technique_sibling_excludes
for item in technique_sibling_excludes(path_primary):
if item not in path_tech_excludes:
path_tech_excludes.append(item)
stage_match_brief = build_stage_match_brief(
learning_goal=stage_goal,
anti_patterns=stage_anti,
success_criteria=list(stage_spec.success_criteria or []),
load_profile=list(stage_spec.load_profile or []),
phase=major.phase if major else None,
path_context_note=path_context_note,
path_anti_patterns=path_anti,
path_primary_topic=path_primary or None,
path_technique_excludes=path_tech_excludes or None,
stage_start_state=stage_start or None,
stage_target_state=stage_target or None,
path_target_state=path_target or None,
contextualized_learning_goal=contextual_goal or None,
)
return {
"stage_goal": stage_goal,
"stage_anti": stage_anti,
"path_primary": path_primary,
"path_tech_excludes": path_tech_excludes,
"stage_match_brief": stage_match_brief,
"path_context_note": path_context_note,
"path_anti": path_anti,
"path_start": path_start,
"path_target": path_target,
"ga_dump": ga_dump,
"rs_dump": rs_dump,
}
def _match_roadmap_slot(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
path_target_profile: PlanningTargetProfile,
path_intent: str,
roadmap_ctx: ProgressionRoadmapContext,
stage_spec: StageSpecArtifact,
step_index: int,
stage_count: int,
planned_ids: List[int],
anchor_id: Optional[int],
anchor_variant_id: Optional[int],
used: Set[int],
slot_priority_exercise_id: Optional[int] = None,
skip_post_match_gate: bool = False,
) -> Tuple[Optional[Dict[str, Any]], Optional[StageSpecArtifact]]:
"""Einzelnen Roadmap-Slot matchen (Initial-Build und Auto-Rematch)."""
major_by_index: Dict[int, MajorStep] = {}
if roadmap_ctx.roadmap:
major_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps}
major = major_by_index.get(stage_spec.major_step_index)
ctx = _stage_validation_context_for_spec(
cur,
body=body,
goal_query=goal_query,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
roadmap_ctx=roadmap_ctx,
stage_spec=stage_spec,
step_index=step_index,
stage_count=stage_count,
major=major,
)
stage_goal = ctx["stage_goal"]
stage_anti = ctx["stage_anti"]
path_primary = ctx["path_primary"]
path_tech_excludes = ctx["path_tech_excludes"]
stage_match_brief = ctx["stage_match_brief"]
path_context_note = ctx["path_context_note"]
ga_dump = ctx["ga_dump"]
rs_dump = ctx["rs_dump"]
brief_summary = (
roadmap_ctx.semantic_brief
if roadmap_ctx.semantic_brief
else brief_to_summary_dict(semantic_brief)
)
stage_spec_dict = stage_spec.model_dump()
if major:
stage_spec_dict["phase"] = major.phase
stage_inp = expectation_input_from_progression_stage(
goal_query=goal_query,
goal_analysis=ga_dump,
resolved_structured=rs_dump,
stage_spec=stage_spec_dict,
semantic_brief_summary=brief_summary,
major_step=major.model_dump() if major else None,
)
stage_exp = build_planning_skill_expectations(cur, stage_inp, semantic_brief=semantic_brief)
step_target = apply_expectations_to_target(path_target_profile, stage_exp)
skill_exp_api = stage_exp.to_api_dict() if stage_exp.items else None
step_query = stage_spec_retrieval_query(
semantic_brief=semantic_brief,
goal_query=goal_query,
stage_spec=stage_spec,
major_step=major,
)
step_kind = resolve_step_exercise_kind_filter(stage_spec, body.exercise_kind_any)
peer_goals = _peer_stage_learning_goals(
roadmap_ctx,
current_major_index=int(stage_spec.major_step_index),
)
supplemental_ids = _supplemental_exercise_ids_from_body(cur, body)
lg_candidates_raw = _fetch_learning_goal_library_candidate_ids(
cur,
tenant=tenant,
progression_graph_id=body.progression_graph_id,
learning_goal=stage_goal,
)
lg_candidates = _filter_learning_goal_candidate_ids(
cur,
tenant=tenant,
progression_graph_id=body.progression_graph_id,
candidate_ids=lg_candidates_raw,
stage_goal=stage_goal,
stage_match_brief=stage_match_brief,
stage_anti=stage_anti,
path_primary=path_primary,
path_tech_excludes=path_tech_excludes,
peer_learning_goals=peer_goals,
)
supplemental_ids = list(
dict.fromkeys(
int(x)
for x in [
*supplemental_ids,
*lg_candidates,
slot_priority_exercise_id,
]
if x is not None and int(x) > 0
)
)
priority_ids = list(
dict.fromkeys(
int(x)
for x in [
slot_priority_exercise_id,
*(body.retrieval_boost_exercise_ids or []),
*lg_candidates[:8],
]
if x is not None and int(x) > 0
)
)
hits, _, _, _ = _run_path_step_retrieval(
cur,
tenant=tenant,
goal_query=goal_query,
step_index=step_index,
max_steps=max_steps,
planned_ids=planned_ids,
anchor_id=anchor_id,
anchor_variant_id=anchor_variant_id,
progression_graph_id=body.progression_graph_id,
include_llm_intent=body.include_llm_intent and step_index == 0,
exercise_kind_any=step_kind,
semantic_brief=stage_match_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
step_query_override=step_query,
step_phase_override=major.phase if major else None,
step_target_profile_override=step_target,
stage_learning_goal=stage_goal or None,
stage_anti_patterns=stage_anti or None,
stage_match_brief=stage_match_brief,
stage_success_criteria=list(stage_spec.success_criteria or []),
stage_load_profile=list(stage_spec.load_profile or []),
path_context_note=path_context_note,
path_primary_topic=path_primary or None,
path_technique_excludes=path_tech_excludes or None,
supplemental_exercise_ids=supplemental_ids,
priority_exercise_ids=priority_ids,
)
hit = _pick_best_path_hit(
hits,
used,
semantic_brief=stage_match_brief,
stage_learning_goal=stage_goal or None,
stage_anti_patterns=stage_anti or None,
roadmap_stage_match=True,
stage_match_brief=stage_match_brief,
path_primary_topic=path_primary or None,
path_technique_excludes=path_tech_excludes or None,
peer_learning_goals=peer_goals,
)
if not hit:
return None, stage_spec
step = _annotate_roadmap_step(
_hit_to_path_step(hit),
stage_spec=stage_spec,
major_step=major,
skill_expectations=skill_exp_api,
anti_patterns_override=stage_anti,
)
if (
slot_priority_exercise_id is not None
and int(step["exercise_id"]) == int(slot_priority_exercise_id)
):
step["slot_status"] = "preserved"
step["roadmap_match_source"] = "slot_best_match"
step["reasons"] = ["Bester Treffer (bestehende Zuordnung)"] + list(step.get("reasons") or [])[:2]
else:
step["slot_status"] = "matched"
step["roadmap_match_source"] = "stage_spec"
if (
not skip_post_match_gate
and step.get("roadmap_match_source") != "slot_best_match"
and not _roadmap_step_passes_post_match_gate(
cur,
step,
goal_query=goal_query,
semantic_brief=semantic_brief,
)
):
return None, stage_spec
return step, None
def _roadmap_step_passes_post_match_gate(
cur,
step: Dict[str, Any],
*,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
) -> bool:
"""Abgleich mit Pfad-QA — kein Rematch-Treffer, der sofort wieder stage_mismatch wäre."""
if step.get("exercise_id") is None:
return False
issues = detect_off_topic_steps(
cur,
[step],
brief=semantic_brief,
goal_query=goal_query,
)
return not issues
def _normalize_roadmap_steps_coverage(
steps: List[Dict[str, Any]],
*,
roadmap_ctx: ProgressionRoadmapContext,
max_steps: int,
) -> List[Dict[str, Any]]:
"""Ein Eintrag pro Roadmap-Major-Step — fehlende Slots als leere Platzhalter."""
stage_specs = list(roadmap_ctx.stage_specs or [])[:max_steps]
if not stage_specs:
return steps
major_by_index: Dict[int, MajorStep] = {}
if roadmap_ctx.roadmap:
major_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps}
by_major: Dict[int, Dict[str, Any]] = {}
for raw in steps:
step = dict(raw)
midx = step.get("roadmap_major_step_index")
if midx is not None:
by_major[int(midx)] = step
out: List[Dict[str, Any]] = []
for spec in sorted(stage_specs, key=lambda s: s.major_step_index):
midx = int(spec.major_step_index)
if midx in by_major:
out.append(by_major[midx])
continue
major = major_by_index.get(midx)
goal = (spec.learning_goal or "").strip()
out.append(
{
"exercise_id": None,
"variant_id": None,
"title": goal or f"Slot {midx + 1}",
"is_ai_proposal": False,
"roadmap_major_step_index": midx,
"roadmap_phase": major.phase if major else None,
"roadmap_learning_goal": goal or None,
"roadmap_match_source": "unfilled",
"slot_status": "unfilled",
"reasons": [],
}
)
return out
def _purge_stage_mismatch_roadmap_slots(
cur,
*,
steps: List[Dict[str, Any]],
roadmap_ctx: ProgressionRoadmapContext,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
) -> Tuple[List[Dict[str, Any]], List[Tuple[int, StageSpecArtifact]]]:
"""Leert Slots mit persistentem stage_mismatch — KI-Gap statt schlechter Bibliotheks-Übung."""
issues = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
purge_majors: Set[int] = set()
for item in issues:
if str(item.get("issue") or "") != "stage_mismatch":
continue
midx = item.get("roadmap_major_step_index")
if midx is None:
continue
try:
purge_majors.add(int(midx))
except (TypeError, ValueError):
continue
if not purge_majors:
return steps, []
stage_specs = list(roadmap_ctx.stage_specs or [])
spec_by_major = {int(s.major_step_index): s for s in stage_specs}
major_by_index: Dict[int, MajorStep] = {}
if roadmap_ctx.roadmap:
major_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps}
new_unfilled: List[Tuple[int, StageSpecArtifact]] = []
out: List[Dict[str, Any]] = []
for raw in steps:
step = dict(raw)
midx = step.get("roadmap_major_step_index")
if midx is None or int(midx) not in purge_majors:
out.append(step)
continue
major_idx = int(midx)
spec = spec_by_major.get(major_idx)
if spec is None:
out.append(step)
continue
step_index = next(
(i for i, sp in enumerate(stage_specs) if int(sp.major_step_index) == major_idx),
major_idx,
)
major = major_by_index.get(major_idx)
goal = (spec.learning_goal or step.get("roadmap_learning_goal") or "").strip()
out.append(
{
"exercise_id": None,
"variant_id": None,
"title": goal or f"Slot {major_idx + 1}",
"is_ai_proposal": False,
"roadmap_major_step_index": major_idx,
"roadmap_phase": major.phase if major else step.get("roadmap_phase"),
"roadmap_learning_goal": goal or None,
"roadmap_match_source": "unfilled",
"slot_status": "unfilled",
"reasons": ["Keine passende Bibliotheks-Übung für Stufen-Lernziel"],
}
)
new_unfilled.append((step_index, spec))
return out, new_unfilled
def _enrich_roadmap_unfilled_gap_offers(
cur,
*,
steps: List[Dict[str, Any]],
gap_fill_offers: List[Dict[str, Any]],
body: ProgressionPathSuggestRequest,
roadmap_ctx: ProgressionRoadmapContext,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""KI-Lücken-Angebote für alle leeren Roadmap-Slots (nach Rematch/Normalize)."""
if not body.include_ai_gap_fill:
return steps, gap_fill_offers
seen_offer_ids = {o.get("offer_id") for o in gap_fill_offers if o.get("offer_id")}
out_steps: List[Dict[str, Any]] = []
offers = list(gap_fill_offers)
for raw in steps:
step = dict(raw)
if step.get("exercise_id") is not None:
out_steps.append(step)
continue
try:
major_idx = int(step["roadmap_major_step_index"])
except (TypeError, ValueError, KeyError):
out_steps.append(step)
continue
if step.get("gap_offer") and step.get("proposal_key"):
oid = step["gap_offer"].get("offer_id")
if oid and oid not in seen_offer_ids:
offers.append(dict(step["gap_offer"]))
seen_offer_ids.add(oid)
out_steps.append(step)
continue
stage_spec = next(
(
s
for s in (roadmap_ctx.stage_specs or [])
if int(s.major_step_index) == major_idx
),
None,
)
learning_goal = (
(stage_spec.learning_goal if stage_spec else None)
or step.get("roadmap_learning_goal")
or step.get("title")
or ""
).strip()
spec = {
"source": "roadmap_unfilled",
"insert_after_index": max(major_idx - 1, -1),
"roadmap_major_step_index": major_idx,
"phase": (step.get("roadmap_phase") or "vertiefung").strip().lower(),
"title_hint": (learning_goal or f"Slot {major_idx + 1}")[:120],
"sketch": learning_goal,
"rationale": (
f"Slot {major_idx + 1} — keine passende Bibliotheks-Übung; "
"KI-Entwurf für diese Stufe."
),
}
offer = build_gap_fill_offer(
spec=spec,
steps=steps,
goal_query=goal_query,
brief=semantic_brief,
proposal=None,
roadmap_snapshot=_roadmap_gap_snapshot_for_spec(
cur,
roadmap_ctx,
spec,
goal_query=goal_query,
semantic_brief=semantic_brief,
),
)
step["gap_offer"] = offer
step["proposal_key"] = offer.get("offer_id")
step["slot_status"] = "unfilled"
if offer.get("offer_id") and offer.get("offer_id") not in seen_offer_ids:
offers.append(offer)
seen_offer_ids.add(offer.get("offer_id"))
out_steps.append(step)
return out_steps, offers
def _merge_rematch_unfilled(
roadmap_unfilled: List[Tuple[int, StageSpecArtifact]],
rematch_new_unfilled: List[Tuple[int, StageSpecArtifact]],
) -> List[Tuple[int, StageSpecArtifact]]:
if not rematch_new_unfilled:
return roadmap_unfilled
remapped = {sp.major_step_index for _, sp in rematch_new_unfilled}
kept = [item for item in roadmap_unfilled if item[1].major_step_index not in remapped]
kept.extend(rematch_new_unfilled)
return kept
def _prune_filled_from_roadmap_unfilled(
steps: Sequence[Mapping[str, Any]],
roadmap_unfilled: List[Tuple[int, StageSpecArtifact]],
) -> List[Tuple[int, StageSpecArtifact]]:
"""Entfernt Stufen mit Bibliotheks-Treffer — verhindert veraltete roadmap_unfilled-Hinweise."""
filled_majors: Set[int] = set()
for raw in steps:
if raw.get("exercise_id") is None:
continue
midx = raw.get("roadmap_major_step_index")
if midx is None:
continue
try:
filled_majors.add(int(midx))
except (TypeError, ValueError):
continue
if not filled_majors:
return roadmap_unfilled
return [
item
for item in roadmap_unfilled
if int(item[1].major_step_index) not in filled_majors
]
def _run_roadmap_rematch_loop(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
path_target_profile: PlanningTargetProfile,
path_intent: str,
roadmap_ctx: ProgressionRoadmapContext,
steps: List[Dict[str, Any]],
stripped_off_topic: List[Dict[str, Any]],
off_topic_before_strip: List[Dict[str, Any]],
roadmap_unfilled: List[Tuple[int, StageSpecArtifact]],
gaps: List[Dict[str, Any]],
) -> Tuple[
List[Dict[str, Any]],
List[Dict[str, Any]],
List[Dict[str, Any]],
List[Dict[str, Any]],
int,
List[Tuple[int, StageSpecArtifact]],
List[Dict[str, Any]],
]:
"""Phase A/B/C: Rematch-Schleife mit optionaler Stufen-Spec-Verfeinerung."""
rematch_log: List[Dict[str, Any]] = []
refine_log: List[Dict[str, Any]] = []
rematch_rounds = 0
max_rounds = int(body.max_rematch_rounds or 0)
if not body.auto_rematch_after_qa or max_rounds <= 0 or not roadmap_ctx.stage_specs:
off_topic_steps = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
return (
steps,
rematch_log,
stripped_off_topic,
off_topic_steps,
rematch_rounds,
roadmap_unfilled,
refine_log,
)
current_stripped = list(stripped_off_topic or [])
use_initial_off_topic = not current_stripped
off_topic_steps: List[Dict[str, Any]] = []
rejected_by_major: Dict[int, Set[int]] = {}
def _track_rejected(items: Sequence[Mapping[str, Any]]) -> None:
for item in items or []:
if not isinstance(item, dict):
continue
eid = item.get("exercise_id")
midx = item.get("roadmap_major_step_index")
if eid is None or midx is None:
continue
try:
rejected_by_major.setdefault(int(midx), set()).add(int(eid))
except (TypeError, ValueError):
continue
_track_rejected(off_topic_before_strip)
_track_rejected(current_stripped)
slot_assignment_history: Dict[int, Set[int]] = {}
for raw in steps:
midx = raw.get("roadmap_major_step_index")
eid = raw.get("exercise_id")
if midx is None or eid is None:
continue
try:
slot_assignment_history.setdefault(int(midx), set()).add(int(eid))
except (TypeError, ValueError):
continue
for round_idx in range(max_rounds):
mini_qa = run_multistage_path_qa(
off_topic_steps=off_topic_steps if round_idx > 0 else [],
stripped_off_topic=current_stripped if round_idx == 0 else [],
gaps=gaps if round_idx == 0 else [],
llm_qa=None,
llm_applied=False,
roadmap_unfilled=roadmap_unfilled,
)
optimization_hints = list(mini_qa.get("optimization_hints") or [])
if body.auto_refine_stage_spec:
_, round_refine = apply_stage_spec_refinements(
roadmap_ctx,
optimization_hints=optimization_hints,
off_topic_steps=off_topic_steps if round_idx > 0 else off_topic_before_strip,
goal_query=goal_query,
semantic_brief=semantic_brief,
)
if round_refine:
for entry in round_refine:
tagged = dict(entry)
tagged["round"] = rematch_rounds + 1
refine_log.append(tagged)
slot_indices, rematch_reasons = collect_rematch_slot_indices(
stripped_off_topic=current_stripped if round_idx == 0 else [],
off_topic_steps=off_topic_before_strip if round_idx == 0 and use_initial_off_topic else [],
optimization_hints=optimization_hints,
stage_specs=roadmap_ctx.stage_specs,
roadmap_unfilled=roadmap_unfilled,
)
if body.auto_refine_stage_spec:
refine_targets = collect_refine_stage_targets(
optimization_hints=optimization_hints,
off_topic_steps=off_topic_steps if round_idx > 0 else off_topic_before_strip,
stage_specs=roadmap_ctx.stage_specs,
)
for midx in refine_targets:
slot_indices.add(int(midx))
if int(midx) not in rematch_reasons:
rematch_reasons[int(midx)] = "refine_stage_spec"
slot_indices = filter_rematch_slot_indices(
steps,
slot_indices,
stripped_off_topic=current_stripped if round_idx == 0 else [],
off_topic_steps=off_topic_before_strip if round_idx == 0 and use_initial_off_topic else [],
)
if not slot_indices:
break
steps, round_log, rematch_new_unfilled = rematch_roadmap_slots(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
steps=steps,
slot_indices=slot_indices,
rematch_reasons=rematch_reasons,
match_slot_fn=_match_roadmap_slot,
rejected_by_major=rejected_by_major,
slot_assignment_history=slot_assignment_history,
)
rematch_rounds += 1
for entry in round_log:
tagged = dict(entry)
tagged["round"] = rematch_rounds
rematch_log.append(tagged)
rid = entry.get("replaced_exercise_id")
midx = entry.get("roadmap_major_step_index")
if rid is not None and midx is not None:
try:
rejected_by_major.setdefault(int(midx), set()).add(int(rid))
except (TypeError, ValueError):
pass
new_eid = entry.get("new_exercise_id")
if (
str(entry.get("action") or "") == "replaced"
and new_eid is not None
and midx is not None
):
try:
slot_assignment_history.setdefault(int(midx), set()).add(int(new_eid))
except (TypeError, ValueError):
pass
current_stripped = prune_stripped_after_rematch(current_stripped, round_log)
roadmap_unfilled = _merge_rematch_unfilled(roadmap_unfilled, rematch_new_unfilled)
roadmap_unfilled = _prune_filled_from_roadmap_unfilled(steps, roadmap_unfilled)
use_initial_off_topic = False
off_topic_steps = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
_track_rejected(off_topic_steps)
if round_idx + 1 >= max_rounds:
break
if not off_topic_steps and not roadmap_unfilled:
break
if not off_topic_steps:
off_topic_steps = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
steps, purged_unfilled = _purge_stage_mismatch_roadmap_slots(
cur,
steps=steps,
roadmap_ctx=roadmap_ctx,
goal_query=goal_query,
semantic_brief=semantic_brief,
)
if purged_unfilled:
roadmap_unfilled = _merge_rematch_unfilled(roadmap_unfilled, purged_unfilled)
off_topic_steps = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
roadmap_unfilled = _prune_filled_from_roadmap_unfilled(steps, roadmap_unfilled)
return (
steps,
rematch_log,
current_stripped,
off_topic_steps,
rematch_rounds,
roadmap_unfilled,
refine_log,
)
def _build_steps_roadmap_first(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
path_target_profile: PlanningTargetProfile,
path_intent: str,
roadmap_ctx: ProgressionRoadmapContext,
) -> Tuple[List[Dict[str, Any]], List[Tuple[int, StageSpecArtifact]]]:
"""Retrieval pro stage_spec statt iterativem Pfad-Bau (Phase F3)."""
stage_specs = list(roadmap_ctx.stage_specs or [])[:max_steps]
if not stage_specs and roadmap_ctx.roadmap:
stage_specs = [
StageSpecArtifact(
major_step_index=m.index,
learning_goal=m.learning_goal,
)
for m in roadmap_ctx.roadmap.major_steps[:max_steps]
]
used: Set[int] = set()
steps: List[Dict[str, Any]] = []
planned_ids: List[int] = []
anchor_id: Optional[int] = None
anchor_variant_id: Optional[int] = None
unfilled: List[Tuple[int, StageSpecArtifact]] = []
stage_count = len(stage_specs)
assignments = _slot_assignments_by_major_index(body.slot_assignments)
majors_by_index: Dict[int, MajorStep] = {}
if roadmap_ctx.roadmap:
majors_by_index = {m.index: m for m in roadmap_ctx.roadmap.major_steps}
preserve_assignments = _assignment_preservation_active(body)
for step_index, stage_spec in enumerate(stage_specs):
major_idx = stage_spec.major_step_index
major = majors_by_index.get(major_idx)
slot_priority_id: Optional[int] = None
if preserve_assignments and major_idx in assignments:
direct = _path_step_from_slot_assignment(
cur,
assignment=assignments[major_idx],
stage_spec=stage_spec,
major_step=major,
tenant=tenant,
progression_graph_id=body.progression_graph_id,
)
if direct:
direct["slot_status"] = "preserved"
direct["roadmap_match_source"] = "slot_best_match"
steps.append(direct)
eid = int(direct["exercise_id"])
used.add(eid)
planned_ids.append(eid)
anchor_id = eid
anchor_variant_id = direct.get("variant_id")
continue
if major_idx in assignments:
try:
slot_priority_id = int(assignments[major_idx].exercise_id)
except (TypeError, ValueError):
slot_priority_id = None
step, unfilled_spec = _match_roadmap_slot(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
stage_spec=stage_spec,
step_index=step_index,
stage_count=stage_count,
planned_ids=planned_ids,
anchor_id=anchor_id,
anchor_variant_id=anchor_variant_id,
used=used,
slot_priority_exercise_id=slot_priority_id,
)
if not step:
unfilled.append((step_index, unfilled_spec or stage_spec))
continue
steps.append(step)
eid = int(step["exercise_id"])
used.add(eid)
planned_ids.append(eid)
anchor_id = eid
anchor_variant_id = step.get("variant_id")
return steps, unfilled
def _evaluate_steps_from_payload(
cur,
payloads: List[EvaluateStepPayload],
) -> List[Dict[str, Any]]:
steps: List[Dict[str, Any]] = []
for raw in payloads:
is_proposal = bool(raw.is_ai_proposal) or raw.exercise_id is None
title = (raw.title or "").strip() or None
if is_proposal:
steps.append(
{
"exercise_id": None,
"variant_id": None,
"title": title or "KI-Vorschlag",
"is_ai_proposal": True,
"ai_suggestion": raw.ai_suggestion,
"proposal_key": raw.proposal_key,
"roadmap_major_step_index": raw.roadmap_major_step_index,
"roadmap_phase": raw.roadmap_phase,
"roadmap_learning_goal": raw.roadmap_learning_goal,
"reasons": [],
}
)
continue
eid = int(raw.exercise_id)
cur.execute(
"SELECT id, title, summary FROM exercises WHERE id = %s",
(eid,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=400, detail=f"Übung {eid} nicht gefunden")
steps.append(
{
"exercise_id": eid,
"variant_id": raw.variant_id,
"title": title or row.get("title"),
"summary": row.get("summary"),
"is_ai_proposal": False,
"roadmap_major_step_index": raw.roadmap_major_step_index,
"roadmap_phase": raw.roadmap_phase,
"roadmap_learning_goal": raw.roadmap_learning_goal,
"reasons": [],
}
)
return steps
def _build_evaluate_empty_slot_gap_specs(
steps: List[Dict[str, Any]],
*,
goal_query: str,
) -> List[Dict[str, Any]]:
"""Gap-Angebote für leere Roadmap-Slots im evaluate_only-Modus."""
specs: List[Dict[str, Any]] = []
for step in steps:
if step.get("exercise_id") is not None:
continue
major_idx = step.get("roadmap_major_step_index")
if major_idx is None:
continue
try:
roadmap_idx = int(major_idx)
except (TypeError, ValueError):
continue
phase = (step.get("roadmap_phase") or "vertiefung").strip().lower()
learning_goal = (step.get("roadmap_learning_goal") or step.get("title") or "").strip()
title_hint = learning_goal[:120] if learning_goal else f"Slot {roadmap_idx + 1}"
specs.append(
{
"source": "roadmap_unfilled",
"insert_after_index": max(roadmap_idx - 1, -1),
"gap": {
"expected_phase": phase,
"roadmap_major_step_index": roadmap_idx,
"learning_goal": learning_goal,
},
"phase": phase,
"title_hint": title_hint,
"sketch": learning_goal or title_hint,
"rationale": (
f"Slot {roadmap_idx + 1} ohne Übung — KI-Entwurf für diese Roadmap-Stufe."
),
"roadmap_major_step_index": roadmap_idx,
}
)
return specs[:8]
def _build_off_topic_slot_gap_spec(
step: Mapping[str, Any],
*,
goal_query: str = "",
) -> Optional[Dict[str, Any]]:
"""KI-Angebot für belegten, aber themenfremden Slot (Ersatz statt Leerstelle)."""
del goal_query
major_idx = step.get("roadmap_major_step_index")
if major_idx is None:
return None
try:
roadmap_idx = int(major_idx)
except (TypeError, ValueError):
return None
phase = (step.get("roadmap_phase") or "vertiefung").strip().lower()
learning_goal = (step.get("roadmap_learning_goal") or step.get("title") or "").strip()
rejected_title = (step.get("title") or "").strip()
title_hint = learning_goal[:120] if learning_goal else f"Slot {roadmap_idx + 1}"
rationale = (
f"Slot {roadmap_idx + 1}: Ersatz für „{rejected_title}“ — passende Übung per KI."
if rejected_title
else f"Slot {roadmap_idx + 1} — KI-Entwurf für diese Roadmap-Stufe."
)
return {
"source": "off_topic",
"insert_after_index": max(roadmap_idx - 1, -1),
"replace_step_index": roadmap_idx,
"gap": {
"expected_phase": phase,
"roadmap_major_step_index": roadmap_idx,
"learning_goal": learning_goal,
},
"phase": phase,
"title_hint": title_hint,
"sketch": learning_goal or title_hint,
"rationale": rationale[:400],
"roadmap_major_step_index": roadmap_idx,
}
def _gap_offer_major_index(offer: Mapping[str, Any]) -> Optional[int]:
raw = offer.get("roadmap_major_step_index")
if raw is None:
return None
try:
return int(raw)
except (TypeError, ValueError):
return None
def _run_evaluate_only_path_qa(
cur,
*,
body: ProgressionPathSuggestRequest,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
steps: List[Dict[str, Any]],
roadmap_ctx: Optional[ProgressionRoadmapContext],
) -> Dict[str, Any]:
roadmap_first = roadmap_ctx is not None
gaps: List[Dict[str, Any]] = []
bridge_inserts: List[Dict[str, Any]] = []
unfilled_gaps: List[Dict[str, Any]] = []
llm_qa: Optional[Dict[str, Any]] = None
llm_qa_applied = False
off_topic_steps: List[Dict[str, Any]] = []
stripped_off_topic: List[Dict[str, Any]] = []
ai_proposals: List[Dict[str, Any]] = []
gap_fill_offers: List[Dict[str, Any]] = []
roadmap_qa_mode: Optional[str] = None
if body.include_path_qa:
if roadmap_first:
roadmap_qa_mode = "roadmap_first_lite"
gaps = detect_path_gaps(
cur,
steps,
brief=semantic_brief,
roadmap_first=roadmap_first,
)
if gaps and roadmap_first:
unfilled_gaps = list(gaps)
if body.include_llm_path_qa:
llm_qa, llm_qa_applied = try_llm_qa_progression_path(
cur,
goal_query=goal_query,
brief=semantic_brief,
steps=steps,
gaps=gaps,
bridge_inserts=bridge_inserts,
)
off_topic_steps = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
llm_gap_specs = parse_llm_suggested_new_exercises(
llm_qa,
brief=semantic_brief,
step_count=len(steps),
)
if body.include_ai_gap_fill:
fresh_large_gaps = [g for g in gaps if g.get("is_large_gap")]
gap_specs = collect_gap_fill_specs(
steps=steps,
unfilled_gaps=fresh_large_gaps or unfilled_gaps,
off_topic_steps=off_topic_steps,
llm_specs=llm_gap_specs,
brief=semantic_brief,
goal_query=goal_query,
)
empty_slot_specs = _build_evaluate_empty_slot_gap_specs(
steps,
goal_query=goal_query,
)
seen_spec_keys = {
(
s.get("source"),
s.get("roadmap_major_step_index"),
s.get("insert_after_index"),
)
for s in gap_specs
}
for spec in empty_slot_specs:
key = (
spec.get("source"),
spec.get("roadmap_major_step_index"),
spec.get("insert_after_index"),
)
if key not in seen_spec_keys:
gap_specs.append(spec)
seen_spec_keys.add(key)
path_roadmap_snapshot = None
if roadmap_ctx:
path_roadmap_snapshot = build_progression_gap_snapshot(
goal_analysis=(
roadmap_ctx.goal_analysis.model_dump()
if roadmap_ctx.goal_analysis
else None
),
resolved_structured=(
roadmap_ctx.resolved_structured.model_dump()
if roadmap_ctx.resolved_structured
else None
),
semantic_brief=roadmap_ctx.semantic_brief
or brief_to_summary_dict(semantic_brief),
)
_, ai_proposals, gap_fill_offers = apply_gap_fill_after_qa(
cur,
steps,
gap_specs,
goal_query=goal_query,
brief=semantic_brief,
include_ai_calls=False,
max_ai_proposals=0,
auto_insert_proposals=False,
roadmap_snapshot=path_roadmap_snapshot,
)
multistage_qa = run_multistage_path_qa(
off_topic_steps=off_topic_steps,
stripped_off_topic=stripped_off_topic,
gaps=gaps,
llm_qa=llm_qa,
llm_applied=llm_qa_applied,
)
path_qa = build_path_qa_summary(
gaps=gaps,
bridge_inserts=bridge_inserts,
ai_proposals=ai_proposals,
gap_fill_offers=gap_fill_offers,
off_topic_steps=off_topic_steps,
stripped_off_topic=stripped_off_topic,
llm_qa=llm_qa,
llm_applied=llm_qa_applied,
reorder_applied=False,
reorder_notes=[],
roadmap_qa_mode=roadmap_qa_mode,
multistage_qa=multistage_qa,
)
return {
"path_qa": path_qa,
"gap_fill_offers": gap_fill_offers,
"steps": steps,
}
def _path_qa_quality_score(path_qa: Optional[Mapping[str, Any]]) -> Optional[float]:
if not path_qa:
return None
raw = path_qa.get("quality_score")
try:
return float(raw) if raw is not None else None
except (TypeError, ValueError):
return None
def _steps_by_major_index(steps: Sequence[Mapping[str, Any]]) -> Dict[int, Dict[str, Any]]:
out: Dict[int, Dict[str, Any]] = {}
for raw in steps or []:
if not isinstance(raw, dict):
continue
midx = raw.get("roadmap_major_step_index")
if midx is None:
continue
try:
out[int(midx)] = dict(raw)
except (TypeError, ValueError):
continue
return out
def _steps_to_evaluate_payloads(steps: Sequence[Mapping[str, Any]]) -> List[EvaluateStepPayload]:
"""Pfad-Schritte → evaluate_steps (für faire QS auf dem End-Stand)."""
payloads: List[EvaluateStepPayload] = []
for step in steps or []:
if not isinstance(step, dict):
continue
midx = step.get("roadmap_major_step_index")
if midx is None:
continue
eid = step.get("exercise_id")
is_proposal = bool(step.get("is_ai_proposal")) or eid is None
payloads.append(
EvaluateStepPayload(
exercise_id=int(eid) if eid is not None and not is_proposal else None,
variant_id=step.get("variant_id"),
title=step.get("title"),
is_ai_proposal=is_proposal,
ai_suggestion=step.get("ai_suggestion") if isinstance(step.get("ai_suggestion"), dict) else None,
proposal_key=step.get("proposal_key"),
roadmap_major_step_index=int(midx),
roadmap_phase=step.get("roadmap_phase"),
roadmap_learning_goal=step.get("roadmap_learning_goal"),
)
)
payloads.sort(key=lambda p: int(p.roadmap_major_step_index or 0))
return payloads
def _normalize_slot_title(title: Optional[str]) -> str:
return (title or "").strip().casefold()
def _annotate_slot_diffs(
diffs: Sequence[Mapping[str, Any]],
) -> List[Dict[str, Any]]:
"""Kennzeichnet reine ID-Tausche (gleicher Titel) — bleiben sichtbar, zählen aber nicht als inhaltlich."""
out: List[Dict[str, Any]] = []
for raw in diffs or []:
if not isinstance(raw, dict):
continue
entry = dict(raw)
bt = _normalize_slot_title(entry.get("baseline_title"))
pt = _normalize_slot_title(entry.get("proposed_title"))
entry["trivial_id_swap"] = bool(bt and pt and bt == pt)
out.append(entry)
return out
def _actionable_slot_diffs(diffs: Sequence[Mapping[str, Any]]) -> List[Dict[str, Any]]:
return [d for d in diffs if not d.get("trivial_id_swap")]
def _last_rematch_replacements_by_slot(
rematch_log: Sequence[Mapping[str, Any]],
) -> Dict[int, Mapping[str, Any]]:
"""Letzter erfolgreicher Replace je Slot (Multi-Runden-Rematch)."""
out: Dict[int, Mapping[str, Any]] = {}
for entry in rematch_log or []:
if not isinstance(entry, dict):
continue
if str(entry.get("action") or "") != "replaced":
continue
if entry.get("new_exercise_id") is None:
continue
midx = entry.get("roadmap_major_step_index")
if midx is None:
continue
out[int(midx)] = entry
return out
def _baseline_slot_accepts_rematch_suggestion(base: Mapping[str, Any]) -> bool:
"""Rematch-Protokoll nur für leere oder explizit ungültige Slots — nicht kuratierte Zuordnungen ersetzen."""
if not base:
return True
base_id = base.get("exercise_id")
status = str(base.get("slot_status") or "").strip().lower()
if base_id is None:
return True
if status in {"unfilled", "stripped", "gap", "off_topic"}:
return True
return False
def _build_rematch_suggestion_diffs(
baseline_steps: Sequence[Mapping[str, Any]],
rematch_log: Sequence[Mapping[str, Any]],
) -> List[Dict[str, Any]]:
"""Vorschläge aus Rematch-Protokoll, wenn End-Pfad vs. Baseline identisch wirkt."""
base_by = _steps_by_major_index(baseline_steps)
replacements = _last_rematch_replacements_by_slot(rematch_log)
diffs: List[Dict[str, Any]] = []
for midx, entry in sorted(replacements.items()):
base = base_by.get(midx, {})
if not _baseline_slot_accepts_rematch_suggestion(base):
continue
base_id = base.get("exercise_id")
new_id = entry.get("new_exercise_id")
base_title = (base.get("title") or "").strip() or None
new_title = (entry.get("new_title") or "").strip() or None
same_id = False
if base_id is not None and new_id is not None:
try:
same_id = int(base_id) == int(new_id)
except (TypeError, ValueError):
same_id = False
if same_id:
bt = _normalize_slot_title(base_title)
pt = _normalize_slot_title(new_title)
if bt and pt and bt == pt:
continue
diffs.append(
{
"roadmap_major_step_index": midx,
"baseline_exercise_id": int(base_id) if base_id is not None else None,
"baseline_title": base_title,
"proposed_exercise_id": int(new_id) if new_id is not None else None,
"proposed_title": new_title,
"baseline_slot_status": base.get("slot_status"),
"proposed_slot_status": "matched",
"changed": True,
"from_rematch_log": True,
}
)
return diffs
def _overlay_rematch_suggestions_on_steps(
proposed_steps: Sequence[Mapping[str, Any]],
suggestion_diffs: Sequence[Mapping[str, Any]],
) -> List[Dict[str, Any]]:
"""Ergänzt proposed_steps um Rematch-Kandidaten (für selektive Übernahme)."""
if not suggestion_diffs:
return list(proposed_steps or [])
prop_by = _steps_by_major_index(proposed_steps)
for diff in suggestion_diffs:
if not isinstance(diff, dict) or not diff.get("from_rematch_log"):
continue
midx = diff.get("roadmap_major_step_index")
new_id = diff.get("proposed_exercise_id")
if midx is None or new_id is None:
continue
existing = dict(prop_by.get(int(midx), {}))
existing.update(
{
"exercise_id": int(new_id),
"title": diff.get("proposed_title") or existing.get("title"),
"variant_id": existing.get("variant_id"),
"roadmap_major_step_index": int(midx),
"is_ai_proposal": False,
"slot_status": "matched",
"roadmap_match_source": "rematch_suggestion",
}
)
prop_by[int(midx)] = existing
ordered: List[Dict[str, Any]] = []
for midx in sorted(prop_by.keys()):
ordered.append(prop_by[midx])
return ordered
def _build_progression_slot_diffs(
baseline_steps: Sequence[Mapping[str, Any]],
proposed_steps: Sequence[Mapping[str, Any]],
) -> List[Dict[str, Any]]:
"""Gegenüberstellung je Roadmap-Slot — nur geänderte oder neu leere Slots."""
base_by = _steps_by_major_index(baseline_steps)
prop_by = _steps_by_major_index(proposed_steps)
diffs: List[Dict[str, Any]] = []
for midx in sorted(set(base_by.keys()) | set(prop_by.keys())):
base = base_by.get(midx, {})
prop = prop_by.get(midx, {})
base_id = base.get("exercise_id")
prop_id = prop.get("exercise_id")
base_title = (base.get("title") or "").strip() or None
prop_title = (prop.get("title") or "").strip() or None
if base_id is not None and prop_id is not None and int(base_id) == int(prop_id):
continue
diffs.append(
{
"roadmap_major_step_index": midx,
"baseline_exercise_id": int(base_id) if base_id is not None else None,
"baseline_title": base_title,
"proposed_exercise_id": int(prop_id) if prop_id is not None else None,
"proposed_title": prop_title,
"baseline_slot_status": base.get("slot_status"),
"proposed_slot_status": prop.get("slot_status"),
"changed": base_id != prop_id or base_title != prop_title,
}
)
return diffs
def _evaluate_steps_for_compare_qa(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
steps: Sequence[Mapping[str, Any]],
) -> Optional[Dict[str, Any]]:
"""Evaluate-only auf konkretem Schritt-Stand (gleiche Pipeline wie Graph bewerten)."""
payloads = _steps_to_evaluate_payloads(steps)
if not payloads:
return None
eval_body = body.model_copy(
update={
"evaluate_only": True,
"evaluate_steps": payloads,
"compare_with_assignments": False,
"preserve_slot_assignments": False,
"include_llm_intent": False,
"auto_rematch_after_qa": False,
"include_roadmap_preview": False,
}
)
return suggest_progression_path(cur, tenant=tenant, body=eval_body)
def _quick_evaluate_steps_qa(
cur,
*,
goal_query: str,
semantic_brief: PlanningSemanticBrief,
steps: Sequence[Mapping[str, Any]],
roadmap_ctx: Optional[ProgressionRoadmapContext],
) -> Dict[str, Any]:
"""Schnelle Pfad-QS ohne rekursiven API-Lauf — für Slot-Vergleiche."""
roadmap_first = roadmap_ctx is not None
steps_list = list(steps or [])
gaps = detect_path_gaps(
cur,
steps_list,
brief=semantic_brief,
roadmap_first=roadmap_first,
)
off_topic_steps = detect_off_topic_steps(
cur,
steps_list,
brief=semantic_brief,
goal_query=goal_query,
)
multistage_qa = run_multistage_path_qa(
off_topic_steps=off_topic_steps,
stripped_off_topic=[],
gaps=gaps,
llm_qa=None,
llm_applied=False,
)
path_qa = build_path_qa_summary(
gaps=gaps,
bridge_inserts=[],
ai_proposals=[],
gap_fill_offers=[],
off_topic_steps=off_topic_steps,
stripped_off_topic=[],
llm_qa=None,
llm_applied=False,
roadmap_qa_mode="roadmap_first_lite" if roadmap_first else None,
multistage_qa=multistage_qa,
)
if path_qa.get("quality_score") is None:
path_qa["quality_score"] = compute_deterministic_path_quality_score(
gaps=gaps,
off_topic_steps=off_topic_steps,
steps=steps_list,
multistage_qa=multistage_qa,
)
return path_qa
def _off_topic_slot_indices(path_qa: Optional[Mapping[str, Any]]) -> Set[int]:
return set(_off_topic_reasons_by_slot((path_qa or {}).get("off_topic_steps") or []).keys())
def _resolve_hint_major_index(
hint: Mapping[str, Any],
stage_specs: Sequence[StageSpecArtifact],
) -> Optional[int]:
raw = hint.get("roadmap_major_step_index")
if raw is not None:
try:
return int(raw)
except (TypeError, ValueError):
return None
step_index = hint.get("step_index")
if step_index is None:
return None
try:
pos = int(step_index)
except (TypeError, ValueError):
return None
if 0 <= pos < len(stage_specs):
return int(stage_specs[pos].major_step_index)
return pos if pos >= 0 else None
def _parse_slot_refs_from_text(text: str) -> Set[int]:
"""„Schritt 8“ / „Slot 8“ / „Stufe 8“ → 0-basierter major_step_index (7)."""
found: Set[int] = set()
if not text:
return found
for match in re.finditer(r"(?:schritt|slot|stufe)\s*(\d+)", text.lower()):
try:
n = int(match.group(1))
except (TypeError, ValueError):
continue
if n >= 1:
found.add(n - 1)
return found
def _problematic_slots_from_path_qa(
baseline_qa: Optional[Mapping[str, Any]],
baseline_steps: Sequence[Mapping[str, Any]],
stage_specs: Sequence[StageSpecArtifact],
) -> Dict[int, List[str]]:
"""Schachstellen aus derselben QS wie „Graph bewerten“ — Basis für Match-Vorschläge."""
problems: Dict[int, List[str]] = {}
def _add(midx: int, reason: str) -> None:
text = (reason or "").strip()
if not text:
return
bucket = problems.setdefault(int(midx), [])
if text not in bucket:
bucket.append(text[:400])
for midx, reasons in _off_topic_reasons_by_slot(
(baseline_qa or {}).get("off_topic_steps") or [],
).items():
for reason in reasons:
_add(midx, reason)
for hint in (baseline_qa or {}).get("optimization_hints") or []:
if not isinstance(hint, dict):
continue
action = str(hint.get("action") or "").strip().lower()
if action == "review_roadmap":
continue
midx = _resolve_hint_major_index(hint, stage_specs)
if midx is None:
title = str(hint.get("title") or "")
for ref in _parse_slot_refs_from_text(
" ".join(
str(hint.get(k) or "")
for k in ("reason", "issue", "title", "roadmap_learning_goal")
)
):
midx = ref
break
if title:
for step in baseline_steps or []:
if not isinstance(step, dict):
continue
st = str(step.get("title") or "").strip()
smidx = step.get("roadmap_major_step_index")
if st and title.lower() in st.lower() and smidx is not None:
midx = int(smidx)
break
if midx is None:
continue
_add(
int(midx),
str(
hint.get("reason")
or hint.get("issue")
or hint.get("title")
or action
),
)
llm_text_parts: List[str] = []
for key in ("topic_coverage",):
raw = (baseline_qa or {}).get(key)
if raw:
llm_text_parts.append(str(raw))
for key in ("issues", "recommendations", "sequence_notes"):
for raw in (baseline_qa or {}).get(key) or []:
llm_text_parts.append(str(raw or ""))
combined = "\n".join(llm_text_parts)
for midx in _parse_slot_refs_from_text(combined):
_add(midx, "In Pfad-Bewertung als Schachstelle genannt")
for raw in (baseline_qa or {}).get("issues") or []:
text = str(raw or "").strip()
if not text:
continue
for step in baseline_steps or []:
if not isinstance(step, dict):
continue
midx = step.get("roadmap_major_step_index")
if midx is None:
continue
try:
slot_no = int(midx) + 1
except (TypeError, ValueError):
continue
title = str(step.get("title") or "").strip()
if (
f"schritt {slot_no}" in text.lower()
or f"slot {slot_no}" in text.lower()
or f"stufe {slot_no}" in text.lower()
or (title and title.lower() in text.lower())
):
_add(int(midx), text)
for step in baseline_steps or []:
if not isinstance(step, dict):
continue
midx = step.get("roadmap_major_step_index")
if midx is None:
continue
try:
major_idx = int(midx)
except (TypeError, ValueError):
continue
if step.get("exercise_id") is None and not step.get("is_ai_proposal"):
_add(major_idx, "Leerer Slot ohne Bibliotheks-Übung")
return problems
def _slot_suggestion_accepted(
*,
baseline_qa: Optional[Mapping[str, Any]],
projected_qa: Optional[Mapping[str, Any]],
baseline_score: Optional[float],
projected_score: Optional[float],
diff: Mapping[str, Any],
off_topic: bool,
major_idx: int,
slot_problem: bool = False,
stage_specs: Optional[Sequence[StageSpecArtifact]] = None,
baseline_steps: Optional[Sequence[Mapping[str, Any]]] = None,
projected_steps: Optional[Sequence[Mapping[str, Any]]] = None,
) -> bool:
"""Entscheidet, ob ein Slot-Vorschlag in die Liste kommt."""
base_id = diff.get("baseline_exercise_id")
prop_id = diff.get("proposed_exercise_id")
base_off = _off_topic_slot_indices(baseline_qa)
proj_off = _off_topic_slot_indices(projected_qa)
delta = _quality_delta(baseline_score, projected_score)
if prop_id is not None and base_id is not None and int(base_id) == int(prop_id):
return False
if slot_problem and prop_id is not None:
if major_idx in base_off and major_idx not in proj_off:
return True
if delta is not None and delta >= -0.001:
return True
if stage_specs is not None:
proj_problems = _problematic_slots_from_path_qa(
projected_qa,
projected_steps or baseline_steps or [],
stage_specs,
)
if major_idx not in proj_problems:
return True
return True
if off_topic and base_id is not None:
if major_idx in base_off and major_idx not in proj_off:
return True
if prop_id is not None:
return _slot_diff_improves_path(diff, delta, off_topic=True)
if base_id is None and prop_id is not None:
return _slot_diff_improves_path(diff, delta, off_topic=False)
if base_id is not None and prop_id is not None:
return _slot_diff_improves_path(diff, delta, off_topic=False)
if base_id is None and prop_id is None and diff.get("proposed_is_ai_proposal"):
return _slot_diff_improves_path(
diff,
delta,
off_topic=off_topic or major_idx in base_off or slot_problem,
)
return False
def _quality_delta(
baseline_score: Optional[float],
projected_score: Optional[float],
) -> Optional[float]:
if baseline_score is None or projected_score is None:
return None
return round(float(projected_score) - float(baseline_score), 4)
def _apply_slot_diff_to_steps(
baseline_steps: Sequence[Mapping[str, Any]],
diff: Mapping[str, Any],
proposed_steps: Sequence[Mapping[str, Any]],
) -> List[Dict[str, Any]]:
"""Einzeländerung auf Baseline-Pfad legen (für faire QS pro Vorschlag)."""
base_by = _steps_by_major_index(baseline_steps)
prop_by = _steps_by_major_index(proposed_steps)
try:
midx = int(diff.get("roadmap_major_step_index"))
except (TypeError, ValueError):
return [dict(s) for s in baseline_steps or []]
out_by: Dict[int, Dict[str, Any]] = {i: dict(s) for i, s in base_by.items()}
prop_step = prop_by.get(midx)
if isinstance(prop_step, dict):
merged = dict(out_by.get(midx, {}))
merged.update(prop_step)
merged["roadmap_major_step_index"] = midx
out_by[midx] = merged
elif diff.get("proposed_exercise_id") is not None:
merged = dict(out_by.get(midx, {}))
merged["exercise_id"] = int(diff["proposed_exercise_id"])
if diff.get("proposed_title"):
merged["title"] = diff.get("proposed_title")
merged["roadmap_major_step_index"] = midx
merged["slot_status"] = diff.get("proposed_slot_status") or "matched"
out_by[midx] = merged
elif diff.get("baseline_exercise_id") is not None and diff.get("proposed_exercise_id") is None:
merged = dict(out_by.get(midx, {}))
merged["exercise_id"] = None
merged["roadmap_major_step_index"] = midx
out_by[midx] = merged
return [out_by[i] for i in sorted(out_by.keys())]
def _slot_diff_improves_path(
diff: Mapping[str, Any],
quality_delta: Optional[float],
*,
off_topic: bool = False,
) -> bool:
"""Nur Vorschläge mit messbarer Pfad-Verbesserung (Lücken/off-topic: neutral oder besser)."""
if quality_delta is None:
return False
try:
delta = float(quality_delta)
except (TypeError, ValueError):
return False
base_id = diff.get("baseline_exercise_id")
prop_id = diff.get("proposed_exercise_id")
if off_topic and base_id is not None:
return delta >= -0.001
if base_id is None and prop_id is not None:
return delta >= -0.001
if base_id is not None and prop_id is not None:
return delta > 0.005
return False
def _score_incremental_slot_diffs(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
baseline_steps: Sequence[Mapping[str, Any]],
proposed_steps: Sequence[Mapping[str, Any]],
baseline_path_qa: Optional[Mapping[str, Any]],
raw_diffs: Sequence[Mapping[str, Any]],
) -> Dict[str, Any]:
"""Bewertet jeden Slot-Diff isoliert gegen die Baseline-QS — filtert Verschlechterungen."""
baseline_score = _path_qa_quality_score(baseline_path_qa)
if baseline_score is None and baseline_steps:
baseline_eval = _evaluate_steps_for_compare_qa(
cur,
tenant=tenant,
body=body,
steps=baseline_steps,
)
if isinstance(baseline_eval, dict):
baseline_score = _path_qa_quality_score(baseline_eval.get("path_qa"))
annotated = _annotate_slot_diffs(list(raw_diffs or []))
candidates = _actionable_slot_diffs(annotated)
# Lücken zuerst, dann Ersetzungen — harte Obergrenze gegen Timeouts
candidates.sort(
key=lambda d: (
0 if d.get("baseline_exercise_id") is None else 1,
int(d.get("roadmap_major_step_index") or 0),
)
)
candidates = candidates[:10]
scored: List[Dict[str, Any]] = []
improving: List[Dict[str, Any]] = []
rejected: List[Dict[str, Any]] = []
for diff in candidates:
merged_steps = _apply_slot_diff_to_steps(baseline_steps, diff, proposed_steps)
eval_res = _evaluate_steps_for_compare_qa(
cur,
tenant=tenant,
body=body,
steps=merged_steps,
)
projected_qa = (
eval_res.get("path_qa")
if isinstance(eval_res, dict) and isinstance(eval_res.get("path_qa"), dict)
else None
)
projected_score = _path_qa_quality_score(projected_qa)
delta: Optional[float] = None
if baseline_score is not None and projected_score is not None:
delta = round(projected_score - baseline_score, 4)
entry = {
**diff,
"projected_path_qa": projected_qa,
"projected_quality_score": projected_score,
"baseline_quality_score": baseline_score,
"quality_delta": delta,
"improves_path": _slot_diff_improves_path(diff, delta),
}
scored.append(entry)
if entry["improves_path"]:
improving.append(entry)
else:
rejected.append(entry)
return {
"baseline_quality_score": baseline_score,
"scored_diffs": scored,
"improvement_diffs": improving,
"rejected_diffs": rejected,
"improvement_count": len(improving),
"rejected_count": len(rejected),
}
def _off_topic_reasons_by_slot(
off_topic_steps: Sequence[Mapping[str, Any]],
) -> Dict[int, List[str]]:
out: Dict[int, List[str]] = {}
for item in off_topic_steps or []:
if not isinstance(item, dict):
continue
midx = item.get("roadmap_major_step_index")
if midx is None:
continue
try:
key = int(midx)
except (TypeError, ValueError):
continue
issue = str(item.get("issue") or "off_topic")
reasons = item.get("reasons") or [issue]
for raw in reasons:
text = str(raw or "").strip()
if text and text not in out.setdefault(key, []):
out[key].append(text[:400])
return out
def _slot_issues_from_path_qa(
path_qa: Optional[Mapping[str, Any]],
major_idx: int,
) -> List[str]:
texts: List[str] = []
if not isinstance(path_qa, dict):
return texts
for key in ("issues", "recommendations"):
for raw in path_qa.get(key) or []:
text = str(raw or "").strip()
if not text:
continue
if f"slot {major_idx + 1}" in text.lower() or f"stufe {major_idx + 1}" in text.lower():
if text not in texts:
texts.append(text[:400])
for hint in path_qa.get("optimization_hints") or []:
if not isinstance(hint, dict):
continue
hint_idx = hint.get("roadmap_major_step_index")
if hint_idx is None:
continue
try:
if int(hint_idx) != int(major_idx):
continue
except (TypeError, ValueError):
continue
text = str(hint.get("reason") or hint.get("issue") or "").strip()
if text and text not in texts:
texts.append(text[:400])
return texts
def _build_slot_pro_contra(
*,
current_step: Mapping[str, Any],
proposed_step: Optional[Mapping[str, Any]],
suggestion_type: str,
baseline_qa: Optional[Mapping[str, Any]],
projected_qa: Optional[Mapping[str, Any]],
quality_delta: Optional[float],
off_topic_reasons: Sequence[str],
candidate_reasons: Sequence[str],
gap_offer: Optional[Mapping[str, Any]] = None,
) -> Dict[str, Any]:
current_pro: List[str] = []
current_contra: List[str] = list(off_topic_reasons or [])[:4]
proposed_pro: List[str] = [str(r) for r in (candidate_reasons or []) if str(r or "").strip()][:4]
proposed_contra: List[str] = []
if current_step.get("exercise_id") is not None and not current_contra:
current_pro.append("Bestehende Zuordnung im Graph")
if current_step.get("is_ai_proposal"):
sketch = (current_step.get("title") or "KI-Entwurf").strip()
current_pro.append(f"KI-Entwurf: {sketch[:120]}")
major_idx = current_step.get("roadmap_major_step_index")
if major_idx is not None:
for text in _slot_issues_from_path_qa(baseline_qa, int(major_idx)):
if text not in current_contra:
current_contra.append(text)
if quality_delta is not None and quality_delta > 0:
proposed_pro.append(f"Pfad-QS +{round(float(quality_delta) * 100)} Prozentpunkte")
elif suggestion_type in {"library_fill", "remove_and_replace", "ai_gap"} and not current_contra:
proposed_pro.append("Schließt Lücke bzw. passt besser zur Stufe")
if isinstance(gap_offer, dict):
sketch = str(gap_offer.get("sketch") or gap_offer.get("title_hint") or "").strip()
if sketch:
proposed_pro.append(f"KI-Entwurf: {sketch[:160]}")
rationale = str(gap_offer.get("rationale") or "").strip()
if rationale:
proposed_pro.append(rationale[:200])
if isinstance(projected_qa, dict):
for text in _slot_issues_from_path_qa(projected_qa, int(major_idx or 0)):
if text not in proposed_contra:
proposed_contra.append(text)
if proposed_step and proposed_step.get("exercise_id") is not None and not proposed_pro:
proposed_pro.append("Bibliotheks-Treffer für Stufen-Lernziel")
return {
"current_pro": current_pro[:6],
"current_contra": current_contra[:6],
"proposed_pro": proposed_pro[:6],
"proposed_contra": proposed_contra[:6],
}
def _roadmap_slot_library_candidates(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
path_target_profile: PlanningTargetProfile,
path_intent: str,
roadmap_ctx: ProgressionRoadmapContext,
stage_spec: StageSpecArtifact,
step_index: int,
stage_count: int,
planned_ids: List[int],
anchor_id: Optional[int],
anchor_variant_id: Optional[int],
used: Set[int],
exclude_exercise_id: Optional[int] = None,
max_candidates: int = 5,
skip_post_match_gate: bool = False,
) -> List[Dict[str, Any]]:
"""Mehrere Bibliotheks-Kandidaten je Slot (beste zuerst, aktuelle optional ausgeschlossen)."""
pick_used = set(used)
if exclude_exercise_id is not None:
try:
pick_used.add(int(exclude_exercise_id))
except (TypeError, ValueError):
pass
candidates: List[Dict[str, Any]] = []
seen_ids: Set[int] = set()
for _ in range(max(1, max_candidates)):
step, _unfilled = _match_roadmap_slot(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
stage_spec=stage_spec,
step_index=step_index,
stage_count=stage_count,
planned_ids=planned_ids,
anchor_id=anchor_id,
anchor_variant_id=anchor_variant_id,
used=pick_used,
slot_priority_exercise_id=None,
skip_post_match_gate=skip_post_match_gate,
)
if not step or step.get("exercise_id") is None:
break
try:
eid = int(step["exercise_id"])
except (TypeError, ValueError):
break
if eid in seen_ids:
break
seen_ids.add(eid)
candidates.append(step)
pick_used.add(eid)
return candidates
def _suggestion_as_slot_diff(entry: Mapping[str, Any]) -> Dict[str, Any]:
return {
"roadmap_major_step_index": entry.get("roadmap_major_step_index"),
"baseline_exercise_id": entry.get("baseline_exercise_id"),
"baseline_title": entry.get("baseline_title"),
"proposed_exercise_id": entry.get("proposed_exercise_id"),
"proposed_title": entry.get("proposed_title"),
"baseline_slot_status": entry.get("baseline_slot_status"),
"proposed_slot_status": entry.get("proposed_slot_status"),
"changed": True,
"suggestion_type": entry.get("suggestion_type"),
"quality_delta": entry.get("quality_delta"),
"projected_quality_score": entry.get("projected_quality_score"),
"baseline_quality_score": entry.get("baseline_quality_score"),
"projected_path_qa": entry.get("projected_path_qa"),
"pro_contra": entry.get("pro_contra"),
"improves_path": entry.get("improves_path"),
"off_topic": entry.get("off_topic"),
"gap_offer": entry.get("gap_offer"),
"proposed_is_ai_proposal": entry.get("proposed_is_ai_proposal"),
}
_SLOT_FIT_POOR_THRESHOLD = 0.30
def _off_topic_semantic_scores_by_slot(
off_topic_steps: Sequence[Mapping[str, Any]],
) -> Dict[int, float]:
scores: Dict[int, float] = {}
for item in off_topic_steps or []:
if not isinstance(item, dict):
continue
midx = item.get("roadmap_major_step_index")
if midx is None:
continue
try:
key = int(midx)
raw = item.get("semantic_score")
if raw is not None:
scores[key] = round(float(raw), 4)
except (TypeError, ValueError):
continue
return scores
def _score_exercise_stage_fit_for_spec(
cur,
*,
exercise_id: int,
step: Mapping[str, Any],
stage_spec: StageSpecArtifact,
semantic_brief: PlanningSemanticBrief,
step_index: int,
stage_count: int,
) -> Optional[float]:
try:
eid = int(exercise_id)
except (TypeError, ValueError):
return None
if eid < 1:
return None
bundle = _load_exercise_text_bundle(cur, eid)
stage_goal = (stage_spec.learning_goal or step.get("roadmap_learning_goal") or "").strip()
phase = (
(step.get("roadmap_phase") or "").strip().lower()
or step_phase_for_index(semantic_brief, step_index, stage_count)
)
stage_anti = list(stage_spec.anti_patterns or step.get("roadmap_anti_patterns") or [])
stage_match_brief = (
build_stage_match_brief(
learning_goal=stage_goal,
anti_patterns=stage_anti or None,
phase=phase or None,
)
if stage_goal
else None
)
if not stage_match_brief:
return None
score, _ = score_exercise_stage_fit(
title=bundle["title"],
summary=bundle["summary"],
goal=bundle["goal"],
variant_names=bundle["variant_names"],
stage_brief=stage_match_brief,
step_phase=phase,
)
return round(float(score), 4)
def _slot_auto_select_library(
*,
baseline_slot_score: Optional[float],
proposed_slot_score: Optional[float],
baseline_exercise_id: Optional[int],
proposed_exercise_id: Optional[int],
) -> bool:
if proposed_exercise_id is None:
return False
if baseline_exercise_id is not None and int(baseline_exercise_id) == int(proposed_exercise_id):
return False
if proposed_slot_score is None:
return False
if baseline_slot_score is None:
return True
return float(proposed_slot_score) > float(baseline_slot_score) + 0.001
def _build_unified_slot_review_entry(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
path_target_profile: PlanningTargetProfile,
path_intent: str,
roadmap_ctx: ProgressionRoadmapContext,
stage_spec: StageSpecArtifact,
step_index: int,
stage_count: int,
major_idx: int,
current: Mapping[str, Any],
baseline_steps: Sequence[Mapping[str, Any]],
baseline_qa: Mapping[str, Any],
baseline_score: Optional[float],
steps_by_major: Mapping[int, Mapping[str, Any]],
problem_slots: Mapping[int, Sequence[str]],
off_topic_map: Mapping[int, Sequence[str]],
off_topic_scores: Mapping[int, float],
gap_fill_offers: List[Dict[str, Any]],
suggestions: List[Dict[str, Any]],
rejected: List[Dict[str, Any]],
) -> Dict[str, Any]:
current = dict(current or {})
current.setdefault("roadmap_major_step_index", major_idx)
current.setdefault("roadmap_learning_goal", stage_spec.learning_goal)
current_id = current.get("exercise_id")
slot_problem = major_idx in problem_slots
off_topic = slot_problem or major_idx in off_topic_map or bool(
current.get("slot_status") in {"off_topic", "stripped"}
)
off_reasons = list(problem_slots.get(major_idx, [])) + list(off_topic_map.get(major_idx, []))
baseline_slot_score: Optional[float] = off_topic_scores.get(major_idx)
if baseline_slot_score is None and current_id is not None and not current.get("is_ai_proposal"):
baseline_slot_score = _score_exercise_stage_fit_for_spec(
cur,
exercise_id=int(current_id),
step=current,
stage_spec=stage_spec,
semantic_brief=semantic_brief,
step_index=step_index,
stage_count=stage_count,
)
planned_ids = [
int(s["exercise_id"])
for midx, s in sorted(steps_by_major.items())
if midx != major_idx and s.get("exercise_id") is not None
]
anchor_id: Optional[int] = None
anchor_variant_id: Optional[int] = None
used_other: Set[int] = set(planned_ids)
for midx in sorted(steps_by_major):
if midx >= major_idx:
break
step = steps_by_major[midx]
eid = step.get("exercise_id")
if eid is not None:
anchor_id = int(eid)
vid = step.get("variant_id")
anchor_variant_id = int(vid) if vid is not None else None
candidates = _roadmap_slot_library_candidates(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
stage_spec=stage_spec,
step_index=step_index,
stage_count=stage_count,
planned_ids=planned_ids,
anchor_id=anchor_id,
anchor_variant_id=anchor_variant_id,
used=used_other,
exclude_exercise_id=int(current_id) if current_id is not None else None,
max_candidates=5,
skip_post_match_gate=True,
)
best_candidate: Optional[Dict[str, Any]] = None
for candidate in candidates:
try:
cand_id = int(candidate.get("exercise_id"))
except (TypeError, ValueError):
continue
if current_id is not None and int(current_id) == cand_id:
continue
best_candidate = candidate
break
library_alt: Optional[Dict[str, Any]] = None
if best_candidate is not None:
try:
cand_id = int(best_candidate.get("exercise_id"))
except (TypeError, ValueError):
cand_id = None
if cand_id is not None:
proposed_slot_score = _score_exercise_stage_fit_for_spec(
cur,
exercise_id=cand_id,
step={**current, **best_candidate, "roadmap_major_step_index": major_idx},
stage_spec=stage_spec,
semantic_brief=semantic_brief,
step_index=step_index,
stage_count=stage_count,
)
suggestion_type = (
"remove_and_replace"
if (off_topic or slot_problem) and current_id is not None
else ("library_fill" if current_id is None else "library_improvement")
)
auto_select = _slot_auto_select_library(
baseline_slot_score=baseline_slot_score,
proposed_slot_score=proposed_slot_score,
baseline_exercise_id=int(current_id) if current_id is not None else None,
proposed_exercise_id=cand_id,
)
slot_score_delta = (
round(float(proposed_slot_score) - float(baseline_slot_score), 4)
if proposed_slot_score is not None and baseline_slot_score is not None
else None
)
pro_contra = _build_slot_pro_contra(
current_step=current,
proposed_step=best_candidate,
suggestion_type=suggestion_type,
baseline_qa=baseline_qa,
projected_qa=None,
quality_delta=None,
off_topic_reasons=off_reasons,
candidate_reasons=best_candidate.get("reasons") or [],
)
if slot_score_delta is not None and slot_score_delta > 0:
fit_msg = f"Stufen-Fit +{round(slot_score_delta * 100)} Prozentpunkte"
if fit_msg not in pro_contra["proposed_pro"]:
pro_contra["proposed_pro"].insert(0, fit_msg)
library_alt = {
"exercise_id": cand_id,
"title": (best_candidate.get("title") or "").strip() or None,
"slot_score": proposed_slot_score,
"slot_score_delta": slot_score_delta,
"quality_delta": None,
"auto_select": auto_select,
"suggestion_type": suggestion_type,
"reasons": list(best_candidate.get("reasons") or [])[:4],
"pro_contra": pro_contra,
}
lib_entry = {
"roadmap_major_step_index": major_idx,
"baseline_exercise_id": int(current_id) if current_id is not None else None,
"baseline_title": (current.get("title") or "").strip() or None,
"proposed_exercise_id": cand_id,
"proposed_title": library_alt["title"],
"baseline_slot_status": current.get("slot_status"),
"proposed_slot_status": best_candidate.get("slot_status") or "matched",
"suggestion_type": suggestion_type,
"quality_delta": None,
"baseline_slot_score": baseline_slot_score,
"proposed_slot_score": proposed_slot_score,
"slot_score_delta": slot_score_delta,
"auto_select": auto_select,
"baseline_quality_score": baseline_score,
"improves_path": auto_select,
"off_topic": off_topic,
"slot_problem": slot_problem,
"problem_reasons": off_reasons[:6],
"proposed_is_ai_proposal": False,
"pro_contra": pro_contra,
}
if auto_select:
suggestions.append(lib_entry)
else:
rejected.append(lib_entry)
show_ai_option = bool(
body.include_ai_gap_fill
and (
current_id is None
or off_topic
or slot_problem
or bool(current.get("is_ai_proposal"))
or (
baseline_slot_score is not None
and baseline_slot_score < _SLOT_FIT_POOR_THRESHOLD
)
)
)
ai_alt: Optional[Dict[str, Any]] = None
if show_ai_option:
slot_offer = next(
(
o
for o in gap_fill_offers
if isinstance(o, dict) and _gap_offer_major_index(o) == major_idx
),
None,
)
if not slot_offer:
gap_spec: Optional[Dict[str, Any]] = None
if current_id is None:
empty_specs = _build_evaluate_empty_slot_gap_specs(
[current],
goal_query=goal_query,
)
gap_spec = empty_specs[0] if empty_specs else None
elif off_topic or slot_problem:
gap_spec = _build_off_topic_slot_gap_spec(current, goal_query=goal_query)
if gap_spec:
slot_offer = build_gap_fill_offer(
spec=gap_spec,
steps=baseline_steps,
goal_query=goal_query,
brief=semantic_brief,
proposal=None,
roadmap_snapshot=_roadmap_gap_snapshot_for_spec(
cur,
roadmap_ctx,
gap_spec,
goal_query=goal_query,
semantic_brief=semantic_brief,
),
)
gap_fill_offers.append(slot_offer)
if slot_offer:
ai_alt = {
"title_hint": slot_offer.get("title_hint") or f"Slot {major_idx + 1}",
"gap_offer": slot_offer,
"auto_select": False,
}
return {
"roadmap_major_step_index": major_idx,
"roadmap_learning_goal": (stage_spec.learning_goal or "").strip() or None,
"baseline_exercise_id": int(current_id) if current_id is not None else None,
"baseline_title": (current.get("title") or "").strip() or None,
"baseline_slot_score": baseline_slot_score,
"baseline_slot_status": current.get("slot_status"),
"slot_problem": slot_problem,
"off_topic": off_topic,
"problem_reasons": off_reasons[:6],
"library_alternative": library_alt,
"ai_alternative": ai_alt,
}
def _run_unified_slot_improvement_review(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
semantic_llm_applied: bool,
path_target_profile: PlanningTargetProfile,
path_intent: str,
first_intent_summary: Mapping[str, Any],
roadmap_ctx: ProgressionRoadmapContext,
progression_roadmap: Optional[Dict[str, Any]],
roadmap_edited: bool,
) -> Dict[str, Any]:
"""
Ein Workflow: Pfad bewerten → je Slot Alternativen suchen → Stufen-Fit vergleichen.
"""
if not body.baseline_evaluate_steps:
raise HTTPException(
status_code=400,
detail="unified_slot_review erfordert baseline_evaluate_steps",
)
if roadmap_ctx is None or not roadmap_ctx.stage_specs:
raise HTTPException(
status_code=400,
detail="unified_slot_review erfordert Roadmap (roadmap_override / roadmap_first)",
)
try:
return _run_unified_slot_improvement_review_core(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
semantic_llm_applied=semantic_llm_applied,
path_target_profile=path_target_profile,
path_intent=path_intent,
first_intent_summary=first_intent_summary,
roadmap_ctx=roadmap_ctx,
progression_roadmap=progression_roadmap,
roadmap_edited=roadmap_edited,
)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=500,
detail=f"unified_slot_review fehlgeschlagen: {exc}",
) from exc
def _run_unified_slot_improvement_review_core(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
goal_query: str,
max_steps: int,
semantic_brief: PlanningSemanticBrief,
semantic_llm_applied: bool,
path_target_profile: PlanningTargetProfile,
path_intent: str,
first_intent_summary: Mapping[str, Any],
roadmap_ctx: ProgressionRoadmapContext,
progression_roadmap: Optional[Dict[str, Any]],
roadmap_edited: bool,
) -> Dict[str, Any]:
if not body.baseline_evaluate_steps:
raise HTTPException(status_code=400, detail="baseline_evaluate_steps fehlt")
if not roadmap_ctx.stage_specs:
raise HTTPException(status_code=400, detail="roadmap stage_specs fehlt")
baseline_steps = _evaluate_steps_from_payload(cur, body.baseline_evaluate_steps)
snapshot = (
dict(body.baseline_path_qa_snapshot)
if isinstance(body.baseline_path_qa_snapshot, dict)
else None
)
if snapshot:
baseline_qa = snapshot
if baseline_qa.get("quality_score") is None:
baseline_qa["quality_score"] = compute_deterministic_path_quality_score(
gaps=baseline_qa.get("large_gaps") or [],
off_topic_steps=baseline_qa.get("off_topic_steps") or [],
steps=baseline_steps,
multistage_qa=baseline_qa,
)
baseline_score = (
float(body.baseline_quality_score)
if body.baseline_quality_score is not None
else _path_qa_quality_score(baseline_qa)
)
gap_fill_offers: List[Dict[str, Any]] = []
else:
eval_body = body.model_copy(
update={
"include_llm_path_qa": body.include_llm_path_qa,
"include_ai_gap_fill": body.include_ai_gap_fill,
"auto_rematch_after_qa": False,
}
)
qa_pack = _run_evaluate_only_path_qa(
cur,
body=eval_body,
goal_query=goal_query,
semantic_brief=semantic_brief,
steps=list(baseline_steps),
roadmap_ctx=roadmap_ctx,
)
baseline_steps = list(qa_pack.get("steps") or baseline_steps)
baseline_qa = qa_pack.get("path_qa") if isinstance(qa_pack.get("path_qa"), dict) else {}
if baseline_qa.get("quality_score") is None:
baseline_qa = dict(baseline_qa)
baseline_qa["quality_score"] = compute_deterministic_path_quality_score(
gaps=baseline_qa.get("large_gaps") or [],
off_topic_steps=baseline_qa.get("off_topic_steps") or [],
steps=baseline_steps,
multistage_qa=baseline_qa,
)
baseline_score = _path_qa_quality_score(baseline_qa)
gap_fill_offers = list(qa_pack.get("gap_fill_offers") or [])
off_topic_map = _off_topic_reasons_by_slot(baseline_qa.get("off_topic_steps") or [])
problem_slots = _problematic_slots_from_path_qa(
baseline_qa,
baseline_steps,
roadmap_ctx.stage_specs,
)
steps_by_major = _steps_by_major_index(baseline_steps)
spec_by_major = {int(s.major_step_index): s for s in roadmap_ctx.stage_specs}
stage_count = len(roadmap_ctx.stage_specs)
off_topic_scores = _off_topic_semantic_scores_by_slot(
baseline_qa.get("off_topic_steps") or [],
)
slot_reviews: List[Dict[str, Any]] = []
suggestions: List[Dict[str, Any]] = []
rejected: List[Dict[str, Any]] = []
for step_index, stage_spec in enumerate(roadmap_ctx.stage_specs):
major_idx = int(stage_spec.major_step_index)
try:
slot_review = _build_unified_slot_review_entry(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
stage_spec=stage_spec,
step_index=step_index,
stage_count=stage_count,
major_idx=major_idx,
current=steps_by_major.get(major_idx, {}),
baseline_steps=baseline_steps,
baseline_qa=baseline_qa,
baseline_score=baseline_score,
steps_by_major=steps_by_major,
problem_slots=problem_slots,
off_topic_map=off_topic_map,
off_topic_scores=off_topic_scores,
gap_fill_offers=gap_fill_offers,
suggestions=suggestions,
rejected=rejected,
)
except Exception as exc:
slot_review = {
"roadmap_major_step_index": major_idx,
"roadmap_learning_goal": (stage_spec.learning_goal or "").strip() or None,
"baseline_exercise_id": None,
"baseline_title": None,
"baseline_slot_score": None,
"baseline_slot_status": None,
"slot_problem": major_idx in problem_slots,
"off_topic": major_idx in off_topic_map,
"problem_reasons": [f"Slot-Review fehlgeschlagen: {exc}"[:300]],
"library_alternative": None,
"ai_alternative": None,
"review_error": str(exc)[:300],
}
slot_reviews.append(slot_review)
improvement_diffs = [_suggestion_as_slot_diff(s) for s in suggestions]
problem_slot_payload = {
str(k): v for k, v in sorted(problem_slots.items(), key=lambda x: x[0])
}
slot_diff_scoring = {
"baseline_quality_score": baseline_score,
"scored_diffs": improvement_diffs + [_suggestion_as_slot_diff(r) for r in rejected],
"improvement_diffs": improvement_diffs,
"rejected_diffs": [_suggestion_as_slot_diff(r) for r in rejected],
"improvement_count": len(improvement_diffs),
"rejected_count": len(rejected),
}
try:
target_summary = path_target_profile.to_summary_dict(cur)
except Exception:
target_summary = {}
return {
"goal_query": goal_query,
"max_steps_requested": max_steps,
"steps": baseline_steps,
"step_count": len(baseline_steps),
"target_profile_summary": target_summary,
"semantic_brief_summary": brief_to_summary_dict(semantic_brief),
"semantic_llm_applied": semantic_llm_applied,
"query_intent_summary": first_intent_summary,
"progression_graph_id": body.progression_graph_id,
"path_qa": baseline_qa,
"baseline_path_qa": baseline_qa,
"baseline_steps": baseline_steps,
"gap_fill_offers": gap_fill_offers,
"progression_roadmap": progression_roadmap,
"roadmap_first": True,
"roadmap_only": False,
"roadmap_edited": roadmap_edited,
"roadmap_unfilled_count": 0,
"path_skill_expectations": None,
"match_summary": {
"unified_slot_review": True,
"suggestion_count": len(suggestions),
"rejected_count": len(rejected),
"problem_slot_count": len(problem_slots),
"slot_review_count": len(slot_reviews),
},
"retrieval_phase": "unified_slot_review",
"unified_slot_review": True,
"slot_reviews": slot_reviews,
"problem_slots": problem_slot_payload,
"slot_suggestions": suggestions,
"slot_diff_scoring": slot_diff_scoring,
"comparison_mode": True,
}
def _merge_gap_fill_offers_from_steps(
steps: Sequence[Mapping[str, Any]],
offers: Sequence[Mapping[str, Any]],
) -> List[Dict[str, Any]]:
"""Gap-Angebote aus Schritt-gap_offer + Top-Level-Liste vereinigen."""
merged: List[Dict[str, Any]] = [dict(o) for o in offers or [] if isinstance(o, dict)]
seen = {o.get("offer_id") for o in merged if o.get("offer_id")}
for raw in steps or []:
if not isinstance(raw, dict):
continue
go = raw.get("gap_offer")
if not isinstance(go, dict):
continue
oid = go.get("offer_id")
if oid and oid in seen:
continue
if oid:
seen.add(oid)
merged.append(dict(go))
return merged
def _build_progression_compare_response(
baseline: Mapping[str, Any],
proposed: Mapping[str, Any],
*,
proposed_eval: Optional[Mapping[str, Any]] = None,
) -> Dict[str, Any]:
baseline_steps = list(baseline.get("steps") or [])
proposed_steps = list(proposed.get("steps") or [])
baseline_qa = baseline.get("path_qa") if isinstance(baseline.get("path_qa"), dict) else {}
pipeline_qa = proposed.get("path_qa") if isinstance(proposed.get("path_qa"), dict) else {}
fair_qa = (
proposed_eval.get("path_qa")
if isinstance(proposed_eval, dict) and isinstance(proposed_eval.get("path_qa"), dict)
else pipeline_qa
)
slot_diffs = _annotate_slot_diffs(
_build_progression_slot_diffs(baseline_steps, proposed_steps),
)
actionable_diffs = _actionable_slot_diffs(slot_diffs)
apply_steps = list(proposed_steps)
gap_fill_offers = _merge_gap_fill_offers_from_steps(
apply_steps,
proposed.get("gap_fill_offers") or [],
)
return {
**dict(proposed),
"comparison_mode": True,
"baseline_steps": baseline_steps,
"baseline_path_qa": baseline_qa,
"proposed_steps": apply_steps,
"proposed_steps_pipeline": proposed_steps,
"proposed_path_qa": fair_qa,
"proposed_path_qa_pipeline": pipeline_qa,
"gap_fill_offers": gap_fill_offers,
"slot_diffs": slot_diffs,
"slot_diffs_actionable": actionable_diffs,
"slot_diff_count": len(actionable_diffs),
"slot_diff_count_including_trivial": len(slot_diffs),
"slot_diffs_source": "steps",
"optimization_actionable": len(actionable_diffs) > 0,
"baseline_quality_score": _path_qa_quality_score(baseline_qa),
"proposed_quality_score": _path_qa_quality_score(fair_qa),
"proposed_pipeline_quality_score": _path_qa_quality_score(pipeline_qa),
"path_qa": fair_qa,
"steps": apply_steps,
}
def suggest_progression_path(
cur,
*,
tenant: TenantContext,
body: ProgressionPathSuggestRequest,
) -> Dict[str, Any]:
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Trainer dürfen Pfad-Vorschläge abrufen")
if body.compare_with_assignments:
eval_source = list(body.evaluate_steps or body.slot_assignments or [])
if len(eval_source) < 1:
raise HTTPException(
status_code=400,
detail="compare_with_assignments erfordert evaluate_steps",
)
baseline_body = body.model_copy(
update={
"evaluate_only": True,
"evaluate_steps": eval_source,
"compare_with_assignments": False,
"preserve_slot_assignments": False,
# Gleiche QS-Pipeline wie „Graph bewerten“ (kein Match/Rematch-Schönung)
"include_llm_intent": False,
"include_llm_path_qa": False,
"auto_rematch_after_qa": False,
"include_roadmap_preview": False,
}
)
baseline = suggest_progression_path(cur, tenant=tenant, body=baseline_body)
proposed_body = body.model_copy(
update={
"compare_with_assignments": False,
"preserve_slot_assignments": False,
"evaluate_only": False,
# Vergleich: deterministische QS + Rematch — kein zusätzlicher Ganzpfad-LLM-Lauf (Timeout)
"include_llm_path_qa": False,
}
)
proposed = suggest_progression_path(cur, tenant=tenant, body=proposed_body)
result = _build_progression_compare_response(baseline, proposed, proposed_eval=None)
if result.get("slot_diff_count", 0) == 0 and isinstance(baseline.get("path_qa"), dict):
fair = baseline["path_qa"]
result["proposed_path_qa"] = fair
result["path_qa"] = fair
result["proposed_quality_score"] = _path_qa_quality_score(fair)
return result
goal_query = _normalize_query(body.query)
if len(goal_query) < 3:
raise HTTPException(status_code=400, detail="Ziel-Anfrage: mindestens 3 Zeichen")
max_steps = int(body.max_steps)
semantic_brief = build_semantic_brief(goal_query)
semantic_llm_applied = False
if body.include_llm_intent and semantic_brief.semantic_strength >= 0.35:
semantic_brief, semantic_llm_applied = try_enrich_semantic_brief_with_llm(
cur, goal_query, semantic_brief
)
extra_path_ctx = " ".join(
p
for p in (
(body.start_situation or "").strip(),
(body.target_state or "").strip(),
(body.roadmap_notes or "").strip(),
)
if p
)
semantic_brief = enrich_brief_with_path_constraints(
semantic_brief,
goal_query,
extra_context=extra_path_ctx or None,
)
roadmap_first = bool(body.roadmap_first)
roadmap_only = bool(body.roadmap_only)
start_target_only = bool(body.start_target_only)
evaluate_only = bool(body.evaluate_only)
include_roadmap = (
roadmap_first or body.include_roadmap_preview or roadmap_only or start_target_only
)
progression_roadmap: Optional[Dict[str, Any]] = None
roadmap_ctx: Optional[ProgressionRoadmapContext] = None
roadmap_edited = False
roadmap_structured = _roadmap_structured_from_body(body)
if body.roadmap_override is not None:
try:
roadmap_ctx = roadmap_context_from_override(
goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
override=body.roadmap_override,
structured=roadmap_structured,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
progression_roadmap = progression_roadmap_to_api_dict(roadmap_ctx)
progression_roadmap["roadmap_edited"] = True
roadmap_edited = True
max_steps = int(roadmap_ctx.max_steps)
roadmap_first = True
elif start_target_only:
roadmap_ctx = run_start_target_resolve_only(
goal_query,
semantic_brief=semantic_brief,
cur=cur,
include_llm_start_target=body.include_llm_start_target,
structured=roadmap_structured,
)
progression_roadmap = progression_roadmap_to_api_dict(roadmap_ctx)
elif include_roadmap:
roadmap_ctx = run_progression_roadmap_pipeline(
goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
cur=cur,
include_llm_roadmap=body.include_llm_roadmap,
include_llm_start_target=body.include_llm_start_target,
structured=roadmap_structured,
)
progression_roadmap = progression_roadmap_to_api_dict(roadmap_ctx)
if start_target_only:
return {
"goal_query": goal_query,
"max_steps_requested": max_steps,
"steps": [],
"step_count": 0,
"target_profile_summary": None,
"semantic_brief_summary": brief_to_summary_dict(semantic_brief),
"semantic_llm_applied": semantic_llm_applied,
"query_intent_summary": {},
"progression_graph_id": body.progression_graph_id,
"path_qa": None,
"gap_fill_offers": [],
"progression_roadmap": progression_roadmap,
"roadmap_first": False,
"roadmap_only": False,
"start_target_only": True,
"roadmap_edited": False,
"roadmap_unfilled_count": 0,
"retrieval_phase": "start_target_only",
}
if roadmap_only:
return {
"goal_query": goal_query,
"max_steps_requested": max_steps,
"steps": [],
"step_count": 0,
"target_profile_summary": None,
"semantic_brief_summary": brief_to_summary_dict(semantic_brief),
"semantic_llm_applied": semantic_llm_applied,
"query_intent_summary": {},
"progression_graph_id": body.progression_graph_id,
"path_qa": None,
"gap_fill_offers": [],
"progression_roadmap": progression_roadmap,
"roadmap_first": False,
"roadmap_only": True,
"roadmap_edited": roadmap_edited,
"roadmap_unfilled_count": 0,
"retrieval_phase": "roadmap_only",
}
if evaluate_only:
if not body.evaluate_steps:
raise HTTPException(
status_code=400,
detail="evaluate_only erfordert evaluate_steps",
)
eval_steps = _evaluate_steps_from_payload(cur, body.evaluate_steps)
qa_pack = _run_evaluate_only_path_qa(
cur,
body=body,
goal_query=goal_query,
semantic_brief=semantic_brief,
steps=eval_steps,
roadmap_ctx=roadmap_ctx,
)
return {
"goal_query": goal_query,
"max_steps_requested": max_steps,
"steps": qa_pack["steps"],
"step_count": len(qa_pack["steps"]),
"target_profile_summary": None,
"semantic_brief_summary": brief_to_summary_dict(semantic_brief),
"semantic_llm_applied": semantic_llm_applied,
"query_intent_summary": {},
"progression_graph_id": body.progression_graph_id,
"path_qa": qa_pack["path_qa"],
"gap_fill_offers": qa_pack["gap_fill_offers"],
"progression_roadmap": progression_roadmap,
"roadmap_first": bool(roadmap_ctx),
"roadmap_only": False,
"roadmap_edited": roadmap_edited,
"roadmap_unfilled_count": 0,
"path_skill_expectations": None,
"retrieval_phase": "evaluate_only",
}
path_target_profile, first_intent_summary, path_intent = _build_path_target_profile(
cur,
goal_query=goal_query,
semantic_brief=semantic_brief,
include_llm_intent=body.include_llm_intent,
start_situation=body.start_situation,
target_state=body.target_state,
roadmap_notes=body.roadmap_notes,
catalog_context=_resolve_planning_catalog_context(cur, body),
)
path_skill_expectations: Optional[Dict[str, Any]] = None
if roadmap_ctx and roadmap_ctx.goal_analysis:
path_inp = expectation_input_from_progression_path(
goal_query=goal_query,
goal_analysis=roadmap_ctx.goal_analysis.model_dump(),
resolved_structured=(
roadmap_ctx.resolved_structured.model_dump()
if roadmap_ctx.resolved_structured
else None
),
semantic_brief_summary=(
roadmap_ctx.semantic_brief
if roadmap_ctx.semantic_brief
else brief_to_summary_dict(semantic_brief)
),
)
path_exp = build_planning_skill_expectations(cur, path_inp, semantic_brief=semantic_brief)
if path_exp.items:
path_target_profile = apply_expectations_to_target(path_target_profile, path_exp)
path_skill_expectations = path_exp.to_api_dict()
if body.unified_slot_review:
return _run_unified_slot_improvement_review(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
semantic_llm_applied=semantic_llm_applied,
path_target_profile=path_target_profile,
path_intent=path_intent,
first_intent_summary=first_intent_summary,
roadmap_ctx=roadmap_ctx,
progression_roadmap=progression_roadmap,
roadmap_edited=roadmap_edited,
)
roadmap_unfilled: List[Tuple[int, StageSpecArtifact]] = []
roadmap_gap_offers: List[Dict[str, Any]] = []
used: Set[int] = set()
steps: List[Dict[str, Any]] = []
planned_ids: List[int] = []
anchor_id: Optional[int] = None
anchor_variant_id: Optional[int] = None
if roadmap_first and roadmap_ctx is not None:
steps, roadmap_unfilled = _build_steps_roadmap_first(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
)
planned_ids = [int(s["exercise_id"]) for s in steps if s.get("exercise_id") is not None]
if planned_ids:
anchor_id = planned_ids[-1]
anchor_variant_id = steps[-1].get("variant_id")
if body.include_ai_gap_fill and roadmap_unfilled:
major_by_index = (
{m.index: m for m in roadmap_ctx.roadmap.major_steps}
if roadmap_ctx.roadmap
else {}
)
roadmap_gap_specs = build_roadmap_unfilled_gap_specs(
unfilled_specs=roadmap_unfilled,
major_steps_by_index=major_by_index,
steps=steps,
brief=semantic_brief,
goal_query=goal_query,
goal_analysis=roadmap_ctx.goal_analysis if roadmap_ctx else None,
resolved_structured=roadmap_ctx.resolved_structured if roadmap_ctx else None,
)
for spec in roadmap_gap_specs:
roadmap_gap_offers.append(
build_gap_fill_offer(
spec=spec,
steps=steps,
goal_query=goal_query,
brief=semantic_brief,
proposal=None,
roadmap_snapshot=_roadmap_gap_snapshot_for_spec(
cur,
roadmap_ctx,
spec,
goal_query=goal_query,
semantic_brief=semantic_brief,
),
)
)
else:
for step_index in range(max_steps):
hits, _tp, _qis, _intent = _run_path_step_retrieval(
cur,
tenant=tenant,
goal_query=goal_query,
step_index=step_index,
max_steps=max_steps,
planned_ids=planned_ids,
anchor_id=anchor_id,
anchor_variant_id=anchor_variant_id,
progression_graph_id=body.progression_graph_id,
include_llm_intent=body.include_llm_intent,
exercise_kind_any=body.exercise_kind_any,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
supplemental_exercise_ids=_supplemental_exercise_ids_from_body(cur, body),
)
hit = _pick_best_path_hit(hits, used, semantic_brief=semantic_brief)
if not hit:
break
step = _hit_to_path_step(hit)
steps.append(step)
eid = int(step["exercise_id"])
used.add(eid)
planned_ids.append(eid)
anchor_id = eid
anchor_variant_id = step.get("variant_id")
stage_spec_count = len(roadmap_ctx.stage_specs or []) if roadmap_ctx else 0
if roadmap_first and stage_spec_count >= 2:
pass
elif len(steps) < 2:
raise HTTPException(
status_code=422,
detail="Zu wenig passende Übungen für einen Pfad (mindestens 2 Schritte). Ziel präzisieren oder max_steps senken.",
)
gaps: List[Dict[str, Any]] = []
bridge_inserts: List[Dict[str, Any]] = []
ai_proposals: List[Dict[str, Any]] = []
gap_fill_offers: List[Dict[str, Any]] = []
off_topic_steps: List[Dict[str, Any]] = []
stripped_off_topic: List[Dict[str, Any]] = []
rematch_log: List[Dict[str, Any]] = []
refine_log: List[Dict[str, Any]] = []
rematch_rounds = 0
llm_qa: Optional[Dict[str, Any]] = None
llm_qa_applied = False
reorder_applied = False
reorder_notes: List[str] = []
roadmap_qa_mode: Optional[str] = None
preserve_assignments = _assignment_preservation_active(body)
if body.include_path_qa:
if roadmap_first:
roadmap_qa_mode = "roadmap_first_lite"
gaps = detect_path_gaps(
cur,
steps,
brief=semantic_brief,
roadmap_first=roadmap_first,
)
unfilled_gaps: List[Dict[str, Any]] = []
if gaps and not roadmap_first:
bridge_fn = _make_bridge_search_fn(
cur,
tenant=tenant,
goal_query=goal_query,
max_steps=max_steps,
progression_graph_id=body.progression_graph_id,
include_llm_intent=body.include_llm_intent,
exercise_kind_any=body.exercise_kind_any,
semantic_brief=semantic_brief,
planned_ids=planned_ids,
path_target_profile=path_target_profile,
path_intent=path_intent,
supplemental_exercise_ids=_supplemental_exercise_ids_from_body(cur, body),
)
steps, bridge_inserts, unfilled_gaps = insert_bridge_exercises(
cur,
steps,
gaps,
brief=semantic_brief,
bridge_search_fn=bridge_fn,
)
elif gaps and roadmap_first:
unfilled_gaps = list(gaps)
if body.include_llm_path_qa and (
not roadmap_first or preserve_assignments
):
llm_qa, llm_qa_applied = try_llm_qa_progression_path(
cur,
goal_query=goal_query,
brief=semantic_brief,
steps=steps,
gaps=gaps,
bridge_inserts=bridge_inserts,
)
if (
body.include_path_reorder
and not roadmap_first
and llm_qa_applied
and llm_qa
):
q_score = llm_qa.get("quality_score")
try:
q_val = float(q_score) if q_score is not None else None
except (TypeError, ValueError):
q_val = None
if llm_qa.get("overall_ok") or (q_val is not None and q_val >= 0.45):
steps, reorder_applied, reorder_notes = apply_llm_path_reorder(steps, llm_qa)
off_topic_steps = detect_off_topic_steps(
cur,
steps,
brief=semantic_brief,
goal_query=goal_query,
)
off_topic_before_strip = list(off_topic_steps)
if preserve_assignments:
stripped_off_topic = []
else:
steps, stripped_off_topic = strip_off_topic_steps_from_path(
steps,
off_topic_steps,
min_remaining=0 if roadmap_first else 2,
)
if stripped_off_topic:
off_topic_steps = []
gaps = detect_path_gaps(
cur,
steps,
brief=semantic_brief,
roadmap_first=roadmap_first,
)
if roadmap_first and roadmap_ctx is not None and not preserve_assignments:
(
steps,
rematch_log,
stripped_off_topic,
rematch_off_topic,
rematch_rounds,
roadmap_unfilled,
refine_log,
) = _run_roadmap_rematch_loop(
cur,
tenant=tenant,
body=body,
goal_query=goal_query,
max_steps=max_steps,
semantic_brief=semantic_brief,
path_target_profile=path_target_profile,
path_intent=path_intent,
roadmap_ctx=roadmap_ctx,
steps=steps,
stripped_off_topic=stripped_off_topic,
off_topic_before_strip=off_topic_before_strip,
roadmap_unfilled=roadmap_unfilled,
gaps=gaps,
)
if rematch_off_topic:
off_topic_steps = rematch_off_topic
gaps = detect_path_gaps(
cur,
steps,
brief=semantic_brief,
roadmap_first=roadmap_first,
)
if body.include_llm_path_qa and roadmap_first and not preserve_assignments:
gaps = detect_path_gaps(
cur,
steps,
brief=semantic_brief,
roadmap_first=roadmap_first,
)
llm_qa, llm_qa_applied = try_llm_qa_progression_path(
cur,
goal_query=goal_query,
brief=semantic_brief,
steps=steps,
gaps=gaps,
bridge_inserts=bridge_inserts,
)
llm_gap_specs = parse_llm_suggested_new_exercises(
llm_qa,
brief=semantic_brief,
step_count=len(steps),
)
if body.include_ai_gap_fill:
fresh_large_gaps = [g for g in gaps if g.get("is_large_gap")]
gap_specs = collect_gap_fill_specs(
steps=steps,
unfilled_gaps=fresh_large_gaps or unfilled_gaps,
off_topic_steps=off_topic_steps,
llm_specs=llm_gap_specs,
brief=semantic_brief,
goal_query=goal_query,
)
path_roadmap_snapshot = None
if roadmap_ctx:
path_roadmap_snapshot = build_progression_gap_snapshot(
goal_analysis=(
roadmap_ctx.goal_analysis.model_dump()
if roadmap_ctx.goal_analysis
else None
),
resolved_structured=(
roadmap_ctx.resolved_structured.model_dump()
if roadmap_ctx.resolved_structured
else None
),
semantic_brief=roadmap_ctx.semantic_brief or brief_to_summary_dict(semantic_brief),
)
steps, ai_proposals, gap_fill_offers = apply_gap_fill_after_qa(
cur,
steps,
gap_specs,
goal_query=goal_query,
brief=semantic_brief,
include_ai_calls=False,
max_ai_proposals=0,
auto_insert_proposals=False,
roadmap_snapshot=path_roadmap_snapshot,
)
if roadmap_gap_offers:
seen_offer_ids = {o.get("offer_id") for o in gap_fill_offers}
for offer in roadmap_gap_offers:
if offer.get("offer_id") not in seen_offer_ids:
gap_fill_offers.append(offer)
if roadmap_first and roadmap_ctx is not None:
steps = _normalize_roadmap_steps_coverage(
steps,
roadmap_ctx=roadmap_ctx,
max_steps=max_steps,
)
steps, gap_fill_offers = _enrich_roadmap_unfilled_gap_offers(
cur,
steps=steps,
gap_fill_offers=gap_fill_offers,
body=body,
roadmap_ctx=roadmap_ctx,
goal_query=goal_query,
semantic_brief=semantic_brief,
)
multistage_qa = run_multistage_path_qa(
off_topic_steps=off_topic_steps,
stripped_off_topic=stripped_off_topic,
gaps=gaps,
llm_qa=llm_qa,
llm_applied=llm_qa_applied,
roadmap_unfilled=roadmap_unfilled if roadmap_first else None,
)
path_qa = build_path_qa_summary(
gaps=gaps,
bridge_inserts=bridge_inserts,
ai_proposals=ai_proposals,
gap_fill_offers=gap_fill_offers,
off_topic_steps=off_topic_steps,
stripped_off_topic=stripped_off_topic,
llm_qa=llm_qa,
llm_applied=llm_qa_applied,
reorder_applied=reorder_applied,
reorder_notes=reorder_notes,
roadmap_qa_mode=roadmap_qa_mode,
multistage_qa=multistage_qa,
)
if rematch_log:
path_qa["rematch_applied"] = True
path_qa["rematch_log"] = rematch_log
path_qa["rematch_rounds"] = rematch_rounds
if refine_log:
path_qa["refine_applied"] = True
path_qa["refine_log"] = refine_log
path_qa["refine_count"] = len(refine_log)
if preserve_assignments:
path_qa["assignments_preserved"] = True
filled_library_steps = sum(1 for s in steps if s.get("exercise_id") is not None)
match_summary = {
"roadmap_first": roadmap_first,
"library_matches": filled_library_steps,
"slot_count": len(steps),
"gap_fill_offer_count": len(gap_fill_offers),
"roadmap_unfilled_count": len(roadmap_unfilled),
}
target_profile_summary = path_target_profile.to_summary_dict(cur)
retrieval_parts = ["profile_v1", "full_library", "path_builder", "semantics"]
if roadmap_first:
retrieval_parts.append("roadmap_first")
if roadmap_qa_mode:
retrieval_parts.append(roadmap_qa_mode)
if body.include_path_qa:
retrieval_parts.append("path_qa")
if llm_qa_applied:
retrieval_parts.append("llm_path_qa")
if reorder_applied:
retrieval_parts.append("path_reorder")
if ai_proposals:
retrieval_parts.append("ai_gap_fill")
if gap_fill_offers:
retrieval_parts.append("gap_fill_offers")
if include_roadmap:
retrieval_parts.append("roadmap_preview")
if roadmap_edited:
retrieval_parts.append("roadmap_edited")
if roadmap_unfilled:
retrieval_parts.append("roadmap_unfilled")
if rematch_log:
retrieval_parts.append("path_rematch")
if refine_log:
retrieval_parts.append("stage_spec_refine")
slot_diff_scoring: Optional[Dict[str, Any]] = None
if (
body.include_incremental_diff_scoring
and body.baseline_evaluate_steps
and not evaluate_only
and not body.compare_with_assignments
):
baseline_steps_for_scoring = _evaluate_steps_from_payload(cur, body.baseline_evaluate_steps)
raw_diffs = _build_progression_slot_diffs(baseline_steps_for_scoring, steps)
baseline_qa_for_scoring: Optional[Dict[str, Any]] = None
if body.baseline_quality_score is not None:
baseline_qa_for_scoring = {"quality_score": float(body.baseline_quality_score)}
slot_diff_scoring = _score_incremental_slot_diffs(
cur,
tenant=tenant,
body=body,
baseline_steps=baseline_steps_for_scoring,
proposed_steps=steps,
baseline_path_qa=baseline_qa_for_scoring,
raw_diffs=raw_diffs,
)
return {
"goal_query": goal_query,
"max_steps_requested": max_steps,
"steps": steps,
"step_count": len(steps),
"target_profile_summary": target_profile_summary,
"semantic_brief_summary": brief_to_summary_dict(semantic_brief),
"semantic_llm_applied": semantic_llm_applied,
"query_intent_summary": first_intent_summary,
"progression_graph_id": body.progression_graph_id,
"path_qa": path_qa,
"gap_fill_offers": gap_fill_offers,
"progression_roadmap": progression_roadmap,
"roadmap_first": roadmap_first,
"roadmap_only": False,
"roadmap_edited": roadmap_edited,
"roadmap_unfilled_count": len(roadmap_unfilled),
"path_skill_expectations": path_skill_expectations,
"match_summary": match_summary,
"retrieval_phase": "+".join(retrieval_parts),
"slot_diff_scoring": slot_diff_scoring,
}
__all__ = [
"EvaluateStepPayload",
"ProgressionPathSuggestRequest",
"suggest_progression_path",
"_pick_best_path_hit",
"_pick_next_path_hit",
]
# Legacy-Alias für Tests
_pick_next_path_hit = _pick_best_path_hit