shinkan-jinkendo/backend/tests/test_training_planning_sections_integration.py
Lars 0d34c0a73d
All checks were successful
Deploy Development / deploy (push) Successful in 38s
Test Suite / pytest-backend (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 13s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m8s
Update pytest configuration and documentation for training planning integration
- Enhanced the pytest workflow in `.gitea/workflows/test.yml` to include `TRAINING_PLANNING_INTEGRATION` for improved testing against the PostgreSQL database.
- Updated `pytest.ini` to clarify integration marker usage, specifying both `ACCESS_LAYER_INTEGRATION` and `TRAINING_PLANNING_INTEGRATION`.
- Revised documentation in `test_training_planning_sections_integration.py` to provide clearer activation instructions for local and CI environments.
2026-05-14 22:35:12 +02:00

160 lines
5.0 KiB
Python

"""
PostgreSQL-Integration: Roundtrip _replace_unit_sections ↔ _fetch_sections.
Aktivierung:
- Lokal: TRAINING_PLANNING_INTEGRATION=1
- CI: .gitea/workflows/test.yml setzt die Variable beim pytest-Lauf (deployter Backend-Container + PostgreSQL).
Voraussetzung: migrierte DB, DB_* wie Docker-Compose.
"""
from __future__ import annotations
import os
import uuid
import pytest
from db import get_db, get_cursor
from routers.training_planning import _fetch_sections, _replace_unit_sections
def _integration_enabled() -> bool:
return os.getenv("TRAINING_PLANNING_INTEGRATION", "").strip().lower() in ("1", "true", "yes")
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
not _integration_enabled(),
reason="TRAINING_PLANNING_INTEGRATION=1 und PostgreSQL (DB_*) erforderlich",
),
]
def _db_ping() -> bool:
try:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT 1 AS ok")
row = cur.fetchone()
return row is not None and row.get("ok") == 1
except Exception:
return False
@pytest.fixture(scope="module")
def db_ready():
if not _db_ping():
pytest.skip("PostgreSQL nicht erreichbar (DB_HOST/DB_PORT/…)")
def test_replace_sections_roundtrip(db_ready):
"""INSERT-Hilfsdaten, replace, fetch — gleiche Semantik wie produktiver PUT-Pfad."""
suffix = uuid.uuid4().hex[:12]
club_name = f"tpl_it_club_{suffix}"
email = f"tpl_it_{suffix}@test.local"
from auth import hash_pin
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id",
(club_name, "T", "active"),
)
club_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO profiles (email, pin_hash, name, role, active_club_id)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(email, hash_pin("x"), f"TP {suffix}", "trainer", club_id),
)
profile_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO training_groups (club_id, name, trainer_id, status)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
(club_id, f"Gruppe {suffix}", profile_id, "active"),
)
group_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO exercises (title, goal, execution, visibility, status, created_by)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""",
(f"Übung {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id),
)
ex_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO training_units (
group_id, planned_date, status, created_by
) VALUES (%s, %s, %s, %s)
RETURNING id
""",
(group_id, "2026-06-01", "planned", profile_id),
)
unit_id = int(cur.fetchone()["id"])
sections_in = [
{
"title": "A1",
"order_index": 0,
"guidance_notes": "gn",
"items": [
{
"item_type": "note",
"order_index": 0,
"note_body": "Hinweis",
},
{
"item_type": "exercise",
"order_index": 1,
"exercise_id": ex_id,
"planned_duration_min": 5,
"notes": "n1",
},
],
},
{
"title": "B2",
"order_index": 1,
"items": [],
},
]
_replace_unit_sections(cur, unit_id, sections_in)
loaded = _fetch_sections(cur, unit_id)
conn.commit()
try:
assert len(loaded) == 2
assert loaded[0]["title"] == "A1"
assert loaded[0]["guidance_notes"] == "gn"
assert loaded[0]["order_index"] == 0
assert len(loaded[0]["items"]) == 2
assert loaded[0]["items"][0]["item_type"] == "note"
assert loaded[0]["items"][0]["note_body"] == "Hinweis"
assert loaded[0]["items"][1]["item_type"] == "exercise"
assert int(loaded[0]["items"][1]["exercise_id"]) == ex_id
assert loaded[0]["items"][1]["planned_duration_min"] == 5
assert loaded[1]["title"] == "B2"
assert loaded[1]["items"] == []
finally:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM training_units WHERE id = %s", (unit_id,))
cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,))
cur.execute("DELETE FROM training_groups WHERE id = %s", (group_id,))
cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,))
cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,))
conn.commit()