# Test eines Kommentars, um die Funktion des gitea testen zu können from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any from uuid import uuid4 from datetime import datetime from clients import model, qdrant from qdrant_client.models import ( PointStruct, VectorParams, Distance, PointIdsList, # NEW: für Filter-Queries (Lookup via external_id) Filter, FieldCondition, MatchValue, ) import os router = APIRouter() # ========================= # Models # ========================= class Exercise(BaseModel): id: str = Field(default_factory=lambda: str(uuid4())) # NEW — optional, bricht vorhandene POST-Calls nicht external_id: Optional[str] = None # z.B. "mw:12345" fingerprint: Optional[str] = None # sha256 über Kernfelder source: Optional[str] = None # Herkunft, z.B. "MediaWiki" imported_at: Optional[datetime] = None # vom Import gesetzt # Bestehende Felder (unverändert) title: str summary: str short_description: str keywords: List[str] = [] link: Optional[str] = None discipline: str group: Optional[str] = None age_group: str target_group: str min_participants: int duration_minutes: int capabilities: Dict[str, int] = {} category: str purpose: str execution: str notes: str preparation: str method: str equipment: List[str] = [] class DeleteResponse(BaseModel): status: str count: int collection: str # ========================= # Helpers # ========================= COLLECTION = os.getenv("EXERCISE_COLLECTION", "exercises") # CHANGED: Factorized to reuse for both create and update def _ensure_collection(): if not qdrant.collection_exists(COLLECTION): qdrant.recreate_collection( collection_name=COLLECTION, vectors_config=VectorParams( size=model.get_sentence_embedding_dimension(), distance=Distance.COSINE, ), ) # NEW: gemeinsamer Helper für external_id-Lookup def _lookup_by_external_id(external_id: str) -> Optional[Dict[str, Any]]: _ensure_collection() flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))]) pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=1) if not pts: return None # qdrant_client liefert PointStruct; wir geben die payload + id zurück doc = pts[0].payload or {} doc = dict(doc) doc.setdefault("id", str(pts[0].id)) return doc # NEW: konsistente Embedding-Erzeugung _def_embed_text_fields = ("title", "summary", "short_description", "purpose", "execution", "notes") def _make_vector(ex: Exercise) -> List[float]: text = ". ".join([getattr(ex, f, "") for f in _def_embed_text_fields if getattr(ex, f, None)]) # Achtung: model.encode muss synchron sein; sonst async anpassen vec = model.encode(text).tolist() return vec # ========================= # Endpoints # ========================= @router.get("/exercise/by-external-id") # NEW def get_exercise_by_external_id(external_id: str = Query(..., min_length=3)): """Lookup für Idempotenz im Importer. Liefert 404, wenn nicht vorhanden.""" found = _lookup_by_external_id(external_id) if not found: raise HTTPException(status_code=404, detail="not found") return found @router.post("/exercise", response_model=Exercise) def create_or_update_exercise(ex: Exercise): """ CHANGED: Upsert-Semantik. Wenn `external_id` existiert und bereits in Qdrant gefunden wird, wird dieselbe Point-ID überschrieben (echtes Update). Ansonsten neuer Eintrag. API-Signatur bleibt identisch (POST /exercise, Body = Exercise). """ _ensure_collection() # Default: neue Point-ID aus dem Exercise-Objekt point_id = ex.id # Wenn external_id gesetzt → prüfen, ob bereits vorhanden → Point-ID übernehmen if ex.external_id: prior = _lookup_by_external_id(ex.external_id) if prior: point_id = prior.get("id", point_id) # Embedding berechnen vector = _make_vector(ex) # Payload synchronisieren (id == point_id) payload = ex.dict() payload["id"] = str(point_id) # Upsert in Qdrant qdrant.upsert( collection_name=COLLECTION, points=[PointStruct(id=str(point_id), vector=vector, payload=payload)], ) return Exercise(**payload) # (Optional) – Einzel-Abruf per ID (falls bereits vorhanden, unverändert) @router.get("/exercise/{exercise_id}", response_model=Exercise) def get_exercise(exercise_id: str): _ensure_collection() pts, _ = qdrant.scroll( collection_name=COLLECTION, scroll_filter=Filter(must=[FieldCondition(key="id", match=MatchValue(value=exercise_id))]), limit=1, ) if not pts: raise HTTPException(status_code=404, detail="not found") payload = dict(pts[0].payload or {}) payload.setdefault("id", str(pts[0].id)) return Exercise(**payload) # Bestehende Admin-Utilities (Delete nach Filter / komplette Collection) – unverändert außer Nutzung von CONSTs @router.delete("/exercise/delete-by-external-id", response_model=DeleteResponse) def delete_by_external_id(external_id: str = Query(...)): _ensure_collection() flt = Filter(must=[FieldCondition(key="external_id", match=MatchValue(value=external_id))]) pts, _ = qdrant.scroll(collection_name=COLLECTION, scroll_filter=flt, limit=10000) ids = [str(p.id) for p in pts] if not ids: return DeleteResponse(status="🔍 Keine Einträge gefunden.", count=0, collection=COLLECTION) qdrant.delete(collection_name=COLLECTION, points_selector=PointIdsList(points=ids)) return DeleteResponse(status="🗑️ gelöscht", count=len(ids), collection=COLLECTION) @router.delete("/exercise/delete-collection", response_model=DeleteResponse) def delete_collection(collection: str = Query(default=COLLECTION)): if not qdrant.collection_exists(collection): raise HTTPException(status_code=404, detail=f"Collection '{collection}' nicht gefunden.") qdrant.delete_collection(collection_name=collection) return DeleteResponse(status="🗑️ gelöscht", count=0, collection=collection)