""" Progressionsgraph zwischen Übungen (Übung → Übung), Migration 032–034. Optional Übungsvarianten als Knoten-Endpunkte; Sequenz-Bulk-Anlage. AuthZ analog training_plan_templates: Bearbeiten/Löschen wie Übungen; Governance-Übergänge zentral. """ import json from typing import Any, Dict, List, Mapping, Optional from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field, model_validator from psycopg2 import IntegrityError from psycopg2.extras import Json from db import get_db, get_cursor, r2d from progression_graph_planning_artifact import normalize_planning_roadmap_payload from tenant_context import TenantContext, get_tenant_context, library_content_visibility_sql from club_tenancy import ( assert_library_content_deletable, assert_library_content_editable, assert_library_content_governance_transition, assert_valid_governance_visibility, is_platform_admin, library_content_visible_to_profile, ) from routers.training_planning import _has_planning_role router = APIRouter(prefix="/api", tags=["exercises"]) class ProgressionGraphCreate(BaseModel): name: str = Field(..., min_length=1, max_length=200) description: Optional[str] = None visibility: str = Field(default="private", pattern="^(private|club|official)$") club_id: Optional[int] = None class ProgressionGraphUpdate(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=200) description: Optional[str] = None visibility: Optional[str] = Field(None, pattern="^(private|club|official)$") club_id: Optional[int] = None planning_roadmap: Optional[Dict[str, Any]] = None class ProgressionEdgeCreate(BaseModel): from_exercise_id: int = Field(..., gt=0) to_exercise_id: int = Field(..., gt=0) from_exercise_variant_id: Optional[int] = Field(default=None) to_exercise_variant_id: Optional[int] = Field(default=None) edge_type: str = Field(default="next_exercise", min_length=1, max_length=50) notes: Optional[str] = Field(None, max_length=4000) class ProgressionEdgeUpdate(BaseModel): notes: Optional[str] = Field(None, max_length=4000) class SequenceStep(BaseModel): exercise_id: int = Field(..., gt=0) variant_id: Optional[int] = Field(default=None) class ProgressionSequenceCreate(BaseModel): steps: List[SequenceStep] = Field(..., min_length=2) segment_notes: Optional[List[Optional[str]]] = None planning_roadmap: Optional[Dict[str, Any]] = None """Länge muss len(steps)-1 sein, wenn gesetzt; Notiz pro Kante Zwischen je zwei Schritten.""" @model_validator(mode="after") def check_segment_notes_len(self): if self.segment_notes is not None and len(self.segment_notes) != len(self.steps) - 1: raise ValueError( f"segment_notes muss genau {len(self.steps) - 1} Einträge haben (len(steps)-1)" ) return self class EdgeIdsBatch(BaseModel): edge_ids: List[int] = Field(..., min_length=1) _EDGE_SELECT = """ SELECT e.id, e.graph_id, e.from_exercise_id, e.from_exercise_variant_id, e.to_exercise_id, e.to_exercise_variant_id, e.edge_type, e.notes, e.created_at, ef.title AS from_exercise_title, et.title AS to_exercise_title, vf.variant_name AS from_variant_name, vt.variant_name AS to_variant_name FROM exercise_progression_edges e JOIN exercises ef ON ef.id = e.from_exercise_id JOIN exercises et ON et.id = e.to_exercise_id LEFT JOIN exercise_variants vf ON vf.id = e.from_exercise_variant_id LEFT JOIN exercise_variants vt ON vt.id = e.to_exercise_variant_id """ def _graph_row(cur, graph_id: int) -> dict: cur.execute( "SELECT * FROM exercise_progression_graphs WHERE id = %s", (graph_id,), ) r = cur.fetchone() if not r: raise HTTPException(status_code=404, detail="Progressionsgraph nicht gefunden") return r2d(r) def _assert_graph_readable(cur, row: dict, profile_id: int, role: str) -> None: vis = (row.get("visibility") or "private").strip().lower() cid = row.get("club_id") if cid is not None: cid = int(cid) cr = row.get("created_by") if cr is not None: cr = int(cr) if not library_content_visible_to_profile(cur, profile_id, vis, cid, cr, role): raise HTTPException(status_code=403, detail="Keine Berechtigung für diesen Progressionsgraph") def _assert_graph_writable(cur, row: dict, profile_id: int, role: str) -> None: assert_library_content_editable(cur, profile_id, role, row) def _persist_graph_planning_roadmap(cur, graph_id: int, raw: Optional[Dict[str, Any]]) -> None: try: normalized = normalize_planning_roadmap_payload(raw) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc cur.execute( "UPDATE exercise_progression_graphs SET planning_roadmap = %s WHERE id = %s", (Json(normalized) if normalized is not None else None, graph_id), ) def _require_graph_read(cur, graph_id: int, profile_id: int, role: str) -> dict: row = _graph_row(cur, graph_id) _assert_graph_readable(cur, row, profile_id, role) return row def _require_graph_write(cur, graph_id: int, profile_id: int, role: str) -> dict: row = _graph_row(cur, graph_id) _assert_graph_writable(cur, row, profile_id, role) return row def _assert_exercises_exist(cur, *exercise_ids: int) -> None: unique_ids = list(dict.fromkeys(exercise_ids)) if not unique_ids: return cur.execute( f"SELECT id FROM exercises WHERE id IN ({','.join(['%s'] * len(unique_ids))})", tuple(unique_ids), ) found = {r["id"] if isinstance(r, dict) else r[0] for r in cur.fetchall()} missing = set(unique_ids) - found if missing: raise HTTPException( status_code=400, detail=f"Unbekannte Übung(en): {sorted(missing)}", ) def _assert_variant_for_exercise(cur, exercise_id: int, variant_id: Optional[int]) -> None: if variant_id is None: return cur.execute( "SELECT exercise_id FROM exercise_variants WHERE id = %s", (variant_id,), ) row = cur.fetchone() if not row: raise HTTPException(status_code=400, detail=f"Unbekannte Variante: {variant_id}") ev_ex = row["exercise_id"] if isinstance(row, dict) else row[0] if ev_ex != exercise_id: raise HTTPException(status_code=400, detail="Variante gehört nicht zur gewählten Übung") def _exercise_allowed_in_progression_graph( exercise_row: Mapping[str, Any], *, graph_visibility: str, graph_club_id: Optional[int], profile_id: int, role: str, ) -> bool: """Prüft, ob eine Übung zur Sichtbarkeit des Progressionsgraphen passt.""" ex_vis = (exercise_row.get("visibility") or "private").strip().lower() gvis = (graph_visibility or "private").strip().lower() if gvis == "private": if ex_vis == "official": return True if ex_vis == "club": return True if ex_vis == "private": if is_platform_admin(role): return True try: return int(exercise_row.get("created_by") or 0) == int(profile_id) except (TypeError, ValueError): return False return False if gvis == "club": if ex_vis == "official": return True if ex_vis != "club": return False ex_club = exercise_row.get("club_id") if ex_club is None: return False if graph_club_id is None: return True return int(ex_club) == int(graph_club_id) return ex_vis == "official" def _assert_exercises_allowed_in_graph( cur, graph_id: int, profile_id: int, role: str, *exercise_ids: int, ) -> None: """400 wenn eine Übung nicht zur Graph-Sichtbarkeit passt.""" row = _graph_row(cur, graph_id) gvis = (row.get("visibility") or "private").strip().lower() gclub_raw = row.get("club_id") gclub = int(gclub_raw) if gclub_raw is not None else None unique = list(dict.fromkeys(exercise_ids)) if not unique: return ph = ",".join(["%s"] * len(unique)) cur.execute( f"SELECT id, title, visibility, club_id, created_by FROM exercises WHERE id IN ({ph})", tuple(unique), ) by_id = {int(r2d(r)["id"]): r2d(r) for r in cur.fetchall()} for eid in unique: ex = by_id.get(int(eid)) if not ex: continue if _exercise_allowed_in_progression_graph( ex, graph_visibility=gvis, graph_club_id=gclub, profile_id=profile_id, role=role, ): continue title = (ex.get("title") or "").strip() or f"#{eid}" raise HTTPException( status_code=400, detail=( f"Übung „{title}“ (Sichtbarkeit: {ex.get('visibility') or 'private'}) " f"passt nicht zum Progressionsgraphen ({gvis})." ), ) def _insert_edge_row( cur, graph_id: int, from_exercise_id: int, from_variant_id: Optional[int], to_exercise_id: int, to_variant_id: Optional[int], edge_type: str, notes: Optional[str], ) -> dict: cur.execute( """ INSERT INTO exercise_progression_edges ( graph_id, from_exercise_id, from_exercise_variant_id, to_exercise_id, to_exercise_variant_id, edge_type, notes ) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( graph_id, from_exercise_id, from_variant_id, to_exercise_id, to_variant_id, edge_type, notes, ), ) new_id = cur.fetchone()["id"] cur.execute(_EDGE_SELECT + " WHERE e.id = %s", (new_id,)) return r2d(cur.fetchone()) @router.get("/exercise-progression-graphs") def list_progression_graphs(tenant: TenantContext = Depends(get_tenant_context)): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) vis_sql, vis_params = library_content_visibility_sql( alias="g", profile_id=profile_id, role=role, effective_club_id=tenant.effective_club_id, ) cur.execute( f""" SELECT g.*, (SELECT COUNT(*) FROM exercise_progression_edges e WHERE e.graph_id = g.id) AS edges_count FROM exercise_progression_graphs g WHERE ({vis_sql}) ORDER BY g.updated_at DESC NULLS LAST, g.name """, vis_params, ) return [r2d(r) for r in cur.fetchall()] @router.get("/exercise-progression-graphs/{graph_id}") def get_progression_graph( graph_id: int, include_edges: bool = Query(default=False), tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) row = _require_graph_read(cur, graph_id, profile_id, role) if include_edges: cur.execute( _EDGE_SELECT + " WHERE e.graph_id = %s ORDER BY e.id", (graph_id,), ) row["edges"] = [r2d(r) for r in cur.fetchall()] return row def _exercise_ids_from_planning_roadmap(artifact: Optional[Dict[str, Any]]) -> set[int]: ids: set[int] = set() if not artifact or not isinstance(artifact, dict): return ids for slot in artifact.get("slot_contents") or []: if not isinstance(slot, dict): continue primary = slot.get("primary") if isinstance(slot.get("primary"), dict) else {} if primary.get("kind") == "library" and primary.get("exercise_id") is not None: try: ids.add(int(primary["exercise_id"])) except (TypeError, ValueError): pass for sib in slot.get("siblings") or []: if not isinstance(sib, dict): continue if sib.get("kind") == "library" and sib.get("exercise_id") is not None: try: ids.add(int(sib["exercise_id"])) except (TypeError, ValueError): pass return ids def _collect_graph_referenced_exercise_ids(cur, graph_id: int) -> set[int]: ids: set[int] = set() cur.execute( """ SELECT from_exercise_id, to_exercise_id FROM exercise_progression_edges WHERE graph_id = %s """, (graph_id,), ) for row in cur.fetchall(): for key in ("from_exercise_id", "to_exercise_id"): raw = row.get(key) if raw is not None: ids.add(int(raw)) cur.execute( "SELECT planning_roadmap FROM exercise_progression_graphs WHERE id = %s", (graph_id,), ) prow = cur.fetchone() if prow and prow.get("planning_roadmap"): art = prow["planning_roadmap"] if isinstance(art, str): try: art = json.loads(art) except json.JSONDecodeError: art = None ids |= _exercise_ids_from_planning_roadmap(art if isinstance(art, dict) else None) return ids def _graph_promotion_transition(graph_visibility: str, target_visibility: str) -> Optional[tuple[str, ...]]: """ Erlaubte Graph-Promotions und welche Übungs-Sichtbarkeiten mit angehoben werden müssen. Returns None wenn kein Übungs-Promotion-Hinweis nötig. """ gvis = (graph_visibility or "private").strip().lower() tvis = (target_visibility or "").strip().lower() transitions: Dict[tuple[str, str], tuple[str, ...]] = { ("private", "club"): ("private",), ("private", "official"): ("private", "club"), ("club", "official"): ("private", "club"), } return transitions.get((gvis, tvis)) @router.get("/exercise-progression-graphs/{graph_id}/visibility-promotion-candidates") def list_visibility_promotion_candidates( graph_id: int, target_visibility: str = Query(default="club", pattern="^(club|official)$"), tenant: TenantContext = Depends(get_tenant_context), ): """ Übungen im Graph, die bei Promotion des Graphen mit angehoben werden müssten. Unterstützt: private→club, private→official, club→official. """ profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) row = _require_graph_read(cur, graph_id, profile_id, role) graph_vis = (row.get("visibility") or "private").strip().lower() target_vis = (target_visibility or "club").strip().lower() need_vis = _graph_promotion_transition(graph_vis, target_vis) if not need_vis: return { "graph_id": graph_id, "graph_visibility": graph_vis, "target_visibility": target_vis, "exercises": [], } ref_ids = _collect_graph_referenced_exercise_ids(cur, graph_id) if not ref_ids: return { "graph_id": graph_id, "graph_visibility": graph_vis, "target_visibility": target_vis, "exercises": [], } vis_placeholders = ",".join(["%s"] * len(need_vis)) ph = ",".join(["%s"] * len(ref_ids)) cur.execute( f""" SELECT id, title, visibility, club_id, created_by FROM exercises WHERE id IN ({ph}) AND LOWER(TRIM(COALESCE(visibility, ''))) IN ({vis_placeholders}) ORDER BY title """, list(ref_ids) + list(need_vis), ) exercises = [] for ex in cur.fetchall(): exd = r2d(ex) if not library_content_visible_to_profile( cur, profile_id, (exd.get("visibility") or "private").strip().lower(), exd.get("club_id"), exd.get("created_by"), role, ): continue exercises.append( { "id": exd["id"], "title": exd.get("title"), "visibility": exd.get("visibility"), } ) return { "graph_id": graph_id, "graph_visibility": graph_vis, "target_visibility": target_vis, "exercises": exercises, } @router.post("/exercise-progression-graphs", status_code=201) def create_progression_graph( body: ProgressionGraphCreate, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role if not _has_planning_role(role): raise HTTPException(status_code=403, detail="Keine Berechtigung zum Anlegen von Progressionsgraphen") name = body.name.strip() if not name: raise HTTPException(status_code=400, detail="name ist Pflicht") vis = (body.visibility or "private").strip().lower() cid = body.club_id if vis == "club": if cid is None: cid = tenant.effective_club_id if cid is None: raise HTTPException( status_code=400, detail="Vereins-Graph: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).", ) gov_club = cid if vis == "club" else None with get_db() as conn: cur = get_cursor(conn) assert_valid_governance_visibility(cur, profile_id, role, vis, gov_club) cur.execute( """ INSERT INTO exercise_progression_graphs (name, description, visibility, club_id, created_by) VALUES (%s, %s, %s, %s, %s) RETURNING id """, (name, body.description, vis, cid if vis == "club" else None, profile_id), ) gid = cur.fetchone()["id"] conn.commit() return get_progression_graph(gid, include_edges=False, tenant=tenant) @router.put("/exercise-progression-graphs/{graph_id}") def update_progression_graph( graph_id: int, body: ProgressionGraphUpdate, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role original = body.model_dump(exclude_unset=True) if not original: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") with get_db() as conn: cur = get_cursor(conn) row = _require_graph_write(cur, graph_id, profile_id, role) ex_vis = (row.get("visibility") or "private").strip().lower() ex_cid_raw = row.get("club_id") ex_cid = int(ex_cid_raw) if ex_cid_raw is not None else None next_vis = ex_vis if "visibility" in original and original["visibility"] is not None: v_raw = str(original["visibility"]).strip().lower() if v_raw: next_vis = v_raw next_club = ex_cid if "club_id" in original: raw_c = original["club_id"] if raw_c in (None, "", []): next_club = None else: next_club = int(raw_c) if next_vis == "club": if next_club is None: next_club = tenant.effective_club_id if next_club is None: raise HTTPException( status_code=400, detail="Vereins-Graph: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).", ) gov_club = next_club if next_vis == "club" else None assert_library_content_governance_transition(cur, profile_id, role, row, next_vis, gov_club) assert_valid_governance_visibility(cur, profile_id, role, next_vis, gov_club) fields: List[str] = [] params: List[Any] = [] if "name" in original: n = (original["name"] or "").strip() if not n: raise HTTPException(status_code=400, detail="name ist Pflicht") fields.append("name = %s") params.append(n) if "description" in original: fields.append("description = %s") params.append(original["description"]) vis_changed = next_vis != ex_vis if "visibility" in original or vis_changed: fields.append("visibility = %s") params.append(next_vis) if "club_id" in original or vis_changed: fields.append("club_id = %s") params.append(next_club if next_vis == "club" else None) if "planning_roadmap" in original: _persist_graph_planning_roadmap(cur, graph_id, original.get("planning_roadmap")) if not fields and "planning_roadmap" not in original: return get_progression_graph(graph_id, include_edges=False, tenant=tenant) if fields: fields.append("updated_at = NOW()") params.append(graph_id) cur.execute( f"UPDATE exercise_progression_graphs SET {', '.join(fields)} WHERE id = %s", tuple(params), ) elif "planning_roadmap" in original: cur.execute( "UPDATE exercise_progression_graphs SET updated_at = NOW() WHERE id = %s", (graph_id,), ) conn.commit() return get_progression_graph(graph_id, include_edges=False, tenant=tenant) @router.delete("/exercise-progression-graphs/{graph_id}") def delete_progression_graph(graph_id: int, tenant: TenantContext = Depends(get_tenant_context)): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) row = _graph_row(cur, graph_id) _assert_graph_readable(cur, row, profile_id, role) assert_library_content_deletable(cur, profile_id, role, row) cur.execute("DELETE FROM exercise_progression_graphs WHERE id = %s", (graph_id,)) conn.commit() return {"ok": True} @router.get("/exercise-progression-graphs/{graph_id}/edges") def list_progression_edges( graph_id: int, from_exercise_id: Optional[int] = Query(default=None), to_exercise_id: Optional[int] = Query(default=None), tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) _require_graph_read(cur, graph_id, profile_id, role) q = _EDGE_SELECT + " WHERE e.graph_id = %s" params: List[Any] = [graph_id] if from_exercise_id is not None: q += " AND e.from_exercise_id = %s" params.append(from_exercise_id) if to_exercise_id is not None: q += " AND e.to_exercise_id = %s" params.append(to_exercise_id) q += " ORDER BY e.id" cur.execute(q, tuple(params)) return [r2d(r) for r in cur.fetchall()] @router.post("/exercise-progression-graphs/{graph_id}/edges", status_code=201) def create_progression_edge( graph_id: int, body: ProgressionEdgeCreate, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) _require_graph_write(cur, graph_id, profile_id, role) _assert_exercises_exist(cur, body.from_exercise_id, body.to_exercise_id) _assert_exercises_allowed_in_graph( cur, graph_id, profile_id, role, body.from_exercise_id, body.to_exercise_id ) fv = body.from_exercise_variant_id tv = body.to_exercise_variant_id _assert_variant_for_exercise(cur, body.from_exercise_id, fv) _assert_variant_for_exercise(cur, body.to_exercise_id, tv) et = (body.edge_type or "next_exercise").strip() or "next_exercise" notes = (body.notes or "").strip() or None try: row = _insert_edge_row( cur, graph_id, body.from_exercise_id, fv, body.to_exercise_id, tv, et, notes, ) conn.commit() except IntegrityError as e: conn.rollback() raise HTTPException( status_code=409, detail="Kante existiert bereits oder Endpunkte unzulässig (gleiche Übung ohne zwei Varianten)", ) from e return row @router.post("/exercise-progression-graphs/{graph_id}/edges/sequence", status_code=201) def create_progression_sequence( graph_id: int, body: ProgressionSequenceCreate, tenant: TenantContext = Depends(get_tenant_context), ): """Legt n−1 Nachfolger-Kanten (next_exercise) für eine geordnete Schrittliste an.""" profile_id = tenant.profile_id role = tenant.global_role steps = body.steps n_seg = len(steps) - 1 seg_notes = body.segment_notes created: List[dict] = [] with get_db() as conn: cur = get_cursor(conn) _require_graph_write(cur, graph_id, profile_id, role) ex_ids = [s.exercise_id for s in steps] _assert_exercises_exist(cur, *ex_ids) _assert_exercises_allowed_in_graph(cur, graph_id, profile_id, role, *ex_ids) try: for i in range(n_seg): a, b = steps[i], steps[i + 1] _assert_variant_for_exercise(cur, a.exercise_id, a.variant_id) _assert_variant_for_exercise(cur, b.exercise_id, b.variant_id) note = None if seg_notes is not None: raw = seg_notes[i] note = (raw or "").strip() or None if raw is not None else None row = _insert_edge_row( cur, graph_id, a.exercise_id, a.variant_id, b.exercise_id, b.variant_id, "next_exercise", note, ) created.append(row) if body.planning_roadmap is not None: _persist_graph_planning_roadmap(cur, graph_id, body.planning_roadmap) cur.execute( "UPDATE exercise_progression_graphs SET updated_at = NOW() WHERE id = %s", (graph_id,), ) conn.commit() except IntegrityError as e: conn.rollback() raise HTTPException( status_code=409, detail="Sequenz konnte nicht vollständig angelegt werden (Duplikat oder ungültige Endpunkte). Keine Teilmenge gespeichert.", ) from e return {"created": created, "count": len(created)} @router.put("/exercise-progression-graphs/{graph_id}/edges/{edge_id}") def update_progression_edge( graph_id: int, edge_id: int, body: ProgressionEdgeUpdate, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role data = body.model_dump(exclude_unset=True) if not data: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") with get_db() as conn: cur = get_cursor(conn) _require_graph_write(cur, graph_id, profile_id, role) cur.execute( "SELECT id FROM exercise_progression_edges WHERE id = %s AND graph_id = %s", (edge_id, graph_id), ) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Kante nicht gefunden") if "notes" in data: n = data["notes"] notes_val = (n or "").strip() or None if n is not None else None cur.execute( "UPDATE exercise_progression_edges SET notes = %s WHERE id = %s AND graph_id = %s", (notes_val, edge_id, graph_id), ) conn.commit() cur.execute(_EDGE_SELECT + " WHERE e.id = %s AND e.graph_id = %s", (edge_id, graph_id)) return r2d(cur.fetchone()) @router.delete("/exercise-progression-graphs/{graph_id}/edges/{edge_id}") def delete_progression_edge( graph_id: int, edge_id: int, tenant: TenantContext = Depends(get_tenant_context), ): profile_id = tenant.profile_id role = tenant.global_role with get_db() as conn: cur = get_cursor(conn) _require_graph_write(cur, graph_id, profile_id, role) cur.execute( """ DELETE FROM exercise_progression_edges WHERE id = %s AND graph_id = %s RETURNING id """, (edge_id, graph_id), ) if not cur.fetchone(): raise HTTPException(status_code=404, detail="Kante nicht gefunden") conn.commit() return {"ok": True} @router.post("/exercise-progression-graphs/{graph_id}/edges/delete-batch") def delete_progression_edges_batch( graph_id: int, body: EdgeIdsBatch, tenant: TenantContext = Depends(get_tenant_context), ): """Löscht mehrere Kanten (z. B. eine zusammenhängende Kette in einem Schritt).""" profile_id = tenant.profile_id role = tenant.global_role ids = body.edge_ids clean_ids = list(dict.fromkeys(ids)) with get_db() as conn: cur = get_cursor(conn) _require_graph_write(cur, graph_id, profile_id, role) cur.execute( f""" DELETE FROM exercise_progression_edges WHERE graph_id = %s AND id IN ({",".join(["%s"] * len(clean_ids))}) """, (graph_id, *clean_ids), ) deleted = cur.rowcount conn.commit() return {"ok": True, "deleted": deleted}