shinkan-jinkendo/backend/exercise_ai.py
Lars 9be69ace5c
All checks were successful
Deploy Development / deploy (push) Successful in 42s
Test Suite / pytest-backend (push) Successful in 38s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 12s
Test Suite / k6 /health Baseline (push) Successful in 33s
Test Suite / playwright-tests (push) Successful in 1m16s
Enhance exercise_ai module with skill input sanitization and version update
- Introduced a new constant `_MAX_SANITIZE_SKILL_INPUT_ROWS` to limit the number of skill entries processed, improving performance and preventing issues with excessively long skill arrays.
- Updated the `_extract_json_array` and `_sanitize_skill_entries` functions to enforce this limit, ensuring that only a maximum of 250 skill entries are handled and that processing stops after 5 valid entries.
- Incremented the application version to 0.8.155 and updated the changelog to reflect these changes, including a note on the improvements made to the AI endpoint for skill arrays.
2026-05-22 09:59:56 +02:00

719 lines
23 KiB
Python

"""
KI-Vorschlaege fuer Uebungsformular: Laedt Prompts aus ai_prompts, ruft OpenRouter auf.
Keine persistente Aenderung an exercises — nur Response-DTO fuer das Frontend.
Skill-Katalog fuer Prompts: priorisierte Auswahl (ai_skill_retrieval_profiles, Fallback-Heuristik).
"""
from __future__ import annotations
import copy
import json
import math
import re
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Sequence, Tuple
from fastapi import HTTPException
from openrouter_chat import OpenRouterError, normalize_openrouter_env, openrouter_chat_completion
_CANONICAL_SKILL_LEVELS = frozenset({"basis", "grundlagen", "aufbau", "fortgeschritten", "optimierung"})
_LEGACY_SKILL_LEVEL_SLUG = {
"einsteiger": "basis",
"experte": "optimierung",
"1": "basis",
"2": "grundlagen",
"3": "aufbau",
"4": "fortgeschritten",
"5": "optimierung",
}
_ALLOWED_SKILL_INTENSITY = frozenset({"niedrig", "mittel", "hoch"})
_TAG_RE = re.compile(r"<[^>]+>", re.IGNORECASE)
_TOKEN_FIND = re.compile(r"[a-zäöüß0-9]+", re.IGNORECASE)
_MAX_PLAIN_FIELD = 28_000
_MAX_SKILLS_CATALOG_LINES = 240
_MAX_SUMMARY_CHARS = 220
_MAX_SANITIZE_SKILL_INPUT_ROWS = 250
_FALLBACK_RETRIEVAL_CONFIG: Dict[str, Any] = {
"version": 1,
"importance_multiplier": 1.0,
"text_overlap_bonus": 2.0,
"main_slug_weights": {"karate": 1.0, "allgemeine": 1.0},
"category_slug_weights": {},
"category_max_share": {"kondition": 0.38, "koordination": 0.35},
"main_min_share": {},
"description_plain_max_len": 160,
"karate_relevance_max_len": 72,
"keyword_overrides": [],
}
def _normalize_exercise_skill_level(value) -> Optional[str]:
if value is None:
return None
s = str(value).strip().lower()
if not s:
return None
if s in _CANONICAL_SKILL_LEVELS:
return s
return _LEGACY_SKILL_LEVEL_SLUG.get(s)
def _normalize_exercise_skill_intensity(value) -> str:
if value is None:
return "mittel"
key = str(value).strip().lower()
if key in ("low",):
return "niedrig"
if key in ("medium",):
return "mittel"
if key in ("high",):
return "hoch"
if key in _ALLOWED_SKILL_INTENSITY:
return key
return "mittel"
def strip_html_to_plain(html: Optional[str], *, max_len: int = _MAX_PLAIN_FIELD) -> str:
if not html:
return ""
t = _TAG_RE.sub(" ", str(html))
t = re.sub(r"\s+", " ", t).strip()
if len(t) > max_len:
t = t[: max_len - 1].rstrip() + ""
return t
def _corpus_tokens(*parts: str) -> frozenset:
hay = " ".join(p.strip() for p in parts if p and p.strip())
ws = {_m.group(0).lower() for _m in _TOKEN_FIND.finditer(hay)}
return frozenset(w for w in ws if len(w) > 1)
def _ai_profiles_table_ready(cur) -> bool:
cur.execute("SELECT to_regclass(%s)::text AS t", ("public.ai_skill_retrieval_profiles",))
row = cur.fetchone()
if row is None:
return False
val = row["t"] if isinstance(row, dict) else row[0]
return val is not None and str(val).strip() != ""
def _average_float_dict(dicts: Sequence[Mapping[str, Any]], *, fallback: float) -> Dict[str, float]:
keys: set = set()
for d in dicts:
keys |= set(d.keys())
out: Dict[str, float] = {}
for k in keys:
vals = []
for d in dicts:
if k not in d or d[k] is None:
continue
try:
vals.append(float(d[k]))
except (TypeError, ValueError):
continue
out[k] = (sum(vals) / len(vals)) if vals else fallback
return out
def _merge_retrieval_configs(configs: Sequence[Dict[str, Any]]) -> Dict[str, Any]:
base = copy.deepcopy(_FALLBACK_RETRIEVAL_CONFIG)
if not configs:
return base
base["main_slug_weights"] = _average_float_dict(
[c.get("main_slug_weights") or {} for c in configs],
fallback=1.0,
)
for slug in ("karate", "allgemeine"):
base["main_slug_weights"].setdefault(slug, 1.0)
base["category_slug_weights"] = _average_float_dict(
[c.get("category_slug_weights") or {} for c in configs],
fallback=1.0,
)
base["category_max_share"] = _average_float_dict(
[c.get("category_max_share") or {} for c in configs],
fallback=1.0,
)
base["main_min_share"] = _average_float_dict(
[c.get("main_min_share") or {} for c in configs],
fallback=0.0,
)
ims = []
tbs = []
dmx = []
krm = []
for c in configs:
try:
if c.get("importance_multiplier") is not None:
ims.append(float(c["importance_multiplier"]))
except (TypeError, ValueError):
continue
try:
if c.get("text_overlap_bonus") is not None:
tbs.append(float(c["text_overlap_bonus"]))
except (TypeError, ValueError):
continue
try:
if c.get("description_plain_max_len") is not None:
dmx.append(int(c["description_plain_max_len"]))
except (TypeError, ValueError):
continue
try:
if c.get("karate_relevance_max_len") is not None:
krm.append(int(c["karate_relevance_max_len"]))
except (TypeError, ValueError):
continue
if ims:
base["importance_multiplier"] = sum(ims) / len(ims)
if tbs:
base["text_overlap_bonus"] = sum(tbs) / len(tbs)
if dmx:
base["description_plain_max_len"] = int(round(sum(dmx) / len(dmx)))
if krm:
base["karate_relevance_max_len"] = int(round(sum(krm) / len(krm)))
overrides: List[Any] = []
for c in configs:
overrides.extend(c.get("keyword_overrides") or [])
base["keyword_overrides"] = overrides
return base
def _mul_weight_dict(target: MutableMapping[str, float], patch: Mapping[str, Any]) -> None:
for k, v in patch.items():
try:
mul = float(v)
except (TypeError, ValueError):
continue
target[k] = float(target.get(k, 1.0)) * mul
def _apply_keyword_overrides(cfg: Dict[str, Any], corpus_lower: str) -> None:
caps = cfg.setdefault("category_max_share", {})
for ov in cfg.get("keyword_overrides") or []:
keys_any = ov.get("keywords_any") or []
if not keys_any or not corpus_lower.strip():
continue
hay = corpus_lower.lower() if corpus_lower else ""
hit = False
for kw in keys_any:
ks = str(kw or "").strip()
if not ks:
continue
ks_l = ks.lower()
hit = ks_l in hay
if hit:
break
if not hit:
continue
patch = ov.get("patch") or {}
_mul_weight_dict(cfg.setdefault("category_slug_weights", {}), patch.get("category_slug_weights") or {})
_mul_weight_dict(cfg.setdefault("main_slug_weights", {}), patch.get("main_slug_weights") or {})
for slug, mx in (patch.get("category_max_share") or {}).items():
try:
mx_f = float(mx)
except (TypeError, ValueError):
continue
cur = float(caps.get(slug, 1.0))
caps[slug] = min(cur, mx_f)
def _ordered_focus_ids(focus_ctx: Optional[Sequence[Tuple[int, bool]]]) -> List[int]:
"""Primär zuerst, dann stabil nach ID."""
if not focus_ctx:
return []
seen = set()
ordered: List[Tuple[int, bool]] = []
for fid, isp in sorted(focus_ctx, key=lambda x: (not x[1], x[0])):
try:
i = int(fid)
except (TypeError, ValueError):
continue
if i < 1 or i in seen:
continue
seen.add(i)
ordered.append((i, bool(isp)))
return [fid for fid, _ in ordered]
def _load_merged_retrieval_config(
cur, focus_ctx: Optional[Sequence[Tuple[int, bool]]]
) -> Dict[str, Any]:
if not _ai_profiles_table_ready(cur):
return copy.deepcopy(_FALLBACK_RETRIEVAL_CONFIG)
loaded: List[Dict[str, Any]] = []
for fid in _ordered_focus_ids(focus_ctx):
cur.execute(
"""
SELECT config
FROM ai_skill_retrieval_profiles
WHERE active = true AND focus_area_id = %s
LIMIT 1
""",
(fid,),
)
rw = cur.fetchone()
if not rw:
continue
raw = rw["config"] if isinstance(rw, dict) else rw[0]
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
continue
if isinstance(raw, dict):
loaded.append(raw)
if not loaded:
cur.execute(
"""
SELECT config
FROM ai_skill_retrieval_profiles
WHERE active = true AND is_default = true
LIMIT 1
"""
)
rw = cur.fetchone()
if rw:
raw = rw["config"] if isinstance(rw, dict) else rw[0]
if isinstance(raw, str):
try:
raw = json.loads(raw)
except json.JSONDecodeError:
raw = None
if isinstance(raw, dict):
loaded.append(raw)
return _merge_retrieval_configs(loaded)
def _fetch_all_active_skills_for_catalog(cur) -> List[Dict[str, Any]]:
cur.execute(
"""
SELECT s.id,
s.name,
s.category,
s.description,
s.karate_relevance,
s.relevance_level,
s.importance,
COALESCE(m.slug, '') AS main_slug,
COALESCE(c.slug, '') AS category_slug,
c.name AS subcategory_name
FROM skills s
LEFT JOIN skill_main_categories m ON m.id = s.main_category_id
LEFT JOIN skill_categories c ON c.id = s.category_id
WHERE (s.status IS NULL OR s.status = 'active')
"""
)
return [dict(r) for r in cur.fetchall()]
def _score_skill_row(
row: Mapping[str, Any],
cfg: Mapping[str, Any],
corpus_tokens: frozenset,
) -> float:
main_slug = str(row.get("main_slug") or "").strip().lower()
cat_slug = str(row.get("category_slug") or "").strip().lower()
main_w = float((cfg.get("main_slug_weights") or {}).get(main_slug, 1.0))
cat_w = float((cfg.get("category_slug_weights") or {}).get(cat_slug, 1.0))
try:
imp = int(row["importance"]) if row.get("importance") is not None else 3
except (TypeError, ValueError):
imp = 3
imp = max(1, min(5, imp))
imp_mult = float(cfg.get("importance_multiplier") or 1.0)
base = float(imp) * imp_mult * max(main_w, 0.05) * max(cat_w, 0.05)
name = strip_html_to_plain(row.get("name"), max_len=400)
dsc = strip_html_to_plain(row.get("description"), max_len=520)
search_blob = " ".join(
[
name,
dsc,
cat_slug.replace("_", " "),
str(row.get("category") or ""),
str(row.get("subcategory_name") or ""),
]
).lower()
overlaps = sum(1 for t in corpus_tokens if t and t in search_blob)
tob = float(cfg.get("text_overlap_bonus") or 0.0)
return base + overlaps * tob
def _category_cap_limits(cfg: Mapping[str, Any], n_max: int) -> Dict[str, int]:
out: Dict[str, int] = {}
mx = cfg.get("category_max_share") or {}
if not isinstance(mx, dict):
return out
for slug, raw in mx.items():
ks = str(slug or "").strip()
if not ks:
continue
try:
sh = float(raw)
except (TypeError, ValueError):
continue
if 0 < sh < 1.0:
out[ks] = max(1, int(math.floor(sh * n_max)))
elif sh >= 1.0:
out[ks] = n_max + 99999
else:
continue
return out
def _pick_catalog_rows(rows_scored: List[Tuple[float, Dict[str, Any]]], cfg: Mapping[str, Any]) -> List[Dict[str, Any]]:
"""rows_scored: (score, row_dict) ohne Sortierung-Anforderung."""
cap_limits = _category_cap_limits(cfg, _MAX_SKILLS_CATALOG_LINES)
ordered = sorted(rows_scored, key=lambda x: (-x[0], str(x[1].get("name") or "")))
picked: List[Dict[str, Any]] = []
picked_ids: set = set()
cat_counts: Dict[str, int] = {}
def under_cap(cat_slug: str) -> bool:
if not cat_slug or cat_slug not in cap_limits:
return True
return cat_counts.get(cat_slug, 0) < cap_limits[cat_slug]
# Pass 1: Cap respektieren
for _sc, rw in ordered:
if len(picked) >= _MAX_SKILLS_CATALOG_LINES:
break
sid = rw["id"]
if sid in picked_ids:
continue
cslug = str(rw.get("category_slug") or "").strip().lower()
if cslug and not under_cap(cslug):
continue
picked.append(rw)
picked_ids.add(sid)
if cslug:
cat_counts[cslug] = cat_counts.get(cslug, 0) + 1
# Pass 2: auffüllen
if len(picked) < _MAX_SKILLS_CATALOG_LINES:
for _sc, rw in ordered:
if len(picked) >= _MAX_SKILLS_CATALOG_LINES:
break
sid = rw["id"]
if sid in picked_ids:
continue
picked.append(rw)
picked_ids.add(sid)
return picked[:_MAX_SKILLS_CATALOG_LINES]
def _format_skill_catalog_line(row: Mapping[str, Any], cfg: Mapping[str, Any]) -> str:
rid = int(row["id"])
nm = (row.get("name") or "").strip() or f"Skill #{rid}"
cat_legacy = str(row.get("category") or "").strip()
sub = str(row.get("subcategory_name") or "").strip()
main_slug = str(row.get("main_slug") or "").strip()
cats = " / ".join(x for x in (main_slug.upper() if main_slug else "", cat_legacy, sub) if x)
dmax = int(cfg.get("description_plain_max_len") or 160)
dsc = strip_html_to_plain(row.get("description"), max_len=max(40, min(400, dmax)))
krmax = int(cfg.get("karate_relevance_max_len") or 0)
kr = strip_html_to_plain(row.get("karate_relevance"), max_len=min(280, krmax)) if krmax > 0 else ""
rel = row.get("relevance_level")
rel_s = str(rel).strip() if rel is not None else ""
parts = [
f"- id={rid} | name={nm}",
f" | kategorie={cats or '-'}",
f" | beschreibung={dsc or '-'}",
]
if krmax > 0 and (kr.strip() or rel_s):
parts.append(f" | karate_relevanz={kr or '-'} | relevanz_stufe={rel_s or '-'}")
return "".join(parts)
def _safe_int_importance(value: Any) -> int:
try:
iv = int(value)
except (TypeError, ValueError):
return 0
return max(1, min(5, iv)) if iv else 0
def build_contextual_skills_catalog_block(
cur,
*,
title: Optional[str],
goal_plain: str,
execution_plain: str,
focus_hint: Optional[str],
focus_ctx: Optional[Sequence[Tuple[int, bool]]],
) -> str:
cfg = _load_merged_retrieval_config(cur, focus_ctx)
corpus_lower = " ".join([title or "", goal_plain or "", execution_plain or "", focus_hint or ""]).lower()
_apply_keyword_overrides(cfg, corpus_lower)
tok = _corpus_tokens(title or "", goal_plain, execution_plain, focus_hint or "")
skill_rows = _fetch_all_active_skills_for_catalog(cur)
scored: List[Tuple[float, Dict[str, Any]]] = []
for r in skill_rows:
scored.append((_score_skill_row(r, cfg, tok), r))
picked = _pick_catalog_rows(scored, cfg)
picked.sort(
key=lambda r: (
-_safe_int_importance(r.get("importance")),
str(r.get("name") or "").lower(),
)
)
lines = [_format_skill_catalog_line(row, cfg) for row in picked]
return "\n".join(lines) if lines else "(keine aktiven Skills im Katalog)"
def _load_prompt_row(cur, slug: str) -> Optional[Dict[str, Any]]:
cur.execute(
"""
SELECT slug, display_name, template, output_format, active
FROM ai_prompts
WHERE slug = %s
""",
(slug,),
)
row = cur.fetchone()
if not row:
return None
d = dict(row)
if not d.get("active", True):
return None
return d
def _render_template(template: str, ctx: Dict[str, str]) -> str:
out = template or ""
for key, val in ctx.items():
placeholder = "{{" + key + "}}"
out = out.replace(placeholder, val if val is not None else "")
return out
def _extract_json_array(text: str) -> Any:
s = text.strip()
if s.startswith("```"):
s = re.sub(r"^```[a-zA-Z0-9]*\s*", "", s)
if s.endswith("```"):
s = s[:-3].strip()
if s.startswith("["):
end = s.rfind("]")
if end > 0:
s = s[: end + 1]
parsed = json.loads(s)
if isinstance(parsed, list) and len(parsed) > _MAX_SANITIZE_SKILL_INPUT_ROWS:
parsed = parsed[:_MAX_SANITIZE_SKILL_INPUT_ROWS]
return parsed
if s.startswith("{"):
obj = json.loads(s)
if isinstance(obj, dict):
for k in ("skills", "items", "data"):
v = obj.get(k)
if isinstance(v, list):
if len(v) > _MAX_SANITIZE_SKILL_INPUT_ROWS:
return v[:_MAX_SANITIZE_SKILL_INPUT_ROWS]
return v
raise ValueError("JSON-Objekt ohne Skills-Liste")
parsed_end = json.loads(s)
if isinstance(parsed_end, list) and len(parsed_end) > _MAX_SANITIZE_SKILL_INPUT_ROWS:
return parsed_end[:_MAX_SANITIZE_SKILL_INPUT_ROWS]
return parsed_end
def _sanitize_skill_entries(cur, rows: Any) -> List[Dict[str, Any]]:
if not isinstance(rows, list):
return []
out: List[Dict[str, Any]] = []
cap = rows[:_MAX_SANITIZE_SKILL_INPUT_ROWS]
for raw in cap:
if len(out) >= 5:
break
if not isinstance(raw, dict):
continue
sid = raw.get("skill_id")
try:
skill_id = int(sid)
except (TypeError, ValueError):
continue
cur.execute(
"""
SELECT s.id, s.name, s.category,
sc.name AS subcategory_name
FROM skills s
LEFT JOIN skill_categories sc ON s.category_id = sc.id
WHERE s.id = %s AND (s.status IS NULL OR s.status = 'active')
""",
(skill_id,),
)
sk = cur.fetchone()
if not sk:
continue
req = _normalize_exercise_skill_level(raw.get("required_level")) or "grundlagen"
tgt = _normalize_exercise_skill_level(raw.get("target_level")) or req
if req not in _CANONICAL_SKILL_LEVELS:
req = _LEGACY_SKILL_LEVEL_SLUG.get(str(raw.get("required_level") or "").strip().lower(), "grundlagen")
if req not in _CANONICAL_SKILL_LEVELS:
req = "grundlagen"
if tgt not in _CANONICAL_SKILL_LEVELS:
tgt = _LEGACY_SKILL_LEVEL_SLUG.get(str(raw.get("target_level") or "").strip().lower(), req)
if tgt not in _CANONICAL_SKILL_LEVELS:
tgt = req
inten = _normalize_exercise_skill_intensity(raw.get("intensity"))
is_primary = bool(raw.get("is_primary")) if raw.get("is_primary") is not None else len(out) == 0
cat = (sk.get("category") or "").strip()
sub = (sk.get("subcategory_name") or "").strip()
skill_category = " / ".join(x for x in (cat, sub) if x) or (cat or None)
conf = raw.get("confidence")
try:
conf_f = float(conf) if conf is not None else None
except (TypeError, ValueError):
conf_f = None
item: Dict[str, Any] = {
"skill_id": skill_id,
"skill_name": (sk.get("name") or "").strip() or f"Skill #{skill_id}",
"required_level": req,
"target_level": tgt,
"intensity": inten,
"is_primary": is_primary,
}
if skill_category:
item["skill_category"] = skill_category
if conf_f is not None:
item["confidence"] = conf_f
out.append(item)
return out[:5]
def _require_openrouter() -> Tuple[str, str]:
key, model = normalize_openrouter_env()
if not key:
raise HTTPException(
status_code=503,
detail="KI nicht konfiguriert (OPENROUTER_API_KEY fehlt).",
)
return key, model
def run_exercise_ai_suggestion(
cur,
*,
title: Optional[str],
goal: Optional[str],
execution: Optional[str],
focus_area_hint: Optional[str],
focus_areas_context: Optional[Sequence[Tuple[int, bool]]] = None,
want_summary: bool,
want_skills: bool,
) -> Dict[str, Any]:
key, model = _require_openrouter()
g_plain = strip_html_to_plain(goal)
e_plain = strip_html_to_plain(execution)
if not (g_plain.strip() or e_plain.strip()):
raise HTTPException(
status_code=400,
detail="Mindestens Ziel oder Durchfuehrung muss Inhalt liefern (nach Entfernen von leerem HTML).",
)
t_title = (title or "").strip()
focus = (focus_area_hint or "").strip()
result: Dict[str, Any] = {"model": model}
if want_summary:
prow = _load_prompt_row(cur, "exercise_summary")
if not prow:
raise HTTPException(status_code=503, detail="Prompt exercise_summary nicht aktiv oder fehlt in DB.")
ctx = {
"exercise_title": t_title or "-",
"exercise_focus_area": focus or "-",
"exercise_goal": g_plain or "-",
"exercise_execution": e_plain or "-",
}
prompt = _render_template(str(prow["template"]), ctx)
try:
raw = openrouter_chat_completion(api_key=key, model=model, user_content=prompt)
except OpenRouterError as e:
raise HTTPException(status_code=502, detail=f"OpenRouter: {e}") from e
text = (raw or "").strip()
if len(text) > _MAX_SUMMARY_CHARS:
text = text[: _MAX_SUMMARY_CHARS - 1].rstrip() + ""
result["summary"] = {"text": text, "ai_generated": True, "model": model}
if want_skills:
srow = _load_prompt_row(cur, "exercise_skill_suggestions")
if not srow:
raise HTTPException(
status_code=503,
detail="Prompt exercise_skill_suggestions nicht aktiv oder fehlt in DB.",
)
catalog = build_contextual_skills_catalog_block(
cur,
title=t_title,
goal_plain=g_plain,
execution_plain=e_plain,
focus_hint=focus or None,
focus_ctx=focus_areas_context,
)
ctx = {
"exercise_title": t_title or "-",
"exercise_focus_area": focus or "-",
"exercise_goal": g_plain or "-",
"exercise_execution": e_plain or "-",
"skills_catalog": catalog,
}
prompt = _render_template(str(srow["template"]), ctx)
sys_hint = (
"Du antwortest nur mit validem JSON (Array). Keine Kommentare, keine Erklaerungen ausserhalb des JSON."
)
try:
raw = openrouter_chat_completion(
api_key=key,
model=model,
user_content=prompt,
system_content=sys_hint,
temperature=0.15,
)
except OpenRouterError as e:
raise HTTPException(status_code=502, detail=f"OpenRouter: {e}") from e
try:
parsed = _extract_json_array(raw)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=502,
detail="KI lieferte kein verwertbares JSON fuer Skills.",
) from e
skills = _sanitize_skill_entries(cur, parsed)
result["skills"] = skills
return result
__all__ = [
"build_contextual_skills_catalog_block",
"run_exercise_ai_suggestion",
"strip_html_to_plain",
]