Trainer_LLM/llm-api/exercise_router.py
Lars 6a4e97f4e4
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 2s
llm-api/exercise_router.py aktualisiert
2025-08-13 12:39:44 +02:00

583 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
exercise_router.py v1.7.1 (Swagger angereichert)
Ergänzt:
- Aussagekräftige summary/description/response_description je Endpoint
- Beispiele (x-codeSamples) für curl-Aufrufe
- Pydantic-Felder mit description + json_schema_extra (Beispiele)
- Keine API-Signatur-/Pfadänderungen, keine Prefix-Änderungen
Hinweis:
- Endpunkte bleiben weiterhin unter /exercise/* (weil die Routenstrings bereits /exercise/... enthalten).
- Falls du später einen APIRouter-Prefix setzen willst, dann bitte die Pfade unten von '/exercise/...' auf relative Pfade ändern,
sonst entstehen Doppelpfade.
"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from uuid import uuid4
from datetime import datetime
from clients import model, qdrant
from qdrant_client.models import (
PointStruct,
VectorParams,
Distance,
PointIdsList,
Filter,
FieldCondition,
MatchValue,
)
import logging
import os
logger = logging.getLogger("exercise_router")
logger.setLevel(logging.INFO)
# Router ohne prefix (Pfadstrings enthalten bereits '/exercise/...')
router = APIRouter(tags=["exercise"])
# =========================
# Models
# =========================
class Exercise(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()), description="Interne UUID (Qdrant-Punkt-ID)")
# Upsert-Metadaten
external_id: Optional[str] = Field(default=None, description="Upsert-Schlüssel (z. B. 'mw:{pageid}')")
fingerprint: Optional[str] = Field(default=None, description="sha256 der Kernfelder für Idempotenz/Diff")
source: Optional[str] = Field(default=None, description="Quelle (z. B. 'mediawiki', 'pdf-import', …)")
imported_at: Optional[datetime] = Field(default=None, description="Zeitpunkt des Imports (ISO-8601)")
# Domain-Felder
title: str = Field(..., description="Übungstitel")
summary: str = Field(..., description="Kurzbeschreibung/Ziel der Übung")
short_description: str = Field(..., description="Alternative Kurzform / Teaser")
keywords: List[str] = Field(default_factory=list, description="Freie Schlagworte (normalisiert)")
link: Optional[str] = Field(default=None, description="Kanonsiche URL/Permalink zur Quelle")
discipline: str = Field(..., description="Disziplin (z. B. Karate)")
group: Optional[str] = Field(default=None, description="Optionale Gruppierung/Kategorie")
age_group: str = Field(..., description="Altersgruppe (z. B. Kinder/Schüler/Teenager/Erwachsene)")
target_group: str = Field(..., description="Zielgruppe (z. B. Breitensportler)")
min_participants: int = Field(..., ge=0, description="Minimale Gruppenstärke")
duration_minutes: int = Field(..., ge=0, description="Dauer in Minuten")
capabilities: Dict[str, int] = Field(default_factory=dict, description="Fähigkeiten-Map: {Name: Level 1..5}")
category: str = Field(..., description="Abschnitt / Kategorie (z. B. Aufwärmen, Grundschule, …)")
purpose: str = Field(..., description="Zweck/Zielabsicht")
execution: str = Field(..., description="Durchführungsschritte (Markdown/Wiki-ähnlich)")
notes: str = Field(..., description="Hinweise/Coaching-Cues")
preparation: str = Field(..., description="Vorbereitung/Material")
method: str = Field(..., description="Methodik/Didaktik")
equipment: List[str] = Field(default_factory=list, description="Benötigte Hilfsmittel")
model_config = {
"json_schema_extra": {
"example": {
"external_id": "mw:218",
"title": "Affenklatschen",
"summary": "Koordination & Aufmerksamkeit mit Ballwechseln",
"short_description": "Ballgewöhnung im Stand/Gehen/Laufen",
"keywords": ["Hand-Auge-Koordination", "Reaktion"],
"link": "https://www.karatetrainer.de/index.php?title=Affenklatschen",
"discipline": "Karate",
"age_group": "Teenager",
"target_group": "Breitensportler",
"min_participants": 4,
"duration_minutes": 8,
"capabilities": {"Reaktionsfähigkeit": 2, "Kopplungsfähigkeit": 2},
"category": "Aufwärmen",
"purpose": "Aufmerksamkeit & Reaktionskette aktivieren",
"execution": "* Paarweise aufstellen …",
"notes": "* nicht zu lange werden lassen",
"preparation": "* Bälle bereit halten",
"method": "* klare Regeln/Strafrunde",
"equipment": ["Bälle"]
}
}
}
class DeleteResponse(BaseModel):
status: str = Field(..., description="Statusmeldung")
count: int = Field(..., ge=0, description="Anzahl betroffener Punkte")
collection: str = Field(..., description="Qdrant-Collection-Name")
class ExerciseSearchRequest(BaseModel):
# Optionaler Semantik-Query (Vektor)
query: Optional[str] = Field(default=None, description="Freitext für Vektor-Suche (optional)")
limit: int = Field(default=20, ge=1, le=200, description="Max. Treffer")
offset: int = Field(default=0, ge=0, description="Offset/Pagination")
# Einfache Filter
discipline: Optional[str] = Field(default=None, description="z. B. Karate")
target_group: Optional[str] = Field(default=None, description="z. B. Breitensportler")
age_group: Optional[str] = Field(default=None, description="z. B. Teenager")
max_duration: Optional[int] = Field(default=None, ge=0, description="Obergrenze Minuten")
# Listen-Filter
equipment_any: Optional[List[str]] = Field(default=None, description="Mind. eines muss passen")
equipment_all: Optional[List[str]] = Field(default=None, description="Alle müssen passen")
keywords_any: Optional[List[str]] = Field(default=None, description="Mind. eines muss passen")
keywords_all: Optional[List[str]] = Field(default=None, description="Alle müssen passen")
# Capabilities (Namen + Level-Operator)
capability_names: Optional[List[str]] = Field(default=None, description="Capability-Bezeichnungen")
capability_ge_level: Optional[int] = Field(default=None, ge=1, le=5, description="Level ≥ N")
capability_eq_level: Optional[int] = Field(default=None, ge=1, le=5, description="Level == N")
model_config = {
"json_schema_extra": {
"examples": [{
"discipline": "Karate",
"max_duration": 12,
"equipment_any": ["Bälle"],
"capability_names": ["Reaktionsfähigkeit"],
"capability_ge_level": 2,
"limit": 5
}, {
"query": "Aufwärmen Reaktionsfähigkeit 10min Teenager Bälle",
"discipline": "Karate",
"limit": 3
}]
}
}
class ExerciseSearchHit(BaseModel):
id: str = Field(..., description="Qdrant-Punkt-ID")
score: Optional[float] = Field(default=None, description="Ähnlichkeitsscore (nur bei Vektor-Suche)")
payload: Exercise = Field(..., description="Übungsdaten (Payload)")
class ExerciseSearchResponse(BaseModel):
hits: List[ExerciseSearchHit] = Field(..., description="Trefferliste")
model_config = {
"json_schema_extra": {
"example": {
"hits": [{
"id": "c1f1-…",
"score": 0.78,
"payload": Exercise.model_config["json_schema_extra"]["example"]
}]
}
}
}
# =========================
# Helpers
# =========================
COLLECTION = os.getenv("EXERCISE_COLLECTION", "exercises")
def _ensure_collection():
if not qdrant.collection_exists(COLLECTION):
qdrant.recreate_collection(
collection_name=COLLECTION,
vectors_config=VectorParams(
size=model.get_sentence_embedding_dimension(),
distance=Distance.COSINE,
),
)
def _lookup_by_external_id(external_id: str) -> Optional[Dict[str, Any]]:
_ensure_collection()
flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))])
pts, _ = qdrant.scroll(
collection_name=COLLECTION,
scroll_filter=flt,
limit=1,
with_payload=True,
)
if not pts:
return None
doc = dict(pts[0].payload or {})
doc.setdefault("id", str(pts[0].id))
return doc
_DEF_EMBED_FIELDS = ("title", "summary", "short_description", "purpose", "execution", "notes")
def _make_vector_from_exercise(ex: Exercise) -> List[float]:
text = ". ".join([getattr(ex, f, "") for f in _DEF_EMBED_FIELDS if getattr(ex, f, None)])
return model.encode(text).tolist()
def _make_vector_from_query(query: str) -> List[float]:
return model.encode(query).tolist()
def _norm_list(xs: List[Any]) -> List[str]:
out = []
seen = set()
for x in xs or []:
s = str(x).strip()
if not s:
continue
key = s.casefold()
if key in seen:
continue
seen.add(key)
out.append(s)
return sorted(out, key=str.casefold)
def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]:
"""
Leitet Facettenfelder aus der capabilities-Map ab:
- capability_keys: alle Namen
- capability_geN: Namen mit Level >= N (1..5)
- capability_eqN: Namen mit Level == N (1..5)
"""
caps = caps or {}
def names_where(pred) -> List[str]:
out = []
for k, v in caps.items():
try:
iv = int(v)
except Exception:
iv = 0
if pred(iv):
t = str(k).strip()
if t:
out.append(t)
return sorted({t for t in out}, key=str.casefold)
all_keys = sorted({str(k).strip() for k in caps.keys() if str(k).strip()}, key=str.casefold)
return {
"capability_keys": all_keys,
# >= N
"capability_ge1": names_where(lambda lv: lv >= 1),
"capability_ge2": names_where(lambda lv: lv >= 2),
"capability_ge3": names_where(lambda lv: lv >= 3),
"capability_ge4": names_where(lambda lv: lv >= 4),
"capability_ge5": names_where(lambda lv: lv >= 5),
# == N
"capability_eq1": names_where(lambda lv: lv == 1),
"capability_eq2": names_where(lambda lv: lv == 2),
"capability_eq3": names_where(lambda lv: lv == 3),
"capability_eq4": names_where(lambda lv: lv == 4),
"capability_eq5": names_where(lambda lv: lv == 5),
}
def _response_strip_extras(payload: Dict[str, Any]) -> Dict[str, Any]:
# Nur definierte Exercise-Felder zurückgeben (saubere API)
allowed = set(Exercise.model_fields.keys())
return {k: v for k, v in payload.items() if k in allowed}
def _build_filter(req: ExerciseSearchRequest) -> Filter:
must: List[Any] = []
should: List[Any] = []
if req.discipline:
must.append(FieldCondition(key="discipline", match=MatchValue(value=req.discipline)))
if req.target_group:
must.append(FieldCondition(key="target_group", match=MatchValue(value=req.target_group)))
if req.age_group:
must.append(FieldCondition(key="age_group", match=MatchValue(value=req.age_group)))
if req.max_duration is not None:
# Range in Qdrant: über rohen JSON-Range-Ausdruck (Client-Modell hat keinen Komfort-Wrapper)
must.append({"key": "duration_minutes", "range": {"lte": int(req.max_duration)}})
# equipment
if req.equipment_all:
for it in req.equipment_all:
must.append(FieldCondition(key="equipment", match=MatchValue(value=it)))
if req.equipment_any:
for it in req.equipment_any:
should.append(FieldCondition(key="equipment", match=MatchValue(value=it)))
# keywords
if req.keywords_all:
for it in req.keywords_all:
must.append(FieldCondition(key="keywords", match=MatchValue(value=it)))
if req.keywords_any:
for it in req.keywords_any:
should.append(FieldCondition(key="keywords", match=MatchValue(value=it)))
# capabilities (ge/eq)
if req.capability_names:
names = [s for s in req.capability_names if s and s.strip()]
if req.capability_eq_level:
key = f"capability_eq{int(req.capability_eq_level)}"
for n in names:
must.append(FieldCondition(key=key, match=MatchValue(value=n)))
elif req.capability_ge_level:
key = f"capability_ge{int(req.capability_ge_level)}"
for n in names:
must.append(FieldCondition(key=key, match=MatchValue(value=n)))
else:
# Default: Level >=1 (alle vorhanden)
for n in names:
must.append(FieldCondition(key="capability_ge1", match=MatchValue(value=n)))
flt = Filter(must=must)
if should:
# Qdrant: 'should' entspricht OR mit minimum_should_match=1
flt.should = should
return flt
# =========================
# Endpoints
# =========================
@router.get(
"/exercise/by-external-id",
summary="Übung per external_id abrufen",
description=(
"Liefert die Übung mit der gegebenen `external_id` (z. B. `mw:{pageid}`). "
"Verwendet einen Qdrant-Filter auf dem Payload-Feld `external_id`."
),
response_description="Vollständiger Exercise-Payload oder 404 bei Nichtfund.",
openapi_extra={
"x-codeSamples": [{
"lang": "bash",
"label": "curl",
"source": "curl -s 'http://localhost:8000/exercise/by-external-id?external_id=mw:218' | jq ."
}]
}
)
def get_exercise_by_external_id(external_id: str = Query(..., min_length=3, description="Upsert-Schlüssel, z. B. 'mw:218'")):
found = _lookup_by_external_id(external_id)
if not found:
raise HTTPException(status_code=404, detail="not found")
return found
@router.post(
"/exercise",
response_model=Exercise,
summary="Create/Update (idempotent per external_id)",
description=(
"Legt eine Übung an oder aktualisiert sie. Wenn `external_id` vorhanden und bereits in der Collection existiert, "
"wird **Update** auf dem bestehenden Punkt ausgeführt (Upsert). `keywords`/`equipment` werden normalisiert, "
"Capability-Facetten (`capability_ge1..5`, `capability_eq1..5`, `capability_keys`) automatisch abgeleitet. "
"Der Vektor wird aus Kernfeldern (title/summary/short_description/purpose/execution/notes) berechnet."
),
response_description="Gespeicherter Exercise-Datensatz (Payload-View).",
openapi_extra={
"x-codeSamples": [{
"lang": "bash",
"label": "curl",
"source": "curl -s -X POST http://localhost:8000/exercise -H 'Content-Type: application/json' -d @exercise.json | jq ."
}]
}
)
def create_or_update_exercise(ex: Exercise):
_ensure_collection()
point_id = ex.id
if ex.external_id:
prior = _lookup_by_external_id(ex.external_id)
if prior:
point_id = prior.get("id", point_id)
vector = _make_vector_from_exercise(ex)
payload: Dict[str, Any] = ex.model_dump()
payload["id"] = str(point_id)
payload["keywords"] = _norm_list(payload.get("keywords") or [])
payload["equipment"] = _norm_list(payload.get("equipment") or [])
payload.update(_facet_capabilities(payload.get("capabilities") or {}))
qdrant.upsert(
collection_name=COLLECTION,
points=[PointStruct(id=str(point_id), vector=vector, payload=payload)],
)
return Exercise(**_response_strip_extras(payload))
@router.get(
"/exercise/{exercise_id}",
response_model=Exercise,
summary="Übung per interner ID (Qdrant-Punkt-ID) lesen",
description="Scrollt nach `id` und gibt den Payload als Exercise zurück.",
response_description="Exercise-Payload oder 404 bei Nichtfund.",
openapi_extra={
"x-codeSamples": [{
"lang": "bash",
"label": "curl",
"source": "curl -s 'http://localhost:8000/exercise/1234-uuid' | jq ."
}]
}
)
def get_exercise(exercise_id: str):
_ensure_collection()
pts, _ = qdrant.scroll(
collection_name=COLLECTION,
scroll_filter=Filter(must=[FieldCondition(key="id", match=MatchValue(value=exercise_id))]),
limit=1,
with_payload=True,
)
if not pts:
raise HTTPException(status_code=404, detail="not found")
payload = dict(pts[0].payload or {})
payload.setdefault("id", str(pts[0].id))
return Exercise(**_response_strip_extras(payload))
@router.post(
"/exercise/search",
response_model=ExerciseSearchResponse,
summary="Suche Übungen (Filter + optional Vektor)",
description=(
"Kombinierbare Filter auf Payload-Feldern (`discipline`, `age_group`, `target_group`, `equipment`, `keywords`, "
"`capability_geN/eqN`) und **optional** Vektor-Suche via `query`. "
"`should`-Filter (equipment_any/keywords_any) wirken als OR (minimum_should_match=1). "
"`max_duration` wird als Range (lte) angewandt. Ergebnis enthält bei Vektor-Suche `score`, sonst `null`."
),
response_description="Trefferliste (payload + Score bei Vektor-Suche).",
openapi_extra={
"x-codeSamples": [
{
"lang": "bash",
"label": "Filter",
"source": "curl -s -X POST http://localhost:8000/exercise/search -H 'Content-Type: application/json' -d '{\"discipline\":\"Karate\",\"max_duration\":12,\"equipment_any\":[\"Bälle\"],\"capability_names\":[\"Reaktionsfähigkeit\"],\"capability_ge_level\":2,\"limit\":5}' | jq ."
},
{
"lang": "bash",
"label": "Vektor + Filter",
"source": "curl -s -X POST http://localhost:8000/exercise/search -H 'Content-Type: application/json' -d '{\"query\":\"Aufwärmen 10min Teenager Bälle\",\"discipline\":\"Karate\",\"limit\":3}' | jq ."
}
]
}
)
def search_exercises(req: ExerciseSearchRequest) -> ExerciseSearchResponse:
_ensure_collection()
flt = _build_filter(req)
hits: List[ExerciseSearchHit] = []
if req.query:
vec = _make_vector_from_query(req.query)
res = qdrant.search(
collection_name=COLLECTION,
query_vector=vec,
limit=req.limit,
offset=req.offset,
query_filter=flt,
)
for h in res:
payload = dict(h.payload or {})
payload.setdefault("id", str(h.id))
hits.append(ExerciseSearchHit(id=str(h.id), score=float(h.score or 0.0), payload=Exercise(**_response_strip_extras(payload))))
else:
# Filter-only: Scroll-Paginierung, Score=None
collected = 0
skipped = 0
next_offset = None
while collected < req.limit:
page, next_offset = qdrant.scroll(
collection_name=COLLECTION,
scroll_filter=flt,
offset=next_offset,
limit=max(1, min(256, req.limit - collected + req.offset - skipped)),
with_payload=True,
)
if not page:
break
for pt in page:
if skipped < req.offset:
skipped += 1
continue
payload = dict(pt.payload or {})
payload.setdefault("id", str(pt.id))
hits.append(ExerciseSearchHit(id=str(pt.id), score=None, payload=Exercise(**_response_strip_extras(payload))))
collected += 1
if collected >= req.limit:
break
if next_offset is None:
break
return ExerciseSearchResponse(hits=hits)
@router.delete(
"/exercise/delete-by-external-id",
response_model=DeleteResponse,
summary="Löscht Punkte mit gegebener external_id",
description=(
"Scrollt nach `external_id` und löscht alle passenden Punkte. "
"Idempotent: wenn nichts gefunden → count=0. Vorsicht: **löscht dauerhaft**."
),
response_description="Status + Anzahl gelöschter Punkte.",
openapi_extra={
"x-codeSamples": [{
"lang": "bash",
"label": "curl",
"source": "curl -s 'http://localhost:8000/exercise/delete-by-external-id?external_id=mw:9999' | jq ."
}]
}
)
def delete_by_external_id(external_id: str = Query(..., description="Upsert-Schlüssel, z. B. 'mw:218'")):
_ensure_collection()
flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))])
pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=10000, with_payload=False)
ids = [str(p.id) for p in pts]
if not ids:
return DeleteResponse(status="🔍 Keine Einträge gefunden.", count=0, collection=COLLECTION)
qdrant.delete(collection_name=COLLECTION, points_selector=PointIdsList(points=ids))
return DeleteResponse(status="🗑️ gelöscht", count=len(ids), collection=COLLECTION)
@router.delete(
"/exercise/delete-collection",
response_model=DeleteResponse,
summary="Collection komplett löschen",
description=(
"Entfernt die gesamte Collection aus Qdrant. **Gefährlich** alle Übungen sind danach weg. "
"Nutze nur in Testumgebungen oder für einen kompletten Neuaufbau."
),
response_description="Status. count=0 (nicht relevant beim Drop).",
openapi_extra={
"x-codeSamples": [{
"lang": "bash",
"label": "curl",
"source": "curl -s 'http://localhost:8000/exercise/delete-collection?collection=exercises' | jq ."
}]
}
)
def delete_collection(collection: str = Query(default=COLLECTION, description="Collection-Name (Default: 'exercises')")):
if not qdrant.collection_exists(collection):
raise HTTPException(status_code=404, detail=f"Collection '{collection}' nicht gefunden.")
qdrant.delete_collection(collection_name=collection)
return DeleteResponse(status="🗑️ gelöscht", count=0, collection=collection)
# ---------------------------
# OPTIONAL: einfacher Selbsttest (kannst du auch separat als Script verwenden)
# ---------------------------
TEST_DOC = """
Speicher als tests/test_exercise_search.py und mit pytest laufen lassen.
import os, requests
BASE = os.getenv("API_BASE", "http://localhost:8000")
# 1) Filter-only
r = requests.post(f"{BASE}/exercise/search", json={
"discipline": "Karate",
"max_duration": 12,
"equipment_any": ["Bälle"],
"capability_names": ["Reaktionsfähigkeit"],
"capability_ge_level": 2,
"limit": 5
})
r.raise_for_status()
js = r.json()
assert "hits" in js
for h in js["hits"]:
p = h["payload"]
assert p["discipline"] == "Karate"
assert p["duration_minutes"] <= 12
# 2) Vector + Filter
r = requests.post(f"{BASE}/exercise/search", json={
"query": "Aufwärmen 10min, Reaktionsfähigkeit, Teenager, Bälle",
"discipline": "Karate",
"limit": 3
})
r.raise_for_status()
js = r.json(); assert len(js["hits"]) <= 3
"""