mitai-jinkendo/backend/routers/insights.py
Lars 32d53b447d
All checks were successful
Deploy Development / deploy (push) Successful in 34s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
fix: pipeline typo and add features diagnostic script
- Fix NameError in insights.py pipeline endpoint (access -> access_calls)
- Add check_features.py diagnostic script for debugging

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 22:32:09 +01:00

509 lines
21 KiB
Python

"""
AI Insights Endpoints for Mitai Jinkendo
Handles AI analysis execution, prompt management, and usage tracking.
"""
import os
import json
import uuid
import logging
import httpx
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Header, Depends
from db import get_db, get_cursor, r2d
from auth import require_auth, require_admin, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
router = APIRouter(prefix="/api", tags=["insights"])
logger = logging.getLogger(__name__)
OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "anthropic/claude-sonnet-4")
ANTHROPIC_KEY = os.getenv("ANTHROPIC_API_KEY", "")
# ── Helper Functions ──────────────────────────────────────────────────────────
def check_ai_limit(pid: str):
"""Check if profile has reached daily AI limit. Returns (allowed, limit, used)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT ai_enabled, ai_limit_day FROM profiles WHERE id=%s", (pid,))
prof = cur.fetchone()
if not prof or not prof['ai_enabled']:
raise HTTPException(403, "KI ist für dieses Profil deaktiviert")
limit = prof['ai_limit_day']
if limit is None:
return (True, None, 0)
today = datetime.now().date().isoformat()
cur.execute("SELECT call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
usage = cur.fetchone()
used = usage['call_count'] if usage else 0
if used >= limit:
raise HTTPException(429, f"Tägliches KI-Limit erreicht ({limit} Calls)")
return (True, limit, used)
def inc_ai_usage(pid: str):
"""Increment AI usage counter for today."""
today = datetime.now().date().isoformat()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id, call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
row = cur.fetchone()
if row:
cur.execute("UPDATE ai_usage SET call_count=%s WHERE id=%s", (row['call_count']+1, row['id']))
else:
cur.execute("INSERT INTO ai_usage (id, profile_id, date, call_count) VALUES (%s,%s,%s,1)",
(str(uuid.uuid4()), pid, today))
def _get_profile_data(pid: str):
"""Fetch all relevant data for AI analysis."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
prof = r2d(cur.fetchone())
cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 90", (pid,))
weight = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
circ = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
caliper = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date DESC LIMIT 90", (pid,))
nutrition = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM activity_log WHERE profile_id=%s ORDER BY date DESC LIMIT 90", (pid,))
activity = [r2d(r) for r in cur.fetchall()]
return {
"profile": prof,
"weight": weight,
"circumference": circ,
"caliper": caliper,
"nutrition": nutrition,
"activity": activity
}
def _render_template(template: str, data: dict) -> str:
"""Simple template variable replacement."""
result = template
for k, v in data.items():
result = result.replace(f"{{{{{k}}}}}", str(v) if v is not None else "")
return result
def _prepare_template_vars(data: dict) -> dict:
"""Prepare template variables from profile data."""
prof = data['profile']
weight = data['weight']
circ = data['circumference']
caliper = data['caliper']
nutrition = data['nutrition']
activity = data['activity']
vars = {
"name": prof.get('name', 'Nutzer'),
"geschlecht": "männlich" if prof.get('sex') == 'm' else "weiblich",
"height": prof.get('height', 178),
"goal_weight": float(prof.get('goal_weight')) if prof.get('goal_weight') else "nicht gesetzt",
"goal_bf_pct": float(prof.get('goal_bf_pct')) if prof.get('goal_bf_pct') else "nicht gesetzt",
"weight_aktuell": float(weight[0]['weight']) if weight else "keine Daten",
"kf_aktuell": float(caliper[0]['body_fat_pct']) if caliper and caliper[0].get('body_fat_pct') else "unbekannt",
}
# Calculate age from dob
if prof.get('dob'):
try:
from datetime import date
dob = datetime.strptime(prof['dob'], '%Y-%m-%d').date()
today = date.today()
age = today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
vars['age'] = age
except:
vars['age'] = "unbekannt"
else:
vars['age'] = "unbekannt"
# Weight trend summary
if len(weight) >= 2:
recent = weight[:30]
delta = float(recent[0]['weight']) - float(recent[-1]['weight'])
vars['weight_trend'] = f"{len(recent)} Einträge, Δ30d: {delta:+.1f}kg"
else:
vars['weight_trend'] = "zu wenig Daten"
# Caliper summary
if caliper:
c = caliper[0]
bf = float(c.get('body_fat_pct')) if c.get('body_fat_pct') else '?'
vars['caliper_summary'] = f"KF: {bf}%, Methode: {c.get('sf_method','?')}"
else:
vars['caliper_summary'] = "keine Daten"
# Circumference summary
if circ:
c = circ[0]
parts = []
for k in ['c_waist', 'c_belly', 'c_hip']:
if c.get(k): parts.append(f"{k.split('_')[1]}: {float(c[k])}cm")
vars['circ_summary'] = ", ".join(parts) if parts else "keine Daten"
else:
vars['circ_summary'] = "keine Daten"
# Nutrition summary
if nutrition:
n = len(nutrition)
avg_kcal = sum(float(d.get('kcal',0) or 0) for d in nutrition) / n
avg_prot = sum(float(d.get('protein_g',0) or 0) for d in nutrition) / n
vars['nutrition_summary'] = f"{n} Tage, Ø {avg_kcal:.0f}kcal, {avg_prot:.0f}g Protein"
vars['nutrition_detail'] = vars['nutrition_summary']
vars['nutrition_days'] = n
vars['kcal_avg'] = round(avg_kcal)
vars['protein_avg'] = round(avg_prot,1)
vars['fat_avg'] = round(sum(float(d.get('fat_g',0) or 0) for d in nutrition) / n,1)
vars['carb_avg'] = round(sum(float(d.get('carbs_g',0) or 0) for d in nutrition) / n,1)
else:
vars['nutrition_summary'] = "keine Daten"
vars['nutrition_detail'] = "keine Daten"
vars['nutrition_days'] = 0
vars['kcal_avg'] = 0
vars['protein_avg'] = 0
vars['fat_avg'] = 0
vars['carb_avg'] = 0
# Protein targets
w = weight[0]['weight'] if weight else prof.get('height',178) - 100
w = float(w) # Convert Decimal to float for math operations
vars['protein_ziel_low'] = round(w * 1.6)
vars['protein_ziel_high'] = round(w * 2.2)
# Activity summary
if activity:
n = len(activity)
total_kcal = sum(float(a.get('kcal_active',0) or 0) for a in activity)
vars['activity_summary'] = f"{n} Trainings, {total_kcal:.0f}kcal gesamt"
vars['activity_detail'] = vars['activity_summary']
vars['activity_kcal_summary'] = f"Ø {total_kcal/n:.0f}kcal/Training"
else:
vars['activity_summary'] = "keine Daten"
vars['activity_detail'] = "keine Daten"
vars['activity_kcal_summary'] = "keine Daten"
return vars
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("/insights")
def get_all_insights(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get all AI insights for profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.get("/insights/latest")
def get_latest_insights(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get latest AI insights across all scopes."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC LIMIT 10", (pid,))
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.get("/ai/insights/{scope}")
def get_ai_insight(scope: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get latest insight for specific scope."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s AND scope=%s ORDER BY created DESC LIMIT 1", (pid,scope))
row = cur.fetchone()
if not row: return None
return r2d(row)
@router.delete("/insights/{insight_id}")
def delete_insight_by_id(insight_id: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Delete a specific insight by ID."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM ai_insights WHERE id=%s AND profile_id=%s", (insight_id, pid))
return {"ok":True}
@router.delete("/ai/insights/{scope}")
def delete_ai_insight(scope: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Delete all insights for specific scope."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM ai_insights WHERE profile_id=%s AND scope=%s", (pid,scope))
return {"ok":True}
@router.post("/insights/run/{slug}")
async def analyze_with_prompt(slug: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Run AI analysis with specified prompt template."""
pid = get_pid(x_profile_id)
# Phase 2: Check feature access (non-blocking, log only)
access = check_feature_access(pid, 'ai_calls')
log_feature_usage(pid, 'ai_calls', access, 'analyze')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} would be blocked: "
f"ai_calls {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
# NOTE: Phase 2 does NOT block - just logs!
# Old check (keep for now, but will be replaced in Phase 4)
check_ai_limit(pid)
# Get prompt template
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_prompts WHERE slug=%s AND active=true", (slug,))
prompt_row = cur.fetchone()
if not prompt_row:
raise HTTPException(404, f"Prompt '{slug}' nicht gefunden")
prompt_tmpl = prompt_row['template']
data = _get_profile_data(pid)
vars = _prepare_template_vars(data)
final_prompt = _render_template(prompt_tmpl, vars)
# Call AI
if ANTHROPIC_KEY:
# Use Anthropic SDK
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": final_prompt}]
)
content = response.content[0].text
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": final_prompt}],
"max_tokens": 2000
},
timeout=60.0
)
if resp.status_code != 200:
raise HTTPException(500, f"KI-Fehler: {resp.text}")
content = resp.json()['choices'][0]['message']['content']
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Save insight (with history - no DELETE)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,%s,%s,CURRENT_TIMESTAMP)",
(str(uuid.uuid4()), pid, slug, content))
# Phase 2: Increment new feature usage counter
increment_feature_usage(pid, 'ai_calls')
# Old usage tracking (keep for now)
inc_ai_usage(pid)
return {"scope": slug, "content": content}
@router.post("/insights/pipeline")
async def analyze_pipeline(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Run 3-stage pipeline analysis."""
pid = get_pid(x_profile_id)
# Phase 2: Check pipeline feature access (boolean - enabled/disabled)
access_pipeline = check_feature_access(pid, 'ai_pipeline')
log_feature_usage(pid, 'ai_pipeline', access_pipeline, 'pipeline')
if not access_pipeline['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} would be blocked: "
f"ai_pipeline {access_pipeline['reason']}"
)
# NOTE: Phase 2 does NOT block - just logs!
# Also check ai_calls (pipeline uses API calls too)
access_calls = check_feature_access(pid, 'ai_calls')
log_feature_usage(pid, 'ai_calls', access_calls, 'pipeline_calls')
if not access_calls['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} would be blocked: "
f"ai_calls {access_calls['reason']} (used: {access_calls['used']}, limit: {access_calls['limit']})"
)
# Old check (keep for now)
check_ai_limit(pid)
data = _get_profile_data(pid)
vars = _prepare_template_vars(data)
# Stage 1: Parallel JSON analyses
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT slug, template FROM ai_prompts WHERE slug LIKE 'pipeline_%' AND slug NOT IN ('pipeline_synthesis','pipeline_goals') AND active=true")
stage1_prompts = [r2d(r) for r in cur.fetchall()]
stage1_results = {}
for p in stage1_prompts:
slug = p['slug']
final_prompt = _render_template(p['template'], vars)
if ANTHROPIC_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1000,
messages=[{"role": "user", "content": final_prompt}]
)
content = response.content[0].text.strip()
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": final_prompt}],
"max_tokens": 1000
},
timeout=60.0
)
content = resp.json()['choices'][0]['message']['content'].strip()
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Try to parse JSON, fallback to raw text
try:
stage1_results[slug] = json.loads(content)
except:
stage1_results[slug] = content
# Stage 2: Synthesis
vars['stage1_body'] = json.dumps(stage1_results.get('pipeline_body', {}), ensure_ascii=False)
vars['stage1_nutrition'] = json.dumps(stage1_results.get('pipeline_nutrition', {}), ensure_ascii=False)
vars['stage1_activity'] = json.dumps(stage1_results.get('pipeline_activity', {}), ensure_ascii=False)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug='pipeline_synthesis' AND active=true")
synth_row = cur.fetchone()
if not synth_row:
raise HTTPException(500, "Pipeline synthesis prompt not found")
synth_prompt = _render_template(synth_row['template'], vars)
if ANTHROPIC_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": synth_prompt}]
)
synthesis = response.content[0].text
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": synth_prompt}],
"max_tokens": 2000
},
timeout=60.0
)
synthesis = resp.json()['choices'][0]['message']['content']
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Stage 3: Goals (only if goals are set)
goals_text = None
prof = data['profile']
if prof.get('goal_weight') or prof.get('goal_bf_pct'):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug='pipeline_goals' AND active=true")
goals_row = cur.fetchone()
if goals_row:
goals_prompt = _render_template(goals_row['template'], vars)
if ANTHROPIC_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=800,
messages=[{"role": "user", "content": goals_prompt}]
)
goals_text = response.content[0].text
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": goals_prompt}],
"max_tokens": 800
},
timeout=60.0
)
goals_text = resp.json()['choices'][0]['message']['content']
# Combine synthesis + goals
final_content = synthesis
if goals_text:
final_content += "\n\n" + goals_text
# Save as 'pipeline' scope (with history - no DELETE)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,'pipeline',%s,CURRENT_TIMESTAMP)",
(str(uuid.uuid4()), pid, final_content))
# Phase 2: Increment ai_calls usage (pipeline uses multiple API calls)
# Note: We increment once per pipeline run, not per individual call
increment_feature_usage(pid, 'ai_calls')
# Old usage tracking (keep for now)
inc_ai_usage(pid)
return {"scope": "pipeline", "content": final_content, "stage1": stage1_results}
@router.get("/ai/usage")
def get_ai_usage(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get AI usage stats for current profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT ai_limit_day FROM profiles WHERE id=%s", (pid,))
prof = cur.fetchone()
limit = prof['ai_limit_day'] if prof else None
today = datetime.now().date().isoformat()
cur.execute("SELECT call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
usage = cur.fetchone()
used = usage['call_count'] if usage else 0
return {"limit": limit, "used": used, "remaining": (limit - used) if limit else None}