llm-api/plan_router.py aktualisiert
All checks were successful
Deploy Trainer_LLM to llm-node / deploy (push) Successful in 1s

This commit is contained in:
Lars 2025-08-13 07:06:08 +02:00
parent 597b94ff25
commit 7f821b5723

View File

@ -1,14 +1,13 @@
# -*- coding: utf-8 -*-
"""
plan_router.py v0.11.0 (WP-15)
plan_router.py v0.12.0 (WP-15)
Minimal-CRUD für Plan-Templates & Pläne (POST/GET) + Idempotenz via Fingerprint.
Erweiterungen:
- optionale Section-Felder ideal/supplement + materialisierte Facettenfelder
- Referenz-Validierung: template_id (pflicht, wenn gesetzt)
- Optionaler Strict-Mode: PLAN_STRICT_EXERCISES prüft exercise_external_id
NEU in v0.12.0:
- List-/Filter-Endpoints: GET /plan_templates, GET /plans
- Umfangreiche Swagger-Doku (summary/description, Query-Parameter mit Beschreibungen & Beispielen)
"""
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from uuid import uuid4
@ -37,8 +36,8 @@ class TemplateSection(BaseModel):
name: str
target_minutes: int
must_keywords: List[str] = []
ideal_keywords: List[str] = [] # optional
supplement_keywords: List[str] = [] # optional
ideal_keywords: List[str] = []
supplement_keywords: List[str] = []
forbid_keywords: List[str] = []
capability_targets: Dict[str, int] = {}
@ -82,12 +81,23 @@ class Plan(BaseModel):
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
source: str = "API"
class PlanTemplateList(BaseModel):
items: List[PlanTemplate]
limit: int
offset: int
count: int
class PlanList(BaseModel):
items: List[Plan]
limit: int
offset: int
count: int
# -----------------
# Helpers
# -----------------
def _ensure_collection(name: str):
# Legt Collection an, wenn sie fehlt (analog exercise_router)
if not qdrant.collection_exists(name):
qdrant.recreate_collection(
collection_name=name,
@ -125,15 +135,11 @@ def _embed(text: str):
def _fingerprint_for_plan(p: Plan) -> str:
"""sha256(title, total_minutes, sections.items.exercise_external_id, sections.items.duration)"""
core = {
"title": p.title,
"total_minutes": int(p.total_minutes),
"items": [
{
"exercise_external_id": it.exercise_external_id,
"duration": int(it.duration),
}
{"exercise_external_id": it.exercise_external_id, "duration": int(it.duration)}
for sec in p.sections
for it in (sec.items or [])
],
@ -153,7 +159,6 @@ def _get_by_field(collection: str, key: str, value: Any) -> Optional[Dict[str, A
def _as_model(model_cls, payload: Dict[str, Any]):
"""Filtert unbekannte Felder heraus (Pydantic v1/v2 kompatibel)."""
fields = getattr(model_cls, "model_fields", None) or getattr(model_cls, "__fields__", {})
allowed = set(fields.keys())
data = {k: payload[k] for k in payload.keys() if k in allowed}
@ -170,13 +175,22 @@ def _exists_in_collection(collection: str, key: str, value: Any) -> bool:
return bool(pts)
# -----------------
# Endpoints
# CRUD (bestehend)
# -----------------
@router.post("/plan_templates", response_model=PlanTemplate)
@router.post(
"/plan_templates",
response_model=PlanTemplate,
summary="Create a plan template",
description=(
"Erstellt ein Plan-Template (Strukturplanung).\n\n"
"• Mehrere Sections erlaubt.\n"
"• Section-Felder: must/ideal/supplement/forbid keywords + capability_targets.\n"
"• Materialisierte Facettenfelder (section_*) werden intern geschrieben, um Qdrant-Filter zu beschleunigen."
),
)
def create_plan_template(t: PlanTemplate):
_ensure_collection(PLAN_TEMPLATE_COLLECTION)
payload = t.model_dump()
# Normalisierung
payload["goals"] = _norm_list(payload.get("goals"))
sections = payload.get("sections", []) or []
for s in sections:
@ -185,7 +199,6 @@ def create_plan_template(t: PlanTemplate):
s["supplement_keywords"] = _norm_list(s.get("supplement_keywords") or [])
s["forbid_keywords"] = _norm_list(s.get("forbid_keywords") or [])
# Materialisierte Facettenfelder (stabile KEYWORD-Indizes in Qdrant)
payload["section_names"] = _norm_list([s.get("name", "") for s in sections])
payload["section_must_keywords"] = _norm_list([kw for s in sections for kw in (s.get("must_keywords") or [])])
payload["section_ideal_keywords"] = _norm_list([kw for s in sections for kw in (s.get("ideal_keywords") or [])])
@ -197,7 +210,12 @@ def create_plan_template(t: PlanTemplate):
return t
@router.get("/plan_templates/{tpl_id}", response_model=PlanTemplate)
@router.get(
"/plan_templates/{tpl_id}",
response_model=PlanTemplate,
summary="Read a plan template by id",
description="Liest ein Template anhand seiner ID und gibt nur die Schemafelder zurück (zusätzliche Payload wird herausgefiltert).",
)
def get_plan_template(tpl_id: str):
_ensure_collection(PLAN_TEMPLATE_COLLECTION)
found = _get_by_field(PLAN_TEMPLATE_COLLECTION, "id", tpl_id)
@ -206,16 +224,22 @@ def get_plan_template(tpl_id: str):
return _as_model(PlanTemplate, found)
@router.post("/plan", response_model=Plan)
@router.post(
"/plan",
response_model=Plan,
summary="Create a concrete training plan",
description=(
"Erstellt einen konkreten Trainingsplan.\n\n"
"Idempotenz: gleicher Fingerprint (title + items) → gleicher Plan (kein Duplikat).\n"
"Optional: Validierung von template_id und Exercises (Strict-Mode)."
),
)
def create_plan(p: Plan):
_ensure_collection(PLAN_COLLECTION)
# 1) Template-Referenz prüfen (falls gesetzt)
if p.template_id:
if not _exists_in_collection(PLAN_TEMPLATE_COLLECTION, "id", p.template_id):
raise HTTPException(status_code=422, detail=f"Unknown template_id: {p.template_id}")
# 2) Optional: Strict-Mode Exercises prüfen
if _truthy(os.getenv("PLAN_STRICT_EXERCISES")):
missing: List[str] = []
for sec in p.sections or []:
@ -224,21 +248,15 @@ def create_plan(p: Plan):
if exid and not _exists_in_collection(EXERCISE_COLLECTION, "external_id", exid):
missing.append(exid)
if missing:
raise HTTPException(status_code=422, detail={
"error": "unknown exercise_external_id",
"missing": sorted(set(missing))
})
raise HTTPException(status_code=422, detail={"error": "unknown exercise_external_id", "missing": sorted(set(missing))})
# 3) Fingerprint
fp = _fingerprint_for_plan(p)
p.fingerprint = p.fingerprint or fp
# 4) Idempotenz
existing = _get_by_field(PLAN_COLLECTION, "fingerprint", p.fingerprint)
if existing:
return _as_model(Plan, existing)
# 5) Normalisieren & upsert
p.goals = _norm_list(p.goals)
payload = p.model_dump()
if isinstance(payload.get("created_at"), datetime):
@ -249,7 +267,12 @@ def create_plan(p: Plan):
return p
@router.get("/plan/{plan_id}", response_model=Plan)
@router.get(
"/plan/{plan_id}",
response_model=Plan,
summary="Read a training plan by id",
description="Liest einen Plan anhand seiner ID. `created_at` wird (falls ISO-String) zu `datetime` geparst.",
)
def get_plan(plan_id: str):
_ensure_collection(PLAN_COLLECTION)
found = _get_by_field(PLAN_COLLECTION, "id", plan_id)
@ -261,3 +284,156 @@ def get_plan(plan_id: str):
except Exception:
pass
return _as_model(Plan, found)
# -----------------
# NEW: List-/Filter-Endpoints
# -----------------
@router.get(
"/plan_templates",
response_model=PlanTemplateList,
summary="List plan templates (filterable)",
description=(
"Listet Plan-Templates mit Filtern.\n\n"
"**Filter** (exakte Matches, KEYWORD-Felder):\n"
"- discipline, age_group, target_group\n"
"- section: Section-Name (nutzt materialisierte `section_names`)\n"
"- goal: Ziel (nutzt `goals`)\n"
"- keyword: trifft auf beliebige Section-Keyword-Felder (must/ideal/supplement/forbid).\n\n"
"**Pagination:** limit/offset. Feld `count` entspricht der Anzahl zurückgegebener Items (keine Gesamtsumme)."
),
)
def list_plan_templates(
discipline: Optional[str] = Query(None, description="Filter: Disziplin (exaktes KEYWORD-Match)", example="Karate"),
age_group: Optional[str] = Query(None, description="Filter: Altersgruppe", example="Teenager"),
target_group: Optional[str] = Query(None, description="Filter: Zielgruppe", example="Breitensport"),
section: Optional[str] = Query(None, description="Filter: Section-Name (materialisiert)", example="Warmup"),
goal: Optional[str] = Query(None, description="Filter: Trainingsziel", example="Technik"),
keyword: Optional[str] = Query(None, description="Filter: Keyword in must/ideal/supplement/forbid", example="Koordination"),
limit: int = Query(20, ge=1, le=200, description="Max. Anzahl Items"),
offset: int = Query(0, ge=0, description="Start-Offset für Paging"),
):
_ensure_collection(PLAN_TEMPLATE_COLLECTION)
must: List[Any] = []
should: List[Any] = []
if discipline:
must.append(FieldCondition(key="discipline", match=MatchValue(value=discipline)))
if age_group:
must.append(FieldCondition(key="age_group", match=MatchValue(value=age_group)))
if target_group:
must.append(FieldCondition(key="target_group", match=MatchValue(value=target_group)))
if section:
must.append(FieldCondition(key="section_names", match=MatchValue(value=section)))
if goal:
must.append(FieldCondition(key="goals", match=MatchValue(value=goal)))
if keyword:
for k in [
"section_must_keywords",
"section_ideal_keywords",
"section_supplement_keywords",
"section_forbid_keywords",
]:
should.append(FieldCondition(key=k, match=MatchValue(value=keyword)))
flt = None
if must or should:
flt = Filter(must=must or None, should=should or None)
# einfache Offset-Implementierung: wir holen (offset+limit) und slicen lokal
fetch_n = offset + limit
fetch_n = max(fetch_n, 1)
pts, _ = qdrant.scroll(collection_name=PLAN_TEMPLATE_COLLECTION, scroll_filter=flt, limit=fetch_n, with_payload=True)
items = []
for p in pts[offset:offset+limit]:
payload = dict(p.payload or {})
payload.setdefault("id", str(p.id))
items.append(_as_model(PlanTemplate, payload))
return PlanTemplateList(items=items, limit=limit, offset=offset, count=len(items))
@router.get(
"/plans",
response_model=PlanList,
summary="List training plans (filterable)",
description=(
"Listet Trainingspläne mit Filtern.\n\n"
"**Filter** (exakte Matches, KEYWORD-Felder):\n"
"- created_by, discipline, age_group, target_group, goal\n"
"- section: Section-Name (aktuell verschachteltes `sections.name`; optional später materialisieren als `plan_section_names`)\n"
"- created_from / created_to: ISO8601 Zeitfenster (lokal ausgewertet).\n\n"
"**Pagination:** limit/offset. Feld `count` entspricht der Anzahl zurückgegebener Items (keine Gesamtsumme)."
),
)
def list_plans(
created_by: Optional[str] = Query(None, description="Filter: Ersteller", example="tester"),
discipline: Optional[str] = Query(None, description="Filter: Disziplin", example="Karate"),
age_group: Optional[str] = Query(None, description="Filter: Altersgruppe", example="Teenager"),
target_group: Optional[str] = Query(None, description="Filter: Zielgruppe", example="Breitensport"),
goal: Optional[str] = Query(None, description="Filter: Trainingsziel", example="Technik"),
section: Optional[str] = Query(None, description="Filter: Section-Name", example="Warmup"),
created_from: Optional[str] = Query(None, description="Ab-Zeitpunkt (ISO 8601, z. B. 2025-08-12T00:00:00Z)", example="2025-08-12T00:00:00Z"),
created_to: Optional[str] = Query(None, description="Bis-Zeitpunkt (ISO 8601)", example="2025-08-13T00:00:00Z"),
limit: int = Query(20, ge=1, le=200, description="Max. Anzahl Items"),
offset: int = Query(0, ge=0, description="Start-Offset für Paging"),
):
_ensure_collection(PLAN_COLLECTION)
must: List[Any] = []
if created_by:
must.append(FieldCondition(key="created_by", match=MatchValue(value=created_by)))
if discipline:
must.append(FieldCondition(key="discipline", match=MatchValue(value=discipline)))
if age_group:
must.append(FieldCondition(key="age_group", match=MatchValue(value=age_group)))
if target_group:
must.append(FieldCondition(key="target_group", match=MatchValue(value=target_group)))
if goal:
must.append(FieldCondition(key="goals", match=MatchValue(value=goal)))
if section:
# Achtung: verschachtelt funktioniert in eurer Qdrant-Version; sonst Schritt 3 (Materialisierung)
must.append(FieldCondition(key="sections.name", match=MatchValue(value=section)))
flt = Filter(must=must or None) if must else None
fetch_n = offset + limit
fetch_n = max(fetch_n, 1)
pts, _ = qdrant.scroll(collection_name=PLAN_COLLECTION, scroll_filter=flt, limit=fetch_n, with_payload=True)
# optionales Zeitfenster lokal anwenden (created_at ist ISOString im Payload)
def _in_window(py):
if not (created_from or created_to):
return True
ts = py.get("created_at")
if isinstance(ts, dict) and ts.get("$date"):
ts = ts["$date"]
if isinstance(ts, str):
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
except Exception:
return False
elif isinstance(ts, datetime):
dt = ts
else:
return False
ok = True
if created_from:
try:
ok = ok and dt >= datetime.fromisoformat(created_from.replace("Z", "+00:00"))
except Exception:
pass
if created_to:
try:
ok = ok and dt <= datetime.fromisoformat(created_to.replace("Z", "+00:00"))
except Exception:
pass
return ok
payloads = []
for p in pts:
py = dict(p.payload or {})
py.setdefault("id", str(p.id))
if _in_window(py):
payloads.append(py)
sliced = payloads[offset:offset+limit]
items = [_as_model(Plan, x) for x in sliced]
return PlanList(items=items, limit=limit, offset=offset, count=len(items))