Trainer_LLM/llm-api/exercise_router.py
Lars 8302a7fecf
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 2s
llm-api/exercise_router.py aktualisiert
2025-08-11 06:42:59 +02:00

173 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Test eines Kommentars, um die Funktion des gitea testen zu können
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from uuid import uuid4
from datetime import datetime
from clients import model, qdrant
from qdrant_client.models import (
PointStruct,
VectorParams,
Distance,
PointIdsList,
# NEW: für Filter-Queries (Lookup via external_id)
Filter, FieldCondition, MatchValue,
)
import os
router = APIRouter()
# =========================
# Models
# =========================
class Exercise(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
# NEW — optional, bricht vorhandene POST-Calls nicht
external_id: Optional[str] = None # z.B. "mw:12345"
fingerprint: Optional[str] = None # sha256 über Kernfelder
source: Optional[str] = None # Herkunft, z.B. "MediaWiki"
imported_at: Optional[datetime] = None # vom Import gesetzt
# Bestehende Felder (unverändert)
title: str
summary: str
short_description: str
keywords: List[str] = []
link: Optional[str] = None
discipline: str
group: Optional[str] = None
age_group: str
target_group: str
min_participants: int
duration_minutes: int
capabilities: Dict[str, int] = {}
category: str
purpose: str
execution: str
notes: str
preparation: str
method: str
equipment: List[str] = []
class DeleteResponse(BaseModel):
status: str
count: int
collection: str
# =========================
# Helpers
# =========================
COLLECTION = os.getenv("EXERCISE_COLLECTION", "exercises")
# CHANGED: Factorized to reuse for both create and update
def _ensure_collection():
if not qdrant.collection_exists(COLLECTION):
qdrant.recreate_collection(
collection_name=COLLECTION,
vectors_config=VectorParams(
size=model.get_sentence_embedding_dimension(),
distance=Distance.COSINE,
),
)
# NEW: gemeinsamer Helper für external_id-Lookup
def _lookup_by_external_id(external_id: str) -> Optional[Dict[str, Any]]:
_ensure_collection()
flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))])
pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=1)
if not pts:
return None
# qdrant_client liefert PointStruct; wir geben die payload + id zurück
doc = pts[0].payload or {}
doc = dict(doc)
doc.setdefault("id", str(pts[0].id))
return doc
# NEW: konsistente Embedding-Erzeugung
_def_embed_text_fields = ("title", "summary", "short_description", "purpose", "execution", "notes")
def _make_vector(ex: Exercise) -> List[float]:
text = ". ".join([getattr(ex, f, "") for f in _def_embed_text_fields if getattr(ex, f, None)])
# Achtung: model.encode muss synchron sein; sonst async anpassen
vec = model.encode(text).tolist()
return vec
# =========================
# Endpoints
# =========================
@router.get("/exercise/by-external-id") # NEW
def get_exercise_by_external_id(external_id: str = Query(..., min_length=3)):
"""Lookup für Idempotenz im Importer. Liefert 404, wenn nicht vorhanden."""
found = _lookup_by_external_id(external_id)
if not found:
raise HTTPException(status_code=404, detail="not found")
return found
@router.post("/exercise", response_model=Exercise)
def create_or_update_exercise(ex: Exercise):
"""
CHANGED: Upsert-Semantik. Wenn `external_id` existiert und bereits in Qdrant gefunden wird,
wird dieselbe Point-ID überschrieben (echtes Update). Ansonsten neuer Eintrag.
API-Signatur bleibt identisch (POST /exercise, Body = Exercise).
"""
_ensure_collection()
# Default: neue Point-ID aus dem Exercise-Objekt
point_id = ex.id
# Wenn external_id gesetzt → prüfen, ob bereits vorhanden → Point-ID übernehmen
if ex.external_id:
prior = _lookup_by_external_id(ex.external_id)
if prior:
point_id = prior.get("id", point_id)
# Embedding berechnen
vector = _make_vector(ex)
# Payload synchronisieren (id == point_id)
payload = ex.dict()
payload["id"] = str(point_id)
# Upsert in Qdrant
qdrant.upsert(
collection_name=COLLECTION,
points=[PointStruct(id=str(point_id), vector=vector, payload=payload)],
)
return Exercise(**payload)
# (Optional) Einzel-Abruf per ID (falls bereits vorhanden, unverändert)
@router.get("/exercise/{exercise_id}", response_model=Exercise)
def get_exercise(exercise_id: str):
_ensure_collection()
pts, _ = qdrant.scroll(
collection_name=COLLECTION,
scroll_filter=Filter(must=[FieldCondition(key="id", match=MatchValue(value=exercise_id))]),
limit=1,
)
if not pts:
raise HTTPException(status_code=404, detail="not found")
payload = dict(pts[0].payload or {})
payload.setdefault("id", str(pts[0].id))
return Exercise(**payload)
# Bestehende Admin-Utilities (Delete nach Filter / komplette Collection) unverändert außer Nutzung von CONSTs
@router.delete("/exercise/delete-by-external-id", response_model=DeleteResponse)
def delete_by_external_id(external_id: str = Query(...)):
_ensure_collection()
flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))])
pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=10000)
ids = [str(p.id) for p in pts]
if not ids:
return DeleteResponse(status="🔍 Keine Einträge gefunden.", count=0, collection=COLLECTION)
qdrant.delete(collection_name=COLLECTION, points_selector=PointIdsList(points=ids))
return DeleteResponse(status="🗑️ gelöscht", count=len(ids), collection=COLLECTION)
@router.delete("/exercise/delete-collection", response_model=DeleteResponse)
def delete_collection(collection: str = Query(default=COLLECTION)):
if not qdrant.collection_exists(collection):
raise HTTPException(status_code=404, detail=f"Collection '{collection}' nicht gefunden.")
qdrant.delete_collection(collection_name=collection)
return DeleteResponse(status="🗑️ gelöscht", count=0, collection=collection)