shinkan-jinkendo/backend/routers/exercises.py
Lars cc4e621f95
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Test Suite / pytest-backend (push) Successful in 25s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 7s
Test Suite / playwright-tests (push) Successful in 24s
feat: enhance media upload handling and MIME type resolution
- Expanded the allowed upload MIME types to include HEIC, HEIF, and QuickTime formats.
- Introduced a new function to resolve MIME types based on file content, filename extensions, and client-provided content types.
- Updated media upload logic in both the exercises and media assets routers to utilize the new MIME resolution function.
- Adjusted frontend file input accept attributes to allow broader media types, improving user experience.
- Enhanced media library components to handle HEIC/HEIF formats with appropriate fallback messaging for browser compatibility.
2026-05-07 21:55:22 +02:00

2777 lines
104 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Exercises Router - v2.0 (Clean-Room Rebuild)
Komplett neu gebaut nach EXERCISES_API_SPEC.md v1.2
KEIN Legacy-Code aus v1 - nur M:N Relations, keine JSONB-Felder für Kataloge
"""
import hashlib
import json
import logging
import os
import re
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Tuple
from urllib.parse import quote
from fastapi import APIRouter, HTTPException, Depends, Query, Request, UploadFile, File, Form
from fastapi.responses import FileResponse, Response, StreamingResponse
from pydantic import BaseModel, Field, model_validator
from db import get_db, get_cursor, r2d
from club_tenancy import (
assert_valid_governance_visibility,
can_manage_club_org,
can_plan_in_club,
club_admin_shares_club_with_creator,
has_club_role,
is_platform_admin,
library_content_visible_to_profile,
)
from tenant_context import TenantContext, get_tenant_context, get_tenant_context_flexible, library_content_visibility_sql
from media_storage import get_effective_media_root, path_under_media_root
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["exercises"])
def _coerce_json_str_list(val: Any) -> List[str]:
"""JSON-Aggregat oder JSON-String aus PG in eine saubere str-Liste für die Listen-API."""
if val is None:
return []
if isinstance(val, list):
return [str(x) for x in val if x is not None and str(x).strip()]
if isinstance(val, str):
try:
parsed = json.loads(val)
if isinstance(parsed, list):
return [str(x) for x in parsed if x is not None and str(x).strip()]
except Exception:
return []
return []
return []
# Kanonische Fähigkeitsstufen 15 (Übung ↔ Skill-Zeile), siehe Migration 029
_CANONICAL_SKILL_LEVELS = frozenset(
{"basis", "grundlagen", "aufbau", "fortgeschritten", "optimierung"}
)
_LEGACY_SKILL_LEVEL_SLUG = {
"einsteiger": "basis",
"experte": "optimierung",
"1": "basis",
"2": "grundlagen",
"3": "aufbau",
"4": "fortgeschritten",
"5": "optimierung",
}
# SQL: numerischer Rang aus target_level (fallback required_level) für Filter
_EXERCISE_SKILL_LEVEL_RANK_SQL = """
CASE COALESCE(
NULLIF(TRIM(LOWER(es.target_level::text)), ''),
NULLIF(TRIM(LOWER(es.required_level::text)), '')
)
WHEN 'basis' THEN 1
WHEN 'grundlagen' THEN 2
WHEN 'aufbau' THEN 3
WHEN 'fortgeschritten' THEN 4
WHEN 'optimierung' THEN 5
WHEN 'einsteiger' THEN 1
WHEN 'experte' THEN 5
WHEN '1' THEN 1
WHEN '2' THEN 2
WHEN '3' THEN 3
WHEN '4' THEN 4
WHEN '5' THEN 5
ELSE NULL END
""".strip()
def normalize_exercise_skill_level(value) -> Optional[str]:
"""Wandelt Legacy-/Zahlencodes in kanonische Slugs; ungültig → None."""
if value is None:
return None
s = str(value).strip().lower()
if not s:
return None
if s in _CANONICAL_SKILL_LEVELS:
return s
return _LEGACY_SKILL_LEVEL_SLUG.get(s)
MEDIA_ROOT = Path(os.getenv("MEDIA_ROOT", str(Path(__file__).resolve().parent.parent / "media")))
MAX_EXERCISE_MEDIA = 10
# Upload-Limits (Übungs-Medien): Trainer wie bisher kleiner; Admin/Superadmin höheres Limit für große Videos
_MAX_UPLOAD_MB_USER = max(1, int(os.getenv("EXERCISE_MEDIA_MAX_UPLOAD_MB", "50")))
_MAX_UPLOAD_MB_ADMIN = max(_MAX_UPLOAD_MB_USER, int(os.getenv("EXERCISE_MEDIA_ADMIN_MAX_UPLOAD_MB", "1024")))
MAX_UPLOAD_BYTES_USER = _MAX_UPLOAD_MB_USER * 1024 * 1024
MAX_UPLOAD_BYTES_ADMIN = _MAX_UPLOAD_MB_ADMIN * 1024 * 1024
ALLOWED_UPLOAD_MIMES = frozenset(
{
"image/jpeg",
"image/png",
"image/gif",
"image/heic",
"image/heif",
"video/mp4",
"video/quicktime",
"application/pdf",
}
)
# Dateiendung → MIME wenn der Client keinen sinnvollen Content-Type sendet (häufig mobil / iOS).
_UPLOAD_FILENAME_MIME_FALLBACK = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".heic": "image/heic",
".heif": "image/heif",
".mp4": "video/mp4",
".mov": "video/quicktime",
".pdf": "application/pdf",
}
def _sniff_allowed_upload_mime(raw: bytes) -> Optional[str]:
"""Erkennt erlaubte MIME-Typen anhand weniger Magic Bytes (ohne Pillow/python-magic)."""
if len(raw) < 12:
return None
if raw[:3] == b"\xff\xd8\xff":
return "image/jpeg"
if raw[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
if raw[:6] in (b"GIF87a", b"GIF89a"):
return "image/gif"
if raw[:4] == b"%PDF":
return "application/pdf"
if raw[4:8] != b"ftyp":
return None
brand = raw[8:12]
# HEIC / HEIF (u. a. iPhone „高效率“)
if brand in (b"heic", b"heix", b"hevx", b"hevc", b"mif1", b"msf1"):
return "image/heic"
if brand in (b"isom", b"iso2", b"iso5", b"iso6", b"mp41", b"mp42", b"M4V ", b"dash", b"msdh"):
return "video/mp4"
# iPhone-Kamera / Fotos: MOV (QuickTime-Container)
if brand == b"qt ":
return "video/quicktime"
return None
def resolve_upload_mime_type(
raw: bytes,
content_type: Optional[str],
filename: Optional[str],
) -> str:
"""Ermittelt ein erlaubtes MIME (Client-Header, Magic Bytes oder Dateiendung)."""
ct = (content_type or "").split(";")[0].strip().lower()
if ct in ALLOWED_UPLOAD_MIMES:
return ct
guessed = _sniff_allowed_upload_mime(raw)
if guessed in ALLOWED_UPLOAD_MIMES:
return guessed
ext = Path(filename or "").suffix.lower()
fb = _UPLOAD_FILENAME_MIME_FALLBACK.get(ext)
if fb in ALLOWED_UPLOAD_MIMES:
return fb
raise ValueError(f"Dateityp nicht erlaubt: {ct or 'unbekannt'}")
def _upload_limit_bytes(tenant: TenantContext) -> int:
role = tenant.global_role or ""
if role in ("admin", "superadmin"):
return MAX_UPLOAD_BYTES_ADMIN
return MAX_UPLOAD_BYTES_USER
# ============================================================================
# Pydantic Models
# ============================================================================
class ExerciseCreate(BaseModel):
# Basis-Felder (goal/execution: DB-Constraint mind. eines; Wiki oft nur eines)
title: str = Field(..., min_length=3, max_length=300)
summary: Optional[str] = None
goal: Optional[str] = Field(None, max_length=5000)
execution: Optional[str] = Field(None, max_length=10000)
preparation: Optional[str] = None
trainer_notes: Optional[str] = None
# Dauer & Gruppengröße
duration_min: Optional[int] = None
duration_max: Optional[int] = None
group_size_min: Optional[int] = None
group_size_max: Optional[int] = None
# Equipment (Liste von Strings)
equipment: list[str] = []
# M:N Relations (Liste von {id: int, is_primary: bool})
focus_areas_multi: list[dict] = []
training_styles_multi: list[dict] = []
training_types_multi: list[dict] = []
target_groups_multi: list[dict] = []
age_groups: list[str] = [] # ["Kinder", "Teenager"] aus Katalog
# Skills (Liste von {skill_id: int, is_primary: bool, intensity: str, required_level: str, target_level: str})
skills: list[dict] = []
# Sichtbarkeit & Status
visibility: str = "private"
status: str = "draft"
club_id: Optional[int] = None
@model_validator(mode="after")
def normalize_goal_execution(self):
g = (self.goal or "").strip() or None
e = (self.execution or "").strip() or None
if not g and not e:
raise ValueError("Mindestens eines der Felder Ziel oder Durchführung ist erforderlich")
self.goal = g
self.execution = e
return self
class ExerciseUpdate(BaseModel):
# Alle Felder optional für Partial Update
title: Optional[str] = Field(None, min_length=3, max_length=300)
summary: Optional[str] = None
goal: Optional[str] = Field(None, max_length=5000)
execution: Optional[str] = Field(None, max_length=10000)
preparation: Optional[str] = None
trainer_notes: Optional[str] = None
duration_min: Optional[int] = None
duration_max: Optional[int] = None
group_size_min: Optional[int] = None
group_size_max: Optional[int] = None
equipment: Optional[list[str]] = None
focus_areas_multi: Optional[list[dict]] = None
training_styles_multi: Optional[list[dict]] = None
training_types_multi: Optional[list[dict]] = None
target_groups_multi: Optional[list[dict]] = None
age_groups: Optional[list[str]] = None
skills: Optional[list[dict]] = None
visibility: Optional[str] = None
status: Optional[str] = None
club_id: Optional[int] = None
# §4.2: Übung → official — angehängte Datei-Assets anheben + Copyright (nur mit expliziter Bestätigung)
promote_attached_media_for_official: Optional[bool] = None
default_official_media_copyright: Optional[str] = Field(default=None, max_length=2000)
@model_validator(mode="after")
def normalize_goal_execution(self):
if self.goal is not None:
self.goal = self.goal.strip() or None
if self.execution is not None:
self.execution = self.execution.strip() or None
return self
class ExerciseMediaUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
is_primary: Optional[bool] = None
context: Optional[str] = None
class ExerciseMediaReorder(BaseModel):
media_ids: list[int]
class ExerciseMediaFromAsset(BaseModel):
"""Bestehendes Archiv-Medium (media_assets) mit Übung verknüpfen — ohne erneuten Upload."""
media_asset_id: int = Field(..., ge=1)
title: Optional[str] = ""
description: Optional[str] = ""
context: str = "ablauf"
is_primary: bool = False
media_type: Optional[str] = None
class ExerciseVariantCreate(BaseModel):
variant_name: str = Field(..., min_length=3, max_length=200)
description: Optional[str] = None
execution_changes: Optional[str] = None
duration_min: Optional[int] = None
duration_max: Optional[int] = None
equipment_changes: Optional[list[str]] = None
difficulty_adjustment: Optional[str] = Field(None, max_length=50)
progression_level: int = Field(default=1, ge=1, le=10)
sequence_order: Optional[int] = None
prerequisite_variant_id: Optional[int] = None
class ExerciseVariantUpdate(BaseModel):
variant_name: Optional[str] = Field(None, min_length=3, max_length=200)
description: Optional[str] = None
execution_changes: Optional[str] = None
duration_min: Optional[int] = None
duration_max: Optional[int] = None
equipment_changes: Optional[list[str]] = None
difficulty_adjustment: Optional[str] = Field(None, max_length=50)
progression_level: Optional[int] = Field(None, ge=1, le=10)
sequence_order: Optional[int] = None
prerequisite_variant_id: Optional[int] = None
class ExerciseVariantsReorder(BaseModel):
variant_ids: list[int]
_VALID_EXERCISE_STATUS_BULK = frozenset({"draft", "in_review", "approved", "archived"})
_LIST_FILTER_VISIBILITY = frozenset({"private", "club", "official"})
_LIST_FILTER_STATUS = frozenset({"draft", "in_review", "approved", "archived"})
_MAX_BULK_METADATA_IDS = 500
_MAX_BULK_RELATION_IDS_PER_KIND = 80
class ExerciseBulkMetadataPatch(BaseModel):
"""Massenänderung: Sichtbarkeit/Status und/oder Zuordnungen (Kataloge)."""
exercise_ids: list[int] = Field(..., min_length=1, max_length=_MAX_BULK_METADATA_IDS)
visibility: Optional[str] = Field(None, pattern="^(private|club|official)$")
status: Optional[str] = None
club_id: Optional[int] = Field(default=None, ge=1)
focus_area_ids: Optional[list[int]] = Field(default=None, max_length=_MAX_BULK_RELATION_IDS_PER_KIND)
style_direction_ids: Optional[list[int]] = Field(default=None, max_length=_MAX_BULK_RELATION_IDS_PER_KIND)
training_type_ids: Optional[list[int]] = Field(default=None, max_length=_MAX_BULK_RELATION_IDS_PER_KIND)
target_group_ids: Optional[list[int]] = Field(default=None, max_length=_MAX_BULK_RELATION_IDS_PER_KIND)
promote_attached_media_for_official: bool = False
default_official_media_copyright: Optional[str] = Field(default=None, max_length=2000)
@model_validator(mode="after")
def at_least_one_patch_field(self):
if (
self.visibility is None
and self.status is None
and self.focus_area_ids is None
and self.style_direction_ids is None
and self.training_type_ids is None
and self.target_group_ids is None
):
raise ValueError(
"Mindestens eines der Felder visibility, status, focus_area_ids, style_direction_ids, "
"training_type_ids oder target_group_ids angeben"
)
return self
# ============================================================================
# Helper Functions
# ============================================================================
def _row_created_by(row) -> int:
if row is None:
return None
if isinstance(row, dict):
return row.get("created_by")
return row[0]
def _detect_embed_platform(url: str) -> Optional[str]:
if not url:
return None
u = url.lower()
if "youtube.com" in u or "youtu.be" in u:
return "youtube"
if "vimeo.com" in u:
return "vimeo"
if "instagram.com" in u:
return "instagram"
if "tiktok.com" in u:
return "tiktok"
return None
def _media_type_from_mime(mime: Optional[str]) -> str:
m = (mime or "").strip().lower()
if m.startswith("image/"):
return "image"
if m.startswith("video/"):
return "video"
if m == "application/pdf":
return "document"
return "document"
def _fetch_exercise_governance_row(cur, exercise_id: int) -> Optional[dict]:
cur.execute(
"SELECT id, visibility, club_id, created_by FROM exercises WHERE id = %s",
(exercise_id,),
)
row = cur.fetchone()
return r2d(row) if row else None
def _assert_can_view_exercise_media(
cur,
exercise_id: int,
tenant: TenantContext,
) -> dict:
"""403 wenn Übung für den Nutzer nicht lesbar (wie GET /exercises/{id})."""
ex = _fetch_exercise_governance_row(cur, exercise_id)
if not ex:
raise HTTPException(status_code=404, detail="Übung nicht gefunden")
if not library_content_visible_to_profile(
cur,
tenant.profile_id,
(ex.get("visibility") or "").strip().lower(),
ex.get("club_id"),
ex.get("created_by"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für diese Übung")
return ex
def _assert_can_edit_exercise(cur, exercise_id: int, tenant: TenantContext) -> None:
"""Übung inhaltlich bearbeiten: Ersteller, Plattform-Admin, oder Planungsberechtigter im Verein (club-Übungen)."""
profile_id = tenant.profile_id
role = tenant.global_role or ""
cur.execute(
"SELECT created_by, visibility, club_id FROM exercises WHERE id = %s",
(exercise_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Übung nicht gefunden")
rd = r2d(row)
owner = rd.get("created_by")
if owner is not None:
owner = int(owner)
if owner == profile_id:
return
if is_platform_admin(role):
return
ex_vis = (rd.get("visibility") or "private").strip().lower()
ex_cid_raw = rd.get("club_id")
ex_cid = int(ex_cid_raw) if ex_cid_raw is not None else None
if ex_vis == "club" and ex_cid is not None and can_plan_in_club(cur, profile_id, ex_cid, role):
return
raise HTTPException(status_code=403, detail="Keine Berechtigung für diese Übung")
def _variant_equipment_json(changes: Optional[list]) -> str:
return json.dumps(changes if changes else [])
def _normalize_variant_equipment_list(val) -> list:
if val is None:
return []
if isinstance(val, list):
return val
if isinstance(val, str):
try:
return json.loads(val)
except Exception:
return []
return []
def _validate_variant_prerequisite(cur, exercise_id: int, prereq_id: Optional[int]) -> None:
if prereq_id is None:
return
cur.execute(
"SELECT 1 FROM exercise_variants WHERE id = %s AND exercise_id = %s",
(prereq_id, exercise_id),
)
if not cur.fetchone():
raise HTTPException(status_code=400, detail="Voraussetzungs-Variante gehört nicht zu dieser Übung")
def _fetch_variant_row(cur, exercise_id: int, variant_id: int) -> dict:
cur.execute(
"""SELECT id, variant_name, description, execution_changes,
duration_min, duration_max, equipment_changes, difficulty_adjustment,
progression_level, sequence_order, prerequisite_variant_id, created_at
FROM exercise_variants WHERE id = %s AND exercise_id = %s""",
(variant_id, exercise_id),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Variante nicht gefunden")
return r2d(row)
def _count_exercise_media(cur, exercise_id: int) -> int:
cur.execute("SELECT COUNT(*) AS c FROM exercise_media WHERE exercise_id = %s", (exercise_id,))
r = cur.fetchone()
return int(r["c"] if isinstance(r, dict) else r[0])
_MIN_OFFICIAL_MEDIA_COPYRIGHT_LEN = 3
def _fetch_exercise_linked_file_assets(cur, exercise_id: int) -> List[Dict[str, Any]]:
cur.execute(
"""
SELECT ma.id, ma.visibility, ma.club_id, ma.lifecycle_state, ma.copyright_notice,
ma.original_filename
FROM exercise_media em
INNER JOIN media_assets ma ON ma.id = em.media_asset_id
WHERE em.exercise_id = %s
""",
(exercise_id,),
)
return [r2d(r) for r in cur.fetchall()]
def _normalize_media_copyright_notice(val: Any) -> str:
return (val or "").strip()
def apply_official_exercise_media_rules(
cur,
exercise_id: int,
next_visibility: str,
*,
promote_attached_media: bool,
default_official_media_copyright: Optional[str],
) -> None:
"""
§4.2 MEDIA_ASSETS_AND_ARCHIVE_SPEC: Offizielle Übung mit Datei-Assets —
nur aktive Assets, Sichtbarkeit official, Copyright mindestens konfiguriert.
Bei Zustimmung werden Sichtbarkeit und fehlende Copyrights gesetzt.
"""
nv = (next_visibility or "private").strip().lower()
if nv != "official":
return
rows = _fetch_exercise_linked_file_assets(cur, exercise_id)
if not rows:
return
blocking_lc: List[Dict[str, Any]] = []
need_promo: List[Dict[str, Any]] = []
need_cr_ids: List[int] = []
for r in rows:
aid = int(r["id"])
lc = (r.get("lifecycle_state") or "").strip().lower()
vis = (r.get("visibility") or "").strip().lower()
cr = _normalize_media_copyright_notice(r.get("copyright_notice"))
if lc != "active":
blocking_lc.append(
{
"media_asset_id": aid,
"lifecycle_state": lc,
"visibility": vis,
"original_filename": r.get("original_filename"),
}
)
continue
if vis != "official":
need_promo.append(
{
"media_asset_id": aid,
"visibility": vis,
"original_filename": r.get("original_filename"),
}
)
if len(cr) < _MIN_OFFICIAL_MEDIA_COPYRIGHT_LEN:
need_cr_ids.append(aid)
if blocking_lc:
raise HTTPException(
status_code=422,
detail={
"code": "OFFICIAL_MEDIA_LIFECYCLE",
"message": (
"Nicht aktive Archiv-Medien dürfen nicht an einer offiziellen Übung hängen "
"(Papierkorb/Recovery zuerst)."
),
"media_assets": blocking_lc,
},
)
default_cr = _normalize_media_copyright_notice(default_official_media_copyright)
if need_promo and not promote_attached_media:
raise HTTPException(
status_code=422,
detail={
"code": "OFFICIAL_MEDIA_CONFIRM_REQUIRED",
"message": "Zugeordnete Dateien sind noch nicht offiziell — Bestätigung erforderlich.",
"assets_need_visibility_promotion": need_promo,
"assets_missing_copyright": sorted(set(need_cr_ids)),
},
)
if need_cr_ids and len(default_cr) < _MIN_OFFICIAL_MEDIA_COPYRIGHT_LEN:
raise HTTPException(
status_code=422,
detail={
"code": "OFFICIAL_MEDIA_CONFIRM_REQUIRED",
"message": (
"Für offizielle Übungen ist ein Copyright-Vermerk pro Datei erforderlich "
f"(mind. {_MIN_OFFICIAL_MEDIA_COPYRIGHT_LEN} Zeichen)."
),
"assets_need_visibility_promotion": [],
"assets_missing_copyright": sorted(set(need_cr_ids)),
},
)
promo_ids = [int(x["media_asset_id"]) for x in need_promo] if promote_attached_media else []
if promo_ids:
ph = ",".join(["%s"] * len(promo_ids))
cur.execute(
f"""
UPDATE media_assets
SET visibility = 'official', club_id = NULL, updated_at = NOW()
WHERE id IN ({ph}) AND lower(trim(lifecycle_state)) = 'active'
""",
tuple(promo_ids),
)
if need_cr_ids and len(default_cr) >= _MIN_OFFICIAL_MEDIA_COPYRIGHT_LEN:
ph = ",".join(["%s"] * len(need_cr_ids))
cur.execute(
f"""
UPDATE media_assets
SET copyright_notice = %s, updated_at = NOW()
WHERE id IN ({ph})
AND (
copyright_notice IS NULL
OR LENGTH(TRIM(copyright_notice)) < %s
)
""",
(default_cr, *need_cr_ids, _MIN_OFFICIAL_MEDIA_COPYRIGHT_LEN),
)
def _abs_media_path(file_path_db: str, media_root: Path) -> Optional[Path]:
if not file_path_db or file_path_db.startswith("http"):
return None
rel = file_path_db.lstrip("/")
if rel.startswith("media/"):
rel = rel[len("media/") :]
p = (media_root / rel).resolve()
try:
p.relative_to(media_root.resolve())
except ValueError:
return None
return p
def _resolve_local_media_file(
media_root: Path,
file_path_db: Optional[str],
asset_storage_key: Optional[str],
) -> Optional[Path]:
if asset_storage_key:
return path_under_media_root(media_root, asset_storage_key)
return _abs_media_path(file_path_db or "", media_root) if file_path_db else None
def enrich_exercise_detail(exercise_id: int, cur) -> dict:
"""
Lädt alle M:N Relations für eine Übung und gibt ein vollständiges
Exercise-Objekt zurück (wie in API-Spec GET /exercises/{id}).
"""
# Basis-Exercise
cur.execute(
"""SELECT e.*, p.name as creator_name, c.name as club_name
FROM exercises e
LEFT JOIN profiles p ON e.created_by = p.id
LEFT JOIN clubs c ON e.club_id = c.id
WHERE e.id = %s""",
(exercise_id,)
)
row = cur.fetchone()
if not row:
return None
exercise = r2d(row)
# Equipment JSONB → List
if exercise.get("equipment"):
exercise["equipment"] = exercise["equipment"] if isinstance(exercise["equipment"], list) else []
else:
exercise["equipment"] = []
# Focus Areas (M:N)
cur.execute(
"""SELECT efa.id, efa.focus_area_id, fa.name, fa.abbreviation, fa.color, fa.icon, efa.is_primary
FROM exercise_focus_areas efa
JOIN focus_areas fa ON efa.focus_area_id = fa.id
WHERE efa.exercise_id = %s
ORDER BY efa.is_primary DESC, fa.name""",
(exercise_id,)
)
exercise["focus_areas"] = [r2d(r) for r in cur.fetchall()]
# Training Styles (M:N)
cur.execute(
"""SELECT ets.id, ets.style_direction_id as training_style_id, sd.name, sd.abbreviation, ets.is_primary
FROM exercise_style_directions ets
JOIN style_directions sd ON ets.style_direction_id = sd.id
WHERE ets.exercise_id = %s
ORDER BY ets.is_primary DESC, sd.name""",
(exercise_id,)
)
exercise["training_styles"] = [r2d(r) for r in cur.fetchall()]
# Trainingsstil (Breitensport / Leistungssport …) — exercise_training_types
cur.execute(
"""SELECT ett.id, ett.training_type_id, tt.name, tt.abbreviation, ett.is_primary
FROM exercise_training_types ett
JOIN training_types tt ON ett.training_type_id = tt.id
WHERE ett.exercise_id = %s
ORDER BY ett.is_primary DESC, tt.sort_order NULLS LAST, tt.name""",
(exercise_id,),
)
exercise["training_types"] = [r2d(r) for r in cur.fetchall()]
# Target Groups (M:N)
cur.execute(
"""SELECT etg.id, etg.target_group_id, tg.name, tg.description, etg.is_primary
FROM exercise_target_groups etg
JOIN target_groups tg ON etg.target_group_id = tg.id
WHERE etg.exercise_id = %s
ORDER BY etg.is_primary DESC, tg.name""",
(exercise_id,)
)
exercise["target_groups"] = [r2d(r) for r in cur.fetchall()]
# Age Groups (M:N) - direkt als VARCHAR gespeichert
cur.execute(
"""SELECT age_group
FROM exercise_age_groups
WHERE exercise_id = %s
ORDER BY age_group""",
(exercise_id,)
)
exercise["age_groups"] = [r["age_group"] for r in cur.fetchall()]
# Skills (M:N) mit Levels und Intensity
cur.execute(
"""SELECT es.id, es.skill_id, s.name as skill_name, s.category as skill_category,
es.is_primary, es.intensity, es.required_level, es.target_level, es.ai_suggested
FROM exercise_skills es
JOIN skills s ON es.skill_id = s.id
WHERE es.exercise_id = %s
ORDER BY es.is_primary DESC, s.name""",
(exercise_id,)
)
exercise["skills"] = [r2d(r) for r in cur.fetchall()]
for sk in exercise["skills"]:
sk["required_level"] = normalize_exercise_skill_level(sk.get("required_level"))
sk["target_level"] = normalize_exercise_skill_level(sk.get("target_level"))
# Variants (1:N) - mit Progression (Reihenfolge: sequence_order, dann progression_level)
cur.execute(
"""SELECT id, variant_name, description, execution_changes,
duration_min, duration_max, equipment_changes, difficulty_adjustment,
progression_level, sequence_order, prerequisite_variant_id, created_at
FROM exercise_variants
WHERE exercise_id = %s
ORDER BY sequence_order NULLS LAST, progression_level, id""",
(exercise_id,)
)
exercise["variants"] = [r2d(r) for r in cur.fetchall()]
# Media (1:N)
cur.execute(
"""SELECT em.id, em.media_type, em.file_path, em.file_size, em.mime_type, em.original_filename,
em.embed_url, em.embed_platform, em.title, em.description, em.sort_order, em.is_primary, em.context,
em.media_asset_id, ma.copyright_notice AS asset_copyright_notice,
ma.lifecycle_state AS asset_lifecycle_state
FROM exercise_media em
LEFT JOIN media_assets ma ON ma.id = em.media_asset_id
WHERE em.exercise_id = %s
ORDER BY em.sort_order, em.id""",
(exercise_id,)
)
exercise["media"] = [r2d(r) for r in cur.fetchall()]
return exercise
def assign_exercise_relations(
cur,
conn,
exercise_id: int,
data: dict,
*,
do_commit: bool = True,
):
"""
Weist M:N Relations für eine Übung zu.
Löscht alte Zuordnungen und legt neue an (REPLACE-Logik).
"""
# Focus Areas
if "focus_areas_multi" in data:
cur.execute("DELETE FROM exercise_focus_areas WHERE exercise_id = %s", (exercise_id,))
for fa in data["focus_areas_multi"]:
cur.execute(
"""INSERT INTO exercise_focus_areas (exercise_id, focus_area_id, is_primary)
VALUES (%s, %s, %s)""",
(exercise_id, fa["focus_area_id"], fa.get("is_primary", False))
)
# Training Styles (Stilrichtungen, z. B. Shotokan)
if "training_styles_multi" in data:
cur.execute("DELETE FROM exercise_style_directions WHERE exercise_id = %s", (exercise_id,))
for ts in data["training_styles_multi"]:
cur.execute(
"""INSERT INTO exercise_style_directions (exercise_id, style_direction_id, is_primary)
VALUES (%s, %s, %s)""",
(exercise_id, ts["training_style_id"], ts.get("is_primary", False))
)
# Trainingsstil (Breitensport, Leistungssport, …)
if "training_types_multi" in data:
cur.execute("DELETE FROM exercise_training_types WHERE exercise_id = %s", (exercise_id,))
for tt in data["training_types_multi"]:
cur.execute(
"""INSERT INTO exercise_training_types (exercise_id, training_type_id, is_primary)
VALUES (%s, %s, %s)""",
(exercise_id, tt["training_type_id"], tt.get("is_primary", False)),
)
# Target Groups
if "target_groups_multi" in data:
cur.execute("DELETE FROM exercise_target_groups WHERE exercise_id = %s", (exercise_id,))
for tg in data["target_groups_multi"]:
cur.execute(
"""INSERT INTO exercise_target_groups (exercise_id, target_group_id, is_primary)
VALUES (%s, %s, %s)""",
(exercise_id, tg["target_group_id"], tg.get("is_primary", False))
)
# Age Groups (direkt als VARCHAR, CHECK constraint validiert)
if "age_groups" in data:
cur.execute("DELETE FROM exercise_age_groups WHERE exercise_id = %s", (exercise_id,))
for age_group_name in data["age_groups"]:
try:
cur.execute(
"INSERT INTO exercise_age_groups (exercise_id, age_group) VALUES (%s, %s)",
(exercise_id, age_group_name)
)
except Exception as e:
logger.warning("Age Group '%s' ungültig: %s", age_group_name, e)
# Skills
if "skills" in data:
cur.execute("DELETE FROM exercise_skills WHERE exercise_id = %s", (exercise_id,))
for skill in data["skills"]:
cur.execute(
"""INSERT INTO exercise_skills
(exercise_id, skill_id, is_primary, intensity, required_level, target_level, ai_suggested)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(
exercise_id,
skill["skill_id"],
skill.get("is_primary", False),
skill.get("intensity"),
normalize_exercise_skill_level(skill.get("required_level")),
normalize_exercise_skill_level(skill.get("target_level")),
skill.get("ai_suggested", False),
)
)
if do_commit:
conn.commit()
# ============================================================================
# Endpoints
# ============================================================================
def _normalize_bulk_id_list(raw: Optional[list]) -> list[int]:
"""Positive IDs, Reihenfolge beibehalten, Duplikate entfernen."""
if not raw:
return []
seen: set[int] = set()
out: list[int] = []
for x in raw:
try:
xi = int(x)
except (TypeError, ValueError):
continue
if xi < 1 or xi in seen:
continue
seen.add(xi)
out.append(xi)
return out
def _assert_catalog_ids_exist(cur, kind: str, ids: list[int]) -> None:
if not ids:
return
table_by_kind = {
"focus_areas": "focus_areas",
"style_directions": "style_directions",
"training_types": "training_types",
"target_groups": "target_groups",
}
table = table_by_kind.get(kind)
if not table:
raise HTTPException(status_code=500, detail="Interner Fehler: unbekannter Katalog")
ph = ",".join(["%s"] * len(ids))
cur.execute(f"SELECT id FROM {table} WHERE id IN ({ph})", tuple(ids))
found = {
int(r["id"]) if isinstance(r, dict) else int(r[0])
for r in cur.fetchall()
}
missing = [i for i in ids if i not in found]
if missing:
raise HTTPException(
status_code=400,
detail=f"Unbekannte {kind}-IDs (Beispiele): {missing[:12]}",
)
def _merge_ids(multi: list[int], single: Optional[int]) -> list[int]:
"""Liste aus wiederholten Query-Parametern plus optional einem Legacy-Einzelfilter (ohne Duplikate)."""
seen: set[int] = set()
out: list[int] = []
for x in list(multi or []):
xi = int(x)
if xi not in seen:
seen.add(xi)
out.append(xi)
if single is not None:
xi = int(single)
if xi not in seen:
out.append(xi)
return out
def _dedupe_positive_ids(ids: list[int]) -> list[int]:
seen: set[int] = set()
out: list[int] = []
for raw in ids or []:
try:
xi = int(raw)
except (TypeError, ValueError):
continue
if xi < 1 or xi in seen:
continue
seen.add(xi)
out.append(xi)
return out
def _merge_str_any(multi: list[str], single: Optional[str]) -> list[str]:
seen = set()
out = []
for x in list(multi or []):
s = str(x).strip()
if not s or s in seen:
continue
seen.add(s)
out.append(s)
if single is not None and str(single).strip():
s = str(single).strip()
if s not in seen:
out.append(s)
return out
def _normalize_choice_list(raw: list[str], allowed: frozenset, label: str) -> list[str]:
out = []
seen = set()
for x in raw or []:
s = str(x).strip().lower()
if not s or s in seen:
continue
if s not in allowed:
raise HTTPException(status_code=400, detail=f"Ungültiger Wert in {label}")
seen.add(s)
out.append(s)
return out
def _exercise_delete_usage_counts(cur, exercise_id: int) -> dict:
cur.execute(
"""
SELECT
(SELECT COUNT(*)::int FROM exercise_block_items WHERE exercise_id = %s) AS block_items,
(SELECT COUNT(*)::int FROM training_unit_section_items WHERE exercise_id = %s) AS section_items,
(SELECT COUNT(*)::int FROM exercise_progression_edges
WHERE from_exercise_id = %s OR to_exercise_id = %s) AS prog_edges
""",
(exercise_id, exercise_id, exercise_id, exercise_id),
)
row = cur.fetchone()
return dict(row) if row else {"block_items": 0, "section_items": 0, "prog_edges": 0}
def _exercise_delete_usage_message(counts: dict) -> str:
bi = int(counts.get("block_items") or 0)
si = int(counts.get("section_items") or 0)
pe = int(counts.get("prog_edges") or 0)
parts = []
if bi:
parts.append(f"{bi}× in Übungsblöcken")
if si:
parts.append(f"{si}× in Trainingsplänen oder Rahmenabläufen")
if pe:
parts.append(f"{pe}× in Progressionsgraphen (Kanten)")
if not parts:
return ""
return (
"Die Übung wird noch verwendet und kann nicht gelöscht werden. Bitte auf „archiviert“ setzen. "
"Verwendung: " + ", ".join(parts) + "."
)
def _assert_can_delete_exercise(cur, tenant: TenantContext, row: dict) -> None:
pid = tenant.profile_id
role = tenant.global_role
if is_platform_admin(role):
return
vis = str(row.get("visibility") or "private").strip().lower()
cid = row.get("club_id")
creator = row.get("created_by")
try:
creator_int = int(creator) if creator is not None else None
except (TypeError, ValueError):
creator_int = None
if vis == "official":
raise HTTPException(
status_code=403,
detail="Globale Übungen dürfen nur von Plattform-Admins gelöscht werden.",
)
if vis == "club":
try:
ex_club = int(cid) if cid is not None else None
except (TypeError, ValueError):
ex_club = None
if ex_club is None:
raise HTTPException(status_code=400, detail="Vereins-Übung ohne gültige Vereinszuordnung")
if not has_club_role(cur, pid, ex_club, "club_admin"):
raise HTTPException(
status_code=403,
detail="Nur Vereins-Admins dürfen Vereins-Übungen löschen.",
)
return
if creator_int is not None and creator_int == pid:
return
if creator_int is not None and club_admin_shares_club_with_creator(cur, pid, creator_int):
return
raise HTTPException(
status_code=403,
detail="Keine Berechtigung zum Löschen dieser Übung.",
)
@router.patch("/exercises/bulk-metadata")
def bulk_patch_exercises_metadata(
body: ExerciseBulkMetadataPatch,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Ändert Sichtbarkeit, Status und/oder Katalog-Zuordnungen für viele Übungen auf einmal (REPLACE je Kategorie).
Zuordnung: Sind z. B. focus_area_ids im Body gesetzt, werden die Fokusbereiche bei den bearbeiteten
Übungen vollständig durch diese Liste ersetzt (leeres Array entfernt alle).
Berechtigt: Ersteller der jeweiligen Übung oder Plattform-Admin (admin/superadmin).
Zusätzlich: Vereinsorga (club_admin) darf **nur** bei reiner Sichtbarkeitsänderung auf ``club``
für den eigenen Verein (`club_id` / aktiver Verein) fremde Übungen freigeben — analog
Trainingseinheit-Speichern.
Governance wie bei Einzel-PUT (official nur Plattform-Admin; club mit Mitgliedschaft bzw. Admin).
"""
profile_id = tenant.profile_id
role = tenant.global_role
unique_ids = sorted({int(x) for x in body.exercise_ids if x is not None and int(x) > 0})
if not unique_ids:
raise HTTPException(status_code=400, detail="Keine gültigen Übungs-IDs")
if len(unique_ids) > _MAX_BULK_METADATA_IDS:
raise HTTPException(
status_code=400,
detail=f"Maximal {_MAX_BULK_METADATA_IDS} Übungen pro Anfrage",
)
status_val: Optional[str] = None
if body.status is not None:
st = str(body.status).strip().lower()
if st not in _VALID_EXERCISE_STATUS_BULK:
raise HTTPException(status_code=400, detail="Ungültiger Status")
status_val = st
patch_visibility = body.visibility is not None
patch_status = status_val is not None
patch_focus_areas = body.focus_area_ids is not None
fa_ids = _normalize_bulk_id_list(body.focus_area_ids or []) if patch_focus_areas else []
patch_style_dirs = body.style_direction_ids is not None
sd_ids = _normalize_bulk_id_list(body.style_direction_ids or []) if patch_style_dirs else []
patch_training_types = body.training_type_ids is not None
tt_ids = _normalize_bulk_id_list(body.training_type_ids or []) if patch_training_types else []
patch_target_groups = body.target_group_ids is not None
tg_ids = _normalize_bulk_id_list(body.target_group_ids or []) if patch_target_groups else []
relation_data: Dict[str, Any] = {}
if patch_focus_areas:
relation_data["focus_areas_multi"] = [
{"focus_area_id": i, "is_primary": idx == 0} for idx, i in enumerate(fa_ids)
]
if patch_style_dirs:
relation_data["training_styles_multi"] = [
{"training_style_id": i, "is_primary": idx == 0} for idx, i in enumerate(sd_ids)
]
if patch_training_types:
relation_data["training_types_multi"] = [
{"training_type_id": i, "is_primary": idx == 0} for idx, i in enumerate(tt_ids)
]
if patch_target_groups:
relation_data["target_groups_multi"] = [
{"target_group_id": i, "is_primary": idx == 0} for idx, i in enumerate(tg_ids)
]
updated: List[int] = []
failed: List[Dict[str, Any]] = []
def _fail_msg(he: HTTPException) -> str:
d = he.detail
return d if isinstance(d, str) else str(d)
with get_db() as conn:
cur = get_cursor(conn)
if patch_focus_areas:
_assert_catalog_ids_exist(cur, "focus_areas", fa_ids)
if patch_style_dirs:
_assert_catalog_ids_exist(cur, "style_directions", sd_ids)
if patch_training_types:
_assert_catalog_ids_exist(cur, "training_types", tt_ids)
if patch_target_groups:
_assert_catalog_ids_exist(cur, "target_groups", tg_ids)
for ex_id in unique_ids:
cur.execute(
"SELECT id, created_by, visibility, club_id FROM exercises WHERE id = %s",
(ex_id,),
)
row = cur.fetchone()
if not row:
failed.append({"id": ex_id, "detail": "Übung nicht gefunden"})
continue
rowd = r2d(row)
owner = rowd.get("created_by")
if owner is not None:
owner = int(owner)
ex_vis = (rowd.get("visibility") or "private").strip().lower()
ex_cid_raw = rowd.get("club_id")
ex_cid = int(ex_cid_raw) if ex_cid_raw is not None else None
next_vis = ex_vis
if patch_visibility:
next_vis = str(body.visibility).strip().lower()
next_club = ex_cid
if patch_visibility and body.club_id is not None:
next_club = int(body.club_id)
if patch_visibility and next_vis == "club" and next_club is None:
eff = tenant.effective_club_id
next_club = int(eff) if eff is not None else None
if patch_visibility and next_vis == "club" and next_club is None:
failed.append(
{
"id": ex_id,
"detail": "Vereins-Übung: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).",
}
)
continue
other_meta_patches = (
patch_status
or patch_focus_areas
or patch_style_dirs
or patch_training_types
or patch_target_groups
)
is_owner_or_platform = owner == profile_id or is_platform_admin(role)
if not is_owner_or_platform:
org_club_promo_only = (
patch_visibility
and not other_meta_patches
and next_vis == "club"
and next_club is not None
and can_manage_club_org(cur, profile_id, int(next_club), role)
)
if not org_club_promo_only:
failed.append(
{
"id": ex_id,
"detail": "Keine Berechtigung (Ersteller, Plattform-Admin oder Vereinsorga bei reiner Vereinsfreigabe).",
}
)
continue
if patch_visibility:
gov_club = next_club if next_vis == "club" else None
try:
assert_valid_governance_visibility(cur, profile_id, role, next_vis, gov_club)
except HTTPException as he:
failed.append({"id": ex_id, "detail": _fail_msg(he)})
continue
if (next_vis or "").strip().lower() == "official":
try:
apply_official_exercise_media_rules(
cur,
ex_id,
next_vis,
promote_attached_media=body.promote_attached_media_for_official is True,
default_official_media_copyright=body.default_official_media_copyright,
)
except HTTPException as he:
d = he.detail
entry: Dict[str, Any] = {"id": ex_id}
if isinstance(d, dict):
entry["detail"] = str(d.get("message") or d.get("code") or "Medien-Validierung fehlgeschlagen")
if "code" in d:
entry["code"] = d["code"]
if "media_assets" in d:
entry["media_assets"] = d["media_assets"]
if "assets_need_visibility_promotion" in d:
entry["assets_need_visibility_promotion"] = d["assets_need_visibility_promotion"]
if "assets_missing_copyright" in d:
entry["assets_missing_copyright"] = d["assets_missing_copyright"]
else:
entry["detail"] = _fail_msg(he)
failed.append(entry)
continue
sets: List[str] = []
vals: List[Any] = []
if patch_visibility:
sets.extend(["visibility = %s", "club_id = %s"])
cid_out = next_club if next_vis == "club" else None
vals.extend([next_vis, cid_out])
if patch_status:
sets.append("status = %s")
vals.append(status_val)
sets.append("updated_at = NOW()")
vals.append(ex_id)
cur.execute(
f"UPDATE exercises SET {', '.join(sets)} WHERE id = %s",
tuple(vals),
)
if relation_data:
assign_exercise_relations(cur, conn, ex_id, relation_data, do_commit=False)
updated.append(ex_id)
conn.commit()
return {
"updated": updated,
"failed": failed,
"updated_count": len(updated),
"failed_count": len(failed),
}
@router.get("/exercises")
def list_exercises(
focus_area_ids: list[int] = Query(default=[], description="ODER: mind. einer dieser Fokusbereiche"),
focus_area: Optional[int] = Query(default=None, description="Einzel-ID (Legacy), wird mit focus_area_ids kombiniert"),
visibility_any: list[str] = Query(default=[], description="ODER: eine dieser Sichtbarkeiten"),
visibility: Optional[str] = Query(default=None, description="Einzel (Legacy)"),
status_any: list[str] = Query(default=[], description="ODER: einer dieser Statuswerte"),
status: Optional[str] = Query(default=None, description="Einzel (Legacy)"),
skill_ids: list[int] = Query(default=[], description="ODER: mind. eine dieser Fähigkeiten"),
skill_id: Optional[int] = Query(default=None, description="Einzel (Legacy)"),
style_direction_ids: list[int] = Query(default=[], description="ODER: mind. eine Stilrichtung"),
style_direction_id: Optional[int] = Query(default=None, description="Einzel (Legacy)"),
training_type_ids: list[int] = Query(default=[], description="ODER: mind. ein Trainingsstil"),
training_type_id: Optional[int] = Query(default=None, description="Einzel (Legacy)"),
target_group_ids: list[int] = Query(default=[], description="ODER: mind. eine Zielgruppe"),
target_group_id: Optional[int] = Query(default=None, description="Einzel (Legacy)"),
skill_min_level: Optional[int] = Query(default=None, ge=1, le=5),
skill_max_level: Optional[int] = Query(default=None, ge=1, le=5),
search: Optional[str] = Query(default=None),
ai_search: Optional[str] = Query(
default=None,
description="Platzhalter KI-Suche: derzeit gleiche Volltextlogik wie search (später Embeddings/Reranking)",
),
limit: int = Query(default=50, ge=1, le=100),
offset: int = Query(default=0, ge=0),
include_variants: bool = Query(
default=False,
description="Wenn true: Feld variants[] pro Übung (id, variant_name, sequence_order) für Planung/UI",
),
visibility_exclude_any: list[str] = Query(
default=[], description="Keine dieser Sichtbarkeiten (Negativliste)"
),
status_exclude_any: list[str] = Query(
default=[], description="Keiner dieser Statuswerte (Negativliste)"
),
exclude_without_focus: bool = Query(
default=False,
description="Wenn true: nur Übungen mit mindestens einem Fokusbereich",
),
focus_only_without_focus_areas: bool = Query(
default=False,
description="Nur Übungen ohne einen einzigen Fokusbereich (M:N exercise_focus_areas leer)",
),
focus_area_must_include_ids: list[int] = Query(
default=[],
description="Alle genannten Fokusbereiche müssen gesetzt sein (UND / „+“)",
),
focus_area_must_exclude_ids: list[int] = Query(
default=[],
description="Keiner dieser Fokusbereiche darf gesetzt sein („−“)",
),
style_direction_must_include_ids: list[int] = Query(
default=[],
description="Alle genannten Stilrichtungen müssen der Übung zugeordnet sein (UND)",
),
style_direction_must_exclude_ids: list[int] = Query(
default=[],
description="Keine dieser Stilrichtungen darf zugeordnet sein",
),
training_type_must_include_ids: list[int] = Query(
default=[],
description="Alle genannten Trainingsstile müssen zugeordnet sein (UND)",
),
training_type_must_exclude_ids: list[int] = Query(
default=[],
description="Keiner dieser Trainingsstile darf zugeordnet sein",
),
target_group_must_include_ids: list[int] = Query(
default=[],
description="Alle genannten Zielgruppen müssen zugeordnet sein (UND)",
),
target_group_must_exclude_ids: list[int] = Query(
default=[],
description="Keine dieser Zielgruppen darf zugeordnet sein",
),
include_archived: bool = Query(
default=False,
description="Archivierte einbeziehen; Standard false (außer Statusfilter enthält archived)",
),
created_by_me: bool = Query(
default=False,
description="Nur Übungen, die vom aktuellen Profil angelegt wurden (created_by = Profil)",
),
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Liste aller Übungen mit Filtern.
Lightweight Response (ohne M:N Details, nur IDs und Namen).
Optional include_variants für Variantenauswahl in der Trainingsplanung.
"""
profile_id = tenant.profile_id
with get_db() as conn:
cur = get_cursor(conn)
# WHERE-Bedingungen
where = ["1=1"]
params = []
role = tenant.global_role
if not is_platform_admin(role):
vis_sql, vis_params = library_content_visibility_sql(
alias="e",
profile_id=profile_id,
role=role,
effective_club_id=tenant.effective_club_id,
)
where.append(vis_sql)
params.extend(vis_params)
if created_by_me:
where.append("e.created_by = %s")
params.append(profile_id)
vis_list = _merge_str_any(visibility_any, visibility)
if vis_list:
ph = ",".join(["%s"] * len(vis_list))
where.append(f"e.visibility IN ({ph})")
params.extend(vis_list)
st_list = _merge_str_any(status_any, status)
if st_list:
ph = ",".join(["%s"] * len(st_list))
where.append(f"e.status IN ({ph})")
params.extend(st_list)
includes_archived = any(str(x).strip().lower() == "archived" for x in st_list)
if not include_archived and not includes_archived:
where.append("COALESCE(e.status, '') <> %s")
params.append("archived")
vis_excl = _normalize_choice_list(
list(visibility_exclude_any),
_LIST_FILTER_VISIBILITY,
"visibility_exclude_any",
)
if vis_excl:
ph = ",".join(["%s"] * len(vis_excl))
where.append(f"(e.visibility IS NULL OR LOWER(TRIM(e.visibility::text)) NOT IN ({ph}))")
params.extend(vis_excl)
st_excl = _normalize_choice_list(
list(status_exclude_any),
_LIST_FILTER_STATUS,
"status_exclude_any",
)
if st_excl:
ph = ",".join(["%s"] * len(st_excl))
where.append(f"(e.status IS NULL OR LOWER(TRIM(e.status::text)) NOT IN ({ph}))")
params.extend(st_excl)
focus_only = focus_only_without_focus_areas
must_inc = _dedupe_positive_ids(list(focus_area_must_include_ids))
must_exc = _dedupe_positive_ids(list(focus_area_must_exclude_ids))
fa_or = _merge_ids(focus_area_ids, focus_area)
if focus_only:
if exclude_without_focus:
raise HTTPException(
status_code=400,
detail="focus_only_without_focus_areas schließt exclude_without_focus aus.",
)
if fa_or:
raise HTTPException(
status_code=400,
detail="focus_only_without_focus_areas darf nicht zusammen mit focus_area_ids (ODER-Liste) verwendet werden.",
)
if must_inc:
raise HTTPException(
status_code=400,
detail="focus_only_without_focus_areas darf nicht zusammen mit focus_area_must_include_ids verwendet werden.",
)
if must_exc:
raise HTTPException(
status_code=400,
detail="focus_only_without_focus_areas darf nicht zusammen mit focus_area_must_exclude_ids verwendet werden.",
)
where.append(
"NOT EXISTS (SELECT 1 FROM exercise_focus_areas efa WHERE efa.exercise_id = e.id)"
)
else:
if exclude_without_focus:
where.append(
"EXISTS (SELECT 1 FROM exercise_focus_areas efa WHERE efa.exercise_id = e.id)"
)
if fa_or:
ph = ",".join(["%s"] * len(fa_or))
where.append(
f"EXISTS (SELECT 1 FROM exercise_focus_areas efa WHERE efa.exercise_id = e.id AND efa.focus_area_id IN ({ph}))"
)
params.extend(fa_or)
for fid in must_inc:
where.append(
"EXISTS (SELECT 1 FROM exercise_focus_areas efa WHERE efa.exercise_id = e.id AND efa.focus_area_id = %s)"
)
params.append(fid)
if must_exc:
ph = ",".join(["%s"] * len(must_exc))
where.append(
f"NOT EXISTS (SELECT 1 FROM exercise_focus_areas efa "
f"WHERE efa.exercise_id = e.id AND efa.focus_area_id IN ({ph}))"
)
params.extend(must_exc)
sk_ids = _merge_ids(skill_ids, skill_id)
if sk_ids:
ph = ",".join(["%s"] * len(sk_ids))
where.append(
f"EXISTS (SELECT 1 FROM exercise_skills es WHERE es.exercise_id = e.id AND es.skill_id IN ({ph}))"
)
params.extend(sk_ids)
sd_or = _merge_ids(style_direction_ids, style_direction_id)
sd_inc = _dedupe_positive_ids(list(style_direction_must_include_ids))
sd_exc = _dedupe_positive_ids(list(style_direction_must_exclude_ids))
if sd_or:
ph = ",".join(["%s"] * len(sd_or))
where.append(
"EXISTS (SELECT 1 FROM exercise_style_directions esd "
f"WHERE esd.exercise_id = e.id AND esd.style_direction_id IN ({ph}))"
)
params.extend(sd_or)
for sid in sd_inc:
where.append(
"EXISTS (SELECT 1 FROM exercise_style_directions esd "
"WHERE esd.exercise_id = e.id AND esd.style_direction_id = %s)"
)
params.append(sid)
if sd_exc:
ph = ",".join(["%s"] * len(sd_exc))
where.append(
"NOT EXISTS (SELECT 1 FROM exercise_style_directions esd "
f"WHERE esd.exercise_id = e.id AND esd.style_direction_id IN ({ph}))"
)
params.extend(sd_exc)
tt_or = _merge_ids(training_type_ids, training_type_id)
tt_inc = _dedupe_positive_ids(list(training_type_must_include_ids))
tt_exc = _dedupe_positive_ids(list(training_type_must_exclude_ids))
if tt_or:
ph = ",".join(["%s"] * len(tt_or))
where.append(
"EXISTS (SELECT 1 FROM exercise_training_types ett "
f"WHERE ett.exercise_id = e.id AND ett.training_type_id IN ({ph}))"
)
params.extend(tt_or)
for tid in tt_inc:
where.append(
"EXISTS (SELECT 1 FROM exercise_training_types ett "
"WHERE ett.exercise_id = e.id AND ett.training_type_id = %s)"
)
params.append(tid)
if tt_exc:
ph = ",".join(["%s"] * len(tt_exc))
where.append(
"NOT EXISTS (SELECT 1 FROM exercise_training_types ett "
f"WHERE ett.exercise_id = e.id AND ett.training_type_id IN ({ph}))"
)
params.extend(tt_exc)
tg_or = _merge_ids(target_group_ids, target_group_id)
tg_inc = _dedupe_positive_ids(list(target_group_must_include_ids))
tg_exc = _dedupe_positive_ids(list(target_group_must_exclude_ids))
if tg_or:
ph = ",".join(["%s"] * len(tg_or))
where.append(
"EXISTS (SELECT 1 FROM exercise_target_groups etg "
f"WHERE etg.exercise_id = e.id AND etg.target_group_id IN ({ph}))"
)
params.extend(tg_or)
for gid in tg_inc:
where.append(
"EXISTS (SELECT 1 FROM exercise_target_groups etg "
"WHERE etg.exercise_id = e.id AND etg.target_group_id = %s)"
)
params.append(gid)
if tg_exc:
ph = ",".join(["%s"] * len(tg_exc))
where.append(
"NOT EXISTS (SELECT 1 FROM exercise_target_groups etg "
f"WHERE etg.exercise_id = e.id AND etg.target_group_id IN ({ph}))"
)
params.extend(tg_exc)
if skill_min_level is not None or skill_max_level is not None:
lo = skill_min_level if skill_min_level is not None else 1
hi = skill_max_level if skill_max_level is not None else 5
if lo > hi:
lo, hi = hi, lo
where.append(
"EXISTS (SELECT 1 FROM exercise_skills es WHERE es.exercise_id = e.id AND ("
+ _EXERCISE_SKILL_LEVEL_RANK_SQL
+ ") BETWEEN %s AND %s)"
)
params.extend([lo, hi])
# Volltext (tsvector); ai_search gleiche Engine, bei zwei Begriffen ODER-Verknüpfung
s1 = (search or "").strip()
s2 = (ai_search or "").strip()
if s1 and s2 and s1 != s2:
where.append(
"(e.search_vector @@ plainto_tsquery('german', %s) "
"OR e.search_vector @@ plainto_tsquery('german', %s))"
)
params.extend([s1, s2])
elif s1 or s2:
qtext = s1 or s2
where.append("e.search_vector @@ plainto_tsquery('german', %s)")
params.append(qtext)
variants_sql = ""
if include_variants:
variants_sql = """,
(
SELECT COALESCE(
json_agg(
json_build_object(
'id', ev.id,
'variant_name', ev.variant_name,
'sequence_order', ev.sequence_order
)
ORDER BY ev.sequence_order NULLS LAST, ev.progression_level, ev.id
),
'[]'::json
)
FROM exercise_variants ev
WHERE ev.exercise_id = e.id
) AS variants"""
# Query (primary_focus_name für Listen-Ansicht gemäß Spec-Beispiel „focus_area“-Label)
query = f"""
SELECT e.id, e.title, e.summary, e.visibility, e.status,
e.created_by, p.name as creator_name,
e.club_id, c.name as club_name,
e.created_at, e.updated_at,
(
SELECT fa.name FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC
LIMIT 1
) AS primary_focus_name,
(
SELECT COALESCE(
json_agg(fa.name ORDER BY efa.is_primary DESC NULLS LAST, fa.name ASC),
'[]'::json
)
FROM exercise_focus_areas efa
JOIN focus_areas fa ON fa.id = efa.focus_area_id
WHERE efa.exercise_id = e.id
) AS focus_area_names,
(
SELECT COALESCE(
json_agg(sd.name ORDER BY esd.is_primary DESC NULLS LAST, sd.name ASC),
'[]'::json
)
FROM exercise_style_directions esd
JOIN style_directions sd ON sd.id = esd.style_direction_id
WHERE esd.exercise_id = e.id
) AS style_direction_names,
(
SELECT COALESCE(
json_agg(tt.name ORDER BY ett.is_primary DESC NULLS LAST, tt.sort_order NULLS LAST, tt.name ASC),
'[]'::json
)
FROM exercise_training_types ett
JOIN training_types tt ON tt.id = ett.training_type_id
WHERE ett.exercise_id = e.id
) AS training_type_names
{variants_sql}
FROM exercises e
LEFT JOIN profiles p ON e.created_by = p.id
LEFT JOIN clubs c ON e.club_id = c.id
WHERE {' AND '.join(where)}
ORDER BY e.updated_at DESC
LIMIT %s OFFSET %s
"""
params.extend([limit, offset])
cur.execute(query, params)
rows = cur.fetchall()
out = []
for r in rows:
d = r2d(r)
pfn = d.get("primary_focus_name")
d["focus_area"] = pfn
d["focus_area_names"] = _coerce_json_str_list(d.get("focus_area_names"))
d["style_direction_names"] = _coerce_json_str_list(d.get("style_direction_names"))
d["training_type_names"] = _coerce_json_str_list(d.get("training_type_names"))
if include_variants:
v = d.get("variants")
if isinstance(v, str):
try:
d["variants"] = json.loads(v)
except Exception:
d["variants"] = []
elif v is None:
d["variants"] = []
out.append(d)
return out
@router.get("/exercises/{exercise_id}")
def get_exercise(
exercise_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Exercise Detail mit allen M:N Relations (vollständig enriched).
"""
profile_id = tenant.profile_id
with get_db() as conn:
cur = get_cursor(conn)
exercise = enrich_exercise_detail(exercise_id, cur)
if not exercise:
raise HTTPException(status_code=404, detail="Übung nicht gefunden")
if not library_content_visible_to_profile(
cur,
profile_id,
exercise["visibility"],
exercise.get("club_id"),
exercise.get("created_by"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für diese Übung")
return exercise
@router.post("/exercises", status_code=201)
def create_exercise(
body: ExerciseCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Erstellt eine neue Übung mit allen M:N Relations.
"""
profile_id = tenant.profile_id
# Validierung
if body.status not in ("draft", "in_review", "approved", "archived"):
raise HTTPException(status_code=400, detail="Ungültiger Status")
if body.visibility not in ("private", "club", "official"):
raise HTTPException(status_code=400, detail="Ungültige Visibility")
club_id = body.club_id
if body.visibility == "club" and club_id is None:
club_id = tenant.effective_club_id
with get_db() as conn:
cur = get_cursor(conn)
assert_valid_governance_visibility(
cur, profile_id, tenant.global_role, body.visibility, club_id
)
# Equipment als JSONB
equipment_json = json.dumps(body.equipment) if body.equipment else None
# INSERT
cur.execute(
"""INSERT INTO exercises
(title, summary, goal, execution, preparation, trainer_notes,
duration_min, duration_max, group_size_min, group_size_max,
equipment, visibility, status, created_by, club_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
RETURNING id""",
(
body.title, body.summary, body.goal, body.execution,
body.preparation, body.trainer_notes,
body.duration_min, body.duration_max,
body.group_size_min, body.group_size_max,
equipment_json,
body.visibility, body.status, profile_id, club_id,
)
)
row = cur.fetchone()
exercise_id = row['id'] if isinstance(row, dict) else row[0]
conn.commit()
# M:N Relations zuweisen
data = body.dict()
assign_exercise_relations(cur, conn, exercise_id, data)
# Vollständiges Objekt zurückgeben
exercise = enrich_exercise_detail(exercise_id, cur)
return exercise
@router.put("/exercises/{exercise_id}")
def update_exercise(
exercise_id: int,
body: ExerciseUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Aktualisiert eine Übung (Partial Update).
Berechtigt: Ersteller, Plattform-Admin oder Nutzer mit Planungsrecht im Verein (Vereins-Übungen).
"""
profile_id = tenant.profile_id
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT created_by, visibility, club_id FROM exercises WHERE id = %s",
(exercise_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Übung nicht gefunden")
_assert_can_edit_exercise(cur, exercise_id, tenant)
rd = r2d(row)
ex_vis = (rd.get("visibility") or "private").strip().lower()
ex_cid = rd.get("club_id")
if ex_cid is not None:
ex_cid = int(ex_cid)
data = body.dict(exclude_unset=True)
raw_promo = data.pop("promote_attached_media_for_official", None)
promote_media_flag = raw_promo is True
default_official_copy = data.pop("default_official_media_copyright", None)
next_vis = ex_vis
if "visibility" in data and data["visibility"] is not None:
v_raw = str(data["visibility"]).strip().lower()
if v_raw:
next_vis = v_raw
next_club = ex_cid
if "club_id" in data:
raw_c = data["club_id"]
if raw_c in (None, "", []):
next_club = None
else:
next_club = int(raw_c)
if next_vis == "club":
if next_club is None:
next_club = tenant.effective_club_id
if next_club is None:
raise HTTPException(
status_code=400,
detail="Vereins-Übung: club_id angeben oder aktiven Verein wählen (X-Active-Club-Id).",
)
data["club_id"] = next_club
if next_vis != ex_vis:
data["visibility"] = next_vis
gov_club = next_club if next_vis == "club" else None
assert_valid_governance_visibility(
cur, profile_id, tenant.global_role, next_vis, gov_club
)
apply_official_exercise_media_rules(
cur,
exercise_id,
next_vis,
promote_attached_media=promote_media_flag,
default_official_media_copyright=default_official_copy,
)
fields = []
params = []
for field in ["title", "summary", "goal", "execution", "preparation", "trainer_notes",
"duration_min", "duration_max", "group_size_min", "group_size_max",
"visibility", "status", "club_id"]:
if field in data and data[field] is not None:
fields.append(f"{field} = %s")
params.append(data[field])
if "equipment" in data:
fields.append("equipment = %s")
params.append(json.dumps(data["equipment"]) if data["equipment"] else None)
if fields:
fields.append("updated_at = NOW()")
params.append(exercise_id)
query = f"UPDATE exercises SET {', '.join(fields)} WHERE id = %s"
cur.execute(query, params)
assign_exercise_relations(cur, conn, exercise_id, data, do_commit=False)
conn.commit()
exercise = enrich_exercise_detail(exercise_id, cur)
return exercise
@router.delete("/exercises/{exercise_id}")
def delete_exercise(
exercise_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Löscht eine Übung.
Berechtigung: Plattform-Admin (alle); Vereins-Admin Vereins-Übungen seines Vereins;
Ersteller nur eigene private Übungen; Vereins-Admin zusätzlich private Übungen von Mitgliedern,
mit denen er einen Verein teilt.
Bei Verwendung in Blöcken, Trainingsplänen oder Progressionsgraphen: 409 — bitte archivieren.
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id, created_by, visibility, club_id FROM exercises WHERE id = %s",
(exercise_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Übung nicht gefunden")
ex = r2d(row)
_assert_can_delete_exercise(cur, tenant, ex)
counts = _exercise_delete_usage_counts(cur, exercise_id)
usage_msg = _exercise_delete_usage_message(counts)
if usage_msg:
raise HTTPException(status_code=409, detail=usage_msg)
cur.execute("DELETE FROM exercises WHERE id = %s", (exercise_id,))
conn.commit()
return {"ok": True}
# --- Übungsvarianten (EXERCISES_API_SPEC.md) ---
@router.put("/exercises/{exercise_id}/variants/reorder")
def reorder_exercise_variants(
exercise_id: int,
body: ExerciseVariantsReorder,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
if len(body.variant_ids) != len(set(body.variant_ids)):
raise HTTPException(status_code=400, detail="variant_ids dürfen keine Duplikate enthalten")
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
cur.execute(
"SELECT id FROM exercise_variants WHERE exercise_id = %s",
(exercise_id,),
)
existing = [r["id"] for r in cur.fetchall()]
if set(existing) != set(body.variant_ids):
raise HTTPException(
status_code=400,
detail="variant_ids müssen alle Varianten dieser Übung genau einmal enthalten",
)
for pos, vid in enumerate(body.variant_ids):
cur.execute(
"""UPDATE exercise_variants SET sequence_order = %s
WHERE id = %s AND exercise_id = %s""",
(pos + 1, vid, exercise_id),
)
conn.commit()
return {"ok": True, "reordered": len(body.variant_ids)}
@router.post("/exercises/{exercise_id}/variants", status_code=201)
def create_exercise_variant(
exercise_id: int,
body: ExerciseVariantCreate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
_validate_variant_prerequisite(cur, exercise_id, body.prerequisite_variant_id)
eq_json = _variant_equipment_json(body.equipment_changes)
seq = body.sequence_order
if seq is None:
cur.execute(
"""SELECT COALESCE(MAX(sequence_order), 0) + 1 AS n
FROM exercise_variants WHERE exercise_id = %s""",
(exercise_id,),
)
seq = cur.fetchone()["n"]
desc = (body.description or "").strip() or None
exec_ch = (body.execution_changes or "").strip() or None
diff = (body.difficulty_adjustment or "").strip() or None
cur.execute(
"""INSERT INTO exercise_variants (
exercise_id, variant_name, description, execution_changes,
duration_min, duration_max, equipment_changes, difficulty_adjustment,
progression_level, sequence_order, prerequisite_variant_id
) VALUES (%s,%s,%s,%s,%s,%s,%s::jsonb,%s,%s,%s,%s)
RETURNING id""",
(
exercise_id,
body.variant_name.strip(),
desc,
exec_ch,
body.duration_min,
body.duration_max,
eq_json,
diff,
body.progression_level,
seq,
body.prerequisite_variant_id,
),
)
new_id = cur.fetchone()["id"]
row = _fetch_variant_row(cur, exercise_id, new_id)
conn.commit()
return row
@router.put("/exercises/{exercise_id}/variants/{variant_id}")
def update_exercise_variant(
exercise_id: int,
variant_id: int,
body: ExerciseVariantUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
data = body.dict(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
old = _fetch_variant_row(cur, exercise_id, variant_id)
if "variant_name" in data and data["variant_name"] is not None:
old["variant_name"] = data["variant_name"].strip()
if "description" in data:
old["description"] = (data["description"] or "").strip() or None
if "execution_changes" in data:
old["execution_changes"] = (data["execution_changes"] or "").strip() or None
if "duration_min" in data:
old["duration_min"] = data["duration_min"]
if "duration_max" in data:
old["duration_max"] = data["duration_max"]
if "equipment_changes" in data:
old["equipment_changes"] = _normalize_variant_equipment_list(data["equipment_changes"])
if "difficulty_adjustment" in data:
old["difficulty_adjustment"] = (data["difficulty_adjustment"] or "").strip() or None
if "progression_level" in data and data["progression_level"] is not None:
old["progression_level"] = data["progression_level"]
if "sequence_order" in data:
old["sequence_order"] = data["sequence_order"]
if "prerequisite_variant_id" in data:
old["prerequisite_variant_id"] = data["prerequisite_variant_id"]
prereq = old.get("prerequisite_variant_id")
if prereq == variant_id:
raise HTTPException(status_code=400, detail="Variante kann nicht ihre eigene Voraussetzung sein")
_validate_variant_prerequisite(cur, exercise_id, prereq)
eq_db = _variant_equipment_json(_normalize_variant_equipment_list(old.get("equipment_changes")))
cur.execute(
"""UPDATE exercise_variants SET
variant_name = %s,
description = %s,
execution_changes = %s,
duration_min = %s,
duration_max = %s,
equipment_changes = %s::jsonb,
difficulty_adjustment = %s,
progression_level = %s,
sequence_order = %s,
prerequisite_variant_id = %s
WHERE id = %s AND exercise_id = %s""",
(
old["variant_name"],
old.get("description"),
old.get("execution_changes"),
old.get("duration_min"),
old.get("duration_max"),
eq_db,
old.get("difficulty_adjustment"),
old.get("progression_level"),
old.get("sequence_order"),
old.get("prerequisite_variant_id"),
variant_id,
exercise_id,
),
)
row = _fetch_variant_row(cur, exercise_id, variant_id)
conn.commit()
return row
@router.delete("/exercises/{exercise_id}/variants/{variant_id}")
def delete_exercise_variant(
exercise_id: int,
variant_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
_fetch_variant_row(cur, exercise_id, variant_id)
cur.execute(
"SELECT COUNT(*) AS c FROM exercise_variants WHERE prerequisite_variant_id = %s",
(variant_id,),
)
cnt = int(cur.fetchone()["c"])
if cnt > 0:
raise HTTPException(
status_code=409,
detail="Variante ist Voraussetzung anderer Varianten — zuerst dort ändern oder entfernen",
)
cur.execute(
"DELETE FROM exercise_variants WHERE id = %s AND exercise_id = %s",
(variant_id, exercise_id),
)
conn.commit()
return {"ok": True}
def _content_disposition_inline(filename: Optional[str]) -> str:
"""Inline-Darstellung im Browser (<video>/<img>), keine attachment-Download-Leiste."""
if not filename:
return "inline"
fn = str(filename).strip()
quoted = quote(fn, safe="")
escaped = fn.replace('"', "\\")
if quoted != fn:
return f"inline; filename*=utf-8''{quoted}"
return f'inline; filename="{escaped}"'
def _parse_bytes_range_single(range_header: str, file_size: int) -> Optional[Tuple[int, int]]:
"""
Aus Range-Header einen inklusiven (start, end)-Bereich; None = gesamte Datei.
Nur ein bytes=-Segment (erstes Segment bei mehreren, durch Komma getrennt).
"""
if not range_header:
return None
hdr = range_header.strip()
if "," in hdr:
hdr = hdr.split(",", 1)[0].strip()
m = re.match(r"^bytes=(\d*)-(\d*)$", hdr)
if not m:
return None
start_s, end_s = m.group(1), m.group(2)
if not start_s and not end_s:
return None
if file_size <= 0:
return None
last_idx = file_size - 1
if start_s == "" and end_s != "":
suffix = int(end_s)
if suffix <= 0:
return None
start = max(0, file_size - suffix)
end = last_idx
elif start_s != "" and end_s == "":
start = int(start_s)
end = last_idx
else:
start = int(start_s)
end = int(end_s)
end = min(end, last_idx)
if start > last_idx or start > end:
return None
return start, end
def _iter_file_chunks(path_str: str, start: int, length: int) -> Iterator[bytes]:
chunk_size = 64 * 1024
remaining = length
with open(path_str, "rb") as f:
f.seek(start)
while remaining > 0:
blob = f.read(min(chunk_size, remaining))
if not blob:
break
remaining -= len(blob)
yield blob
def _binary_media_response(
path_arg: Path, mime_type: str, download_name: Optional[str], request: Request
):
"""Vollständige oder Range-Download-Antwort; MP4/streaming brauchen oft 206 + Content-Range."""
path_str = str(path_arg.resolve())
stat_r = os.stat(path_str)
file_size = int(stat_r.st_size)
cd_val = _content_disposition_inline(download_name)
base_h = {"accept-ranges": "bytes", "content-disposition": cd_val}
if request.method.upper() == "HEAD":
return Response(
status_code=200,
headers={**base_h, "content-type": mime_type or "application/octet-stream", "content-length": str(file_size)},
)
rng = request.headers.get("range")
if rng and request.method.upper() != "HEAD":
rstrip = rng.strip()
if re.match(r"^bytes=(\d*)-(\d*)$", rstrip):
parsed = _parse_bytes_range_single(rng, file_size)
if parsed is None:
return Response(
status_code=416,
headers={
"accept-ranges": "bytes",
"content-range": f"bytes */{file_size}",
},
)
start, end_incl = parsed
content_length = end_incl - start + 1
hdrs = {
**base_h,
"content-type": mime_type or "application/octet-stream",
"content-range": f"bytes {start}-{end_incl}/{file_size}",
"content-length": str(content_length),
}
return StreamingResponse(
_iter_file_chunks(path_str, start, content_length),
status_code=206,
headers=hdrs,
media_type=mime_type or "application/octet-stream",
)
return FileResponse(
path_str,
media_type=mime_type or "application/octet-stream",
stat_result=stat_r,
headers=base_h,
)
# --- Medien (MEDIA_UPLOAD_SPEC.md / EXERCISES_API_SPEC.md) ---
def _fetch_media_row(cur, exercise_id: int, media_id: int) -> Optional[dict]:
cur.execute(
"""SELECT em.id, em.exercise_id, em.media_type, em.file_path, em.file_size, em.mime_type, em.original_filename,
em.embed_url, em.embed_platform, em.title, em.description, em.sort_order, em.is_primary, em.context, em.created_at,
em.media_asset_id, ma.storage_key AS asset_storage_key,
ma.lifecycle_state AS asset_lifecycle_state
FROM exercise_media em
LEFT JOIN media_assets ma ON ma.id = em.media_asset_id
WHERE em.id = %s AND em.exercise_id = %s""",
(media_id, exercise_id),
)
row = cur.fetchone()
return r2d(row) if row else None
@router.api_route("/exercises/{exercise_id}/media/{media_id}/file", methods=["GET", "HEAD"])
def download_exercise_media_file(
request: Request,
exercise_id: int,
media_id: int,
tenant: TenantContext = Depends(get_tenant_context_flexible),
):
"""
Dateiauslieferung mit Governance wie GET Übung — Auth via X-Auth-Token oder ?ssetoken (für <img>/<video>).
Unterstützt HTTP Range (206) für MP4-Streaming und Content-Disposition: inline (kein attachment).
"""
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_view_exercise_media(cur, exercise_id, tenant)
media = _fetch_media_row(cur, exercise_id, media_id)
if not media:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
if (media.get("embed_url") or "").strip():
raise HTTPException(status_code=400, detail="Embed-Medien haben keine Datei-URL")
lc = (media.get("asset_lifecycle_state") or "active").strip().lower()
if lc == "trash_hidden":
_assert_can_edit_exercise(cur, exercise_id, tenant)
elif lc not in ("active", "trash_soft", ""):
raise HTTPException(status_code=404, detail="Medium nicht verfügbar")
fp = media.get("file_path")
media_root = get_effective_media_root(cur)
abs_p = _resolve_local_media_file(
media_root,
fp,
media.get("asset_storage_key"),
)
if not abs_p or not abs_p.is_file():
raise HTTPException(status_code=404, detail="Datei nicht gefunden")
mime = media.get("mime_type") or "application/octet-stream"
fname = media.get("original_filename") or abs_p.name
return _binary_media_response(abs_p, mime, str(fname) if fname else None, request)
@router.post("/exercises/{exercise_id}/media", status_code=201)
async def upload_exercise_media(
exercise_id: int,
tenant: TenantContext = Depends(get_tenant_context),
file: Optional[UploadFile] = File(None),
embed_url: Optional[str] = Form(None),
media_type: str = Form(...),
title: str = Form(""),
description: str = Form(""),
context: str = Form("ablauf"),
is_primary: bool = Form(False),
):
profile_id = tenant.profile_id
if media_type not in ("image", "video", "document", "sketch"):
raise HTTPException(status_code=400, detail="Ungültiger media_type")
if context not in ("ablauf", "detail", "trainer_hint"):
raise HTTPException(status_code=400, detail="Ungültiger context")
emb = (embed_url or "").strip() or None
has_file = file is not None and bool(file.filename)
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
if _count_exercise_media(cur, exercise_id) >= MAX_EXERCISE_MEDIA:
raise HTTPException(
status_code=400,
detail=f"Maximal {MAX_EXERCISE_MEDIA} Medien pro Übung",
)
if has_file and emb:
raise HTTPException(status_code=400, detail="Entweder Datei oder embed_url, nicht beides")
if not has_file and not emb:
raise HTTPException(status_code=400, detail="Datei oder embed_url erforderlich")
sort_sql = (
"COALESCE((SELECT MAX(sort_order) + 1 FROM exercise_media WHERE exercise_id = %s), 1)"
)
if emb:
platform = _detect_embed_platform(emb)
if not platform:
raise HTTPException(
status_code=400,
detail="Ungültige Embed-URL (erlaubt: YouTube, Vimeo, Instagram, TikTok)",
)
cur.execute(
f"""INSERT INTO exercise_media (
exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, context, is_primary, sort_order
) VALUES (
%s, %s, NULL, NULL, NULL, NULL, %s, %s, %s, %s, %s, %s, {sort_sql}
)
RETURNING id, exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, sort_order, is_primary, context, created_at""",
(
exercise_id,
media_type,
emb,
platform,
title or None,
description or None,
context,
is_primary,
exercise_id,
),
)
else:
raw = await file.read()
max_upload = _upload_limit_bytes(tenant)
if len(raw) > max_upload:
raise HTTPException(
status_code=413,
detail=f"Datei zu groß (max. {max_upload // (1024 * 1024)} MB)",
)
try:
mime = resolve_upload_mime_type(raw, file.content_type, file.filename)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
ext = Path(file.filename or "").suffix[:12] if file.filename else ""
if not ext and mime == "image/jpeg":
ext = ".jpg"
elif not ext and mime == "image/png":
ext = ".png"
elif not ext and mime in ("image/heic", "image/heif"):
ext = ".heic"
elif not ext and mime == "video/mp4":
ext = ".mp4"
elif not ext and mime == "video/quicktime":
ext = ".mov"
cur.execute(
"SELECT visibility, club_id, created_by FROM exercises WHERE id = %s",
(exercise_id,),
)
ex_gov = cur.fetchone()
if not ex_gov:
raise HTTPException(status_code=404, detail="Übung nicht gefunden")
ex_vis = (r2d(ex_gov).get("visibility") or "private").strip().lower()
ex_club = r2d(ex_gov).get("club_id")
media_root = get_effective_media_root(cur)
full_sha = hashlib.sha256(raw).hexdigest()
cur.execute(
"""SELECT id, storage_key, byte_size, lifecycle_state, original_filename FROM media_assets
WHERE sha256 = %s AND lower(trim(visibility)) = %s
AND (club_id IS NOT DISTINCT FROM %s)
LIMIT 1""",
(full_sha, ex_vis, ex_club),
)
existing_asset = cur.fetchone()
if existing_asset:
ea = r2d(existing_asset)
lc = (ea.get("lifecycle_state") or "").strip().lower()
if lc == "active":
aid = ea["id"]
sk = ea["storage_key"]
sz = ea.get("byte_size") or len(raw)
db_path = f"/media/{sk}"
cur.execute(
f"""INSERT INTO exercise_media (
exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, context, is_primary, sort_order,
media_asset_id
) VALUES (
%s, %s, %s, %s, %s, %s, NULL, NULL, %s, %s, %s, %s, {sort_sql}, %s
)
RETURNING id, exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, sort_order, is_primary, context, created_at,
media_asset_id""",
(
exercise_id,
media_type,
db_path,
sz,
mime,
file.filename,
title or None,
description or None,
context,
is_primary,
exercise_id,
aid,
),
)
elif lc in ("trash_soft", "trash_hidden"):
logger.warning(
"exercise_media_upload_conflict MEDIA_ASSET_IN_TRASH exercise_id=%s profile_id=%s "
"media_asset_id=%s lifecycle_state=%s visibility=%s club_id=%s sha256_prefix=%s "
"upload_filename=%s stored_original_filename=%s",
exercise_id,
profile_id,
ea["id"],
lc,
ex_vis,
ex_club,
full_sha[:16],
(file.filename or "")[:200],
(ea.get("original_filename") or "")[:200],
)
raise HTTPException(
status_code=409,
detail={
"code": "MEDIA_ASSET_IN_TRASH",
"message": (
"Diese Datei ist inhaltsgleich (SHA-256) mit einem Archiv-Medium im Papierkorb. "
"Reaktivieren und verknüpfen ist möglich; ein zweites physisches Exemplar wird nicht angelegt."
),
"media_asset_id": ea["id"],
"lifecycle_state": lc,
"original_filename": ea.get("original_filename"),
},
)
else:
logger.warning(
"exercise_media_upload_conflict MEDIA_ASSET_UNAVAILABLE exercise_id=%s profile_id=%s "
"media_asset_id=%s lifecycle_state=%s visibility=%s club_id=%s sha256_prefix=%s",
exercise_id,
profile_id,
ea["id"],
lc,
ex_vis,
ex_club,
full_sha[:16],
)
raise HTTPException(
status_code=409,
detail={
"code": "MEDIA_ASSET_UNAVAILABLE",
"message": "Es existiert bereits ein Archiv-Eintrag zu dieser Datei in einem nicht nutzbaren Zustand.",
"media_asset_id": ea["id"],
"lifecycle_state": lc,
},
)
else:
storage_key = f"exercises/{full_sha}{ext}"
dest_path = path_under_media_root(media_root, storage_key)
if dest_path is None:
raise HTTPException(status_code=500, detail="Ungültiger Speicherpfad")
dest_path.parent.mkdir(parents=True, exist_ok=True)
if not dest_path.is_file():
dest_path.write_bytes(raw)
cur.execute(
"""INSERT INTO media_assets (
mime_type, byte_size, sha256, original_filename, visibility, club_id,
uploaded_by_profile_id, copyright_notice, storage_backend, storage_key, lifecycle_state
) VALUES (%s, %s, %s, %s, %s, %s, %s, NULL, 'local', %s, 'active')
RETURNING id""",
(
mime,
len(raw),
full_sha,
file.filename,
ex_vis,
ex_club,
profile_id,
storage_key,
),
)
ar = cur.fetchone()
aid = r2d(ar)["id"]
db_path = f"/media/{storage_key}"
cur.execute(
f"""INSERT INTO exercise_media (
exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, context, is_primary, sort_order,
media_asset_id
) VALUES (
%s, %s, %s, %s, %s, %s, NULL, NULL, %s, %s, %s, %s, {sort_sql}, %s
)
RETURNING id, exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, sort_order, is_primary, context, created_at,
media_asset_id""",
(
exercise_id,
media_type,
db_path,
len(raw),
mime,
file.filename,
title or None,
description or None,
context,
is_primary,
exercise_id,
aid,
),
)
row = cur.fetchone()
conn.commit()
return r2d(row)
@router.post("/exercises/{exercise_id}/media/from-asset", status_code=201)
def attach_exercise_media_from_asset(
exercise_id: int,
body: ExerciseMediaFromAsset,
tenant: TenantContext = Depends(get_tenant_context),
):
"""Archiv-Medium an Übung anhängen (Wiederverwendung, deduplizierte Datei)."""
profile_id = tenant.profile_id
ctx = (body.context or "ablauf").strip()
if ctx not in ("ablauf", "detail", "trainer_hint"):
raise HTTPException(status_code=400, detail="Ungültiger context")
mt_in = body.media_type
if mt_in is not None and mt_in not in ("image", "video", "document", "sketch"):
raise HTTPException(status_code=400, detail="Ungültiger media_type")
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
if _count_exercise_media(cur, exercise_id) >= MAX_EXERCISE_MEDIA:
raise HTTPException(
status_code=400,
detail=f"Maximal {MAX_EXERCISE_MEDIA} Medien pro Übung",
)
cur.execute(
"""SELECT id, mime_type, byte_size, original_filename, visibility, club_id,
uploaded_by_profile_id, storage_key, lifecycle_state
FROM media_assets WHERE id = %s""",
(body.media_asset_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Archiv-Medium nicht gefunden")
asset = r2d(row)
if (asset.get("lifecycle_state") or "").strip().lower() != "active":
raise HTTPException(
status_code=400,
detail="Nur aktive Archiv-Medien können verknüpft werden",
)
if not library_content_visible_to_profile(
cur,
profile_id,
(asset.get("visibility") or "").strip().lower(),
asset.get("club_id"),
asset.get("uploaded_by_profile_id"),
tenant.global_role,
):
raise HTTPException(status_code=403, detail="Keine Berechtigung für dieses Archiv-Medium")
cur.execute(
"SELECT 1 FROM exercise_media WHERE exercise_id = %s AND media_asset_id = %s",
(exercise_id, body.media_asset_id),
)
if cur.fetchone():
raise HTTPException(
status_code=400,
detail="Dieses Archiv-Medium ist bereits mit der Übung verknüpft",
)
sk = asset.get("storage_key")
if not sk:
raise HTTPException(status_code=400, detail="Archiv-Eintrag hat keine Datei")
media_type = mt_in or _media_type_from_mime(asset.get("mime_type"))
title = (body.title or "").strip() or None
if title is None and asset.get("original_filename"):
title = str(asset.get("original_filename") or "")[:300] or None
description = (body.description or "").strip() or None
db_path = f"/media/{sk}"
sort_sql = (
"COALESCE((SELECT MAX(sort_order) + 1 FROM exercise_media WHERE exercise_id = %s), 1)"
)
cur.execute(
f"""INSERT INTO exercise_media (
exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, context, is_primary, sort_order,
media_asset_id
) VALUES (
%s, %s, %s, %s, %s, %s, NULL, NULL, %s, %s, %s, %s, {sort_sql}, %s
)
RETURNING id, exercise_id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, sort_order, is_primary, context, created_at,
media_asset_id""",
(
exercise_id,
media_type,
db_path,
asset.get("byte_size"),
asset.get("mime_type"),
asset.get("original_filename"),
title,
description,
ctx,
body.is_primary,
exercise_id,
body.media_asset_id,
),
)
out = r2d(cur.fetchone())
conn.commit()
out["asset_lifecycle_state"] = "active"
return out
@router.put("/exercises/{exercise_id}/media/reorder")
def reorder_exercise_media(
exercise_id: int,
body: ExerciseMediaReorder,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
ids = body.media_ids
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
cur.execute(
"SELECT id FROM exercise_media WHERE exercise_id = %s ORDER BY sort_order, id",
(exercise_id,),
)
existing = [r["id"] if isinstance(r, dict) else r[0] for r in cur.fetchall()]
if set(ids) != set(existing) or len(ids) != len(existing):
raise HTTPException(
status_code=400,
detail="media_ids unvollständig oder gehören nicht zu dieser Übung",
)
for i, mid in enumerate(ids, start=1):
cur.execute(
"UPDATE exercise_media SET sort_order = %s, updated_at = NOW() WHERE id = %s AND exercise_id = %s",
(i, mid, exercise_id),
)
conn.commit()
return {"ok": True, "reordered": len(ids)}
@router.put("/exercises/{exercise_id}/media/{media_id}")
def update_exercise_media(
exercise_id: int,
media_id: int,
body: ExerciseMediaUpdate,
tenant: TenantContext = Depends(get_tenant_context),
):
profile_id = tenant.profile_id
data = body.dict(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren")
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
if not _fetch_media_row(cur, exercise_id, media_id):
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
if "context" in data and data["context"] not in ("ablauf", "detail", "trainer_hint", None):
raise HTTPException(status_code=400, detail="Ungültiger context")
fields = []
params = []
for k in ("title", "description", "is_primary", "context"):
if k in data:
fields.append(f"{k} = %s")
params.append(data[k])
if fields:
fields.append("updated_at = NOW()")
params.extend([media_id, exercise_id])
cur.execute(
f"UPDATE exercise_media SET {', '.join(fields)} WHERE id = %s AND exercise_id = %s",
params,
)
conn.commit()
cur.execute(
"""SELECT id, media_type, file_path, file_size, mime_type, original_filename,
embed_url, embed_platform, title, description, sort_order, is_primary, context, created_at,
media_asset_id
FROM exercise_media WHERE id = %s""",
(media_id,),
)
return r2d(cur.fetchone())
@router.delete("/exercises/{exercise_id}/media/{media_id}")
def delete_exercise_media(
exercise_id: int,
media_id: int,
tenant: TenantContext = Depends(get_tenant_context),
):
"""
Entfernt nur die Verknüpfung Übung ↔ exercise_media (bzw. Embed-Zeile).
Löscht keine Datei und keinen media_assets-Datensatz — optionales Papierkorb-Angebot nur clientseitig
über orphan_media_asset_id, wenn dies die letzte Referenz auf das Asset war.
"""
profile_id = tenant.profile_id
orphan_media_asset_id: Optional[int] = None
with get_db() as conn:
cur = get_cursor(conn)
_assert_can_edit_exercise(cur, exercise_id, tenant)
cur.execute(
"""SELECT em.media_asset_id
FROM exercise_media em
WHERE em.id = %s AND em.exercise_id = %s""",
(media_id, exercise_id),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Medium nicht gefunden")
rec = r2d(row)
media_asset_id = rec.get("media_asset_id")
if media_asset_id:
cur.execute(
"SELECT COUNT(*) AS c FROM exercise_media WHERE media_asset_id = %s",
(media_asset_id,),
)
if int(r2d(cur.fetchone())["c"]) <= 1:
orphan_media_asset_id = int(media_asset_id)
cur.execute(
"DELETE FROM exercise_media WHERE id = %s AND exercise_id = %s",
(media_id, exercise_id),
)
conn.commit()
return {"ok": True, "orphan_media_asset_id": orphan_media_asset_id}