shinkan-jinkendo/backend/routers/exercise_progression_graphs.py
Lars 4b9374765b
All checks were successful
Deploy Development / deploy (push) Successful in 41s
Test Suite / pytest-backend (push) Successful in 43s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 34s
Test Suite / playwright-tests (push) Successful in 1m21s
Enhance Progression Graph Management with F15 Features and Evaluation Improvements
- Updated `PROJECT_STATUS.md` to reflect the implementation of F15 features, including the unified slot review and handling of `findings_stale`.
- Enhanced `PROGRESSION_GRAPH_SLOT_EDITOR_SPEC.md` with detailed descriptions of new functionalities related to the match dialog and path quality assessments.
- Introduced new functions in `exercise_progression_graphs.py` to validate exercise visibility against progression graph settings, ensuring proper governance.
- Improved frontend components to support new governance parameters (visibility and club_id) in exercise creation workflows.
- Updated documentation in `HANDOVER.md` and `PLANNING_KI_ROADMAP.md` to outline the latest developments and validation results for the F15 features.
- Enhanced utility functions for exercise creation to incorporate governance settings, improving the overall user experience in the path builder and editor.
2026-06-14 06:44:12 +02:00

827 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Progressionsgraph zwischen Übungen (Übung → Übung), Migration 032034.
Optional Übungsvarianten als Knoten-Endpunkte; Sequenz-Bulk-Anlage.
AuthZ analog training_plan_templates: Bearbeiten/Löschen wie Übungen; Governance-Übergänge zentral.
"""
import json
from typing import Any, Dict, List, Mapping, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field, model_validator
from psycopg2 import IntegrityError
from psycopg2.extras import Json
from db import get_db, get_cursor, r2d
from progression_graph_planning_artifact import normalize_planning_roadmap_payload
from tenant_context import TenantContext, get_tenant_context, library_content_visibility_sql
from club_tenancy import (
assert_library_content_deletable,
assert_library_content_editable,
assert_library_content_governance_transition,
assert_valid_governance_visibility,
is_platform_admin,
library_content_visible_to_profile,
)
from routers.training_planning import _has_planning_role
router = APIRouter(prefix="/api", tags=["exercises"])
class ProgressionGraphCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
description: Optional[str] = None
visibility: str = Field(default="private", pattern="^(private|club|official)$")
club_id: Optional[int] = None
class ProgressionGraphUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
planning_roadmap: Optional[Dict[str, Any]] = None
class ProgressionEdgeCreate(BaseModel):
from_exercise_id: int = Field(..., gt=0)
to_exercise_id: int = Field(..., gt=0)
from_exercise_variant_id: Optional[int] = Field(default=None)
to_exercise_variant_id: Optional[int] = Field(default=None)
edge_type: str = Field(default="next_exercise", min_length=1, max_length=50)
notes: Optional[str] = Field(None, max_length=4000)
class ProgressionEdgeUpdate(BaseModel):
notes: Optional[str] = Field(None, max_length=4000)
class SequenceStep(BaseModel):
exercise_id: int = Field(..., gt=0)
variant_id: Optional[int] = Field(default=None)
class ProgressionSequenceCreate(BaseModel):
steps: List[SequenceStep] = Field(..., min_length=2)
segment_notes: Optional[List[Optional[str]]] = None
planning_roadmap: Optional[Dict[str, Any]] = None
"""Länge muss len(steps)-1 sein, wenn gesetzt; Notiz pro Kante Zwischen je zwei Schritten."""
@model_validator(mode="after")
def check_segment_notes_len(self):
if self.segment_notes is not None and len(self.segment_notes) != len(self.steps) - 1:
raise ValueError(
f"segment_notes muss genau {len(self.steps) - 1} Einträge haben (len(steps)-1)"
)
return self
class EdgeIdsBatch(BaseModel):
edge_ids: List[int] = Field(..., min_length=1)
_EDGE_SELECT = """
SELECT e.id, e.graph_id,
e.from_exercise_id, e.from_exercise_variant_id,
e.to_exercise_id, e.to_exercise_variant_id,
e.edge_type, e.notes, e.created_at,
ef.title AS from_exercise_title, et.title AS to_exercise_title,
vf.variant_name AS from_variant_name, vt.variant_name AS to_variant_name
FROM exercise_progression_edges e
JOIN exercises ef ON ef.id = e.from_exercise_id
JOIN exercises et ON et.id = e.to_exercise_id
LEFT JOIN exercise_variants vf ON vf.id = e.from_exercise_variant_id
LEFT JOIN exercise_variants vt ON vt.id = e.to_exercise_variant_id
"""
def _graph_row(cur, graph_id: int) -> dict:
cur.execute(
"SELECT * FROM exercise_progression_graphs WHERE id = %s",
(graph_id,),
)
r = cur.fetchone()
if not r:
raise HTTPException(status_code=404, detail="Progressionsgraph nicht gefunden")
return r2d(r)
def _assert_graph_readable(cur, row: dict, profile_id: int, role: str) -> None:
vis = (row.get("visibility") or "private").strip().lower()
cid = row.get("club_id")
if cid is not None:
cid = int(cid)
cr = row.get("created_by")
if cr is not None:
cr = int(cr)
if not library_content_visible_to_profile(cur, profile_id, vis, cid, cr, role):
raise HTTPException(status_code=403, detail="Keine Berechtigung für diesen Progressionsgraph")
def _assert_graph_writable(cur, row: dict, profile_id: int, role: str) -> None:
assert_library_content_editable(cur, profile_id, role, row)
def _persist_graph_planning_roadmap(cur, graph_id: int, raw: Optional[Dict[str, Any]]) -> None:
try:
normalized = normalize_planning_roadmap_payload(raw)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
cur.execute(
"UPDATE exercise_progression_graphs SET planning_roadmap = %s WHERE id = %s",
(Json(normalized) if normalized is not None else None, graph_id),
)
def _require_graph_read(cur, graph_id: int, profile_id: int, role: str) -> dict:
row = _graph_row(cur, graph_id)
_assert_graph_readable(cur, row, profile_id, role)
return row
def _require_graph_write(cur, graph_id: int, profile_id: int, role: str) -> dict:
row = _graph_row(cur, graph_id)
_assert_graph_writable(cur, row, profile_id, role)
return row
def _assert_exercises_exist(cur, *exercise_ids: int) -> None:
unique_ids = list(dict.fromkeys(exercise_ids))
if not unique_ids:
return
cur.execute(
f"SELECT id FROM exercises WHERE id IN ({','.join(['%s'] * len(unique_ids))})",
tuple(unique_ids),
)
found = {r["id"] if isinstance(r, dict) else r[0] for r in cur.fetchall()}
missing = set(unique_ids) - found
if missing:
raise HTTPException(
status_code=400,
detail=f"Unbekannte Übung(en): {sorted(missing)}",
)
def _assert_variant_for_exercise(cur, exercise_id: int, variant_id: Optional[int]) -> None:
if variant_id is None:
return
cur.execute(
"SELECT exercise_id FROM exercise_variants WHERE id = %s",
(variant_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=400, detail=f"Unbekannte Variante: {variant_id}")
ev_ex = row["exercise_id"] if isinstance(row, dict) else row[0]
if ev_ex != exercise_id:
raise HTTPException(status_code=400, detail="Variante gehört nicht zur gewählten Übung")
def _exercise_allowed_in_progression_graph(
exercise_row: Mapping[str, Any],
*,
graph_visibility: str,
graph_club_id: Optional[int],
profile_id: int,
role: str,
) -> bool:
"""Prüft, ob eine Übung zur Sichtbarkeit des Progressionsgraphen passt."""
ex_vis = (exercise_row.get("visibility") or "private").strip().lower()
gvis = (graph_visibility or "private").strip().lower()
if gvis == "private":
if ex_vis == "official":
return True
if ex_vis == "club":
return True
if ex_vis == "private":
if is_platform_admin(role):
return True
try:
return int(exercise_row.get("created_by") or 0) == int(profile_id)
except (TypeError, ValueError):
return False
return False
if gvis == "club":
if ex_vis == "official":
return True
if ex_vis != "club":
return False
ex_club = exercise_row.get("club_id")
if ex_club is None:
return False
if graph_club_id is None:
return True
return int(ex_club) == int(graph_club_id)
return ex_vis == "official"
def _assert_exercises_allowed_in_graph(
cur,
graph_id: int,
profile_id: int,
role: str,
*exercise_ids: int,
) -> None:
"""400 wenn eine Übung nicht zur Graph-Sichtbarkeit passt."""
row = _graph_row(cur, graph_id)
gvis = (row.get("visibility") or "private").strip().lower()
gclub_raw = row.get("club_id")
gclub = int(gclub_raw) if gclub_raw is not None else None
unique = list(dict.fromkeys(exercise_ids))
if not unique:
return
ph = ",".join(["%s"] * len(unique))
cur.execute(
f"SELECT id, title, visibility, club_id, created_by FROM exercises WHERE id IN ({ph})",
tuple(unique),
)
by_id = {int(r2d(r)["id"]): r2d(r) for r in cur.fetchall()}
for eid in unique:
ex = by_id.get(int(eid))
if not ex:
continue
if _exercise_allowed_in_progression_graph(
ex,
graph_visibility=gvis,
graph_club_id=gclub,
profile_id=profile_id,
role=role,
):
continue
title = (ex.get("title") or "").strip() or f"#{eid}"
raise HTTPException(
status_code=400,
detail=(
f"Übung „{title}“ (Sichtbarkeit: {ex.get('visibility') or 'private'}) "
f"passt nicht zum Progressionsgraphen ({gvis})."
),
)
def _insert_edge_row(
cur,
graph_id: int,
from_exercise_id: int,
from_variant_id: Optional[int],
to_exercise_id: int,
to_variant_id: Optional[int],
edge_type: str,
notes: Optional[str],
) -> dict:
cur.execute(
"""
INSERT INTO exercise_progression_edges (
graph_id,
from_exercise_id, from_exercise_variant_id,
to_exercise_id, to_exercise_variant_id,
edge_type, notes
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
graph_id,
from_exercise_id,
from_variant_id,
to_exercise_id,
to_variant_id,
edge_type,
notes,
),
)
new_id = cur.fetchone()["id"]
cur.execute(_EDGE_SELECT + " WHERE e.id = %s", (new_id,))
return r2d(cur.fetchone())
@router.get("/exercise-progression-graphs")
def list_progression_graphs(tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
vis_sql, vis_params = library_content_visibility_sql(
alias="g",
profile_id=profile_id,
role=role,
effective_club_id=tenant.effective_club_id,
)
cur.execute(
f"""
SELECT g.*,
(SELECT COUNT(*) FROM exercise_progression_edges e WHERE e.graph_id = g.id) AS edges_count
FROM exercise_progression_graphs g
WHERE ({vis_sql})
ORDER BY g.updated_at DESC NULLS LAST, g.name
""",
vis_params,
)
return [r2d(r) for r in cur.fetchall()]
@router.get("/exercise-progression-graphs/{graph_id}")
def get_progression_graph(
graph_id: int,
include_edges: bool = Query(default=False),
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row = _require_graph_read(cur, graph_id, profile_id, role)
if include_edges:
cur.execute(
_EDGE_SELECT + " WHERE e.graph_id = %s ORDER BY e.id",
(graph_id,),
)
row["edges"] = [r2d(r) for r in cur.fetchall()]
return row
def _exercise_ids_from_planning_roadmap(artifact: Optional[Dict[str, Any]]) -> set[int]:
ids: set[int] = set()
if not artifact or not isinstance(artifact, dict):
return ids
for slot in artifact.get("slot_contents") or []:
if not isinstance(slot, dict):
continue
primary = slot.get("primary") if isinstance(slot.get("primary"), dict) else {}
if primary.get("kind") == "library" and primary.get("exercise_id") is not None:
try:
ids.add(int(primary["exercise_id"]))
except (TypeError, ValueError):
pass
for sib in slot.get("siblings") or []:
if not isinstance(sib, dict):
continue
if sib.get("kind") == "library" and sib.get("exercise_id") is not None:
try:
ids.add(int(sib["exercise_id"]))
except (TypeError, ValueError):
pass
return ids
def _collect_graph_referenced_exercise_ids(cur, graph_id: int) -> set[int]:
ids: set[int] = set()
cur.execute(
"""
SELECT from_exercise_id, to_exercise_id
FROM exercise_progression_edges
WHERE graph_id = %s
""",
(graph_id,),
)
for row in cur.fetchall():
for key in ("from_exercise_id", "to_exercise_id"):
raw = row.get(key)
if raw is not None:
ids.add(int(raw))
cur.execute(
"SELECT planning_roadmap FROM exercise_progression_graphs WHERE id = %s",
(graph_id,),
)
prow = cur.fetchone()
if prow and prow.get("planning_roadmap"):
art = prow["planning_roadmap"]
if isinstance(art, str):
try:
art = json.loads(art)
except json.JSONDecodeError:
art = None
ids |= _exercise_ids_from_planning_roadmap(art if isinstance(art, dict) else None)
return ids
@router.get("/exercise-progression-graphs/{graph_id}/visibility-promotion-candidates")
def list_visibility_promotion_candidates(
graph_id: int,
target_visibility: str = Query(default="club", pattern="^(club|official)$"),
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Private Übungen im Graph, die bei Promotion des Graphen mit angehoben werden müssten.
"""
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row = _require_graph_read(cur, graph_id, profile_id, role)
graph_vis = (row.get("visibility") or "private").strip().lower()
if graph_vis != "private" or target_visibility != "club":
return {
"graph_id": graph_id,
"graph_visibility": graph_vis,
"target_visibility": target_visibility,
"exercises": [],
}
ref_ids = _collect_graph_referenced_exercise_ids(cur, graph_id)
if not ref_ids:
return {
"graph_id": graph_id,
"graph_visibility": graph_vis,
"target_visibility": target_visibility,
"exercises": [],
}
ph = ",".join(["%s"] * len(ref_ids))
cur.execute(
f"""
SELECT id, title, visibility, club_id, created_by
FROM exercises
WHERE id IN ({ph})
AND LOWER(TRIM(COALESCE(visibility, ''))) = 'private'
ORDER BY title
""",
list(ref_ids),
)
exercises = []
for ex in cur.fetchall():
exd = r2d(ex)
if not library_content_visible_to_profile(
cur,
profile_id,
(exd.get("visibility") or "private").strip().lower(),
exd.get("club_id"),
exd.get("created_by"),
role,
):
continue
exercises.append(
{
"id": exd["id"],
"title": exd.get("title"),
"visibility": exd.get("visibility"),
}
)
return {
"graph_id": graph_id,
"graph_visibility": graph_vis,
"target_visibility": target_visibility,
"exercises": exercises,
}
@router.post("/exercise-progression-graphs", status_code=201)
def create_progression_graph(
body: ProgressionGraphCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Keine Berechtigung zum Anlegen von Progressionsgraphen")
name = body.name.strip()
if not name:
raise HTTPException(status_code=400, detail="name ist Pflicht")
vis = (body.visibility or "private").strip().lower()
cid = body.club_id
if vis == "club":
if cid is None:
cid = tenant.effective_club_id
if cid is None:
raise HTTPException(
status_code=400,
detail="Vereins-Graph: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).",
)
gov_club = cid if vis == "club" else None
with get_db() as conn:
cur = get_cursor(conn)
assert_valid_governance_visibility(cur, profile_id, role, vis, gov_club)
cur.execute(
"""
INSERT INTO exercise_progression_graphs (name, description, visibility, club_id, created_by)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(name, body.description, vis, cid if vis == "club" else None, profile_id),
)
gid = cur.fetchone()["id"]
conn.commit()
return get_progression_graph(gid, include_edges=False, tenant=tenant)
@router.put("/exercise-progression-graphs/{graph_id}")
def update_progression_graph(
graph_id: int,
body: ProgressionGraphUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
original = body.model_dump(exclude_unset=True)
if not original:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
row = _require_graph_write(cur, graph_id, profile_id, role)
ex_vis = (row.get("visibility") or "private").strip().lower()
ex_cid_raw = row.get("club_id")
ex_cid = int(ex_cid_raw) if ex_cid_raw is not None else None
next_vis = ex_vis
if "visibility" in original and original["visibility"] is not None:
v_raw = str(original["visibility"]).strip().lower()
if v_raw:
next_vis = v_raw
next_club = ex_cid
if "club_id" in original:
raw_c = original["club_id"]
if raw_c in (None, "", []):
next_club = None
else:
next_club = int(raw_c)
if next_vis == "club":
if next_club is None:
next_club = tenant.effective_club_id
if next_club is None:
raise HTTPException(
status_code=400,
detail="Vereins-Graph: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).",
)
gov_club = next_club if next_vis == "club" else None
assert_library_content_governance_transition(cur, profile_id, role, row, next_vis, gov_club)
assert_valid_governance_visibility(cur, profile_id, role, next_vis, gov_club)
fields: List[str] = []
params: List[Any] = []
if "name" in original:
n = (original["name"] or "").strip()
if not n:
raise HTTPException(status_code=400, detail="name ist Pflicht")
fields.append("name = %s")
params.append(n)
if "description" in original:
fields.append("description = %s")
params.append(original["description"])
vis_changed = next_vis != ex_vis
if "visibility" in original or vis_changed:
fields.append("visibility = %s")
params.append(next_vis)
if "club_id" in original or vis_changed:
fields.append("club_id = %s")
params.append(next_club if next_vis == "club" else None)
if "planning_roadmap" in original:
_persist_graph_planning_roadmap(cur, graph_id, original.get("planning_roadmap"))
if not fields and "planning_roadmap" not in original:
return get_progression_graph(graph_id, include_edges=False, tenant=tenant)
if fields:
fields.append("updated_at = NOW()")
params.append(graph_id)
cur.execute(
f"UPDATE exercise_progression_graphs SET {', '.join(fields)} WHERE id = %s",
tuple(params),
)
elif "planning_roadmap" in original:
cur.execute(
"UPDATE exercise_progression_graphs SET updated_at = NOW() WHERE id = %s",
(graph_id,),
)
conn.commit()
return get_progression_graph(graph_id, include_edges=False, tenant=tenant)
@router.delete("/exercise-progression-graphs/{graph_id}")
def delete_progression_graph(graph_id: int, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row = _graph_row(cur, graph_id)
_assert_graph_readable(cur, row, profile_id, role)
assert_library_content_deletable(cur, profile_id, role, row)
cur.execute("DELETE FROM exercise_progression_graphs WHERE id = %s", (graph_id,))
conn.commit()
return {"ok": True}
@router.get("/exercise-progression-graphs/{graph_id}/edges")
def list_progression_edges(
graph_id: int,
from_exercise_id: Optional[int] = Query(default=None),
to_exercise_id: Optional[int] = Query(default=None),
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_read(cur, graph_id, profile_id, role)
q = _EDGE_SELECT + " WHERE e.graph_id = %s"
params: List[Any] = [graph_id]
if from_exercise_id is not None:
q += " AND e.from_exercise_id = %s"
params.append(from_exercise_id)
if to_exercise_id is not None:
q += " AND e.to_exercise_id = %s"
params.append(to_exercise_id)
q += " ORDER BY e.id"
cur.execute(q, tuple(params))
return [r2d(r) for r in cur.fetchall()]
@router.post("/exercise-progression-graphs/{graph_id}/edges", status_code=201)
def create_progression_edge(
graph_id: int,
body: ProgressionEdgeCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
_assert_exercises_exist(cur, body.from_exercise_id, body.to_exercise_id)
_assert_exercises_allowed_in_graph(
cur, graph_id, profile_id, role, body.from_exercise_id, body.to_exercise_id
)
fv = body.from_exercise_variant_id
tv = body.to_exercise_variant_id
_assert_variant_for_exercise(cur, body.from_exercise_id, fv)
_assert_variant_for_exercise(cur, body.to_exercise_id, tv)
et = (body.edge_type or "next_exercise").strip() or "next_exercise"
notes = (body.notes or "").strip() or None
try:
row = _insert_edge_row(
cur,
graph_id,
body.from_exercise_id,
fv,
body.to_exercise_id,
tv,
et,
notes,
)
conn.commit()
except IntegrityError as e:
conn.rollback()
raise HTTPException(
status_code=409,
detail="Kante existiert bereits oder Endpunkte unzulässig (gleiche Übung ohne zwei Varianten)",
) from e
return row
@router.post("/exercise-progression-graphs/{graph_id}/edges/sequence", status_code=201)
def create_progression_sequence(
graph_id: int,
body: ProgressionSequenceCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Legt n1 Nachfolger-Kanten (next_exercise) für eine geordnete Schrittliste an."""
profile_id = tenant.profile_id
role = tenant.global_role
steps = body.steps
n_seg = len(steps) - 1
seg_notes = body.segment_notes
created: List[dict] = []
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
ex_ids = [s.exercise_id for s in steps]
_assert_exercises_exist(cur, *ex_ids)
_assert_exercises_allowed_in_graph(cur, graph_id, profile_id, role, *ex_ids)
try:
for i in range(n_seg):
a, b = steps[i], steps[i + 1]
_assert_variant_for_exercise(cur, a.exercise_id, a.variant_id)
_assert_variant_for_exercise(cur, b.exercise_id, b.variant_id)
note = None
if seg_notes is not None:
raw = seg_notes[i]
note = (raw or "").strip() or None if raw is not None else None
row = _insert_edge_row(
cur,
graph_id,
a.exercise_id,
a.variant_id,
b.exercise_id,
b.variant_id,
"next_exercise",
note,
)
created.append(row)
if body.planning_roadmap is not None:
_persist_graph_planning_roadmap(cur, graph_id, body.planning_roadmap)
cur.execute(
"UPDATE exercise_progression_graphs SET updated_at = NOW() WHERE id = %s",
(graph_id,),
)
conn.commit()
except IntegrityError as e:
conn.rollback()
raise HTTPException(
status_code=409,
detail="Sequenz konnte nicht vollständig angelegt werden (Duplikat oder ungültige Endpunkte). Keine Teilmenge gespeichert.",
) from e
return {"created": created, "count": len(created)}
@router.put("/exercise-progression-graphs/{graph_id}/edges/{edge_id}")
def update_progression_edge(
graph_id: int,
edge_id: int,
body: ProgressionEdgeUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
data = body.model_dump(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute(
"SELECT id FROM exercise_progression_edges WHERE id = %s AND graph_id = %s",
(edge_id, graph_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Kante nicht gefunden")
if "notes" in data:
n = data["notes"]
notes_val = (n or "").strip() or None if n is not None else None
cur.execute(
"UPDATE exercise_progression_edges SET notes = %s WHERE id = %s AND graph_id = %s",
(notes_val, edge_id, graph_id),
)
conn.commit()
cur.execute(_EDGE_SELECT + " WHERE e.id = %s AND e.graph_id = %s", (edge_id, graph_id))
return r2d(cur.fetchone())
@router.delete("/exercise-progression-graphs/{graph_id}/edges/{edge_id}")
def delete_progression_edge(
graph_id: int,
edge_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute(
"""
DELETE FROM exercise_progression_edges
WHERE id = %s AND graph_id = %s
RETURNING id
""",
(edge_id, graph_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Kante nicht gefunden")
conn.commit()
return {"ok": True}
@router.post("/exercise-progression-graphs/{graph_id}/edges/delete-batch")
def delete_progression_edges_batch(
graph_id: int,
body: EdgeIdsBatch,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Löscht mehrere Kanten (z. B. eine zusammenhängende Kette in einem Schritt)."""
profile_id = tenant.profile_id
role = tenant.global_role
ids = body.edge_ids
clean_ids = list(dict.fromkeys(ids))
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute(
f"""
DELETE FROM exercise_progression_edges
WHERE graph_id = %s AND id IN ({",".join(["%s"] * len(clean_ids))})
""",
(graph_id, *clean_ids),
)
deleted = cur.rowcount
conn.commit()
return {"ok": True, "deleted": deleted}