All checks were successful
Deploy Development / deploy (push) Successful in 39s
Test Suite / pytest-backend (push) Successful in 36s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m9s
- Updated the backend to improve the fetching and insertion of training unit sections, including a new function for handling section items. - Added documentation notes regarding the unique constraint on `training_unit_sections` and the implications for parallel training streams. - Updated frontend components and utility functions to reflect changes in the training planning API and to prepare for future enhancements related to parallel streams.
160 lines
5.1 KiB
Python
160 lines
5.1 KiB
Python
"""
|
|
PostgreSQL-Integration: Roundtrip _replace_unit_sections ↔ _fetch_sections.
|
|
|
|
Aktivierung (lokal, analog zu test_access_layer_integration):
|
|
set TRAINING_PLANNING_INTEGRATION=1
|
|
pytest tests/test_training_planning_sections_integration.py -v -m integration
|
|
|
|
Voraussetzung: migrierte DB, DB_* wie Docker-Compose.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from db import get_db, get_cursor
|
|
from routers.training_planning import _fetch_sections, _replace_unit_sections
|
|
|
|
|
|
def _integration_enabled() -> bool:
|
|
return os.getenv("TRAINING_PLANNING_INTEGRATION", "").strip().lower() in ("1", "true", "yes")
|
|
|
|
|
|
pytestmark = [
|
|
pytest.mark.integration,
|
|
pytest.mark.skipif(
|
|
not _integration_enabled(),
|
|
reason="TRAINING_PLANNING_INTEGRATION=1 und PostgreSQL (DB_*) erforderlich",
|
|
),
|
|
]
|
|
|
|
|
|
def _db_ping() -> bool:
|
|
try:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT 1 AS ok")
|
|
row = cur.fetchone()
|
|
return row is not None and row.get("ok") == 1
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def db_ready():
|
|
if not _db_ping():
|
|
pytest.skip("PostgreSQL nicht erreichbar (DB_HOST/DB_PORT/…)")
|
|
|
|
|
|
def test_replace_sections_roundtrip(db_ready):
|
|
"""INSERT-Hilfsdaten, replace, fetch — gleiche Semantik wie produktiver PUT-Pfad."""
|
|
suffix = uuid.uuid4().hex[:12]
|
|
club_name = f"tpl_it_club_{suffix}"
|
|
email = f"tpl_it_{suffix}@test.local"
|
|
|
|
from auth import hash_pin
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id",
|
|
(club_name, "T", "active"),
|
|
)
|
|
club_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO profiles (email, pin_hash, name, role, active_club_id)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(email, hash_pin("x"), f"TP {suffix}", "trainer", club_id),
|
|
)
|
|
profile_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO training_groups (club_id, name, trainer_id, status)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(club_id, f"Gruppe {suffix}", profile_id, "active"),
|
|
)
|
|
group_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO exercises (title, goal, execution, visibility, status, created_by)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(f"Übung {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id),
|
|
)
|
|
ex_id = int(cur.fetchone()["id"])
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO training_units (
|
|
group_id, planned_date, status, created_by
|
|
) VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(group_id, "2026-06-01", "planned", profile_id),
|
|
)
|
|
unit_id = int(cur.fetchone()["id"])
|
|
|
|
sections_in = [
|
|
{
|
|
"title": "A1",
|
|
"order_index": 0,
|
|
"guidance_notes": "gn",
|
|
"items": [
|
|
{
|
|
"item_type": "note",
|
|
"order_index": 0,
|
|
"note_body": "Hinweis",
|
|
},
|
|
{
|
|
"item_type": "exercise",
|
|
"order_index": 1,
|
|
"exercise_id": ex_id,
|
|
"planned_duration_min": 5,
|
|
"notes": "n1",
|
|
},
|
|
],
|
|
},
|
|
{
|
|
"title": "B2",
|
|
"order_index": 1,
|
|
"items": [],
|
|
},
|
|
]
|
|
_replace_unit_sections(cur, unit_id, sections_in)
|
|
loaded = _fetch_sections(cur, unit_id)
|
|
conn.commit()
|
|
|
|
try:
|
|
assert len(loaded) == 2
|
|
assert loaded[0]["title"] == "A1"
|
|
assert loaded[0]["guidance_notes"] == "gn"
|
|
assert loaded[0]["order_index"] == 0
|
|
assert len(loaded[0]["items"]) == 2
|
|
assert loaded[0]["items"][0]["item_type"] == "note"
|
|
assert loaded[0]["items"][0]["note_body"] == "Hinweis"
|
|
assert loaded[0]["items"][1]["item_type"] == "exercise"
|
|
assert int(loaded[0]["items"][1]["exercise_id"]) == ex_id
|
|
assert loaded[0]["items"][1]["planned_duration_min"] == 5
|
|
assert loaded[1]["title"] == "B2"
|
|
assert loaded[1]["items"] == []
|
|
finally:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("DELETE FROM training_units WHERE id = %s", (unit_id,))
|
|
cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,))
|
|
cur.execute("DELETE FROM training_groups WHERE id = %s", (group_id,))
|
|
cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,))
|
|
cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,))
|
|
conn.commit()
|