# -*- coding: utf-8 -*- """ exercise_router.py – v1.7.1 (Swagger angereichert) Ergänzt: - Aussagekräftige summary/description/response_description je Endpoint - Beispiele (x-codeSamples) für curl-Aufrufe - Pydantic-Felder mit description + json_schema_extra (Beispiele) - Keine API-Signatur-/Pfadänderungen, keine Prefix-Änderungen Hinweis: - Endpunkte bleiben weiterhin unter /exercise/* (weil die Routenstrings bereits /exercise/... enthalten). - Falls du später einen APIRouter-Prefix setzen willst, dann bitte die Pfade unten von '/exercise/...' auf relative Pfade ändern, sonst entstehen Doppelpfade. """ from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from uuid import uuid4 from datetime import datetime from clients import model, qdrant from qdrant_client.models import ( PointStruct, VectorParams, Distance, PointIdsList, Filter, FieldCondition, MatchValue, ) import logging import os logger = logging.getLogger("exercise_router") logger.setLevel(logging.INFO) # Router ohne prefix (Pfadstrings enthalten bereits '/exercise/...') router = APIRouter(tags=["exercise"]) # ========================= # Models # ========================= class Exercise(BaseModel): id: str = Field(default_factory=lambda: str(uuid4()), description="Interne UUID (Qdrant-Punkt-ID)") # Upsert-Metadaten external_id: Optional[str] = Field(default=None, description="Upsert-Schlüssel (z. B. 'mw:{pageid}')") fingerprint: Optional[str] = Field(default=None, description="sha256 der Kernfelder für Idempotenz/Diff") source: Optional[str] = Field(default=None, description="Quelle (z. B. 'mediawiki', 'pdf-import', …)") imported_at: Optional[datetime] = Field(default=None, description="Zeitpunkt des Imports (ISO-8601)") # Domain-Felder title: str = Field(..., description="Übungstitel") summary: str = Field(..., description="Kurzbeschreibung/Ziel der Übung") short_description: str = Field(..., description="Alternative Kurzform / Teaser") keywords: List[str] = Field(default_factory=list, description="Freie Schlagworte (normalisiert)") link: Optional[str] = Field(default=None, description="Kanonsiche URL/Permalink zur Quelle") discipline: str = Field(..., description="Disziplin (z. B. Karate)") group: Optional[str] = Field(default=None, description="Optionale Gruppierung/Kategorie") age_group: str = Field(..., description="Altersgruppe (z. B. Kinder/Schüler/Teenager/Erwachsene)") target_group: str = Field(..., description="Zielgruppe (z. B. Breitensportler)") min_participants: int = Field(..., ge=0, description="Minimale Gruppenstärke") duration_minutes: int = Field(..., ge=0, description="Dauer in Minuten") capabilities: Dict[str, int] = Field(default_factory=dict, description="Fähigkeiten-Map: {Name: Level 1..5}") category: str = Field(..., description="Abschnitt / Kategorie (z. B. Aufwärmen, Grundschule, …)") purpose: str = Field(..., description="Zweck/Zielabsicht") execution: str = Field(..., description="Durchführungsschritte (Markdown/Wiki-ähnlich)") notes: str = Field(..., description="Hinweise/Coaching-Cues") preparation: str = Field(..., description="Vorbereitung/Material") method: str = Field(..., description="Methodik/Didaktik") equipment: List[str] = Field(default_factory=list, description="Benötigte Hilfsmittel") model_config = { "json_schema_extra": { "example": { "external_id": "mw:218", "title": "Affenklatschen", "summary": "Koordination & Aufmerksamkeit mit Ballwechseln", "short_description": "Ballgewöhnung im Stand/Gehen/Laufen", "keywords": ["Hand-Auge-Koordination", "Reaktion"], "link": "https://www.karatetrainer.de/index.php?title=Affenklatschen", "discipline": "Karate", "age_group": "Teenager", "target_group": "Breitensportler", "min_participants": 4, "duration_minutes": 8, "capabilities": {"Reaktionsfähigkeit": 2, "Kopplungsfähigkeit": 2}, "category": "Aufwärmen", "purpose": "Aufmerksamkeit & Reaktionskette aktivieren", "execution": "* Paarweise aufstellen …", "notes": "* nicht zu lange werden lassen", "preparation": "* Bälle bereit halten", "method": "* klare Regeln/Strafrunde", "equipment": ["Bälle"] } } } class DeleteResponse(BaseModel): status: str = Field(..., description="Statusmeldung") count: int = Field(..., ge=0, description="Anzahl betroffener Punkte") collection: str = Field(..., description="Qdrant-Collection-Name") class ExerciseSearchRequest(BaseModel): # Optionaler Semantik-Query (Vektor) query: Optional[str] = Field(default=None, description="Freitext für Vektor-Suche (optional)") limit: int = Field(default=20, ge=1, le=200, description="Max. Treffer") offset: int = Field(default=0, ge=0, description="Offset/Pagination") # Einfache Filter discipline: Optional[str] = Field(default=None, description="z. B. Karate") target_group: Optional[str] = Field(default=None, description="z. B. Breitensportler") age_group: Optional[str] = Field(default=None, description="z. B. Teenager") max_duration: Optional[int] = Field(default=None, ge=0, description="Obergrenze Minuten") # Listen-Filter equipment_any: Optional[List[str]] = Field(default=None, description="Mind. eines muss passen") equipment_all: Optional[List[str]] = Field(default=None, description="Alle müssen passen") keywords_any: Optional[List[str]] = Field(default=None, description="Mind. eines muss passen") keywords_all: Optional[List[str]] = Field(default=None, description="Alle müssen passen") # Capabilities (Namen + Level-Operator) capability_names: Optional[List[str]] = Field(default=None, description="Capability-Bezeichnungen") capability_ge_level: Optional[int] = Field(default=None, ge=1, le=5, description="Level ≥ N") capability_eq_level: Optional[int] = Field(default=None, ge=1, le=5, description="Level == N") model_config = { "json_schema_extra": { "examples": [{ "discipline": "Karate", "max_duration": 12, "equipment_any": ["Bälle"], "capability_names": ["Reaktionsfähigkeit"], "capability_ge_level": 2, "limit": 5 }, { "query": "Aufwärmen Reaktionsfähigkeit 10min Teenager Bälle", "discipline": "Karate", "limit": 3 }] } } class ExerciseSearchHit(BaseModel): id: str = Field(..., description="Qdrant-Punkt-ID") score: Optional[float] = Field(default=None, description="Ähnlichkeitsscore (nur bei Vektor-Suche)") payload: Exercise = Field(..., description="Übungsdaten (Payload)") class ExerciseSearchResponse(BaseModel): hits: List[ExerciseSearchHit] = Field(..., description="Trefferliste") model_config = { "json_schema_extra": { "example": { "hits": [{ "id": "c1f1-…", "score": 0.78, "payload": Exercise.model_config["json_schema_extra"]["example"] }] } } } # ========================= # Helpers # ========================= COLLECTION = os.getenv("EXERCISE_COLLECTION", "exercises") def _ensure_collection(): if not qdrant.collection_exists(COLLECTION): qdrant.recreate_collection( collection_name=COLLECTION, vectors_config=VectorParams( size=model.get_sentence_embedding_dimension(), distance=Distance.COSINE, ), ) def _lookup_by_external_id(external_id: str) -> Optional[Dict[str, Any]]: _ensure_collection() flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))]) pts, _ = qdrant.scroll( collection_name=COLLECTION, scroll_filter=flt, limit=1, with_payload=True, ) if not pts: return None doc = dict(pts[0].payload or {}) doc.setdefault("id", str(pts[0].id)) return doc _DEF_EMBED_FIELDS = ("title", "summary", "short_description", "purpose", "execution", "notes") def _make_vector_from_exercise(ex: Exercise) -> List[float]: text = ". ".join([getattr(ex, f, "") for f in _DEF_EMBED_FIELDS if getattr(ex, f, None)]) return model.encode(text).tolist() def _make_vector_from_query(query: str) -> List[float]: return model.encode(query).tolist() def _norm_list(xs: List[Any]) -> List[str]: out = [] seen = set() for x in xs or []: s = str(x).strip() if not s: continue key = s.casefold() if key in seen: continue seen.add(key) out.append(s) return sorted(out, key=str.casefold) def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]: """ Leitet Facettenfelder aus der capabilities-Map ab: - capability_keys: alle Namen - capability_geN: Namen mit Level >= N (1..5) - capability_eqN: Namen mit Level == N (1..5) """ caps = caps or {} def names_where(pred) -> List[str]: out = [] for k, v in caps.items(): try: iv = int(v) except Exception: iv = 0 if pred(iv): t = str(k).strip() if t: out.append(t) return sorted({t for t in out}, key=str.casefold) all_keys = sorted({str(k).strip() for k in caps.keys() if str(k).strip()}, key=str.casefold) return { "capability_keys": all_keys, # >= N "capability_ge1": names_where(lambda lv: lv >= 1), "capability_ge2": names_where(lambda lv: lv >= 2), "capability_ge3": names_where(lambda lv: lv >= 3), "capability_ge4": names_where(lambda lv: lv >= 4), "capability_ge5": names_where(lambda lv: lv >= 5), # == N "capability_eq1": names_where(lambda lv: lv == 1), "capability_eq2": names_where(lambda lv: lv == 2), "capability_eq3": names_where(lambda lv: lv == 3), "capability_eq4": names_where(lambda lv: lv == 4), "capability_eq5": names_where(lambda lv: lv == 5), } def _response_strip_extras(payload: Dict[str, Any]) -> Dict[str, Any]: # Nur definierte Exercise-Felder zurückgeben (saubere API) allowed = set(Exercise.model_fields.keys()) return {k: v for k, v in payload.items() if k in allowed} def _build_filter(req: ExerciseSearchRequest) -> Filter: must: List[Any] = [] should: List[Any] = [] if req.discipline: must.append(FieldCondition(key="discipline", match=MatchValue(value=req.discipline))) if req.target_group: must.append(FieldCondition(key="target_group", match=MatchValue(value=req.target_group))) if req.age_group: must.append(FieldCondition(key="age_group", match=MatchValue(value=req.age_group))) if req.max_duration is not None: # Range in Qdrant: über rohen JSON-Range-Ausdruck (Client-Modell hat keinen Komfort-Wrapper) must.append({"key": "duration_minutes", "range": {"lte": int(req.max_duration)}}) # equipment if req.equipment_all: for it in req.equipment_all: must.append(FieldCondition(key="equipment", match=MatchValue(value=it))) if req.equipment_any: for it in req.equipment_any: should.append(FieldCondition(key="equipment", match=MatchValue(value=it))) # keywords if req.keywords_all: for it in req.keywords_all: must.append(FieldCondition(key="keywords", match=MatchValue(value=it))) if req.keywords_any: for it in req.keywords_any: should.append(FieldCondition(key="keywords", match=MatchValue(value=it))) # capabilities (ge/eq) if req.capability_names: names = [s for s in req.capability_names if s and s.strip()] if req.capability_eq_level: key = f"capability_eq{int(req.capability_eq_level)}" for n in names: must.append(FieldCondition(key=key, match=MatchValue(value=n))) elif req.capability_ge_level: key = f"capability_ge{int(req.capability_ge_level)}" for n in names: must.append(FieldCondition(key=key, match=MatchValue(value=n))) else: # Default: Level >=1 (alle vorhanden) for n in names: must.append(FieldCondition(key="capability_ge1", match=MatchValue(value=n))) flt = Filter(must=must) if should: # Qdrant: 'should' entspricht OR mit minimum_should_match=1 flt.should = should return flt # ========================= # Endpoints # ========================= @router.get( "/exercise/by-external-id", summary="Übung per external_id abrufen", description=( "Liefert die Übung mit der gegebenen `external_id` (z. B. `mw:{pageid}`). " "Verwendet einen Qdrant-Filter auf dem Payload-Feld `external_id`." ), response_description="Vollständiger Exercise-Payload oder 404 bei Nichtfund.", openapi_extra={ "x-codeSamples": [{ "lang": "bash", "label": "curl", "source": "curl -s 'http://localhost:8000/exercise/by-external-id?external_id=mw:218' | jq ." }] } ) def get_exercise_by_external_id(external_id: str = Query(..., min_length=3, description="Upsert-Schlüssel, z. B. 'mw:218'")): found = _lookup_by_external_id(external_id) if not found: raise HTTPException(status_code=404, detail="not found") return found @router.post( "/exercise", response_model=Exercise, summary="Create/Update (idempotent per external_id)", description=( "Legt eine Übung an oder aktualisiert sie. Wenn `external_id` vorhanden und bereits in der Collection existiert, " "wird **Update** auf dem bestehenden Punkt ausgeführt (Upsert). `keywords`/`equipment` werden normalisiert, " "Capability-Facetten (`capability_ge1..5`, `capability_eq1..5`, `capability_keys`) automatisch abgeleitet. " "Der Vektor wird aus Kernfeldern (title/summary/short_description/purpose/execution/notes) berechnet." ), response_description="Gespeicherter Exercise-Datensatz (Payload-View).", openapi_extra={ "x-codeSamples": [{ "lang": "bash", "label": "curl", "source": "curl -s -X POST http://localhost:8000/exercise -H 'Content-Type: application/json' -d @exercise.json | jq ." }] } ) def create_or_update_exercise(ex: Exercise): _ensure_collection() point_id = ex.id if ex.external_id: prior = _lookup_by_external_id(ex.external_id) if prior: point_id = prior.get("id", point_id) vector = _make_vector_from_exercise(ex) payload: Dict[str, Any] = ex.model_dump() payload["id"] = str(point_id) payload["keywords"] = _norm_list(payload.get("keywords") or []) payload["equipment"] = _norm_list(payload.get("equipment") or []) payload.update(_facet_capabilities(payload.get("capabilities") or {})) qdrant.upsert( collection_name=COLLECTION, points=[PointStruct(id=str(point_id), vector=vector, payload=payload)], ) return Exercise(**_response_strip_extras(payload)) @router.get( "/exercise/{exercise_id}", response_model=Exercise, summary="Übung per interner ID (Qdrant-Punkt-ID) lesen", description="Scrollt nach `id` und gibt den Payload als Exercise zurück.", response_description="Exercise-Payload oder 404 bei Nichtfund.", openapi_extra={ "x-codeSamples": [{ "lang": "bash", "label": "curl", "source": "curl -s 'http://localhost:8000/exercise/1234-uuid' | jq ." }] } ) def get_exercise(exercise_id: str): _ensure_collection() pts, _ = qdrant.scroll( collection_name=COLLECTION, scroll_filter=Filter(must=[FieldCondition(key="id", match=MatchValue(value=exercise_id))]), limit=1, with_payload=True, ) if not pts: raise HTTPException(status_code=404, detail="not found") payload = dict(pts[0].payload or {}) payload.setdefault("id", str(pts[0].id)) return Exercise(**_response_strip_extras(payload)) @router.post( "/exercise/search", response_model=ExerciseSearchResponse, summary="Suche Übungen (Filter + optional Vektor)", description=( "Kombinierbare Filter auf Payload-Feldern (`discipline`, `age_group`, `target_group`, `equipment`, `keywords`, " "`capability_geN/eqN`) und **optional** Vektor-Suche via `query`. " "`should`-Filter (equipment_any/keywords_any) wirken als OR (minimum_should_match=1). " "`max_duration` wird als Range (lte) angewandt. Ergebnis enthält bei Vektor-Suche `score`, sonst `null`." ), response_description="Trefferliste (payload + Score bei Vektor-Suche).", openapi_extra={ "x-codeSamples": [ { "lang": "bash", "label": "Filter", "source": "curl -s -X POST http://localhost:8000/exercise/search -H 'Content-Type: application/json' -d '{\"discipline\":\"Karate\",\"max_duration\":12,\"equipment_any\":[\"Bälle\"],\"capability_names\":[\"Reaktionsfähigkeit\"],\"capability_ge_level\":2,\"limit\":5}' | jq ." }, { "lang": "bash", "label": "Vektor + Filter", "source": "curl -s -X POST http://localhost:8000/exercise/search -H 'Content-Type: application/json' -d '{\"query\":\"Aufwärmen 10min Teenager Bälle\",\"discipline\":\"Karate\",\"limit\":3}' | jq ." } ] } ) def search_exercises(req: ExerciseSearchRequest) -> ExerciseSearchResponse: _ensure_collection() flt = _build_filter(req) hits: List[ExerciseSearchHit] = [] if req.query: vec = _make_vector_from_query(req.query) res = qdrant.search( collection_name=COLLECTION, query_vector=vec, limit=req.limit, offset=req.offset, query_filter=flt, ) for h in res: payload = dict(h.payload or {}) payload.setdefault("id", str(h.id)) hits.append(ExerciseSearchHit(id=str(h.id), score=float(h.score or 0.0), payload=Exercise(**_response_strip_extras(payload)))) else: # Filter-only: Scroll-Paginierung, Score=None collected = 0 skipped = 0 next_offset = None while collected < req.limit: page, next_offset = qdrant.scroll( collection_name=COLLECTION, scroll_filter=flt, offset=next_offset, limit=max(1, min(256, req.limit - collected + req.offset - skipped)), with_payload=True, ) if not page: break for pt in page: if skipped < req.offset: skipped += 1 continue payload = dict(pt.payload or {}) payload.setdefault("id", str(pt.id)) hits.append(ExerciseSearchHit(id=str(pt.id), score=None, payload=Exercise(**_response_strip_extras(payload)))) collected += 1 if collected >= req.limit: break if next_offset is None: break return ExerciseSearchResponse(hits=hits) @router.delete( "/exercise/delete-by-external-id", response_model=DeleteResponse, summary="Löscht Punkte mit gegebener external_id", description=( "Scrollt nach `external_id` und löscht alle passenden Punkte. " "Idempotent: wenn nichts gefunden → count=0. Vorsicht: **löscht dauerhaft**." ), response_description="Status + Anzahl gelöschter Punkte.", openapi_extra={ "x-codeSamples": [{ "lang": "bash", "label": "curl", "source": "curl -s 'http://localhost:8000/exercise/delete-by-external-id?external_id=mw:9999' | jq ." }] } ) def delete_by_external_id(external_id: str = Query(..., description="Upsert-Schlüssel, z. B. 'mw:218'")): _ensure_collection() flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))]) pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=10000, with_payload=False) ids = [str(p.id) for p in pts] if not ids: return DeleteResponse(status="🔍 Keine Einträge gefunden.", count=0, collection=COLLECTION) qdrant.delete(collection_name=COLLECTION, points_selector=PointIdsList(points=ids)) return DeleteResponse(status="🗑️ gelöscht", count=len(ids), collection=COLLECTION) @router.delete( "/exercise/delete-collection", response_model=DeleteResponse, summary="Collection komplett löschen", description=( "Entfernt die gesamte Collection aus Qdrant. **Gefährlich** – alle Übungen sind danach weg. " "Nutze nur in Testumgebungen oder für einen kompletten Neuaufbau." ), response_description="Status. count=0 (nicht relevant beim Drop).", openapi_extra={ "x-codeSamples": [{ "lang": "bash", "label": "curl", "source": "curl -s 'http://localhost:8000/exercise/delete-collection?collection=exercises' | jq ." }] } ) def delete_collection(collection: str = Query(default=COLLECTION, description="Collection-Name (Default: 'exercises')")): if not qdrant.collection_exists(collection): raise HTTPException(status_code=404, detail=f"Collection '{collection}' nicht gefunden.") qdrant.delete_collection(collection_name=collection) return DeleteResponse(status="🗑️ gelöscht", count=0, collection=collection) # --------------------------- # OPTIONAL: einfacher Selbsttest (kannst du auch separat als Script verwenden) # --------------------------- TEST_DOC = """ Speicher als tests/test_exercise_search.py und mit pytest laufen lassen. import os, requests BASE = os.getenv("API_BASE", "http://localhost:8000") # 1) Filter-only r = requests.post(f"{BASE}/exercise/search", json={ "discipline": "Karate", "max_duration": 12, "equipment_any": ["Bälle"], "capability_names": ["Reaktionsfähigkeit"], "capability_ge_level": 2, "limit": 5 }) r.raise_for_status() js = r.json() assert "hits" in js for h in js["hits"]: p = h["payload"] assert p["discipline"] == "Karate" assert p["duration_minutes"] <= 12 # 2) Vector + Filter r = requests.post(f"{BASE}/exercise/search", json={ "query": "Aufwärmen 10min, Reaktionsfähigkeit, Teenager, Bälle", "discipline": "Karate", "limit": 3 }) r.raise_for_status() js = r.json(); assert len(js["hits"]) <= 3 """