""" Progressionsgraph-Auflösung für Planungs-KI (Phase C1). Variantenbewusste Nachfolger-Kanten (Migration 034) und Auto-Match eines sichtbaren Graphen anhand der Anker-Übung, wenn der Client keine graph_id sendet. """ from __future__ import annotations from typing import Any, Dict, List, Mapping, Optional, Sequence, Set, Tuple from tenant_context import TenantContext, library_content_visibility_sql ProgressionSuccessorBundle = Tuple[Set[int], Dict[int, str], Dict[int, Optional[int]]] def edge_matches_anchor_from( edge: Mapping[str, Any], from_variant_id: Optional[int], ) -> bool: """Kante gilt als Ausgang vom Anker: generische Kante oder passende Varianten-Kante.""" edge_var = edge.get("from_exercise_variant_id") if edge_var is None: return True if from_variant_id is None: return False try: return int(edge_var) == int(from_variant_id) except (TypeError, ValueError): return False def filter_outgoing_progression_edges( edges: Sequence[Mapping[str, Any]], *, from_variant_id: Optional[int], ) -> List[Mapping[str, Any]]: return [e for e in edges if edge_matches_anchor_from(e, from_variant_id)] def parse_successors_from_edges( edges: Sequence[Mapping[str, Any]], ) -> ProgressionSuccessorBundle: ids: Set[int] = set() notes: Dict[int, str] = {} variants: Dict[int, Optional[int]] = {} for row in edges: tid = int(row["to_exercise_id"]) ids.add(tid) n = (row.get("notes") or "").strip() if n: notes[tid] = n raw_v = row.get("to_exercise_variant_id") variants[tid] = int(raw_v) if raw_v is not None else None return ids, notes, variants def rank_progression_graph_rows(rows: Sequence[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]: if not rows: return None def _key(row: Mapping[str, Any]) -> Tuple[int, int, int]: var_match = int(row.get("variant_match_count") or 0) out_count = int(row.get("outgoing_count") or 0) gid = int(row.get("id") or 0) return (var_match, out_count, gid) return max(rows, key=_key) def resolve_progression_graph_for_planning( cur, tenant: TenantContext, *, from_exercise_id: Optional[int], from_variant_id: Optional[int], explicit_graph_id: Optional[int], ) -> Tuple[Optional[int], Optional[str], bool]: """ Liefert (graph_id, graph_name, auto_resolved). Bei explicit_graph_id: Sichtbarkeit prüfen, kein Auto-Match. Sonst: sichtbarer Graph mit passenden Ausgangskanten vom Anker. """ profile_id = tenant.profile_id role = tenant.global_role vis_sql, vis_params = library_content_visibility_sql( alias="g", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) if explicit_graph_id and int(explicit_graph_id) > 0: gid = int(explicit_graph_id) cur.execute( f""" SELECT g.id, g.name FROM exercise_progression_graphs g WHERE g.id = %s AND ({vis_sql}) """, [gid, *vis_params], ) row = cur.fetchone() if not row: return None, None, False name = (row.get("name") or "").strip() or None return gid, name, False if not from_exercise_id or int(from_exercise_id) < 1: return None, None, False anchor_var = int(from_variant_id) if from_variant_id is not None else None cur.execute( f""" SELECT g.id, g.name, COUNT(*)::int AS outgoing_count, COUNT(*) FILTER ( WHERE e.from_exercise_variant_id IS NOT NULL AND (%s IS NOT NULL) AND e.from_exercise_variant_id = %s )::int AS variant_match_count FROM exercise_progression_edges e INNER JOIN exercise_progression_graphs g ON g.id = e.graph_id WHERE e.from_exercise_id = %s AND LOWER(TRIM(e.edge_type)) = 'next_exercise' AND ({vis_sql}) AND ( e.from_exercise_variant_id IS NULL OR (%s IS NULL) OR e.from_exercise_variant_id = %s ) GROUP BY g.id, g.name """, [anchor_var, anchor_var, int(from_exercise_id), *vis_params, anchor_var, anchor_var], ) picked = rank_progression_graph_rows(cur.fetchall()) if not picked: return None, None, False gid = int(picked["id"]) name = (picked.get("name") or "").strip() or None return gid, name, True def load_progression_successors_for_anchor( cur, *, graph_id: Optional[int], from_exercise_id: Optional[int], from_variant_id: Optional[int], ) -> ProgressionSuccessorBundle: if not graph_id or not from_exercise_id: return set(), {}, {} cur.execute( """ SELECT to_exercise_id, to_exercise_variant_id, notes, from_exercise_variant_id FROM exercise_progression_edges WHERE graph_id = %s AND from_exercise_id = %s AND LOWER(TRIM(edge_type)) = 'next_exercise' """, (int(graph_id), int(from_exercise_id)), ) rows = [dict(r) for r in cur.fetchall()] filtered = filter_outgoing_progression_edges(rows, from_variant_id=from_variant_id) return parse_successors_from_edges(filtered) def apply_progression_context_to_pack( cur, tenant: TenantContext, pack: Dict[str, Any], *, explicit_graph_id: Optional[int], anchor_variant_id: Optional[int], ) -> Dict[str, Any]: """Pack um aufgelösten Graph und Nachfolger anreichern.""" anchor_id = pack.get("anchor_exercise_id") pack["anchor_exercise_variant_id"] = anchor_variant_id graph_id, graph_name, auto_resolved = resolve_progression_graph_for_planning( cur, tenant, from_exercise_id=anchor_id, from_variant_id=anchor_variant_id, explicit_graph_id=explicit_graph_id, ) pack["progression_graph_id"] = graph_id pack["progression_graph_name"] = graph_name pack["progression_graph_auto_resolved"] = bool(auto_resolved) succ_ids, notes, succ_variants = load_progression_successors_for_anchor( cur, graph_id=graph_id, from_exercise_id=anchor_id, from_variant_id=anchor_variant_id, ) pack["progression_successor_ids"] = sorted(succ_ids) pack["progression_edge_notes"] = notes pack["progression_successor_variants"] = succ_variants return pack __all__ = [ "apply_progression_context_to_pack", "edge_matches_anchor_from", "filter_outgoing_progression_edges", "load_progression_successors_for_anchor", "parse_successors_from_edges", "rank_progression_graph_rows", "resolve_progression_graph_for_planning", ]