shinkan-jinkendo/backend/tests/test_training_planning_sections_integration.py
Lars 220a16429c
All checks were successful
Deploy Development / deploy (push) Successful in 39s
Test Suite / pytest-backend (push) Successful in 36s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m9s
Enhance training unit sections handling and documentation for parallel training streams
- Updated the backend to improve the fetching and insertion of training unit sections, including a new function for handling section items.
- Added documentation notes regarding the unique constraint on `training_unit_sections` and the implications for parallel training streams.
- Updated frontend components and utility functions to reflect changes in the training planning API and to prepare for future enhancements related to parallel streams.
2026-05-14 22:24:55 +02:00

160 lines
5.1 KiB
Python

"""
PostgreSQL-Integration: Roundtrip _replace_unit_sections ↔ _fetch_sections.
Aktivierung (lokal, analog zu test_access_layer_integration):
set TRAINING_PLANNING_INTEGRATION=1
pytest tests/test_training_planning_sections_integration.py -v -m integration
Voraussetzung: migrierte DB, DB_* wie Docker-Compose.
"""
from __future__ import annotations
import os
import uuid
import pytest
from db import get_db, get_cursor
from routers.training_planning import _fetch_sections, _replace_unit_sections
def _integration_enabled() -> bool:
return os.getenv("TRAINING_PLANNING_INTEGRATION", "").strip().lower() in ("1", "true", "yes")
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
not _integration_enabled(),
reason="TRAINING_PLANNING_INTEGRATION=1 und PostgreSQL (DB_*) erforderlich",
),
]
def _db_ping() -> bool:
try:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT 1 AS ok")
row = cur.fetchone()
return row is not None and row.get("ok") == 1
except Exception:
return False
@pytest.fixture(scope="module")
def db_ready():
if not _db_ping():
pytest.skip("PostgreSQL nicht erreichbar (DB_HOST/DB_PORT/…)")
def test_replace_sections_roundtrip(db_ready):
"""INSERT-Hilfsdaten, replace, fetch — gleiche Semantik wie produktiver PUT-Pfad."""
suffix = uuid.uuid4().hex[:12]
club_name = f"tpl_it_club_{suffix}"
email = f"tpl_it_{suffix}@test.local"
from auth import hash_pin
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id",
(club_name, "T", "active"),
)
club_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO profiles (email, pin_hash, name, role, active_club_id)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(email, hash_pin("x"), f"TP {suffix}", "trainer", club_id),
)
profile_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO training_groups (club_id, name, trainer_id, status)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
(club_id, f"Gruppe {suffix}", profile_id, "active"),
)
group_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO exercises (title, goal, execution, visibility, status, created_by)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""",
(f"Übung {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id),
)
ex_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO training_units (
group_id, planned_date, status, created_by
) VALUES (%s, %s, %s, %s)
RETURNING id
""",
(group_id, "2026-06-01", "planned", profile_id),
)
unit_id = int(cur.fetchone()["id"])
sections_in = [
{
"title": "A1",
"order_index": 0,
"guidance_notes": "gn",
"items": [
{
"item_type": "note",
"order_index": 0,
"note_body": "Hinweis",
},
{
"item_type": "exercise",
"order_index": 1,
"exercise_id": ex_id,
"planned_duration_min": 5,
"notes": "n1",
},
],
},
{
"title": "B2",
"order_index": 1,
"items": [],
},
]
_replace_unit_sections(cur, unit_id, sections_in)
loaded = _fetch_sections(cur, unit_id)
conn.commit()
try:
assert len(loaded) == 2
assert loaded[0]["title"] == "A1"
assert loaded[0]["guidance_notes"] == "gn"
assert loaded[0]["order_index"] == 0
assert len(loaded[0]["items"]) == 2
assert loaded[0]["items"][0]["item_type"] == "note"
assert loaded[0]["items"][0]["note_body"] == "Hinweis"
assert loaded[0]["items"][1]["item_type"] == "exercise"
assert int(loaded[0]["items"][1]["exercise_id"]) == ex_id
assert loaded[0]["items"][1]["planned_duration_min"] == 5
assert loaded[1]["title"] == "B2"
assert loaded[1]["items"] == []
finally:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM training_units WHERE id = %s", (unit_id,))
cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,))
cur.execute("DELETE FROM training_groups WHERE id = %s", (group_id,))
cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,))
cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,))
conn.commit()