diff --git a/backend/tests/test_training_framework_phases_integration.py b/backend/tests/test_training_framework_phases_integration.py new file mode 100644 index 0000000..fd9a333 --- /dev/null +++ b/backend/tests/test_training_framework_phases_integration.py @@ -0,0 +1,206 @@ +""" +PostgreSQL-Integration: Rahmenprogramm-Slot mit verschachtelten `phases` (Blueprint-Unit). + +Aktivierung: + - Lokal: TRAINING_PLANNING_INTEGRATION=1 + - CI: .gitea/workflows/test.yml setzt die Variable beim pytest-Lauf. + +Prüft `_insert_slots_and_blueprints` → `_replace_unit_phases` wie beim API-PUT mit Slot-Payload. +""" +from __future__ import annotations + +import os +import uuid + +import pytest + +from db import get_db, get_cursor +from routers.training_framework_programs import _insert_slots_and_blueprints +from routers.training_planning import _fetch_phases_nested + + +def _integration_enabled() -> bool: + return os.getenv("TRAINING_PLANNING_INTEGRATION", "").strip().lower() in ("1", "true", "yes") + + +pytestmark = [ + pytest.mark.integration, + pytest.mark.skipif( + not _integration_enabled(), + reason="TRAINING_PLANNING_INTEGRATION=1 und PostgreSQL (DB_*) erforderlich", + ), +] + + +def _db_ping() -> bool: + try: + with get_db() as conn: + cur = get_cursor(conn) + cur.execute("SELECT 1 AS ok") + row = cur.fetchone() + return row is not None and row.get("ok") == 1 + except Exception: + return False + + +@pytest.fixture(scope="module") +def db_ready(): + if not _db_ping(): + pytest.skip("PostgreSQL nicht erreichbar (DB_HOST/DB_PORT/…)") + + +def test_framework_blueprint_slot_phases_roundtrip(db_ready): + """Ein Slot mit `phases` erzeugt eine Blueprint-Unit mit identischer Phasenstruktur.""" + suffix = uuid.uuid4().hex[:12] + club_name = f"fw_ph_club_{suffix}" + email = f"fw_ph_{suffix}@test.local" + + from auth import hash_pin + + with get_db() as conn: + cur = get_cursor(conn) + cur.execute( + "INSERT INTO clubs (name, abbreviation, status) VALUES (%s, %s, %s) RETURNING id", + (club_name, "F", "active"), + ) + club_id = int(cur.fetchone()["id"]) + + cur.execute( + """ + INSERT INTO profiles (email, pin_hash, name, role, active_club_id) + VALUES (%s, %s, %s, %s, %s) + RETURNING id + """, + (email, hash_pin("x"), f"FWPH {suffix}", "trainer", club_id), + ) + profile_id = int(cur.fetchone()["id"]) + + cur.execute( + """ + INSERT INTO exercises (title, goal, execution, visibility, status, created_by) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING id + """, + (f"Übung FWPH {suffix}", "Ziel", "Ablauf", "private", "draft", profile_id), + ) + ex_id = int(cur.fetchone()["id"]) + + cur.execute( + """ + INSERT INTO training_framework_programs ( + title, description, + planned_period_start, planned_period_end, + visibility, club_id, created_by, + focus_area_id, style_direction_id + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + RETURNING id + """, + ( + f"Rahmen FWPH {suffix}", + None, + None, + None, + "private", + club_id, + profile_id, + None, + None, + ), + ) + fw_id = int(cur.fetchone()["id"]) + + phases_in = [ + { + "phase_kind": "whole_group", + "order_index": 0, + "title": "Aufwärmen", + "sections": [ + { + "title": "Gemeinsam", + "order_index": 0, + "items": [ + {"item_type": "note", "order_index": 0, "note_body": "Los"}, + ], + }, + ], + }, + { + "phase_kind": "parallel", + "order_index": 1, + "title": "Breakout", + "streams": [ + { + "order_index": 0, + "title": "Matte A", + "sections": [ + { + "title": "Technik A", + "order_index": 0, + "items": [ + { + "item_type": "exercise", + "order_index": 0, + "exercise_id": ex_id, + "planned_duration_min": 10, + }, + ], + }, + ], + }, + ], + }, + ] + + slots_in = [ + { + "sort_order": 0, + "title": "Session 1", + "notes": None, + "phases": phases_in, + }, + ] + + _insert_slots_and_blueprints(cur, fw_id, slots_in, profile_id, "trainer") + + cur.execute( + """ + SELECT id FROM training_framework_slots + WHERE framework_program_id = %s + ORDER BY sort_order + LIMIT 1 + """, + (fw_id,), + ) + slot_row = cur.fetchone() + assert slot_row is not None + slot_id = int(slot_row["id"]) + + cur.execute( + "SELECT id FROM training_units WHERE framework_slot_id = %s", + (slot_id,), + ) + bu_row = cur.fetchone() + assert bu_row is not None + blueprint_unit_id = int(bu_row["id"]) + + nested = _fetch_phases_nested(cur, blueprint_unit_id) + conn.commit() + + try: + assert len(nested) == 2 + assert nested[0]["phase_kind"] == "whole_group" + assert len(nested[0].get("sections") or []) == 1 + assert nested[1]["phase_kind"] == "parallel" + streams = nested[1].get("streams") or [] + assert len(streams) == 1 + assert len(streams[0].get("sections") or []) == 1 + assert streams[0]["sections"][0]["title"] == "Technik A" + assert int(streams[0]["sections"][0]["items"][0]["exercise_id"]) == ex_id + finally: + with get_db() as conn: + cur = get_cursor(conn) + cur.execute("DELETE FROM training_framework_programs WHERE id = %s", (fw_id,)) + cur.execute("DELETE FROM exercises WHERE id = %s", (ex_id,)) + cur.execute("DELETE FROM profiles WHERE id = %s", (profile_id,)) + cur.execute("DELETE FROM clubs WHERE id = %s", (club_id,)) + conn.commit()