Trainer_LLM/llm-api/exercise_router.py
Lars 380b361e70
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 2s
llm-api/exercise_router.py aktualisiert
2025-08-11 18:59:27 +02:00

239 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
exercise_router.py v1.6.1
Änderungen ggü. v1.6.0:
- **Idempotenz-Fix:** Qdrant-Scroll liefert nun explizit den Payload (`WithPayloadSelector(enable=True)`)
für `/exercise/by-external-id` und `/exercise/{id}`. Dadurch kann der Importer den gespeicherten
Fingerprint korrekt gegen den Recalc-Hash prüfen (keine Phantom-Updates mehr).
- Capability-Facetten & Listen-Normalisierung wie in v1.6.0.
- Keine API-Signaturänderungen.
"""
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from uuid import uuid4
from datetime import datetime
from clients import model, qdrant
from qdrant_client.models import (
PointStruct,
VectorParams,
Distance,
PointIdsList,
Filter,
FieldCondition,
MatchValue,
WithPayloadSelector,
)
import os
router = APIRouter()
# =========================
# Models
# =========================
class Exercise(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
# Upsert-Metadaten
external_id: Optional[str] = None # z.B. "mw:12345"
fingerprint: Optional[str] = None # sha256 über Kernfelder
source: Optional[str] = None # Herkunft, z.B. "MediaWiki"
imported_at: Optional[datetime] = None # vom Import gesetzt (ISO-String wird akzeptiert)
# Domain-Felder
title: str
summary: str
short_description: str
keywords: List[str] = []
link: Optional[str] = None
discipline: str
group: Optional[str] = None
age_group: str
target_group: str
min_participants: int
duration_minutes: int
capabilities: Dict[str, int] = {}
category: str
purpose: str
execution: str
notes: str
preparation: str
method: str
equipment: List[str] = []
class DeleteResponse(BaseModel):
status: str
count: int
collection: str
# =========================
# Helpers
# =========================
COLLECTION = os.getenv("EXERCISE_COLLECTION", "exercises")
def _ensure_collection():
"""Sicherstellen, dass die Collection existiert (kein Drop)."""
if not qdrant.collection_exists(COLLECTION):
qdrant.recreate_collection(
collection_name=COLLECTION,
vectors_config=VectorParams(
size=model.get_sentence_embedding_dimension(),
distance=Distance.COSINE,
),
)
def _lookup_by_external_id(external_id: str) -> Optional[Dict[str, Any]]:
"""Lookup via Payload-Filter. Liefert die gespeicherte Payload (mit allen Feldern)."""
_ensure_collection()
flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))])
pts, _ = qdrant.scroll(
collection_name=COLLECTION,
scroll_filter=flt,
limit=1,
with_payload=WithPayloadSelector(enable=True),
)
if not pts:
return None
doc = dict(pts[0].payload or {})
doc.setdefault("id", str(pts[0].id))
return doc
_DEF_EMBED_FIELDS = ("title", "summary", "short_description", "purpose", "execution", "notes")
def _make_vector(ex: Exercise) -> List[float]:
text = ". ".join([getattr(ex, f, "") for f in _DEF_EMBED_FIELDS if getattr(ex, f, None)])
vec = model.encode(text).tolist()
return vec
def _norm_list(xs: List[Any]) -> List[str]:
"""Trim + Duplikate entfernen + sortieren (stabil für Filter & Fingerprint)."""
out = []
seen = set()
for x in xs or []:
s = str(x).strip()
if not s:
continue
key = s.casefold()
if key in seen:
continue
seen.add(key)
out.append(s)
return sorted(out, key=str.casefold)
def _facet_capabilities(caps: Dict[str, Any]) -> Dict[str, List[str]]:
caps = caps or {}
def ge(n: int) -> List[str]:
out = []
for k, v in caps.items():
try:
if int(v) >= n:
out.append(str(k))
except Exception:
pass
return sorted({s.strip() for s in out if s.strip()}, key=str.casefold)
all_keys = sorted({str(k).strip() for k in caps.keys() if str(k).strip()}, key=str.casefold)
return {
"capability_keys": all_keys,
"capability_ge1": ge(1),
"capability_ge2": ge(2),
"capability_ge3": ge(3),
}
def _response_strip_extras(payload: Dict[str, Any]) -> Dict[str, Any]:
"""Nur Felder zurückgeben, die im Pydantic-Modell existieren (Extra-Felder bleiben im Qdrant-Payload)."""
allowed = set(Exercise.model_fields.keys()) # Pydantic v2
return {k: v for k, v in payload.items() if k in allowed}
# =========================
# Endpoints
# =========================
@router.get("/exercise/by-external-id")
def get_exercise_by_external_id(external_id: str = Query(..., min_length=3)):
"""Lookup für Idempotenz im Importer. Liefert 404, wenn nicht vorhanden."""
found = _lookup_by_external_id(external_id)
if not found:
raise HTTPException(status_code=404, detail="not found")
return found
@router.post("/exercise", response_model=Exercise)
def create_or_update_exercise(ex: Exercise):
"""
Upsert-Semantik. Wenn `external_id` existiert und bereits in Qdrant gefunden wird,
wird dieselbe Point-ID überschrieben (echtes Update). Ansonsten neuer Eintrag.
API-Signatur bleibt identisch (POST /exercise, Body = Exercise).
"""
_ensure_collection()
# Bestehende Point-ID übernehmen, falls external_id bereits vorhanden ist
point_id = ex.id
if ex.external_id:
prior = _lookup_by_external_id(ex.external_id)
if prior:
point_id = prior.get("id", point_id)
# Embedding
vector = _make_vector(ex)
# Payload stabilisieren + Facetten einfügen
payload: Dict[str, Any] = ex.model_dump()
payload["id"] = str(point_id)
payload["keywords"] = _norm_list(payload.get("keywords") or [])
payload["equipment"] = _norm_list(payload.get("equipment") or [])
facet = _facet_capabilities(payload.get("capabilities") or {})
# Extra-Felder nur im gespeicherten Payload verwenden (für Filter), nicht in der Response
payload.update(facet)
# Upsert in Qdrant
qdrant.upsert(
collection_name=COLLECTION,
points=[PointStruct(id=str(point_id), vector=vector, payload=payload)],
)
return Exercise(**_response_strip_extras(payload))
@router.get("/exercise/{exercise_id}", response_model=Exercise)
def get_exercise(exercise_id: str):
_ensure_collection()
pts, _ = qdrant.scroll(
collection_name=COLLECTION,
scroll_filter=Filter(must=[FieldCondition(key="id", match=MatchValue(value=exercise_id))]),
limit=1,
with_payload=WithPayloadSelector(enable=True),
)
if not pts:
raise HTTPException(status_code=404, detail="not found")
payload = dict(pts[0].payload or {})
payload.setdefault("id", str(pts[0].id))
return Exercise(**_response_strip_extras(payload))
@router.delete("/exercise/delete-by-external-id", response_model=DeleteResponse)
def delete_by_external_id(external_id: str = Query(...)):
_ensure_collection()
flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))])
pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=10000, with_payload=WithPayloadSelector(enable=False))
ids = [str(p.id) for p in pts]
if not ids:
return DeleteResponse(status="🔍 Keine Einträge gefunden.", count=0, collection=COLLECTION)
qdrant.delete(collection_name=COLLECTION, points_selector=PointIdsList(points=ids))
return DeleteResponse(status="🗑️ gelöscht", count=len(ids), collection=COLLECTION)
@router.delete("/exercise/delete-collection", response_model=DeleteResponse)
def delete_collection(collection: str = Query(default=COLLECTION)):
if not qdrant.collection_exists(collection):
raise HTTPException(status_code=404, detail=f"Collection '{collection}' nicht gefunden.")
qdrant.delete_collection(collection_name=collection)
return DeleteResponse(status="🗑️ gelöscht", count=0, collection=collection)