shinkan-jinkendo/backend/routers/exercise_progression_graphs.py
Lars 5aee9c52fc
Some checks failed
Deploy Development / deploy (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 6s
Test Suite / playwright-tests (push) Failing after 34s
feat: integrate tenant context across club-related APIs
- Refactored club join requests, memberships, and clubs routers to utilize TenantContext for authentication and authorization, enhancing security and consistency.
- Updated session handling to replace direct session dictionary access with TenantContext, improving code clarity and maintainability.
- Ensured proper role and profile ID retrieval from TenantContext in various endpoints, streamlining access control for club management functionalities.
2026-05-05 22:05:10 +02:00

598 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Progressionsgraph zwischen Übungen (Übung → Übung), Migration 032034.
Optional Übungsvarianten als Knoten-Endpunkte; Sequenz-Bulk-Anlage.
AuthZ analog training_plan_templates (_template_access / _has_planning_role).
"""
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field, model_validator
from psycopg2 import IntegrityError
from db import get_db, get_cursor, r2d
from tenant_context import TenantContext, get_tenant_context, library_content_visibility_sql
from club_tenancy import (
assert_valid_governance_visibility,
exercise_visible_to_profile,
is_platform_admin,
)
from routers.training_planning import _has_planning_role
router = APIRouter(prefix="/api", tags=["exercises"])
class ProgressionGraphCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
description: Optional[str] = None
visibility: str = Field(default="private", pattern="^(private|club|official)$")
club_id: Optional[int] = None
class ProgressionGraphUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
club_id: Optional[int] = None
class ProgressionEdgeCreate(BaseModel):
from_exercise_id: int = Field(..., gt=0)
to_exercise_id: int = Field(..., gt=0)
from_exercise_variant_id: Optional[int] = Field(default=None)
to_exercise_variant_id: Optional[int] = Field(default=None)
edge_type: str = Field(default="next_exercise", min_length=1, max_length=50)
notes: Optional[str] = Field(None, max_length=4000)
class ProgressionEdgeUpdate(BaseModel):
notes: Optional[str] = Field(None, max_length=4000)
class SequenceStep(BaseModel):
exercise_id: int = Field(..., gt=0)
variant_id: Optional[int] = Field(default=None)
class ProgressionSequenceCreate(BaseModel):
steps: List[SequenceStep] = Field(..., min_length=2)
segment_notes: Optional[List[Optional[str]]] = None
"""Länge muss len(steps)-1 sein, wenn gesetzt; Notiz pro Kante Zwischen je zwei Schritten."""
@model_validator(mode="after")
def check_segment_notes_len(self):
if self.segment_notes is not None and len(self.segment_notes) != len(self.steps) - 1:
raise ValueError(
f"segment_notes muss genau {len(self.steps) - 1} Einträge haben (len(steps)-1)"
)
return self
class EdgeIdsBatch(BaseModel):
edge_ids: List[int] = Field(..., min_length=1)
_EDGE_SELECT = """
SELECT e.id, e.graph_id,
e.from_exercise_id, e.from_exercise_variant_id,
e.to_exercise_id, e.to_exercise_variant_id,
e.edge_type, e.notes, e.created_at,
ef.title AS from_exercise_title, et.title AS to_exercise_title,
vf.variant_name AS from_variant_name, vt.variant_name AS to_variant_name
FROM exercise_progression_edges e
JOIN exercises ef ON ef.id = e.from_exercise_id
JOIN exercises et ON et.id = e.to_exercise_id
LEFT JOIN exercise_variants vf ON vf.id = e.from_exercise_variant_id
LEFT JOIN exercise_variants vt ON vt.id = e.to_exercise_variant_id
"""
def _graph_row(cur, graph_id: int) -> dict:
cur.execute(
"SELECT * FROM exercise_progression_graphs WHERE id = %s",
(graph_id,),
)
r = cur.fetchone()
if not r:
raise HTTPException(status_code=404, detail="Progressionsgraph nicht gefunden")
return r2d(r)
def _assert_graph_readable(cur, row: dict, profile_id: int, role: str) -> None:
vis = (row.get("visibility") or "private").strip().lower()
cid = row.get("club_id")
if cid is not None:
cid = int(cid)
cr = row.get("created_by")
if cr is not None:
cr = int(cr)
if not exercise_visible_to_profile(cur, profile_id, vis, cid, cr, role):
raise HTTPException(status_code=403, detail="Keine Berechtigung für diesen Progressionsgraph")
def _assert_graph_writable(cur, row: dict, profile_id: int, role: str) -> None:
if is_platform_admin(role):
return
created_by = row.get("created_by")
if created_by is not None:
created_by = int(created_by)
if created_by != profile_id:
raise HTTPException(status_code=403, detail="Keine Berechtigung für diesen Progressionsgraph")
def _require_graph_read(cur, graph_id: int, profile_id: int, role: str) -> dict:
row = _graph_row(cur, graph_id)
_assert_graph_readable(cur, row, profile_id, role)
return row
def _require_graph_write(cur, graph_id: int, profile_id: int, role: str) -> dict:
row = _graph_row(cur, graph_id)
_assert_graph_writable(cur, row, profile_id, role)
return row
def _assert_exercises_exist(cur, *exercise_ids: int) -> None:
unique_ids = list(dict.fromkeys(exercise_ids))
if not unique_ids:
return
cur.execute(
f"SELECT id FROM exercises WHERE id IN ({','.join(['%s'] * len(unique_ids))})",
tuple(unique_ids),
)
found = {r["id"] if isinstance(r, dict) else r[0] for r in cur.fetchall()}
missing = set(unique_ids) - found
if missing:
raise HTTPException(
status_code=400,
detail=f"Unbekannte Übung(en): {sorted(missing)}",
)
def _assert_variant_for_exercise(cur, exercise_id: int, variant_id: Optional[int]) -> None:
if variant_id is None:
return
cur.execute(
"SELECT exercise_id FROM exercise_variants WHERE id = %s",
(variant_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=400, detail=f"Unbekannte Variante: {variant_id}")
ev_ex = row["exercise_id"] if isinstance(row, dict) else row[0]
if ev_ex != exercise_id:
raise HTTPException(status_code=400, detail="Variante gehört nicht zur gewählten Übung")
def _insert_edge_row(
cur,
graph_id: int,
from_exercise_id: int,
from_variant_id: Optional[int],
to_exercise_id: int,
to_variant_id: Optional[int],
edge_type: str,
notes: Optional[str],
) -> dict:
cur.execute(
"""
INSERT INTO exercise_progression_edges (
graph_id,
from_exercise_id, from_exercise_variant_id,
to_exercise_id, to_exercise_variant_id,
edge_type, notes
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
graph_id,
from_exercise_id,
from_variant_id,
to_exercise_id,
to_variant_id,
edge_type,
notes,
),
)
new_id = cur.fetchone()["id"]
cur.execute(_EDGE_SELECT + " WHERE e.id = %s", (new_id,))
return r2d(cur.fetchone())
@router.get("/exercise-progression-graphs")
def list_progression_graphs(tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
if is_platform_admin(role):
cur.execute(
"""
SELECT g.*,
(SELECT COUNT(*) FROM exercise_progression_edges e WHERE e.graph_id = g.id) AS edges_count
FROM exercise_progression_graphs g
ORDER BY g.updated_at DESC NULLS LAST, g.name
"""
)
else:
vis_sql, vis_params = library_content_visibility_sql(
alias="g",
profile_id=profile_id,
role=role,
effective_club_id=tenant.effective_club_id,
)
cur.execute(
f"""
SELECT g.*,
(SELECT COUNT(*) FROM exercise_progression_edges e WHERE e.graph_id = g.id) AS edges_count
FROM exercise_progression_graphs g
WHERE ({vis_sql})
ORDER BY g.updated_at DESC NULLS LAST, g.name
""",
vis_params,
)
return [r2d(r) for r in cur.fetchall()]
@router.get("/exercise-progression-graphs/{graph_id}")
def get_progression_graph(
graph_id: int,
include_edges: bool = Query(default=False),
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row = _require_graph_read(cur, graph_id, profile_id, role)
if include_edges:
cur.execute(
_EDGE_SELECT + " WHERE e.graph_id = %s ORDER BY e.id",
(graph_id,),
)
row["edges"] = [r2d(r) for r in cur.fetchall()]
return row
@router.post("/exercise-progression-graphs", status_code=201)
def create_progression_graph(
body: ProgressionGraphCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Keine Berechtigung zum Anlegen von Progressionsgraphen")
name = body.name.strip()
if not name:
raise HTTPException(status_code=400, detail="name ist Pflicht")
vis = (body.visibility or "private").strip().lower()
cid = body.club_id
if vis == "club":
if cid is None:
cid = tenant.effective_club_id
if cid is None:
raise HTTPException(
status_code=400,
detail="Vereins-Graph: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).",
)
gov_club = cid if vis == "club" else None
with get_db() as conn:
cur = get_cursor(conn)
assert_valid_governance_visibility(cur, profile_id, role, vis, gov_club)
cur.execute(
"""
INSERT INTO exercise_progression_graphs (name, description, visibility, club_id, created_by)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(name, body.description, vis, cid if vis == "club" else None, profile_id),
)
gid = cur.fetchone()["id"]
conn.commit()
return get_progression_graph(gid, include_edges=False, tenant=tenant)
@router.put("/exercise-progression-graphs/{graph_id}")
def update_progression_graph(
graph_id: int,
body: ProgressionGraphUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
original = body.model_dump(exclude_unset=True)
if not original:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
row = _require_graph_write(cur, graph_id, profile_id, role)
ex_vis = (row.get("visibility") or "private").strip().lower()
ex_cid_raw = row.get("club_id")
ex_cid = int(ex_cid_raw) if ex_cid_raw is not None else None
next_vis = ex_vis
if "visibility" in original and original["visibility"] is not None:
v_raw = str(original["visibility"]).strip().lower()
if v_raw:
next_vis = v_raw
next_club = ex_cid
if "club_id" in original:
raw_c = original["club_id"]
if raw_c in (None, "", []):
next_club = None
else:
next_club = int(raw_c)
if next_vis == "club":
if next_club is None:
next_club = tenant.effective_club_id
if next_club is None:
raise HTTPException(
status_code=400,
detail="Vereins-Graph: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).",
)
gov_club = next_club if next_vis == "club" else None
assert_valid_governance_visibility(cur, profile_id, role, next_vis, gov_club)
fields: List[str] = []
params: List[Any] = []
if "name" in original:
n = (original["name"] or "").strip()
if not n:
raise HTTPException(status_code=400, detail="name ist Pflicht")
fields.append("name = %s")
params.append(n)
if "description" in original:
fields.append("description = %s")
params.append(original["description"])
vis_changed = next_vis != ex_vis
if "visibility" in original or vis_changed:
fields.append("visibility = %s")
params.append(next_vis)
if "club_id" in original or vis_changed:
fields.append("club_id = %s")
params.append(next_club if next_vis == "club" else None)
if not fields:
return get_progression_graph(graph_id, include_edges=False, tenant=tenant)
fields.append("updated_at = NOW()")
params.append(graph_id)
cur.execute(
f"UPDATE exercise_progression_graphs SET {', '.join(fields)} WHERE id = %s",
tuple(params),
)
conn.commit()
return get_progression_graph(graph_id, include_edges=False, tenant=tenant)
@router.delete("/exercise-progression-graphs/{graph_id}")
def delete_progression_graph(graph_id: int, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute("DELETE FROM exercise_progression_graphs WHERE id = %s", (graph_id,))
conn.commit()
return {"ok": True}
@router.get("/exercise-progression-graphs/{graph_id}/edges")
def list_progression_edges(
graph_id: int,
from_exercise_id: Optional[int] = Query(default=None),
to_exercise_id: Optional[int] = Query(default=None),
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_read(cur, graph_id, profile_id, role)
q = _EDGE_SELECT + " WHERE e.graph_id = %s"
params: List[Any] = [graph_id]
if from_exercise_id is not None:
q += " AND e.from_exercise_id = %s"
params.append(from_exercise_id)
if to_exercise_id is not None:
q += " AND e.to_exercise_id = %s"
params.append(to_exercise_id)
q += " ORDER BY e.id"
cur.execute(q, tuple(params))
return [r2d(r) for r in cur.fetchall()]
@router.post("/exercise-progression-graphs/{graph_id}/edges", status_code=201)
def create_progression_edge(
graph_id: int,
body: ProgressionEdgeCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
_assert_exercises_exist(cur, body.from_exercise_id, body.to_exercise_id)
fv = body.from_exercise_variant_id
tv = body.to_exercise_variant_id
_assert_variant_for_exercise(cur, body.from_exercise_id, fv)
_assert_variant_for_exercise(cur, body.to_exercise_id, tv)
et = (body.edge_type or "next_exercise").strip() or "next_exercise"
notes = (body.notes or "").strip() or None
try:
row = _insert_edge_row(
cur,
graph_id,
body.from_exercise_id,
fv,
body.to_exercise_id,
tv,
et,
notes,
)
conn.commit()
except IntegrityError as e:
conn.rollback()
raise HTTPException(
status_code=409,
detail="Kante existiert bereits oder Endpunkte unzulässig (gleiche Übung ohne zwei Varianten)",
) from e
return row
@router.post("/exercise-progression-graphs/{graph_id}/edges/sequence", status_code=201)
def create_progression_sequence(
graph_id: int,
body: ProgressionSequenceCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Legt n1 Nachfolger-Kanten (next_exercise) für eine geordnete Schrittliste an."""
profile_id = tenant.profile_id
role = tenant.global_role
steps = body.steps
n_seg = len(steps) - 1
seg_notes = body.segment_notes
created: List[dict] = []
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
ex_ids = [s.exercise_id for s in steps]
_assert_exercises_exist(cur, *ex_ids)
try:
for i in range(n_seg):
a, b = steps[i], steps[i + 1]
_assert_variant_for_exercise(cur, a.exercise_id, a.variant_id)
_assert_variant_for_exercise(cur, b.exercise_id, b.variant_id)
note = None
if seg_notes is not None:
raw = seg_notes[i]
note = (raw or "").strip() or None if raw is not None else None
row = _insert_edge_row(
cur,
graph_id,
a.exercise_id,
a.variant_id,
b.exercise_id,
b.variant_id,
"next_exercise",
note,
)
created.append(row)
conn.commit()
except IntegrityError as e:
conn.rollback()
raise HTTPException(
status_code=409,
detail="Sequenz konnte nicht vollständig angelegt werden (Duplikat oder ungültige Endpunkte). Keine Teilmenge gespeichert.",
) from e
return {"created": created, "count": len(created)}
@router.put("/exercise-progression-graphs/{graph_id}/edges/{edge_id}")
def update_progression_edge(
graph_id: int,
edge_id: int,
body: ProgressionEdgeUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
data = body.model_dump(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute(
"SELECT id FROM exercise_progression_edges WHERE id = %s AND graph_id = %s",
(edge_id, graph_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Kante nicht gefunden")
if "notes" in data:
n = data["notes"]
notes_val = (n or "").strip() or None if n is not None else None
cur.execute(
"UPDATE exercise_progression_edges SET notes = %s WHERE id = %s AND graph_id = %s",
(notes_val, edge_id, graph_id),
)
conn.commit()
cur.execute(_EDGE_SELECT + " WHERE e.id = %s AND e.graph_id = %s", (edge_id, graph_id))
return r2d(cur.fetchone())
@router.delete("/exercise-progression-graphs/{graph_id}/edges/{edge_id}")
def delete_progression_edge(
graph_id: int,
edge_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute(
"""
DELETE FROM exercise_progression_edges
WHERE id = %s AND graph_id = %s
RETURNING id
""",
(edge_id, graph_id),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Kante nicht gefunden")
conn.commit()
return {"ok": True}
@router.post("/exercise-progression-graphs/{graph_id}/edges/delete-batch")
def delete_progression_edges_batch(
graph_id: int,
body: EdgeIdsBatch,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Löscht mehrere Kanten (z. B. eine zusammenhängende Kette in einem Schritt)."""
profile_id = tenant.profile_id
role = tenant.global_role
ids = body.edge_ids
clean_ids = list(dict.fromkeys(ids))
with get_db() as conn:
cur = get_cursor(conn)
_require_graph_write(cur, graph_id, profile_id, role)
cur.execute(
f"""
DELETE FROM exercise_progression_edges
WHERE graph_id = %s AND id IN ({",".join(["%s"] * len(clean_ids))})
""",
(graph_id, *clean_ids),
)
deleted = cur.rowcount
conn.commit()
return {"ok": True, "deleted": deleted}