shinkan-jinkendo/backend/routers/training_planning.py
Lars f15aa7c415
All checks were successful
Deploy Development / deploy (push) Successful in 41s
Test Suite / pytest-backend (push) Successful in 37s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m7s
Update version to 0.8.148 and enhance training plan template functionality
- Incremented app version to 0.8.148 and updated changelog to reflect new features.
- Improved the training plan template structure by adding a preview of sections, including support for split sessions.
- Introduced a new editing page for training plan templates, allowing users to modify templates directly.
- Enhanced the TrainingPlanningPageRoot to include a description field when saving templates, improving user guidance.
- Updated permissions to allow editing of training plan templates based on user roles.
2026-05-19 10:13:26 +02:00

3287 lines
119 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Training Planning Trainingseinheiten, strukturierter Ablauf (Sektionen + Übungen/Notizen)
und wiederverwendbare Trainingsvorlagen (Sektions-Gliederung).
Governance: Sichtbarkeit wie Übungen (private / club / official); Bearbeiten wie Übungen; Löschen nach Rolle (s. club_tenancy).
"""
from datetime import date, datetime, time as dt_time, timedelta
from typing import Any, Dict, List, Optional, Tuple
from fastapi import APIRouter, Depends, HTTPException, Query
from psycopg2.extras import Json as PsycopgJson
from fastapi_param_unwrap import unwrap_query_default
from db import get_db, get_cursor, r2d
from tenant_context import TenantContext, get_tenant_context, library_content_visibility_sql
from club_tenancy import (
assert_library_content_deletable,
assert_library_content_editable,
assert_library_content_governance_transition,
assert_valid_governance_visibility,
can_manage_club_org,
is_platform_admin,
library_content_visible_to_profile,
)
from routers.training_modules import load_training_module_for_apply
from routers.exercises import load_combination_slots_for_exercise
router = APIRouter(prefix="/api", tags=["training_planning"])
def _has_planning_role(role: Optional[str]) -> bool:
"""Kann Trainingseinheiten/Vorlagen anlegen (bis Governance: auch einfacher Account)."""
return role in ("admin", "superadmin", "trainer", "user")
def _optional_positive_int(val, field_name: str) -> Optional[int]:
if val is None or val == "":
return None
try:
i = int(val)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail=f"Ungültige {field_name}")
if i < 1:
raise HTTPException(status_code=400, detail=f"Ungültige {field_name}")
return i
def _parse_cursor_planned_date(raw: Optional[str]) -> date:
s = (raw or "").strip()
if not s:
raise HTTPException(status_code=400, detail="cursor_planned_date ungültig (YYYY-MM-DD)")
try:
return date.fromisoformat(s[:10])
except ValueError:
raise HTTPException(status_code=400, detail="cursor_planned_date ungültig (YYYY-MM-DD)")
def _parse_cursor_planned_time_optional(raw: Optional[str]) -> Optional[dt_time]:
s = (raw or "").strip()
if not s:
return None
for fmt in ("%H:%M:%S", "%H:%M"):
try:
return datetime.strptime(s, fmt).time()
except ValueError:
continue
raise HTTPException(
status_code=400,
detail="cursor_planned_time ungültig (HH:MM oder HH:MM:SS)",
)
def _training_units_keyset_sql(
order_dir: str,
cursor_date: date,
cursor_time_null: bool,
cursor_time: Optional[dt_time],
cursor_id: int,
) -> Tuple[str, List[Any]]:
"""WHERE-Zusatz für Keyset; sort=asc|desc muss zu order_dir passen."""
d = cursor_date
cid = cursor_id
if order_dir == "ASC":
if cursor_time_null:
frag = (
"(tu.planned_date > %s OR (tu.planned_date = %s AND "
"tu.planned_time_start IS NULL AND tu.id > %s))"
)
return frag, [d, d, cid]
assert cursor_time is not None
ct = cursor_time
frag = (
"(tu.planned_date > %s OR (tu.planned_date = %s AND ("
"(tu.planned_time_start IS NOT NULL AND (tu.planned_time_start > %s OR "
"(tu.planned_time_start = %s AND tu.id > %s))) OR "
"(tu.planned_time_start IS NULL)"
")))"
)
return frag, [d, d, ct, ct, cid]
if order_dir == "DESC":
if cursor_time_null:
frag = (
"(tu.planned_date < %s OR (tu.planned_date = %s AND "
"tu.planned_time_start IS NULL AND tu.id < %s))"
)
return frag, [d, d, cid]
assert cursor_time is not None
ct = cursor_time
frag = (
"(tu.planned_date < %s OR (tu.planned_date = %s AND ("
"(tu.planned_time_start IS NOT NULL AND (tu.planned_time_start < %s OR "
"(tu.planned_time_start = %s AND tu.id < %s))) OR "
"(tu.planned_time_start IS NULL)"
")))"
)
return frag, [d, d, ct, ct, cid]
raise HTTPException(status_code=400, detail="sort: nur asc oder desc")
def _validate_variant_for_exercise(cur, exercise_id: Optional[int], variant_id: Optional[int]):
if not exercise_id:
if variant_id:
raise HTTPException(
status_code=400, detail="exercise_variant_id nur zusammen mit exercise_id erlaubt"
)
return
cur.execute(
"SELECT COALESCE(exercise_kind, 'simple') AS exercise_kind FROM exercises WHERE id = %s",
(int(exercise_id),),
)
ek_row = cur.fetchone()
if not ek_row:
raise HTTPException(status_code=400, detail="Übung nicht gefunden")
if str(r2d(ek_row).get("exercise_kind") or "simple").strip().lower() == "combination":
if variant_id:
raise HTTPException(
status_code=400,
detail="Kombinationsübungen haben keine Varianten — bitte exercise_variant_id weglassen",
)
return
if not variant_id:
return
cur.execute(
"SELECT 1 FROM exercise_variants WHERE id = %s AND exercise_id = %s",
(variant_id, exercise_id),
)
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Variante passt nicht zur gewählten Übung")
def _can_access_group_for_create(cur, group_id: int, profile_id: int, role: str) -> None:
cur.execute(
"SELECT trainer_id, co_trainer_ids, club_id FROM training_groups WHERE id = %s",
(group_id,),
)
group = cur.fetchone()
if not group:
raise HTTPException(status_code=404, detail="Trainingsgruppe nicht gefunden")
co_trainers = group["co_trainer_ids"] or []
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Trainer dürfen Trainingseinheiten erstellen")
if role not in ["admin", "superadmin"]:
if group["trainer_id"] != profile_id and profile_id not in co_trainers:
if not can_manage_club_org(cur, profile_id, int(group["club_id"]), role):
raise HTTPException(
status_code=403, detail="Nur der zuständige Trainer darf für diese Gruppe planen"
)
def _profile_active_in_club(cur, club_id: int, profile_id: int) -> bool:
cur.execute(
"""
SELECT 1 FROM club_members
WHERE club_id = %s AND profile_id = %s AND status = 'active'
LIMIT 1
""",
(club_id, profile_id),
)
return cur.fetchone() is not None
def _caller_may_assign_session_trainers(
cur,
group_row: Dict[str, Any],
profile_id: int,
role: str,
unit_created_by: Optional[int],
) -> bool:
if is_platform_admin(role):
return True
cid = group_row.get("club_id")
if cid is not None and can_manage_club_org(cur, profile_id, int(cid), role):
return True
if unit_created_by is not None and unit_created_by == profile_id:
return True
if group_row.get("trainer_id") == profile_id:
return True
co = group_row.get("co_trainer_ids") or []
return profile_id in co
def _effective_co_trainer_ids_for_row(unit_row: Dict[str, Any]) -> List[int]:
"""Leseregel: Session-Co-Trainer überschreiben die Gruppe; NULL auf der Einheit = Gruppen-Standard."""
unit_asst = unit_row.get("assistant_trainer_profile_ids")
if unit_asst is not None:
src = unit_asst
else:
src = unit_row.get("co_trainer_ids") or []
seen: set = set()
out: List[int] = []
for x in src:
try:
i = int(x)
except (TypeError, ValueError):
continue
if i not in seen:
seen.add(i)
out.append(i)
return sorted(out)
def effective_co_trainer_profile_ids_for_merge(
unit_assistant: Any, group_co: Any
) -> List[int]:
"""Reine Hilfsfunktion (pytest): gleiche Semantik wie _effective_co_trainer_ids_for_row."""
if unit_assistant is not None:
src = unit_assistant
else:
src = group_co or []
seen: set = set()
out: List[int] = []
for x in src:
try:
i = int(x)
except (TypeError, ValueError):
continue
if i not in seen:
seen.add(i)
out.append(i)
return sorted(out)
def _training_unit_guard_row(cur, unit_id: int) -> Dict[str, Any]:
cur.execute(
"""
SELECT tu.id, tu.created_by, tu.group_id, tu.plan_template_id, tu.framework_slot_id,
tu.lead_trainer_profile_id,
tu.assistant_trainer_profile_ids,
tg.trainer_id, tg.co_trainer_ids, tg.club_id AS group_club_id,
fwp.created_by AS framework_created_by
FROM training_units tu
LEFT JOIN training_groups tg ON tu.group_id = tg.id
LEFT JOIN training_framework_slots fs ON fs.id = tu.framework_slot_id
LEFT JOIN training_framework_programs fwp ON fwp.id = fs.framework_program_id
WHERE tu.id = %s
""",
(unit_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden")
return r2d(row)
def _assert_training_unit_permission(
cur, unit_row: Dict[str, Any], profile_id: int, role: str
) -> None:
if unit_row.get("framework_slot_id"):
if role in ["admin", "superadmin"]:
return
if unit_row.get("created_by") == profile_id:
return
fw_by = unit_row.get("framework_created_by")
if fw_by is not None and fw_by == profile_id:
return
raise HTTPException(status_code=403, detail="Keine Berechtigung")
co_eff = _effective_co_trainer_ids_for_row(unit_row)
if role in ["admin", "superadmin"]:
return
gcid = unit_row.get("group_club_id")
if gcid is not None and can_manage_club_org(cur, profile_id, int(gcid), role):
return
if (
unit_row["created_by"] != profile_id
and unit_row["trainer_id"] != profile_id
and profile_id not in co_eff
and unit_row.get("lead_trainer_profile_id") != profile_id
):
raise HTTPException(status_code=403, detail="Keine Berechtigung")
def _assert_delete_training_unit(
cur,
role: str,
created_by: int,
profile_id: int,
group_club_id: Optional[int],
) -> None:
if role in ["admin", "superadmin"]:
return
if created_by == profile_id:
return
if group_club_id is not None and can_manage_club_org(cur, profile_id, int(group_club_id), role):
return
raise HTTPException(status_code=403, detail="Keine Berechtigung")
def _assert_club_visible_for_trainer(cur, club_id: int, profile_id: int, role: str) -> None:
"""Nicht-Admin: Vereinsbezug für Listen mit club_id (Mitglied genügt; Details filtert WHERE)."""
if role in ("admin", "superadmin"):
return
if can_manage_club_org(cur, profile_id, club_id, role):
return
cur.execute(
"""
SELECT 1 FROM club_members
WHERE club_id = %s AND profile_id = %s AND status = 'active'
LIMIT 1
""",
(club_id, profile_id),
)
if cur.fetchone():
return
cur.execute(
"""
SELECT 1 FROM training_groups g
WHERE g.club_id = %s AND g.status = 'active'
AND (
g.trainer_id = %s
OR (g.co_trainer_ids IS NOT NULL AND g.co_trainer_ids @> jsonb_build_array(%s::int))
)
LIMIT 1
""",
(club_id, profile_id, profile_id),
)
if not cur.fetchone():
raise HTTPException(status_code=403, detail="Kein Zugriff auf diesen Verein")
def _normalize_lead_trainer_profile_id(
cur,
group_id: int,
raw_lead: Any,
profile_id: int,
role: str,
unit_created_by: Optional[int],
) -> Optional[int]:
"""NULL = Standard (Gruppen-Haupttrainer); sonst gültiges Profil i.d.R. mit Vereinsbezug."""
if raw_lead is None:
return None
if raw_lead in ("", []):
return None
try:
nid = int(raw_lead)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="lead_trainer_profile_id ungültig")
if nid < 1:
raise HTTPException(status_code=400, detail="lead_trainer_profile_id ungültig")
cur.execute("SELECT 1 FROM profiles WHERE id = %s", (nid,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Profil für Leitung nicht gefunden")
cur.execute(
"SELECT trainer_id, co_trainer_ids, club_id FROM training_groups WHERE id = %s",
(group_id,),
)
gr = cur.fetchone()
if not gr:
raise HTTPException(status_code=400, detail="Trainingsgruppe nicht gefunden")
grd = dict(gr)
cid = grd.get("club_id")
if cid is None:
raise HTTPException(status_code=400, detail="Trainingsgruppe nicht gefunden")
club_i = int(cid)
if is_platform_admin(role):
return nid
eligible = {grd["trainer_id"]} if grd.get("trainer_id") else set()
for x in grd.get("co_trainer_ids") or []:
try:
eligible.add(int(x))
except (TypeError, ValueError):
continue
if nid == profile_id:
if not _profile_active_in_club(cur, club_i, profile_id):
raise HTTPException(
status_code=403,
detail="Nur aktive Vereinsmitglieder können die Leitung dieser Einheit übernehmen",
)
return nid
if nid not in eligible:
if not _profile_active_in_club(cur, club_i, nid):
raise HTTPException(
status_code=400,
detail="Leitung nur für Profile mit aktiver Mitgliedschaft im Verein der Gruppe",
)
if not _caller_may_assign_session_trainers(cur, grd, profile_id, role, unit_created_by):
raise HTTPException(
status_code=403,
detail="Keine Berechtigung, die Leitung zuzuweisen",
)
return nid
if nid != profile_id and not _caller_may_assign_session_trainers(
cur, grd, profile_id, role, unit_created_by
):
raise HTTPException(status_code=403, detail="Keine Berechtigung, die Leitung anderen zuzuweisen")
return nid
def _normalize_assistant_trainer_profile_ids(
cur,
group_id: int,
raw_val: Any,
profile_id: int,
role: str,
unit_created_by: Optional[int],
lead_nid: Optional[int],
) -> Any:
"""
None = Vererbung aus training_groups.co_trainer_ids (SQL NULL);
Liste = Session-Co-Trainer (JSONB Array; leeres Array ausdrücklich ohne Co.)
"""
if raw_val is None:
return None
if not isinstance(raw_val, list):
raise HTTPException(status_code=400, detail="assistant_trainer_profile_ids muss Liste oder null sein")
ids_in: List[int] = []
for x in raw_val:
try:
i = int(x)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="assistant_trainer_profile_ids ungültig")
if i < 1:
raise HTTPException(status_code=400, detail="assistant_trainer_profile_ids ungültig")
ids_in.append(i)
uniq = sorted(set(ids_in))
cur.execute(
"SELECT trainer_id, co_trainer_ids, club_id FROM training_groups WHERE id = %s",
(group_id,),
)
gr = cur.fetchone()
if not gr:
raise HTTPException(status_code=400, detail="Trainingsgruppe nicht gefunden")
grd = dict(gr)
cid = grd.get("club_id")
if cid is None:
raise HTTPException(status_code=400, detail="Trainingsgruppe nicht gefunden")
club_i = int(cid)
if not is_platform_admin(role) and not _caller_may_assign_session_trainers(
cur, grd, profile_id, role, unit_created_by
):
raise HTTPException(status_code=403, detail="Keine Berechtigung, Co-Trainer zuzuweisen")
eligible = {grd["trainer_id"]} if grd.get("trainer_id") else set()
for x in grd.get("co_trainer_ids") or []:
try:
eligible.add(int(x))
except (TypeError, ValueError):
continue
eff_lead = lead_nid if lead_nid is not None else (grd.get("trainer_id") or None)
for nid in uniq:
cur.execute("SELECT 1 FROM profiles WHERE id = %s", (nid,))
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Profil für Co-Trainer nicht gefunden")
if eff_lead is not None and nid == eff_lead:
raise HTTPException(status_code=400, detail="Leitung und Co-Trainer dürfen sich nicht überschneiden")
if is_platform_admin(role):
continue
if nid in eligible:
continue
if not _profile_active_in_club(cur, club_i, nid):
raise HTTPException(
status_code=400,
detail="Co-Trainer nur mit aktiver Mitgliedschaft im Verein dieser Gruppe",
)
return uniq
def _normalize_stream_assigned_trainer_profile_ids(
cur,
raw_val: Any,
*,
group_id: Optional[int],
profile_id: int,
role: str,
unit_created_by: Optional[int],
eff_lead_nid: Optional[int],
) -> Any:
"""
JSONB-Liste für training_unit_parallel_streams.assigned_trainer_profile_ids.
Ohne group_id (Rahmen-Blueprint): nur Profil-Existenz + keine Überschneidung mit Leitung.
Mit group_id: gleiche Vereins-/Zuweisungsregeln wie assistant_trainer_profile_ids.
"""
if raw_val is None:
return None
if not isinstance(raw_val, list):
raise HTTPException(
status_code=400,
detail="assigned_trainer_profile_ids (Stream) muss Liste oder null sein",
)
ids_in: List[int] = []
for x in raw_val:
try:
i = int(x)
except (TypeError, ValueError):
raise HTTPException(
status_code=400,
detail="assigned_trainer_profile_ids (Stream) ungültig",
)
if i < 1:
raise HTTPException(
status_code=400,
detail="assigned_trainer_profile_ids (Stream) ungültig",
)
ids_in.append(i)
uniq = sorted(set(ids_in))
for nid in uniq:
cur.execute("SELECT 1 FROM profiles WHERE id = %s", (nid,))
if not cur.fetchone():
raise HTTPException(
status_code=400,
detail="Profil für Stream-Co-Trainer nicht gefunden",
)
if eff_lead_nid is not None and nid == eff_lead_nid:
raise HTTPException(
status_code=400,
detail="Leitung und Stream-Co-Trainer dürfen sich nicht überschneiden",
)
if group_id is None:
return uniq
cur.execute(
"SELECT trainer_id, co_trainer_ids, club_id FROM training_groups WHERE id = %s",
(group_id,),
)
gr = cur.fetchone()
if not gr:
raise HTTPException(status_code=400, detail="Trainingsgruppe nicht gefunden")
grd = dict(gr)
cid = grd.get("club_id")
if cid is None:
raise HTTPException(status_code=400, detail="Trainingsgruppe nicht gefunden")
club_i = int(cid)
if not is_platform_admin(role) and not _caller_may_assign_session_trainers(
cur, grd, profile_id, role, unit_created_by
):
raise HTTPException(status_code=403, detail="Keine Berechtigung, Co-Trainer (Stream) zuzuweisen")
eligible = {grd["trainer_id"]} if grd.get("trainer_id") else set()
for x in grd.get("co_trainer_ids") or []:
try:
eligible.add(int(x))
except (TypeError, ValueError):
continue
for nid in uniq:
if is_platform_admin(role):
continue
if nid in eligible:
continue
if not _profile_active_in_club(cur, club_i, nid):
raise HTTPException(
status_code=400,
detail="Stream-Co-Trainer nur mit aktiver Mitgliedschaft im Verein dieser Gruppe",
)
return uniq
def _normalize_planning_method_profile_payload(raw) -> Optional[Dict[str, Any]]:
"""None = Katalog wirksam; Dict = Snapshot fuer diese Platzierung."""
if raw is None:
return None
if isinstance(raw, dict):
return dict(raw)
raise HTTPException(
status_code=400,
detail="planning_method_profile muss ein JSON-Objekt oder null sein",
)
# Nachverfolgbarkeit: Übernahmen aus Rahmenprogramm über origin_framework_slot_id
_ORIGIN_LINEAGE_JOIN = """
LEFT JOIN training_framework_slots origin_slot ON origin_slot.id = tu.origin_framework_slot_id
LEFT JOIN training_framework_programs origin_fp ON origin_fp.id = origin_slot.framework_program_id
"""
_ORIGIN_LINEAGE_FIELDS = """
origin_fp.id AS origin_framework_program_id,
origin_fp.title AS origin_framework_program_title,
COALESCE(TRIM(origin_slot.title), '') AS origin_framework_slot_title,
origin_slot.sort_order AS origin_framework_slot_sort_order
"""
def _optional_source_training_module_id_payload(raw_val) -> Optional[int]:
"""Erlaubt None; sonst positives int (FK-Verletzung bei ungültigem Modul möglich)."""
if raw_val is None or raw_val == "":
return None
try:
i = int(raw_val)
except (TypeError, ValueError):
return None
if i < 1:
return None
return i
# ── Sektionen laden / ersetzen (Kernpfad Planungsinhalt) ──────────────────
# Hinweis: Pro Sektion ein Items-Query (N+1) — bewusst einfach; Batching später möglich.
def _clear_unit_plan_content(cur, unit_id: int) -> None:
"""Löscht alle Planungs-Phasen der Einheit (CASCADE: Streams, Sektionen, Items)."""
cur.execute("DELETE FROM training_unit_phases WHERE training_unit_id = %s", (unit_id,))
def _ensure_default_whole_group_phase(cur, unit_id: int, *, order_index: int = 0) -> int:
"""Legt bei Bedarf eine whole_group-Phase an; gibt phase.id zurück."""
cur.execute(
"""
SELECT id FROM training_unit_phases
WHERE training_unit_id = %s AND phase_kind = 'whole_group' AND order_index = %s
LIMIT 1
""",
(unit_id, order_index),
)
row = cur.fetchone()
if row:
return int(row["id"])
cur.execute(
"""
INSERT INTO training_unit_phases (training_unit_id, order_index, phase_kind, title, guidance_notes)
VALUES (%s, %s, 'whole_group', NULL, NULL)
RETURNING id
""",
(unit_id, order_index),
)
return int(cur.fetchone()["id"])
_SECTION_ROWS_SQL = """
SELECT tus.id, tus.training_unit_id, tus.order_index, tus.title, tus.guidance_notes,
tus.source_template_section_id, tus.phase_id, tus.parallel_stream_id
FROM training_unit_sections tus
LEFT JOIN training_unit_phases ph ON ph.id = tus.phase_id
LEFT JOIN training_unit_parallel_streams ps ON ps.id = tus.parallel_stream_id
LEFT JOIN training_unit_phases ph_s ON ph_s.id = ps.phase_id
WHERE tus.training_unit_id = %s
ORDER BY COALESCE(ph.order_index, ph_s.order_index) ASC,
ps.order_index ASC NULLS FIRST,
tus.order_index ASC
"""
_SECTION_ITEMS_ROWS_SQL = """
SELECT tusi.*,
e.title AS exercise_title,
e.exercise_kind AS exercise_kind,
e.summary AS exercise_summary,
e.method_archetype AS catalog_method_archetype,
e.method_profile AS catalog_method_profile,
(
SELECT fa.name FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC
LIMIT 1
) AS exercise_focus_area,
ev.variant_name AS exercise_variant_name,
tm.title AS source_module_title
FROM training_unit_section_items tusi
LEFT JOIN exercises e ON tusi.exercise_id = e.id
LEFT JOIN exercise_variants ev ON tusi.exercise_variant_id = ev.id
LEFT JOIN training_modules tm ON tm.id = tusi.source_training_module_id
WHERE tusi.section_id = %s
ORDER BY tusi.order_index
"""
def _hydrate_section_item_combination_slots(cur, it: Dict[str, Any]) -> None:
"""Setzt `combination_slots` für KombiÜbungen; sonst leere Liste."""
if it.get("item_type") != "exercise":
return
cmp_raw = it.get("catalog_method_profile")
if not isinstance(cmp_raw, dict):
it["catalog_method_profile"] = {}
else:
it["catalog_method_profile"] = dict(cmp_raw)
ek = str(it.get("exercise_kind") or "simple").strip().lower()
if ek == "combination" and it.get("exercise_id"):
try:
it["combination_slots"] = load_combination_slots_for_exercise(cur, int(it["exercise_id"]))
except (TypeError, ValueError):
it["combination_slots"] = []
else:
it["combination_slots"] = []
def _fetch_section_items_for_section(cur, section_id: int) -> List[Dict[str, Any]]:
cur.execute(_SECTION_ITEMS_ROWS_SQL, (section_id,))
items = [r2d(r) for r in cur.fetchall()]
for it in items:
_hydrate_section_item_combination_slots(cur, it)
return items
def _fetch_sections(cur, unit_id: int) -> List[Dict[str, Any]]:
"""Lädt alle Sektionen inkl. Items und Katalog-Anreicherung für die Einheit."""
cur.execute(_SECTION_ROWS_SQL, (unit_id,))
secs = []
for sec_row in cur.fetchall():
sec = r2d(sec_row)
sec["items"] = _fetch_section_items_for_section(cur, sec["id"])
secs.append(sec)
return secs
def _fetch_phases_nested(cur, unit_id: int) -> List[Dict[str, Any]]:
"""Verschachtelte Phasen/Streams/Sektionen für GET (UI kann parallele Sp später nutzen)."""
cur.execute(
"""
SELECT id, training_unit_id, order_index, phase_kind, title, guidance_notes
FROM training_unit_phases
WHERE training_unit_id = %s
ORDER BY order_index
""",
(unit_id,),
)
out: List[Dict[str, Any]] = []
for prow in cur.fetchall():
p = r2d(prow)
pk = str(p.get("phase_kind") or "").strip().lower()
if pk == "whole_group":
cur.execute(
"""
SELECT id, training_unit_id, order_index, title, guidance_notes,
source_template_section_id, phase_id, parallel_stream_id
FROM training_unit_sections
WHERE phase_id = %s
ORDER BY order_index
""",
(p["id"],),
)
secs: List[Dict[str, Any]] = []
for srow in cur.fetchall():
sec = r2d(srow)
sec["items"] = _fetch_section_items_for_section(cur, sec["id"])
secs.append(sec)
p["sections"] = secs
p["streams"] = []
elif pk == "parallel":
p["sections"] = []
cur.execute(
"""
SELECT id, phase_id, order_index, title, notes, assigned_trainer_profile_ids
FROM training_unit_parallel_streams
WHERE phase_id = %s
ORDER BY order_index
""",
(p["id"],),
)
streams: List[Dict[str, Any]] = []
for st_row in cur.fetchall():
st = r2d(st_row)
cur.execute(
"""
SELECT id, training_unit_id, order_index, title, guidance_notes,
source_template_section_id, phase_id, parallel_stream_id
FROM training_unit_sections
WHERE parallel_stream_id = %s
ORDER BY order_index
""",
(st["id"],),
)
secs = []
for sec_row in cur.fetchall():
sec = r2d(sec_row)
sec["items"] = _fetch_section_items_for_section(cur, sec["id"])
secs.append(sec)
st["sections"] = secs
streams.append(st)
p["streams"] = streams
else:
p["sections"] = []
p["streams"] = []
out.append(p)
return out
def _clone_section_payload_dict(sec: Dict[str, Any]) -> Dict[str, Any]:
"""Sektion inkl. Items ohne DB-IDs (für phases-Payload / Kopie)."""
items_clean: List[Dict[str, Any]] = []
for it in sorted(sec.get("items", []), key=lambda x: x.get("order_index", 0)):
itype = it.get("item_type") or ("exercise" if it.get("exercise_id") else "note")
oix = it.get("order_index")
if itype == "note":
note_item = {
"item_type": "note",
"order_index": oix,
"note_body": it.get("note_body") or "",
}
sm = _optional_source_training_module_id_payload(it.get("source_training_module_id"))
if sm is not None:
note_item["source_training_module_id"] = sm
items_clean.append(note_item)
continue
if itype != "exercise" or not it.get("exercise_id"):
continue
ex_item = {
"item_type": "exercise",
"order_index": oix,
"exercise_id": it["exercise_id"],
"exercise_variant_id": it.get("exercise_variant_id"),
"planned_duration_min": it.get("planned_duration_min"),
"actual_duration_min": it.get("actual_duration_min"),
"notes": it.get("notes"),
"modifications": it.get("modifications"),
"planning_method_profile": it.get("planning_method_profile"),
}
sm = _optional_source_training_module_id_payload(it.get("source_training_module_id"))
if sm is not None:
ex_item["source_training_module_id"] = sm
items_clean.append(ex_item)
row: Dict[str, Any] = {
"title": sec.get("title"),
"order_index": sec.get("order_index"),
"guidance_notes": sec.get("guidance_notes"),
"items": items_clean,
}
stid = sec.get("source_template_section_id")
if stid is not None and stid != "":
try:
stid_i = int(stid)
if stid_i >= 1:
row["source_template_section_id"] = stid_i
except (TypeError, ValueError):
pass
return row
def _phases_clone_payload(cur, unit_id: int) -> List[Dict[str, Any]]:
"""Vollständige Phasen/Streams/Sektionen für tiefe Kopie (ohne DB-IDs)."""
nested = _fetch_phases_nested(cur, unit_id)
out: List[Dict[str, Any]] = []
for ph in nested:
kind = str(ph.get("phase_kind") or "").strip().lower()
if kind not in ("whole_group", "parallel"):
kind = "whole_group"
pd: Dict[str, Any] = {
"order_index": ph.get("order_index"),
"phase_kind": kind,
"title": ph.get("title"),
"guidance_notes": ph.get("guidance_notes"),
}
if kind == "whole_group":
pd["sections"] = [_clone_section_payload_dict(s) for s in ph.get("sections") or []]
pd["streams"] = []
else:
pd["sections"] = []
streams_clean: List[Dict[str, Any]] = []
for st in ph.get("streams") or []:
sd: Dict[str, Any] = {
"order_index": st.get("order_index"),
"title": st.get("title"),
"notes": st.get("notes"),
"assigned_trainer_profile_ids": st.get("assigned_trainer_profile_ids"),
"sections": [_clone_section_payload_dict(s) for s in st.get("sections") or []],
}
streams_clean.append(sd)
pd["streams"] = streams_clean
out.append(pd)
return out
def _copy_scheduled_unit_plan_to_blueprint(
cur,
source_unit_id: int,
blueprint_unit_id: int,
profile_id: int,
role: str,
) -> None:
"""Übernimmt Phasen/Sektionen einer geplanten Einheit in eine Rahmen-Blueprint-Einheit."""
cloned_phases = _phases_clone_payload(cur, source_unit_id)
if cloned_phases:
_replace_unit_phases(cur, blueprint_unit_id, cloned_phases, profile_id, role, profile_id)
return
secs = _fetch_sections(cur, source_unit_id)
sections_payload = [_clone_section_payload_dict(s) for s in secs]
if not sections_payload:
_replace_unit_sections(
cur,
blueprint_unit_id,
[{"title": "Ablauf", "order_index": 0, "guidance_notes": None, "items": []}],
)
return
_replace_unit_sections(cur, blueprint_unit_id, sections_payload)
def _shift_framework_slots_sort_orders_from(cur, framework_program_id: int, from_sort_order: int) -> None:
cur.execute(
"""
UPDATE training_framework_slots
SET sort_order = sort_order + 1
WHERE framework_program_id = %s AND sort_order >= %s
""",
(framework_program_id, int(from_sort_order)),
)
def _insert_framework_slot_and_blueprint_unit(
cur,
framework_program_id: int,
sort_order: int,
title: Optional[str],
notes: Optional[Any],
profile_id: int,
) -> Tuple[int, int]:
"""Legt Slot-Zeile + Blueprint-`training_units`-Zeile an; gibt (slot_id, blueprint_unit_id) zurück."""
cur.execute(
"""
INSERT INTO training_framework_slots (
framework_program_id, sort_order, title, notes, training_unit_id
) VALUES (%s, %s, %s, %s, NULL)
RETURNING id
""",
(
framework_program_id,
int(sort_order),
title,
notes,
),
)
sid = int(cur.fetchone()["id"])
cur.execute(
"""
INSERT INTO training_units (
group_id, planned_date,
planned_time_start, planned_time_end, planned_focus,
status, notes, trainer_notes,
created_by, plan_template_id, framework_slot_id
) VALUES (
NULL, NULL,
NULL, NULL, NULL,
'planned', NULL, NULL,
%s, NULL, %s
)
RETURNING id
""",
(profile_id, sid),
)
bid = int(cur.fetchone()["id"])
return sid, bid
def _copy_blueprint_into_scheduled_unit(
cur,
blueprint_unit_id: int,
group_id: int,
planned_date: str,
profile_id: int,
origin_framework_slot_id: Optional[int],
role: str,
) -> int:
cur.execute(
"""
INSERT INTO training_units (
group_id,
planned_date,
planned_time_start,
planned_time_end,
planned_focus,
actual_date,
actual_time_start,
actual_time_end,
attendance_count,
status,
notes,
trainer_notes,
created_by,
plan_template_id,
origin_framework_slot_id,
framework_slot_id
)
SELECT
%s,
%s,
planned_time_start,
planned_time_end,
planned_focus,
NULL::DATE,
NULL::TIME WITHOUT TIME ZONE,
NULL::TIME WITHOUT TIME ZONE,
NULL::INT,
COALESCE(status, 'planned'),
notes,
trainer_notes,
%s,
NULL::INT,
%s,
NULL::INT
FROM training_units
WHERE id = %s
AND framework_slot_id IS NOT NULL
RETURNING id
""",
(
group_id,
planned_date,
profile_id,
origin_framework_slot_id,
blueprint_unit_id,
),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Blueprint-Einheit nicht gefunden")
nu = row["id"]
cloned = _phases_clone_payload(cur, blueprint_unit_id)
_replace_unit_phases(cur, nu, cloned, profile_id, role, profile_id)
return nu
def _flatten_exercises_from_sections(unit: Dict[str, Any]) -> None:
flat: List[Dict[str, Any]] = []
for sec in sorted(unit.get("sections", []), key=lambda s: s.get("order_index", 0)):
for item in sorted(sec.get("items", []), key=lambda i: i.get("order_index", 0)):
if item.get("item_type") == "exercise":
flat.append(item)
unit["exercises"] = flat
def _hydrate_training_unit_payload(cur, unit: Dict[str, Any]) -> Dict[str, Any]:
"""GET-Payload: `phases` (verschachtelt), flache `sections` + abgeleitete `exercises` (Legacy)."""
uid = unit["id"]
unit["phases"] = _fetch_phases_nested(cur, uid)
unit["sections"] = _fetch_sections(cur, uid)
_flatten_exercises_from_sections(unit)
return unit
def _resolve_training_unit_section_id(cur, unit_id: int, section_order_index: int) -> int:
"""Erste Sektion mit order_index in einer whole_group-Phase (Parallelstreams ausgeschlossen)."""
cur.execute(
"""
SELECT tus.id
FROM training_unit_sections tus
INNER JOIN training_unit_phases p ON p.id = tus.phase_id
WHERE tus.training_unit_id = %s
AND tus.order_index = %s
AND tus.parallel_stream_id IS NULL
AND LOWER(TRIM(p.phase_kind)) = 'whole_group'
ORDER BY p.order_index ASC, tus.id ASC
LIMIT 1
""",
(unit_id, section_order_index),
)
r = cur.fetchone()
if not r:
raise HTTPException(
status_code=400, detail="Abschnitt für diese Reihenfolge nicht gefunden"
)
return int(r["id"])
def _resolve_training_unit_section_id_for_apply(
cur,
unit_id: int,
section_order_index: int,
*,
phase_order_index: Optional[int],
parallel_stream_order_index: Optional[int],
) -> int:
"""Ziel-Abschnitt: ganzes Gruppen physisch (nur section_order_index) oder innerhalb eines Parallelstreams."""
if parallel_stream_order_index is None:
return _resolve_training_unit_section_id(cur, unit_id, section_order_index)
if phase_order_index is None:
raise HTTPException(
status_code=400,
detail="phase_order_index ist bei parallel_stream_order_index Pflicht",
)
cur.execute(
"""
SELECT tus.id
FROM training_unit_sections tus
INNER JOIN training_unit_parallel_streams st ON st.id = tus.parallel_stream_id
INNER JOIN training_unit_phases p ON p.id = st.phase_id
WHERE tus.training_unit_id = %s
AND tus.order_index = %s
AND st.order_index = %s
AND p.order_index = %s
AND LOWER(TRIM(p.phase_kind)) = 'parallel'
ORDER BY tus.id ASC
LIMIT 1
""",
(
unit_id,
section_order_index,
parallel_stream_order_index,
phase_order_index,
),
)
r = cur.fetchone()
if not r:
raise HTTPException(
status_code=400,
detail="Abschnitt im Parallelstream für diese Indizes nicht gefunden",
)
return int(r["id"])
def _append_copied_module_items_to_section(
cur,
section_id: int,
module_items: List[Dict[str, Any]],
source_training_module_id: int,
) -> None:
"""Hängt kopierte ModulItems ans Ende eines Abschnitts (section_order_index in API)."""
cur.execute(
"""
SELECT COALESCE(MAX(order_index), -1) AS mo
FROM training_unit_section_items
WHERE section_id = %s
""",
(section_id,),
)
row = cur.fetchone()
start = int(row["mo"]) + 1 if row and row["mo"] is not None else 0
for i, mi in enumerate(module_items):
oi = start + i
itype = mi.get("item_type")
if itype == "note":
body = mi.get("note_body")
if body is None:
body = ""
cur.execute(
"""
INSERT INTO training_unit_section_items (
section_id, order_index, item_type,
exercise_id, exercise_variant_id,
planned_duration_min, actual_duration_min,
notes, modifications, note_body, source_training_module_id
) VALUES (%s, %s, 'note',
NULL, NULL, NULL, NULL, NULL, NULL, %s, %s)
""",
(section_id, oi, body, source_training_module_id),
)
continue
eid = mi.get("exercise_id")
if not eid:
continue
eid = int(eid)
vid = mi.get("exercise_variant_id")
if vid is not None:
vid = int(vid)
else:
vid = None
_validate_variant_for_exercise(cur, eid, vid)
cur.execute(
"""
INSERT INTO training_unit_section_items (
section_id, order_index, item_type,
exercise_id, exercise_variant_id,
planned_duration_min, actual_duration_min,
notes, modifications, note_body,
source_training_module_id, planning_method_profile
) VALUES (%s, %s, 'exercise',
%s, %s, %s, NULL, %s, NULL, NULL, %s, NULL)
""",
(
section_id,
oi,
eid,
vid,
mi.get("planned_duration_min"),
mi.get("notes"),
source_training_module_id,
),
)
def _insert_section_items(cur, section_id: int, items_in: Optional[List[Any]], start_order: int = 0):
if items_in is None:
items_in = []
for i, raw in enumerate(items_in):
itype = raw.get("item_type")
if not itype:
itype = "exercise" if raw.get("exercise_id") else "note"
order_ix = raw.get("order_index")
if order_ix is None:
order_ix = start_order + i
if itype == "note":
body = raw.get("note_body")
if body is None:
body = ""
src_mod = _optional_source_training_module_id_payload(raw.get("source_training_module_id"))
cur.execute(
"""
INSERT INTO training_unit_section_items (
section_id, order_index, item_type,
exercise_id, exercise_variant_id,
planned_duration_min, actual_duration_min,
notes, modifications, note_body, source_training_module_id
) VALUES (%s, %s, 'note',
NULL, NULL, NULL, NULL, NULL, NULL, %s, %s
)
""",
(section_id, order_ix, body, src_mod),
)
continue
eid = raw.get("exercise_id")
if not eid:
continue
eid = int(eid)
vid = _optional_positive_int(raw.get("exercise_variant_id"), "exercise_variant_id")
_validate_variant_for_exercise(cur, eid, vid)
cur.execute(
"""SELECT COALESCE(exercise_kind, 'simple') AS k FROM exercises WHERE id = %s""",
(eid,),
)
er = cur.fetchone()
ek = str(er["k"] if er and er.get("k") is not None else "simple").strip().lower()
planning_mp = _normalize_planning_method_profile_payload(raw.get("planning_method_profile"))
if ek != "combination":
planning_mp = None
planning_sql_val = PsycopgJson(planning_mp) if planning_mp is not None else None
src_mod = _optional_source_training_module_id_payload(raw.get("source_training_module_id"))
cur.execute(
"""
INSERT INTO training_unit_section_items (
section_id, order_index, item_type,
exercise_id, exercise_variant_id,
planned_duration_min, actual_duration_min,
notes, modifications, note_body,
source_training_module_id, planning_method_profile
) VALUES (%s, %s, 'exercise',
%s, %s, %s, %s, %s, %s, NULL, %s, %s)
""",
(
section_id,
order_ix,
eid,
vid,
raw.get("planned_duration_min"),
raw.get("actual_duration_min"),
raw.get("notes"),
raw.get("modifications"),
src_mod,
planning_sql_val,
),
)
def _insert_one_replacement_section(
cur,
unit_id: int,
sec: Any,
enumeration_index: int,
*,
phase_id: Optional[int] = None,
parallel_stream_id: Optional[int] = None,
) -> None:
"""Eine Sektion inkl. Items (genau eines von phase_id / parallel_stream_id gesetzt)."""
if (phase_id is None) == (parallel_stream_id is None):
raise HTTPException(
status_code=500, detail="Intern: Sektion braucht phase_id oder parallel_stream_id"
)
title = (sec.get("title") or "").strip() or "Abschnitt"
order_ix = sec.get("order_index")
if order_ix is None:
order_ix = enumeration_index
src_tsec = _optional_positive_int(sec.get("source_template_section_id"), "source_template_section_id")
cur.execute(
"""
INSERT INTO training_unit_sections (
training_unit_id, phase_id, parallel_stream_id, order_index, title, guidance_notes, source_template_section_id
) VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
unit_id,
phase_id,
parallel_stream_id,
order_ix,
title,
sec.get("guidance_notes"),
src_tsec,
),
)
sid = cur.fetchone()["id"]
_insert_section_items(cur, sid, sec.get("items"))
def _replace_unit_sections(cur, unit_id: int, sections_in: List[Any]):
"""Ersetzt den gesamten Plan (Legacy): eine whole_group-Phase + Sektionen."""
_clear_unit_plan_content(cur, unit_id)
phase_id = _ensure_default_whole_group_phase(cur, unit_id, order_index=0)
for si, sec in enumerate(sections_in):
_insert_one_replacement_section(
cur, unit_id, sec, si, phase_id=phase_id, parallel_stream_id=None
)
def _replace_unit_phases(
cur,
unit_id: int,
phases_in: List[Any],
profile_id: int,
role: str,
unit_created_by: Optional[int],
) -> None:
"""Ersetzt Phasen inkl. paralleler Streams und Sektionen (voller Plan)."""
if not isinstance(phases_in, list):
raise HTTPException(status_code=400, detail="phases muss eine Liste sein")
cur.execute(
"""
SELECT tu.group_id,
COALESCE(tu.lead_trainer_profile_id, tg.trainer_id) AS eff_lead
FROM training_units tu
LEFT JOIN training_groups tg ON tg.id = tu.group_id
WHERE tu.id = %s
""",
(unit_id,),
)
ur = cur.fetchone()
group_id_opt = int(ur["group_id"]) if ur and ur.get("group_id") is not None else None
eff_lead_raw = ur.get("eff_lead") if ur else None
eff_lead_nid = int(eff_lead_raw) if eff_lead_raw is not None else None
_clear_unit_plan_content(cur, unit_id)
for pi, ph in enumerate(phases_in):
kind = str(ph.get("phase_kind") or "").strip().lower()
if kind not in ("whole_group", "parallel"):
raise HTTPException(
status_code=400,
detail="phase_kind muss whole_group oder parallel sein",
)
# Reihenfolge strikt aus der Liste (pi): vermeidet UNIQUE(tu, order_index)-Kollisionen,
# wenn der Client dieselbe phase_order_index mehrfach trägt (z. B. nach Zuordnungswechseln).
p_oix = int(pi)
cur.execute(
"""
INSERT INTO training_unit_phases (training_unit_id, order_index, phase_kind, title, guidance_notes)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(
unit_id,
p_oix,
kind,
ph.get("title"),
ph.get("guidance_notes"),
),
)
phase_id = int(cur.fetchone()["id"])
if kind == "whole_group":
secs = ph.get("sections")
if secs is None:
secs = []
if not isinstance(secs, list):
raise HTTPException(status_code=400, detail="sections muss Liste sein")
for si, sec in enumerate(secs):
_insert_one_replacement_section(
cur, unit_id, sec, si, phase_id=phase_id, parallel_stream_id=None
)
else:
streams = ph.get("streams")
if streams is None:
streams = []
if not isinstance(streams, list):
raise HTTPException(status_code=400, detail="streams muss Liste sein")
for si, st in enumerate(streams):
raw_asst = st.get("assigned_trainer_profile_ids")
asst_norm = _normalize_stream_assigned_trainer_profile_ids(
cur,
raw_asst,
group_id=group_id_opt,
profile_id=profile_id,
role=role,
unit_created_by=unit_created_by,
eff_lead_nid=eff_lead_nid,
)
asst_db = None if asst_norm is None else PsycopgJson(asst_norm)
st_oix = st.get("order_index")
if st_oix is None:
st_oix = si
cur.execute(
"""
INSERT INTO training_unit_parallel_streams (
phase_id, order_index, title, notes, assigned_trainer_profile_ids
) VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(
phase_id,
int(st_oix),
st.get("title"),
st.get("notes"),
asst_db,
),
)
sid = int(cur.fetchone()["id"])
secs = st.get("sections")
if secs is None:
secs = []
if not isinstance(secs, list):
raise HTTPException(
status_code=400,
detail="sections (Stream) muss Liste sein",
)
for ti, sec in enumerate(secs):
_insert_one_replacement_section(
cur, unit_id, sec, ti, phase_id=None, parallel_stream_id=sid
)
def _assert_single_plan_content_key_create(data: dict) -> None:
"""Höchstens ein Plan-Inhalt: phases | sections | exercises (Non-None)."""
n = sum(1 for k in ("phases", "sections", "exercises") if data.get(k) is not None)
if n > 1:
raise HTTPException(
status_code=400,
detail="Nur eines von phases, sections oder exercises angeben",
)
def _assert_single_plan_content_key_update(data: dict) -> None:
"""PUT: höchstens einer der Keys phases | sections | exercises."""
keys = [k for k in ("phases", "sections", "exercises") if k in data]
if len(keys) > 1:
raise HTTPException(
status_code=400,
detail="Nur eines von phases, sections oder exercises im Body gleichzeitig",
)
def _distinct_exercise_ids_in_unit(cur, unit_id: int) -> List[int]:
cur.execute(
"""
SELECT DISTINCT tusi.exercise_id
FROM training_unit_section_items tusi
INNER JOIN training_unit_sections tus ON tusi.section_id = tus.id
WHERE tus.training_unit_id = %s
AND tusi.item_type = 'exercise'
AND tusi.exercise_id IS NOT NULL
""",
(unit_id,),
)
rows = cur.fetchall() or []
out: List[int] = []
for r in rows:
try:
out.append(int(r["exercise_id"]))
except (TypeError, ValueError, KeyError):
continue
return out
def _group_club_id_for_scheduled_unit(cur, unit_id: int) -> Optional[int]:
"""Nur echte Gruppentermine (keine Rahmen-Blueprints ohne Gruppe)."""
cur.execute(
"""
SELECT tg.club_id
FROM training_units tu
INNER JOIN training_groups tg ON tu.group_id = tg.id
WHERE tu.id = %s AND tu.framework_slot_id IS NULL
""",
(unit_id,),
)
r = cur.fetchone()
if not r or r.get("club_id") is None:
return None
return int(r["club_id"])
def _exercise_needs_club_visibility_for_target(ex: Dict[str, Any], target_club_id: int) -> bool:
"""Übung für Mitglieder des Ziel-Vereins in der Durchführung sichtbar machen (Dashboard/Queue)."""
if str(ex.get("status") or "").strip().lower() == "archived":
return False
vis = (ex.get("visibility") or "private").strip().lower()
if vis == "official":
return False
if vis == "private":
return True
if vis == "club":
raw = ex.get("club_id")
if raw is None:
return True
try:
return int(raw) != int(target_club_id)
except (TypeError, ValueError):
return True
return False
def _caller_may_promote_exercise_to_club(
cur,
exercise_created_by: Optional[int],
profile_id: int,
role: str,
target_club_id: int,
) -> bool:
if is_platform_admin(role):
return True
if exercise_created_by is not None and int(exercise_created_by) == profile_id:
return True
if can_manage_club_org(cur, profile_id, target_club_id, role):
return True
return False
def _promote_private_exercises_used_in_unit(cur, unit_id: int, profile_id: int, role: str) -> None:
"""
Private Übungen in der Einheit auf visibility=club (Verein der Trainingsgruppe) setzen,
damit andere Trainer und Mitglieder sie in der Durchführung sehen.
"""
target_club_id = _group_club_id_for_scheduled_unit(cur, unit_id)
if not target_club_id:
return
if not (
is_platform_admin(role)
or _profile_active_in_club(cur, target_club_id, profile_id)
or can_manage_club_org(cur, profile_id, target_club_id, role)
):
return
for eid in _distinct_exercise_ids_in_unit(cur, unit_id):
cur.execute(
"""
SELECT id, created_by, visibility, club_id, COALESCE(status, '') AS status
FROM exercises WHERE id = %s
""",
(eid,),
)
row = cur.fetchone()
if not row:
continue
if str(row.get("status") or "").strip().lower() == "archived":
continue
vis = (row.get("visibility") or "private").strip().lower()
if vis == "official":
continue
if vis == "club":
continue
if vis != "private":
continue
cb = row.get("created_by")
if not _caller_may_promote_exercise_to_club(cur, cb, profile_id, role, target_club_id):
continue
cur.execute(
"""
UPDATE exercises
SET visibility = 'club', club_id = %s, updated_at = NOW()
WHERE id = %s AND LOWER(COALESCE(visibility, 'private')) = 'private'
""",
(target_club_id, eid),
)
def _insert_sections_from_legacy_exercises(cur, unit_id: int, exercises_in: List[Any]):
if not exercises_in:
return
pid = _ensure_default_whole_group_phase(cur, unit_id, order_index=0)
cur.execute(
"""
INSERT INTO training_unit_sections (
training_unit_id, phase_id, parallel_stream_id, order_index, title, guidance_notes
)
VALUES (%s, %s, NULL, 0, %s, NULL)
RETURNING id
""",
(unit_id, pid, "Übungen"),
)
sid = cur.fetchone()["id"]
slot = 0
filtered: List[Dict[str, Any]] = []
for ex in exercises_in:
eid = ex.get("exercise_id")
if not eid:
continue
eid = int(eid)
vid = _optional_positive_int(ex.get("exercise_variant_id"), "exercise_variant_id")
_validate_variant_for_exercise(cur, eid, vid)
filtered.append(
{
"item_type": "exercise",
"order_index": slot,
"exercise_id": eid,
"exercise_variant_id": vid,
"planned_duration_min": ex.get("planned_duration_min"),
"actual_duration_min": ex.get("actual_duration_min"),
"notes": ex.get("notes"),
"modifications": ex.get("modifications"),
}
)
slot += 1
_insert_section_items(cur, sid, filtered, start_order=0)
def _normalize_training_plan_template_section_payload(sec: Any, si: int) -> Dict[str, Any]:
title = (sec.get("title") or "").strip() or f"Abschnitt {si + 1}"
order_ix = sec.get("order_index")
if order_ix is None:
order_ix = si
try:
order_ix = int(order_ix)
except (TypeError, ValueError):
order_ix = si
pk = str(sec.get("phase_kind") or "whole_group").strip().lower()
if pk not in ("whole_group", "parallel"):
pk = "whole_group"
try:
p_oi = int(sec.get("phase_order_index") if sec.get("phase_order_index") is not None else 0)
except (TypeError, ValueError):
p_oi = 0
p_so: Optional[int] = None
if pk == "parallel":
raw_so = sec.get("parallel_stream_order_index")
try:
p_so = int(raw_so) if raw_so is not None and raw_so != "" else 0
except (TypeError, ValueError):
p_so = 0
return {
"title": title,
"order_index": order_ix,
"guidance_text": sec.get("guidance_text"),
"phase_kind": pk,
"phase_order_index": p_oi,
"parallel_stream_order_index": p_so,
}
def _insert_training_plan_template_sections(cur, template_id: int, sections_in: List[Any]) -> None:
for si, sec in enumerate(sections_in):
row = _normalize_training_plan_template_section_payload(sec, si)
cur.execute(
"""
INSERT INTO training_plan_template_sections (
template_id, order_index, title, guidance_text,
phase_kind, phase_order_index, parallel_stream_order_index
) VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
template_id,
row["order_index"],
row["title"],
row["guidance_text"],
row["phase_kind"],
row["phase_order_index"],
row["parallel_stream_order_index"],
),
)
def _template_rows_to_phases_payload(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Flache Vorlagen-Sektionen → `phases`-Liste wie beim Training-Unit PUT (nur Gliederung, leere items)."""
if not rows:
return []
phases_out: List[Dict[str, Any]] = []
i = 0
n = len(rows)
while i < n:
r0 = rows[i]
pk0 = str(r0.get("phase_kind") or "whole_group").strip().lower()
if pk0 not in ("whole_group", "parallel"):
pk0 = "whole_group"
try:
p_oix0 = int(r0.get("phase_order_index") if r0.get("phase_order_index") is not None else 0)
except (TypeError, ValueError):
p_oix0 = 0
run: List[Dict[str, Any]] = []
while i < n:
r = rows[i]
pk = str(r.get("phase_kind") or "whole_group").strip().lower()
if pk not in ("whole_group", "parallel"):
pk = "whole_group"
try:
p_oix = int(r.get("phase_order_index") if r.get("phase_order_index") is not None else 0)
except (TypeError, ValueError):
p_oix = 0
if pk != pk0 or p_oix != p_oix0:
break
run.append(r)
i += 1
if pk0 == "whole_group":
secs = []
for j, rr in enumerate(run):
tid = rr.get("id")
secs.append(
{
"title": rr.get("title"),
"order_index": j,
"guidance_notes": rr.get("guidance_text"),
"items": [],
**(
{"source_template_section_id": int(tid)}
if tid is not None
else {}
),
}
)
phases_out.append(
{
"phase_kind": "whole_group",
"order_index": p_oix0,
"title": None,
"guidance_notes": None,
"sections": secs,
}
)
else:
by_stream: Dict[int, List[Dict[str, Any]]] = {}
for rr in run:
raw_so = rr.get("parallel_stream_order_index")
try:
so = int(raw_so) if raw_so is not None and raw_so != "" else 0
except (TypeError, ValueError):
so = 0
by_stream.setdefault(so, []).append(rr)
stream_order = sorted(by_stream.keys())
streams = []
for so in stream_order:
bucket = by_stream[so]
st: Dict[str, Any] = {
"order_index": so,
"title": None,
"notes": None,
"sections": [],
}
for j, rr in enumerate(bucket):
tid = rr.get("id")
st["sections"].append(
{
"title": rr.get("title"),
"order_index": j,
"guidance_notes": rr.get("guidance_text"),
"items": [],
**(
{"source_template_section_id": int(tid)}
if tid is not None
else {}
),
}
)
streams.append(st)
phases_out.append(
{
"phase_kind": "parallel",
"order_index": p_oix0,
"title": None,
"guidance_notes": None,
"streams": streams,
}
)
return phases_out
def _instantiate_from_template(
cur,
unit_id: int,
template_id: int,
*,
profile_id: int,
role: str,
unit_created_by: int,
) -> None:
cur.execute(
"""
SELECT id, title, guidance_text, order_index, phase_kind, phase_order_index, parallel_stream_order_index
FROM training_plan_template_sections
WHERE template_id = %s
ORDER BY order_index
""",
(template_id,),
)
rows_raw = cur.fetchall()
rows = [r2d(r) for r in rows_raw]
if not rows:
_clear_unit_plan_content(cur, unit_id)
pid = _ensure_default_whole_group_phase(cur, unit_id, order_index=0)
cur.execute(
"""
INSERT INTO training_unit_sections (
training_unit_id, phase_id, parallel_stream_id, order_index, title, guidance_notes
) VALUES (%s, %s, NULL, 0, 'Hauptteil', NULL)
""",
(unit_id, pid),
)
return
phases_payload = _template_rows_to_phases_payload(rows)
_clear_unit_plan_content(cur, unit_id)
_replace_unit_phases(
cur,
unit_id,
phases_payload,
profile_id,
role,
unit_created_by,
)
def _fetch_training_plan_template_row(cur, tid: int) -> Dict[str, Any]:
cur.execute("SELECT * FROM training_plan_templates WHERE id = %s", (tid,))
r = cur.fetchone()
if not r:
raise HTTPException(status_code=404, detail="Trainingsvorlage nicht gefunden")
return r2d(r)
def _template_assert_readable(cur, row: Dict[str, Any], profile_id: int, role: Optional[str]) -> None:
if is_platform_admin(role):
return
if not library_content_visible_to_profile(
cur,
profile_id,
row.get("visibility") or "club",
row.get("club_id"),
row.get("created_by"),
role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für diese Vorlage")
def _template_access(cur, tid: int, profile_id: int, role: str) -> Dict[str, Any]:
"""Lesender Zugriff (Liste der Vorlage für Einheit); Schreiben: _template_assert_writable."""
row = _fetch_training_plan_template_row(cur, tid)
_template_assert_readable(cur, row, profile_id, role)
return row
# ── Vorlagen ────────────────────────────────────────────────────────────
@router.get("/training-plan-templates")
def list_training_plan_templates(tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
vis_clause, vis_params = library_content_visibility_sql(
alias="t",
profile_id=profile_id,
role=role,
effective_club_id=tenant.effective_club_id,
)
cur.execute(
f"""
SELECT t.*,
(SELECT COUNT(*) FROM training_plan_template_sections s WHERE s.template_id = t.id)
AS sections_count,
COALESCE(
(
SELECT json_agg(
json_build_object(
'id', s.id,
'order_index', s.order_index,
'title', s.title,
'guidance_text', s.guidance_text,
'phase_kind', s.phase_kind,
'phase_order_index', s.phase_order_index,
'parallel_stream_order_index', s.parallel_stream_order_index
)
ORDER BY s.order_index
)
FROM training_plan_template_sections s
WHERE s.template_id = t.id
),
'[]'::json
) AS sections
FROM training_plan_templates t
WHERE ({vis_clause})
ORDER BY t.updated_at DESC NULLS LAST, t.name
""",
vis_params,
)
return [r2d(r) for r in cur.fetchall()]
@router.get("/training-plan-templates/{template_id}")
def get_training_plan_template(template_id: int, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row = _template_access(cur, template_id, profile_id, role)
cur.execute(
"""
SELECT *
FROM training_plan_template_sections
WHERE template_id = %s
ORDER BY order_index
""",
(template_id,),
)
row["sections"] = [r2d(r) for r in cur.fetchall()]
return row
@router.post("/training-plan-templates")
def create_training_plan_template(data: dict, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Trainer dürfen Vorlagen anlegen")
name = (data.get("name") or "").strip()
if not name:
raise HTTPException(status_code=400, detail="name ist Pflicht")
vis_raw = data.get("visibility")
visibility = (vis_raw if isinstance(vis_raw, str) else "club").strip() or "club"
club_id = data.get("club_id")
if club_id in ("", []):
club_id = None
if visibility == "club" and club_id is None:
club_id = tenant.effective_club_id
sections_in = data.get("sections") or []
with get_db() as conn:
cur = get_cursor(conn)
assert_valid_governance_visibility(cur, profile_id, role, visibility, club_id)
cur.execute(
"""
INSERT INTO training_plan_templates (club_id, created_by, name, description, visibility)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""",
(club_id, profile_id, name, data.get("description"), visibility),
)
tid = cur.fetchone()["id"]
_insert_training_plan_template_sections(cur, tid, sections_in)
conn.commit()
return get_training_plan_template(tid, tenant)
@router.put("/training-plan-templates/{template_id}")
def update_training_plan_template(template_id: int, data: dict, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row_prev = _fetch_training_plan_template_row(cur, template_id)
assert_library_content_editable(cur, profile_id, role, row_prev)
merged_vis = row_prev.get("visibility") or "club"
merged_club = row_prev.get("club_id")
if "visibility" in data:
v_in = data.get("visibility")
if not isinstance(v_in, str) or v_in not in ("private", "club", "official"):
raise HTTPException(status_code=400, detail="visibility ungültig")
merged_vis = v_in
if "club_id" in data:
merged_club = data.get("club_id")
if merged_club in ("", []):
merged_club = None
if merged_vis == "club" and merged_club is None:
merged_club = tenant.effective_club_id
if "visibility" in data or "club_id" in data:
assert_library_content_governance_transition(
cur, profile_id, role, row_prev, merged_vis, merged_club
)
assert_valid_governance_visibility(cur, profile_id, role, merged_vis, merged_club)
fields = []
params: List[Any] = []
if "name" in data:
name = data.get("name")
name = name.strip() if isinstance(name, str) else ""
if not name:
raise HTTPException(status_code=400, detail="name ist Pflicht")
fields.append("name = %s")
params.append(name)
if "description" in data:
fields.append("description = %s")
params.append(data.get("description"))
if "club_id" in data:
fields.append("club_id = %s")
params.append(merged_club)
if "visibility" in data:
fields.append("visibility = %s")
params.append(merged_vis)
fields.append("updated_at = NOW()")
params.append(template_id)
cur.execute(
f"""
UPDATE training_plan_templates SET {", ".join(fields)}
WHERE id = %s
""",
tuple(params),
)
if "sections" in data:
cur.execute(
"DELETE FROM training_plan_template_sections WHERE template_id = %s", (template_id,)
)
sections_in = data["sections"] or []
_insert_training_plan_template_sections(cur, template_id, sections_in)
conn.commit()
return get_training_plan_template(template_id, tenant)
@router.delete("/training-plan-templates/{template_id}")
def delete_training_plan_template(template_id: int, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
row_del = _fetch_training_plan_template_row(cur, template_id)
assert_library_content_deletable(cur, profile_id, role, row_del)
cur.execute("DELETE FROM training_plan_templates WHERE id = %s", (template_id,))
conn.commit()
return {"ok": True}
# ── Einheiten ─────────────────────────────────────────────────────────────
@router.get("/training-units")
def list_training_units(
group_id: Optional[int] = Query(default=None),
club_id: Optional[int] = Query(default=None),
start_date: Optional[str] = Query(default=None),
end_date: Optional[str] = Query(default=None),
status: Optional[str] = Query(default=None),
assigned_to_me: bool = Query(default=False),
debrief_pending: bool = Query(
default=False,
description="Nur abgeschlossene Einheiten ohne gesetzte Rückschau (debrief_completed_at IS NULL)",
),
sort: str = Query(default="desc"),
limit: Optional[int] = Query(default=None),
cursor_planned_date: Optional[str] = Query(
default=None,
description="Keyset: YYYY-MM-DD der letzten Zeile (mit cursor_id)",
),
cursor_planned_time: Optional[str] = Query(
default=None,
description="Keyset: HH:MM oder HH:MM:SS; weglassen/leer wenn planned_time_start NULL",
),
cursor_id: Optional[int] = Query(
default=None,
ge=1,
description="Keyset: id der letzten Zeile (mit cursor_planned_date)",
),
tenant: TenantContext = Depends(get_tenant_context),
):
group_id = unwrap_query_default(group_id)
club_id = unwrap_query_default(club_id)
start_date = unwrap_query_default(start_date)
end_date = unwrap_query_default(end_date)
status = unwrap_query_default(status)
assigned_to_me = unwrap_query_default(assigned_to_me)
debrief_pending = unwrap_query_default(debrief_pending)
sort = unwrap_query_default(sort)
limit = unwrap_query_default(limit)
cursor_planned_date = unwrap_query_default(cursor_planned_date)
cursor_planned_time = unwrap_query_default(cursor_planned_time)
cursor_id = unwrap_query_default(cursor_id)
profile_id = tenant.profile_id
role = tenant.global_role
gid = _optional_positive_int(group_id, "group_id") if group_id else None
cid = _optional_positive_int(club_id, "club_id") if club_id else None
if gid and cid:
raise HTTPException(status_code=400, detail="Nur eines der Parameter group_id oder club_id angeben")
order_dir = "ASC" if (sort or "").strip().lower() == "asc" else "DESC"
lim: Optional[int] = None
if limit is not None:
try:
lim = int(limit)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="limit ungültig")
if lim < 1:
raise HTTPException(status_code=400, detail="limit ungültig")
lim = min(lim, 250)
c_id_q = cursor_id
c_date_raw = (cursor_planned_date or "").strip() or None
time_nonempty = (cursor_planned_time or "").strip() != ""
has_cursor_partial = (
(c_id_q is not None) != (c_date_raw is not None) or (time_nonempty and c_id_q is None)
)
if has_cursor_partial:
raise HTTPException(
status_code=400,
detail="cursor_planned_date und cursor_id müssen zusammen gesetzt werden",
)
use_keyset = c_id_q is not None
if use_keyset and lim is None:
raise HTTPException(status_code=400, detail="Keyset: Parameter limit ist erforderlich")
cursor_d: Optional[date] = None
cursor_t: Optional[dt_time] = None
cursor_t_null = False
if use_keyset:
assert c_id_q is not None and c_date_raw is not None
cursor_d = _parse_cursor_planned_date(c_date_raw)
cursor_t = _parse_cursor_planned_time_optional(cursor_planned_time)
cursor_t_null = cursor_t is None
with get_db() as conn:
cur = get_cursor(conn)
if cid and role not in ["admin", "superadmin"]:
_assert_club_visible_for_trainer(cur, cid, profile_id, role)
if gid and role not in ["admin", "superadmin"]:
cur.execute(
"SELECT trainer_id, co_trainer_ids, club_id FROM training_groups WHERE id = %s AND status = 'active'",
(gid,),
)
gr = cur.fetchone()
if not gr:
raise HTTPException(status_code=404, detail="Trainingsgruppe nicht gefunden")
gd = dict(gr)
cob = gd.get("co_trainer_ids") or []
ok_staff = gd.get("trainer_id") == profile_id or profile_id in cob
ok_org = can_manage_club_org(cur, profile_id, int(gd["club_id"]), role)
ok_member = _profile_active_in_club(cur, int(gd["club_id"]), profile_id)
if not (ok_staff or ok_org or ok_member):
raise HTTPException(status_code=403, detail="Keine Berechtigung für diese Gruppe")
query = """
SELECT tu.*,
tg.name as group_name,
tg.weekday as group_weekday,
tg.club_id AS group_club_id,
c.name as club_name,
p.name as trainer_name,
p.name as creator_name,
COALESCE(tu.lead_trainer_profile_id, tg.trainer_id) AS effective_lead_trainer_profile_id,
COALESCE(tu.assistant_trainer_profile_ids, tg.co_trainer_ids, '[]'::jsonb)
AS effective_assistant_trainer_profile_ids,
leadp.name AS lead_trainer_name
"""
query += "," + _ORIGIN_LINEAGE_FIELDS
query += """
FROM training_units tu
LEFT JOIN training_groups tg ON tu.group_id = tg.id
LEFT JOIN clubs c ON tg.club_id = c.id
LEFT JOIN profiles p ON tu.created_by = p.id
LEFT JOIN profiles leadp ON leadp.id = COALESCE(tu.lead_trainer_profile_id, tg.trainer_id)
"""
query += _ORIGIN_LINEAGE_JOIN
where = []
params = []
skip_involvement_filter = role in ("admin", "superadmin")
if not skip_involvement_filter and cid is not None:
if can_manage_club_org(cur, profile_id, cid, role):
skip_involvement_filter = True
if not skip_involvement_filter and gid is not None:
cur.execute(
"SELECT club_id FROM training_groups WHERE id = %s AND status = 'active'",
(gid,),
)
gcx = cur.fetchone()
if gcx and gcx.get("club_id") is not None:
if can_manage_club_org(cur, profile_id, int(gcx["club_id"]), role):
skip_involvement_filter = True
if not skip_involvement_filter:
where.append(
"(tu.created_by = %s OR tg.trainer_id = %s OR tu.lead_trainer_profile_id = %s OR "
"COALESCE(tu.assistant_trainer_profile_ids, tg.co_trainer_ids, '[]'::jsonb) "
"@> jsonb_build_array(%s::int))"
)
params.extend([profile_id, profile_id, profile_id, profile_id])
where.append("tu.framework_slot_id IS NULL")
if gid:
where.append("tu.group_id = %s")
params.append(gid)
if cid:
where.append("tg.club_id = %s")
params.append(cid)
if assigned_to_me:
where.append(
"(COALESCE(tu.lead_trainer_profile_id, tg.trainer_id) = %s OR "
"COALESCE(tu.assistant_trainer_profile_ids, tg.co_trainer_ids, '[]'::jsonb) "
"@> jsonb_build_array(%s::int))"
)
params.extend([profile_id, profile_id])
if start_date:
where.append("tu.planned_date >= %s")
params.append(start_date)
if end_date:
where.append("tu.planned_date <= %s")
params.append(end_date)
if debrief_pending:
where.append("tu.status = %s")
params.append("completed")
where.append("tu.debrief_completed_at IS NULL")
elif status:
where.append("tu.status = %s")
params.append(status)
if use_keyset:
assert cursor_d is not None and c_id_q is not None
ks_sql, ks_params = _training_units_keyset_sql(
order_dir,
cursor_d,
cursor_t_null,
cursor_t,
int(c_id_q),
)
where.append(ks_sql)
params.extend(ks_params)
if where:
query += " WHERE " + " AND ".join(where)
query += (
f" ORDER BY tu.planned_date {order_dir}, (tu.planned_time_start IS NULL) ASC, "
f"tu.planned_time_start {order_dir} NULLS LAST, tu.id {order_dir}"
)
if lim is not None:
query += " LIMIT %s"
params.append(lim)
cur.execute(query, params)
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.get("/training-units/exercises-club-visibility-queue")
def exercises_club_visibility_queue(
start_date: Optional[str] = Query(default=None),
end_date: Optional[str] = Query(default=None),
assigned_to_me: bool = Query(default=True),
limit_units: int = Query(default=80, ge=1, le=150),
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Übungen in deinen Trainingseinheiten (Zeitfenster), die für den jeweiligen Verein der Gruppe
noch nicht vereinsweit sichtbar sind — für Dashboard & Freigabe-Workflow.
"""
profile_id = tenant.profile_id
role = tenant.global_role
if start_date is None:
start_date = (date.today() - timedelta(days=45)).isoformat()
if end_date is None:
end_date = (date.today() + timedelta(days=365)).isoformat()
units = list_training_units(
group_id=None,
club_id=None,
start_date=start_date,
end_date=end_date,
status=None,
assigned_to_me=assigned_to_me,
debrief_pending=False,
sort="asc",
limit=limit_units,
tenant=tenant,
)
unit_ids = [int(u["id"]) for u in units if u.get("id") is not None]
if not unit_ids:
return {"items": []}
placeholders = ",".join(["%s"] * len(unit_ids))
items: List[Dict[str, Any]] = []
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
f"""
SELECT DISTINCT tu.id AS unit_id,
tu.planned_date,
tg.name AS group_name,
tg.club_id AS target_club_id,
c.name AS target_club_name,
tusi.exercise_id AS exercise_id
FROM training_units tu
INNER JOIN training_groups tg ON tu.group_id = tg.id
LEFT JOIN clubs c ON c.id = tg.club_id
INNER JOIN training_unit_sections tus ON tus.training_unit_id = tu.id
INNER JOIN training_unit_section_items tusi ON tusi.section_id = tus.id
WHERE tu.id IN ({placeholders})
AND tu.framework_slot_id IS NULL
AND tusi.item_type = 'exercise'
AND tusi.exercise_id IS NOT NULL
""",
tuple(unit_ids),
)
pairs = [r2d(r) for r in cur.fetchall()]
if not pairs:
return {"items": []}
ex_ids = sorted(
{int(p["exercise_id"]) for p in pairs if p.get("exercise_id") is not None}
)
if not ex_ids:
return {"items": []}
exercises_map: Dict[int, Dict[str, Any]] = {}
ph = ",".join(["%s"] * len(ex_ids))
cur.execute(
f"""
SELECT id, title, visibility, club_id, created_by, status
FROM exercises
WHERE id IN ({ph})
""",
tuple(ex_ids),
)
for r in cur.fetchall():
d = r2d(r)
exercises_map[int(d["id"])] = d
agg: Dict[tuple, Dict[str, Any]] = {}
for p in pairs:
try:
ex_id = int(p["exercise_id"])
except (TypeError, ValueError):
continue
tc_raw = p.get("target_club_id")
if tc_raw is None:
continue
tc = int(tc_raw)
key = (ex_id, tc)
if key not in agg:
agg[key] = {
"exercise_id": ex_id,
"target_club_id": tc,
"target_club_name": (p.get("target_club_name") or "").strip(),
"units": [],
}
uid = p.get("unit_id")
if uid is None:
continue
agg[key]["units"].append(
{
"id": int(uid),
"planned_date": str(p["planned_date"]) if p.get("planned_date") is not None else "",
"group_name": (p.get("group_name") or "").strip(),
}
)
for _key, blob in agg.items():
ex_id = blob["exercise_id"]
tc = blob["target_club_id"]
ex = exercises_map.get(ex_id)
if not ex:
continue
if not _exercise_needs_club_visibility_for_target(ex, tc):
continue
uniq_units = {u["id"]: u for u in blob["units"]}.values()
ulist = sorted(
uniq_units,
key=lambda x: (x.get("planned_date") or "", x.get("id")),
)
cb = ex.get("created_by")
cb_int = int(cb) if cb is not None else None
can_promote = _caller_may_promote_exercise_to_club(cur, cb_int, profile_id, role, tc)
vis = (ex.get("visibility") or "private").strip().lower()
st = (ex.get("status") or "draft").strip().lower()
ecid = ex.get("club_id")
items.append(
{
"exercise_id": ex_id,
"title": (ex.get("title") or f"Übung #{ex_id}").strip() or f"Übung #{ex_id}",
"visibility": vis,
"status": st,
"club_id": int(ecid) if ecid is not None else None,
"created_by": cb_int,
"target_club_id": tc,
"target_club_name": blob.get("target_club_name") or "",
"can_promote": can_promote,
"units": ulist,
}
)
items.sort(
key=lambda x: (
(x["units"][0].get("planned_date") if x["units"] else ""),
x["title"],
)
)
return {"items": items}
@router.get("/training-units/{unit_id}")
def get_training_unit(unit_id: int, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT tu.*,
tg.name as group_name,
tg.weekday as group_weekday,
tg.time_start as group_time_start,
tg.time_end as group_time_end,
tg.location as group_location,
c.name as club_name,
p.name as trainer_name,
p.name as creator_name,
tg.trainer_id AS trainer_id,
tg.co_trainer_ids AS co_trainer_ids,
tg.club_id AS group_club_id,
COALESCE(tu.lead_trainer_profile_id, tg.trainer_id) AS effective_lead_trainer_profile_id,
COALESCE(tu.assistant_trainer_profile_ids, tg.co_trainer_ids, '[]'::jsonb)
AS effective_assistant_trainer_profile_ids,
leadp.name AS lead_trainer_name,
""" + _ORIGIN_LINEAGE_FIELDS.strip() + """
FROM training_units tu
LEFT JOIN training_groups tg ON tu.group_id = tg.id
LEFT JOIN clubs c ON tg.club_id = c.id
LEFT JOIN profiles p ON tu.created_by = p.id
LEFT JOIN profiles leadp ON leadp.id = COALESCE(tu.lead_trainer_profile_id, tg.trainer_id)
""" + _ORIGIN_LINEAGE_JOIN.strip() + """
WHERE tu.id = %s
""",
(unit_id,),
)
unit = cur.fetchone()
if not unit:
raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden")
unit = r2d(unit)
if unit.get("framework_slot_id"):
if role not in ["admin", "superadmin"]:
cur.execute(
"""
SELECT fp.created_by FROM training_framework_slots s
JOIN training_framework_programs fp ON fp.id = s.framework_program_id
WHERE s.id = %s
""",
(unit["framework_slot_id"],),
)
fr = cur.fetchone()
cb = fr["created_by"] if fr else None
if unit["created_by"] != profile_id and cb != profile_id:
raise HTTPException(status_code=403, detail="Keine Berechtigung")
else:
if not unit.get("group_id"):
raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden")
_assert_training_unit_permission(cur, unit, profile_id, role)
_hydrate_training_unit_payload(cur, unit)
return unit
@router.post("/training-units/{unit_id}/apply-training-module")
def apply_training_module_to_training_unit(
unit_id: int, data: dict, tenant: TenantContext = Depends(get_tenant_context)
):
"""Kopiert Modul-Positionen ans Ende eines Abschnitts.
Ziel: `section_order_index` in einer whole_group-Phase (Standard) oder
zusätzlich `phase_order_index` + `parallel_stream_order_index` für einen Stream.
"""
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Trainer dürfen Module übernehmen")
module_id_raw = data.get("module_id")
if module_id_raw is None or module_id_raw == "":
raise HTTPException(status_code=400, detail="module_id ist Pflicht")
try:
module_id = int(module_id_raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="module_id ungültig")
soy = data.get("section_order_index")
try:
section_order_index = int(soy)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="section_order_index ist Pflicht (Ganzzahl)")
if section_order_index < 0:
raise HTTPException(status_code=400, detail="section_order_index ungültig")
ps_raw = data.get("parallel_stream_order_index")
parallel_stream_oi: Optional[int] = None
if ps_raw is not None and ps_raw != "":
try:
parallel_stream_oi = int(ps_raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="parallel_stream_order_index ungültig")
if parallel_stream_oi < 0:
raise HTTPException(status_code=400, detail="parallel_stream_order_index ungültig")
phase_oi: Optional[int] = None
ph_raw = data.get("phase_order_index")
if ph_raw is not None and ph_raw != "":
try:
phase_oi = int(ph_raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="phase_order_index ungültig")
if phase_oi < 0:
raise HTTPException(status_code=400, detail="phase_order_index ungültig")
if phase_oi is not None and parallel_stream_oi is None:
raise HTTPException(
status_code=400,
detail="phase_order_index nur zusammen mit parallel_stream_order_index",
)
with get_db() as conn:
cur = get_cursor(conn)
unit_row = _training_unit_guard_row(cur, unit_id)
_assert_training_unit_permission(cur, unit_row, profile_id, role)
section_id = _resolve_training_unit_section_id_for_apply(
cur,
unit_id,
section_order_index,
phase_order_index=phase_oi,
parallel_stream_order_index=parallel_stream_oi,
)
mod_items, src_mid = load_training_module_for_apply(cur, module_id, profile_id, role)
_append_copied_module_items_to_section(cur, section_id, mod_items, src_mid)
_promote_private_exercises_used_in_unit(cur, unit_id, profile_id, role)
conn.commit()
return get_training_unit(unit_id, tenant)
@router.post("/training-units")
def create_training_unit(data: dict, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
group_id = data.get("group_id")
planned_date = data.get("planned_date")
if not group_id or not planned_date:
raise HTTPException(status_code=400, detail="group_id und planned_date sind Pflichtfelder")
plan_template_id = _optional_positive_int(data.get("plan_template_id"), "plan_template_id")
with get_db() as conn:
cur = get_cursor(conn)
_can_access_group_for_create(cur, group_id, profile_id, role)
tpl_id_safe = None
if plan_template_id:
_template_access(cur, plan_template_id, profile_id, role)
tpl_id_safe = plan_template_id
cur.execute(
"SELECT trainer_id FROM training_groups WHERE id = %s",
(int(group_id),),
)
g0 = cur.fetchone()
default_group_trainer = g0["trainer_id"] if g0 else None
lead_ins: Optional[int] = None
if "lead_trainer_profile_id" in data:
lead_ins = _normalize_lead_trainer_profile_id(
cur,
int(group_id),
data.get("lead_trainer_profile_id"),
profile_id,
role,
profile_id,
)
assistant_val: Any = None
assistant_set = False
if "assistant_trainer_profile_ids" in data:
assistant_set = True
eff_lead_for_co = lead_ins if lead_ins is not None else default_group_trainer
assistant_val = _normalize_assistant_trainer_profile_ids(
cur,
int(group_id),
data.get("assistant_trainer_profile_ids"),
profile_id,
role,
profile_id,
eff_lead_for_co,
)
base_params = (
group_id,
planned_date,
data.get("planned_time_start"),
data.get("planned_time_end"),
data.get("planned_focus"),
data.get("status", "planned"),
data.get("notes"),
data.get("trainer_notes"),
profile_id,
tpl_id_safe,
lead_ins,
)
if assistant_set:
av_db = None if assistant_val is None else PsycopgJson(assistant_val)
cur.execute(
"""
INSERT INTO training_units (
group_id, planned_date, planned_time_start, planned_time_end,
planned_focus, status, notes, trainer_notes, created_by,
plan_template_id,
lead_trainer_profile_id,
assistant_trainer_profile_ids
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
base_params + (av_db,),
)
else:
cur.execute(
"""
INSERT INTO training_units (
group_id, planned_date, planned_time_start, planned_time_end,
planned_focus, status, notes, trainer_notes, created_by,
plan_template_id,
lead_trainer_profile_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
base_params,
)
unit_id = cur.fetchone()["id"]
_assert_single_plan_content_key_create(data)
phases_in = data.get("phases")
sections_in = data.get("sections")
exercises_in = data.get("exercises")
if phases_in is not None:
_replace_unit_phases(cur, unit_id, phases_in, profile_id, role, profile_id)
elif sections_in is not None:
_replace_unit_sections(cur, unit_id, sections_in)
elif tpl_id_safe:
_instantiate_from_template(
cur,
unit_id,
tpl_id_safe,
profile_id=profile_id,
role=role,
unit_created_by=profile_id,
)
elif exercises_in is not None:
_insert_sections_from_legacy_exercises(cur, unit_id, exercises_in)
_promote_private_exercises_used_in_unit(cur, unit_id, profile_id, role)
conn.commit()
return get_training_unit(unit_id, tenant)
@router.put("/training-units/{unit_id}")
def update_training_unit(unit_id: int, data: dict, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
unit_row = _training_unit_guard_row(cur, unit_id)
_assert_training_unit_permission(cur, unit_row, profile_id, role)
is_blueprint = unit_row.get("framework_slot_id") is not None
tpl_upd = data.get("plan_template_id") if "plan_template_id" in data else None
tpl_id_val = None
if tpl_upd not in (None, ""):
tid = _optional_positive_int(tpl_upd, "plan_template_id")
if tid:
_template_access(cur, tid, profile_id, role)
tpl_id_val = tid
trainer_notes_val = None
if "trainer_notes" not in data:
cur.execute(
"SELECT trainer_notes FROM training_units WHERE id = %s",
(unit_id,),
)
row_tn = cur.fetchone()
trainer_notes_val = row_tn["trainer_notes"] if row_tn else None
else:
trainer_notes_val = data.get("trainer_notes")
if is_blueprint:
if data.get("reset_from_template"):
raise HTTPException(
status_code=400,
detail="Rahmen-Blueprints können nicht aus einer Vorlage zurückgesetzt werden",
)
if tpl_upd not in (None, ""):
raise HTTPException(
status_code=400,
detail="plan_template_id ist bei Rahmen-Blueprints nicht zulässig",
)
blueprint_fields = []
blueprint_params: List[Any] = []
if "planned_focus" in data:
blueprint_fields.append("planned_focus = %s")
blueprint_params.append(data.get("planned_focus"))
if "planned_time_start" in data:
blueprint_fields.append("planned_time_start = %s")
blueprint_params.append(data.get("planned_time_start"))
if "planned_time_end" in data:
blueprint_fields.append("planned_time_end = %s")
blueprint_params.append(data.get("planned_time_end"))
if "notes" in data:
blueprint_fields.append("notes = %s")
blueprint_params.append(data.get("notes"))
blueprint_fields.append("trainer_notes = %s")
blueprint_params.append(trainer_notes_val)
blueprint_params.append(unit_id)
cur.execute(
f"""
UPDATE training_units SET
{", ".join(blueprint_fields)},
updated_at = NOW()
WHERE id = %s
""",
tuple(blueprint_params),
)
else:
cur_lead = unit_row.get("lead_trainer_profile_id")
base_tr = unit_row.get("trainer_id")
lead_sql = ""
lead_params: List[Any] = []
assist_sql = ""
assist_params: List[Any] = []
nl: Optional[int]
if "lead_trainer_profile_id" in data:
nl = _normalize_lead_trainer_profile_id(
cur,
unit_row["group_id"],
data.get("lead_trainer_profile_id"),
profile_id,
role,
unit_row.get("created_by"),
)
lead_sql = ", lead_trainer_profile_id = %s"
lead_params.append(nl)
eff_lead_for_co = nl if nl is not None else base_tr
else:
nl = cur_lead if cur_lead is not None else base_tr
eff_lead_for_co = nl
if "assistant_trainer_profile_ids" in data:
na = _normalize_assistant_trainer_profile_ids(
cur,
unit_row["group_id"],
data.get("assistant_trainer_profile_ids"),
profile_id,
role,
unit_row.get("created_by"),
eff_lead_for_co,
)
assist_sql = ", assistant_trainer_profile_ids = %s"
assist_params.append(None if na is None else PsycopgJson(na))
debrief_frag = ""
if "debrief_completed" in data and not is_blueprint:
if data.get("debrief_completed") is True:
debrief_frag = ", debrief_completed_at = NOW()"
else:
debrief_frag = ", debrief_completed_at = NULL"
cur.execute(
f"""
UPDATE training_units SET
planned_date = COALESCE(%s, planned_date),
planned_time_start = %s,
planned_time_end = %s,
planned_focus = %s,
actual_date = %s,
actual_time_start = %s,
actual_time_end = %s,
attendance_count = %s,
status = %s,
notes = %s,
trainer_notes = %s,
plan_template_id = COALESCE(%s, plan_template_id),
updated_at = NOW()
{lead_sql}
{assist_sql}
{debrief_frag}
WHERE id = %s
""",
(
data.get("planned_date"),
data.get("planned_time_start"),
data.get("planned_time_end"),
data.get("planned_focus"),
data.get("actual_date"),
data.get("actual_time_start"),
data.get("actual_time_end"),
data.get("attendance_count"),
data.get("status"),
data.get("notes"),
trainer_notes_val,
tpl_id_val,
)
+ tuple(lead_params)
+ tuple(assist_params)
+ (unit_id,),
)
content_handled = False
if not is_blueprint and data.get("reset_from_template"):
tid = tpl_id_val or unit_row.get("plan_template_id")
if not tid:
raise HTTPException(
status_code=400,
detail="reset_from_template erfordert plan_template_id auf der Einheit oder im Request",
)
_template_access(cur, tid, profile_id, role)
cur.execute(
"UPDATE training_units SET plan_template_id = %s WHERE id = %s", (tid, unit_id)
)
_instantiate_from_template(
cur,
unit_id,
tid,
profile_id=profile_id,
role=role,
unit_created_by=int(unit_row.get("created_by") or profile_id),
)
content_handled = True
_assert_single_plan_content_key_update(data)
if not content_handled and "phases" in data:
_replace_unit_phases(
cur,
unit_id,
data.get("phases") or [],
profile_id,
role,
unit_row.get("created_by"),
)
elif not content_handled and "sections" in data:
_replace_unit_sections(cur, unit_id, data["sections"] or [])
elif not content_handled and "exercises" in data:
_clear_unit_plan_content(cur, unit_id)
_insert_sections_from_legacy_exercises(cur, unit_id, data["exercises"] or [])
if content_handled or any(k in data for k in ("phases", "sections", "exercises")):
_promote_private_exercises_used_in_unit(cur, unit_id, profile_id, role)
conn.commit()
return get_training_unit(unit_id, tenant)
@router.delete("/training-units/{unit_id}")
def delete_training_unit(unit_id: int, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
role = tenant.global_role
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT tu.created_by, tu.framework_slot_id, tg.club_id AS group_club_id
FROM training_units tu
LEFT JOIN training_groups tg ON tu.group_id = tg.id
WHERE tu.id = %s
""",
(unit_id,),
)
unit = cur.fetchone()
if not unit:
raise HTTPException(status_code=404, detail="Trainingseinheit nicht gefunden")
if unit.get("framework_slot_id"):
raise HTTPException(
status_code=400,
detail="Blueprint-Einheiten werden über das Rahmenprogramm verwaltet, nicht hier gelöscht.",
)
_assert_delete_training_unit(
cur,
role,
unit["created_by"],
profile_id,
unit.get("group_club_id"),
)
cur.execute("DELETE FROM training_units WHERE id = %s", (unit_id,))
conn.commit()
return {"ok": True}
@router.post("/training-units/from-framework-slot")
def create_training_unit_from_framework_slot(data: dict, tenant: TenantContext = Depends(get_tenant_context)):
"""Geplante Einheit aus Rahmen-Slot-Blueprint kopieren (Lineage über origin_framework_slot_id)."""
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Planungsberechtigte dürfen Trainingseinheiten erstellen")
raw_sid = data.get("framework_slot_id")
try:
slot_id = int(raw_sid)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="framework_slot_id ist ungültig")
if slot_id < 1:
raise HTTPException(status_code=400, detail="framework_slot_id ist ungültig")
group_id = data.get("group_id")
planned_date = data.get("planned_date")
if not group_id or not planned_date:
raise HTTPException(status_code=400, detail="group_id und planned_date sind Pflichtfelder")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT fp.created_by FROM training_framework_slots s
JOIN training_framework_programs fp ON fp.id = s.framework_program_id
WHERE s.id = %s
""",
(slot_id,),
)
fw_row = cur.fetchone()
if not fw_row:
raise HTTPException(status_code=404, detail="Rahmen-Slot nicht gefunden")
if role not in ["admin", "superadmin"]:
if fw_row["created_by"] is not None and fw_row["created_by"] != profile_id:
raise HTTPException(
status_code=403,
detail="Keine Berechtigung für dieses Rahmenprogramm",
)
cur.execute(
"SELECT id FROM training_units WHERE framework_slot_id = %s",
(slot_id,),
)
blueprint = cur.fetchone()
if not blueprint:
raise HTTPException(status_code=404, detail="Keine Blueprint-Einheit für diesen Slot")
_can_access_group_for_create(cur, int(group_id), profile_id, role)
new_id = _copy_blueprint_into_scheduled_unit(
cur,
int(blueprint["id"]),
int(group_id),
str(planned_date),
profile_id,
slot_id,
role,
)
_promote_private_exercises_used_in_unit(cur, new_id, profile_id, role)
conn.commit()
return get_training_unit(new_id, tenant)
@router.post("/training-units/{unit_id}/publish-to-framework")
def publish_training_unit_to_framework(
unit_id: int, data: dict, tenant: TenantContext = Depends(get_tenant_context)
):
"""Geplanten Ablauf einer Einheit als Session-Blueprint in ein Rahmenprogramm übernehmen (neu / bestehend, Slot wählbar)."""
from routers.training_framework_programs import ( # noqa: WPS433 — zyklischer Import
_assert_visibility,
_fetch_framework_row,
_insert_goal_rows,
_parse_positive_int_ids,
_replace_target_groups,
_replace_training_types,
_response_framework_detail,
)
profile_id = tenant.profile_id
role = tenant.global_role
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Planungsberechtigte dürfen Rahmenprogramme bearbeiten")
mode = (data.get("mode") or "").strip().lower()
if mode not in ("new_slot", "existing_slot"):
raise HTTPException(status_code=400, detail="mode muss new_slot oder existing_slot sein")
fw_new = data.get("new_framework")
fw_id_raw = data.get("framework_program_id")
has_new = isinstance(fw_new, dict) and len(fw_new) > 0
has_id = fw_id_raw not in (None, "")
if has_new == has_id:
raise HTTPException(
status_code=400,
detail="Entweder new_framework ODER framework_program_id angeben (nicht beides, nicht keines)",
)
framework_id: int = 0
slot_title_o: Optional[str] = None
notes_o = data.get("slot_notes")
st = data.get("slot_title")
if st is not None:
st = str(st).strip()
slot_title_o = st or None
with get_db() as conn:
cur = get_cursor(conn)
unit_row = _training_unit_guard_row(cur, unit_id)
if unit_row.get("framework_slot_id"):
raise HTTPException(
status_code=400,
detail="Nur geplante Einheiten (Kalender) können in einen Rahmen übernommen werden, keine Blueprint-Einheit",
)
_assert_training_unit_permission(cur, unit_row, profile_id, role)
if has_new:
title = (fw_new.get("title") or "").strip()
if not title:
raise HTTPException(status_code=400, detail="new_framework.title ist Pflicht")
vis = _assert_visibility(fw_new.get("visibility") or "private")
club_nf = fw_new.get("club_id")
if club_nf in ("", []):
club_nf = None
if vis == "club" and club_nf is None:
club_nf = tenant.effective_club_id
goals_in = fw_new.get("goals")
if not isinstance(goals_in, list) or not goals_in:
raise HTTPException(
status_code=400,
detail="new_framework.goals als Liste mit mindestens einem Eintrag ist Pflicht",
)
fa_id = _optional_positive_int(fw_new.get("focus_area_id"), "focus_area_id")
sd_id = _optional_positive_int(fw_new.get("style_direction_id"), "style_direction_id")
tt_ids = _parse_positive_int_ids(fw_new.get("training_type_ids"), "training_type_ids")
tg_ids = _parse_positive_int_ids(fw_new.get("target_group_ids"), "target_group_ids")
assert_valid_governance_visibility(cur, profile_id, role, vis, club_nf)
cur.execute(
"""
INSERT INTO training_framework_programs (
title, description,
planned_period_start, planned_period_end,
visibility, club_id, created_by,
focus_area_id, style_direction_id
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
title[:200],
fw_new.get("description"),
fw_new.get("planned_period_start"),
fw_new.get("planned_period_end"),
vis,
club_nf,
profile_id,
fa_id,
sd_id,
),
)
framework_id = int(cur.fetchone()["id"])
_insert_goal_rows(cur, framework_id, goals_in)
_replace_training_types(cur, framework_id, tt_ids)
_replace_target_groups(cur, framework_id, tg_ids)
else:
try:
framework_id = int(fw_id_raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="framework_program_id ungültig") from None
if framework_id < 1:
raise HTTPException(status_code=400, detail="framework_program_id ungültig")
row_fw = _fetch_framework_row(cur, framework_id)
assert_library_content_editable(cur, profile_id, role, row_fw)
ins_raw = data.get("insert_at_index")
slot_id_out: int = 0
if mode == "new_slot":
pos: Optional[int] = None
if ins_raw is not None and ins_raw != "":
try:
pos = int(ins_raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="insert_at_index ungültig") from None
if pos < 0:
raise HTTPException(status_code=400, detail="insert_at_index ungültig")
if pos is None:
cur.execute(
"""
SELECT COALESCE(MAX(sort_order), -1) + 1 AS n
FROM training_framework_slots
WHERE framework_program_id = %s
""",
(framework_id,),
)
pos = int(cur.fetchone()["n"])
else:
cur.execute(
"SELECT COUNT(*)::int AS c FROM training_framework_slots WHERE framework_program_id = %s",
(framework_id,),
)
cmax = int(cur.fetchone()["c"])
if pos > cmax:
pos = cmax
_shift_framework_slots_sort_orders_from(cur, framework_id, pos)
sid, bid = _insert_framework_slot_and_blueprint_unit(
cur,
framework_id,
pos,
slot_title_o,
notes_o,
profile_id,
)
slot_id_out = sid
_copy_scheduled_unit_plan_to_blueprint(cur, unit_id, bid, profile_id, role)
_promote_private_exercises_used_in_unit(cur, bid, profile_id, role)
else:
raw_existing = data.get("framework_slot_id")
try:
slot_id_out = int(raw_existing)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="framework_slot_id ist Pflicht und muss eine Zahl sein") from None
if slot_id_out < 1:
raise HTTPException(status_code=400, detail="framework_slot_id ist ungültig")
cur.execute(
"""
SELECT id, framework_program_id, sort_order, title, notes
FROM training_framework_slots
WHERE id = %s
""",
(slot_id_out,),
)
slot_row = cur.fetchone()
if not slot_row:
raise HTTPException(status_code=404, detail="Rahmen-Slot nicht gefunden")
if int(slot_row["framework_program_id"]) != framework_id:
raise HTTPException(status_code=400, detail="Slot gehört nicht zu diesem Rahmenprogramm")
cur.execute(
"SELECT id FROM training_units WHERE framework_slot_id = %s",
(slot_id_out,),
)
bp = cur.fetchone()
if not bp:
raise HTTPException(status_code=404, detail="Keine Blueprint-Einheit für diesen Slot")
meta_fields: List[str] = []
meta_params: List[Any] = []
if "slot_title" in data:
stn = data.get("slot_title")
title_v = (str(stn).strip() or None) if stn is not None else None
meta_fields.append("title = %s")
meta_params.append(title_v)
if "slot_notes" in data:
meta_fields.append("notes = %s")
meta_params.append(data.get("slot_notes"))
if meta_fields:
meta_params.append(slot_id_out)
cur.execute(
f"""
UPDATE training_framework_slots
SET {", ".join(meta_fields)}
WHERE id = %s
""",
tuple(meta_params),
)
bid = int(bp["id"])
_copy_scheduled_unit_plan_to_blueprint(cur, unit_id, bid, profile_id, role)
_promote_private_exercises_used_in_unit(cur, bid, profile_id, role)
cur.execute(
"UPDATE training_framework_programs SET updated_at = NOW() WHERE id = %s",
(framework_id,),
)
conn.commit()
return _response_framework_detail(framework_id, profile_id, role)
@router.post("/training-units/quick-create")
def quick_create_training_unit(data: dict, tenant: TenantContext = Depends(get_tenant_context)):
profile_id = tenant.profile_id
group_id = data.get("group_id")
planned_date = data.get("planned_date")
plan_template_id = _optional_positive_int(data.get("plan_template_id"), "plan_template_id")
if not group_id or not planned_date:
raise HTTPException(status_code=400, detail="group_id und planned_date sind Pflichtfelder")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT weekday, time_start, time_end, trainer_id, co_trainer_ids
FROM training_groups
WHERE id = %s
""",
(group_id,),
)
group = cur.fetchone()
if not group:
raise HTTPException(status_code=404, detail="Trainingsgruppe nicht gefunden")
role = tenant.global_role
co_trainers = group["co_trainer_ids"] or []
if not _has_planning_role(role):
raise HTTPException(status_code=403, detail="Nur Trainer dürfen Trainingseinheiten erstellen")
if role not in ["admin", "superadmin"]:
if group["trainer_id"] != profile_id and profile_id not in co_trainers:
raise HTTPException(status_code=403, detail="Keine Berechtigung für diese Gruppe")
tpl_id_safe = None
if plan_template_id:
_template_access(cur, plan_template_id, profile_id, role)
tpl_id_safe = plan_template_id
cur.execute(
"""
INSERT INTO training_units (
group_id, planned_date,
planned_time_start, planned_time_end,
status, created_by, plan_template_id
) VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
group_id,
planned_date,
group["time_start"],
group["time_end"],
"planned",
profile_id,
tpl_id_safe,
),
)
unit_id = cur.fetchone()["id"]
if tpl_id_safe:
_instantiate_from_template(
cur,
unit_id,
tpl_id_safe,
profile_id=profile_id,
role=role,
unit_created_by=profile_id,
)
_promote_private_exercises_used_in_unit(cur, unit_id, profile_id, role)
conn.commit()
return get_training_unit(unit_id, tenant)