shinkan-jinkendo/backend/tests/test_training_framework_phases_integration.py
Lars 88c4201f80
All checks were successful
Deploy Development / deploy (push) Successful in 39s
Test Suite / pytest-backend (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m8s
Refactor training framework to improve phase management and user experience
- Enhanced phase handling in training unit hydration and insertion processes, ensuring better data integrity.
- Updated frontend components to support phase representation in training framework slots.
- Improved user interface controls for managing parallel phases, optimizing user experience during training program edits.
- Refactored payload building functions to accommodate phase adjustments, enhancing save functionality for training plans.
2026-05-16 07:27:18 +02:00

207 lines
6.5 KiB
Python

"""
PostgreSQL-Integration: Rahmenprogramm-Slot mit verschachtelten `phases` (Blueprint-Unit).
Aktivierung:
- Lokal: TRAINING_PLANNING_INTEGRATION=1
- CI: .gitea/workflows/test.yml setzt die Variable beim pytest-Lauf.
Prüft `_insert_slots_and_blueprints` → `_replace_unit_phases` wie beim API-PUT mit Slot-Payload.
"""
from __future__ import annotations
import os
import uuid
import pytest
from db import get_db, get_cursor
from routers.training_framework_programs import _insert_slots_and_blueprints
from routers.training_planning import _fetch_phases_nested
def _integration_enabled() -> bool:
return os.getenv("TRAINING_PLANNING_INTEGRATION", "").strip().lower() in ("1", "true", "yes")
pytestmark = [
pytest.mark.integration,
pytest.mark.skipif(
not _integration_enabled(),
reason="TRAINING_PLANNING_INTEGRATION=1 und PostgreSQL (DB_*) erforderlich",
),
]
def _db_ping() -> bool:
try:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT 1 AS ok")
row = cur.fetchone()
return row is not None and row.get("ok") == 1
except Exception:
return False
@pytest.fixture(scope="module")
def db_ready():
if not _db_ping():
pytest.skip("PostgreSQL nicht erreichbar (DB_HOST/DB_PORT/…)")
def test_framework_blueprint_slot_phases_roundtrip(db_ready):
"""Ein Slot mit `phases` erzeugt eine Blueprint-Unit mit identischer Phasenstruktur."""
suffix = uuid.uuid4().hex[:12]
club_name = f"fw_ph_club_{suffix}"
email = f"fw_ph_{suffix}@test.local"
from auth import hash_pin
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id",
(club_name, "F", "active"),
)
club_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO profiles (email, pin_hash, name, role, active_club_id)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(email, hash_pin("x"), f"FWPH {suffix}", "trainer", club_id),
)
profile_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO exercises (title, goal, execution, visibility, status, created_by)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""",
(f"Übung FWPH {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id),
)
ex_id = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO training_framework_programs (
title, description,
planned_period_start, planned_period_end,
visibility, club_id, created_by,
focus_area_id, style_direction_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
f"Rahmen FWPH {suffix}",
None,
None,
None,
"private",
club_id,
profile_id,
None,
None,
),
)
fw_id = int(cur.fetchone()["id"])
phases_in = [
{
"phase_kind": "whole_group",
"order_index": 0,
"title": "Aufwärmen",
"sections": [
{
"title": "Gemeinsam",
"order_index": 0,
"items": [
{"item_type": "note", "order_index": 0, "note_body": "Los"},
],
},
],
},
{
"phase_kind": "parallel",
"order_index": 1,
"title": "Breakout",
"streams": [
{
"order_index": 0,
"title": "Matte A",
"sections": [
{
"title": "Technik A",
"order_index": 0,
"items": [
{
"item_type": "exercise",
"order_index": 0,
"exercise_id": ex_id,
"planned_duration_min": 10,
},
],
},
],
},
],
},
]
slots_in = [
{
"sort_order": 0,
"title": "Session 1",
"notes": None,
"phases": phases_in,
},
]
_insert_slots_and_blueprints(cur, fw_id, slots_in, profile_id, "trainer")
cur.execute(
"""
SELECT id FROM training_framework_slots
WHERE framework_program_id = %s
ORDER BY sort_order
LIMIT 1
""",
(fw_id,),
)
slot_row = cur.fetchone()
assert slot_row is not None
slot_id = int(slot_row["id"])
cur.execute(
"SELECT id FROM training_units WHERE framework_slot_id = %s",
(slot_id,),
)
bu_row = cur.fetchone()
assert bu_row is not None
blueprint_unit_id = int(bu_row["id"])
nested = _fetch_phases_nested(cur, blueprint_unit_id)
conn.commit()
try:
assert len(nested) == 2
assert nested[0]["phase_kind"] == "whole_group"
assert len(nested[0].get("sections") or []) == 1
assert nested[1]["phase_kind"] == "parallel"
streams = nested[1].get("streams") or []
assert len(streams) == 1
assert len(streams[0].get("sections") or []) == 1
assert streams[0]["sections"][0]["title"] == "Technik A"
assert int(streams[0]["sections"][0]["items"][0]["exercise_id"]) == ex_id
finally:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM training_framework_programs WHERE id = %s", (fw_id,))
cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,))
cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,))
cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,))
conn.commit()