All checks were successful
Deploy Development / deploy (push) Successful in 39s
Test Suite / pytest-backend (push) Successful in 36s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m9s
- Bumped APP_VERSION to 0.8.138 and updated the changelog to reflect recent changes. - Enhanced training unit planning with support for POST/PUT requests including phases and parallel streams. - Fixed role assignment validation for stream co-trainers and added integration tests for phase handling. - Updated the training planning API to improve data structure and retrieval for nested phases and sections.
291 lines
9.7 KiB
Python
291 lines
9.7 KiB
Python
"""
|
|
PostgreSQL-Integration: Roundtrip _replace_unit_sections / _replace_unit_phases ↔ Fetch-Helfer.
|
|
|
|
Aktivierung:
|
|
- Lokal: TRAINING_PLANNING_INTEGRATION=1
|
|
- CI: .gitea/workflows/test.yml setzt die Variable beim pytest-Lauf (deployter Backend-Container + PostgreSQL).
|
|
|
|
Voraussetzung: migrierte DB, DB_* wie Docker-Compose.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from db import get_db, get_cursor
|
|
from routers.training_planning import (
|
|
_fetch_phases_nested,
|
|
_fetch_sections,
|
|
_replace_unit_phases,
|
|
_replace_unit_sections,
|
|
)
|
|
|
|
|
|
def _integration_enabled() -> bool:
|
|
return os.getenv("TRAINING_PLANNING_INTEGRATION", "").strip().lower() in ("1", "true", "yes")
|
|
|
|
|
|
pytestmark = [
|
|
pytest.mark.integration,
|
|
pytest.mark.skipif(
|
|
not _integration_enabled(),
|
|
reason="TRAINING_PLANNING_INTEGRATION=1 und PostgreSQL (DB_*) erforderlich",
|
|
),
|
|
]
|
|
|
|
|
|
def _db_ping() -> bool:
|
|
try:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT 1 AS ok")
|
|
row = cur.fetchone()
|
|
return row is not None and row.get("ok") == 1
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def db_ready():
|
|
if not _db_ping():
|
|
pytest.skip("PostgreSQL nicht erreichbar (DB_HOST/DB_PORT/…)")
|
|
|
|
|
|
def test_replace_sections_roundtrip(db_ready):
|
|
"""INSERT-Hilfsdaten, replace, fetch — gleiche Semantik wie produktiver PUT-Pfad."""
|
|
suffix = uuid.uuid4().hex[:12]
|
|
club_name = f"tpl_it_club_{suffix}"
|
|
email = f"tpl_it_{suffix}@test.local"
|
|
|
|
from auth import hash_pin
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id",
|
|
(club_name, "T", "active"),
|
|
)
|
|
club_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO profiles (email, pin_hash, name, role, active_club_id)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(email, hash_pin("x"), f"TP {suffix}", "trainer", club_id),
|
|
)
|
|
profile_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO training_groups (club_id, name, trainer_id, status)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(club_id, f"Gruppe {suffix}", profile_id, "active"),
|
|
)
|
|
group_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO exercises (title, goal, execution, visibility, status, created_by)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(f"Übung {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id),
|
|
)
|
|
ex_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO training_units (
|
|
group_id, planned_date, status, created_by
|
|
) VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(group_id, "2026-06-01", "planned", profile_id),
|
|
)
|
|
unit_id = int(cur.fetchone()["id"])
|
|
|
|
sections_in = [
|
|
{
|
|
"title": "A1",
|
|
"order_index": 0,
|
|
"guidance_notes": "gn",
|
|
"items": [
|
|
{
|
|
"item_type": "note",
|
|
"order_index": 0,
|
|
"note_body": "Hinweis",
|
|
},
|
|
{
|
|
"item_type": "exercise",
|
|
"order_index": 1,
|
|
"exercise_id": ex_id,
|
|
"planned_duration_min": 5,
|
|
"notes": "n1",
|
|
},
|
|
],
|
|
},
|
|
{
|
|
"title": "B2",
|
|
"order_index": 1,
|
|
"items": [],
|
|
},
|
|
]
|
|
_replace_unit_sections(cur, unit_id, sections_in)
|
|
loaded = _fetch_sections(cur, unit_id)
|
|
conn.commit()
|
|
|
|
try:
|
|
assert len(loaded) == 2
|
|
assert loaded[0]["title"] == "A1"
|
|
assert loaded[0]["guidance_notes"] == "gn"
|
|
assert loaded[0]["order_index"] == 0
|
|
assert len(loaded[0]["items"]) == 2
|
|
assert loaded[0]["items"][0]["item_type"] == "note"
|
|
assert loaded[0]["items"][0]["note_body"] == "Hinweis"
|
|
assert loaded[0]["items"][1]["item_type"] == "exercise"
|
|
assert int(loaded[0]["items"][1]["exercise_id"]) == ex_id
|
|
assert loaded[0]["items"][1]["planned_duration_min"] == 5
|
|
assert loaded[1]["title"] == "B2"
|
|
assert loaded[1]["items"] == []
|
|
finally:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("DELETE FROM training_units WHERE id = %s", (unit_id,))
|
|
cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,))
|
|
cur.execute("DELETE FROM training_groups WHERE id = %s", (group_id,))
|
|
cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,))
|
|
cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,))
|
|
conn.commit()
|
|
|
|
|
|
def test_replace_phases_roundtrip_parallel_stream(db_ready):
|
|
"""Phasen inkl. parallel-Stream-Sektionen ersetzen und wieder laden."""
|
|
suffix = uuid.uuid4().hex[:12]
|
|
club_name = f"ph_it_club_{suffix}"
|
|
email = f"ph_it_{suffix}@test.local"
|
|
|
|
from auth import hash_pin
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id",
|
|
(club_name, "P", "active"),
|
|
)
|
|
club_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO profiles (email, pin_hash, name, role, active_club_id)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(email, hash_pin("x"), f"PH {suffix}", "trainer", club_id),
|
|
)
|
|
profile_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO training_groups (club_id, name, trainer_id, status)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(club_id, f"Gruppe PH {suffix}", profile_id, "active"),
|
|
)
|
|
group_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO exercises (title, goal, execution, visibility, status, created_by)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(f"Übung PH {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id),
|
|
)
|
|
ex_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO training_units (
|
|
group_id, planned_date, status, created_by
|
|
) VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(group_id, "2026-06-02", "planned", profile_id),
|
|
)
|
|
unit_id = int(cur.fetchone()["id"])
|
|
|
|
phases_in = [
|
|
{
|
|
"phase_kind": "whole_group",
|
|
"order_index": 0,
|
|
"title": "Aufwärmen",
|
|
"sections": [
|
|
{
|
|
"title": "Gemeinsam",
|
|
"order_index": 0,
|
|
"items": [
|
|
{"item_type": "note", "order_index": 0, "note_body": "Los"},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
{
|
|
"phase_kind": "parallel",
|
|
"order_index": 1,
|
|
"title": "Breakout",
|
|
"streams": [
|
|
{
|
|
"order_index": 0,
|
|
"title": "Matte A",
|
|
"sections": [
|
|
{
|
|
"title": "Technik A",
|
|
"order_index": 0,
|
|
"items": [
|
|
{
|
|
"item_type": "exercise",
|
|
"order_index": 0,
|
|
"exercise_id": ex_id,
|
|
"planned_duration_min": 10,
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
]
|
|
_replace_unit_phases(cur, unit_id, phases_in, profile_id, "trainer", profile_id)
|
|
nested = _fetch_phases_nested(cur, unit_id)
|
|
flat_sec = _fetch_sections(cur, unit_id)
|
|
conn.commit()
|
|
|
|
try:
|
|
assert len(nested) == 2
|
|
assert nested[0]["phase_kind"] == "whole_group"
|
|
assert len(nested[0].get("sections") or []) == 1
|
|
assert nested[1]["phase_kind"] == "parallel"
|
|
streams = nested[1].get("streams") or []
|
|
assert len(streams) == 1
|
|
assert len(streams[0].get("sections") or []) == 1
|
|
assert streams[0]["sections"][0]["title"] == "Technik A"
|
|
assert len(streams[0]["sections"][0].get("items") or []) == 1
|
|
assert int(streams[0]["sections"][0]["items"][0]["exercise_id"]) == ex_id
|
|
assert len(flat_sec) == 2
|
|
finally:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("DELETE FROM training_units WHERE id = %s", (unit_id,))
|
|
cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,))
|
|
cur.execute("DELETE FROM training_groups WHERE id = %s", (group_id,))
|
|
cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,))
|
|
cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,))
|
|
conn.commit()
|