mitai-jinkendo/backend/routers/insights.py
Lars 6627b5eee7
All checks were successful
Deploy Development / deploy (push) Successful in 43s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 13s
feat: Pipeline-System - Backend Infrastructure (Issue #28, Phase 1)
Implementiert konfigurierbare mehrstufige Analysen. Admins können
mehrere Pipeline-Konfigurationen erstellen mit unterschiedlichen
Modulen, Zeiträumen und Prompts.

**Backend:**
- Migration 019: pipeline_configs Tabelle + ai_prompts erweitert
- Pipeline-Config Models: PipelineConfigCreate, PipelineConfigUpdate
- Pipeline-Executor: refactored für config-basierte Ausführung
- CRUD-Endpoints: /api/prompts/pipeline-configs (list, create, update, delete, set-default)
- Reset-to-Default: /api/prompts/{id}/reset-to-default für System-Prompts

**Features:**
- 3 Seed-Configs: "Alltags-Check" (default), "Schlaf & Erholung", "Wettkampf-Analyse"
- Dynamische Platzhalter: {{stage1_<slug>}} für alle Stage-1-Ergebnisse
- Backward-compatible: /api/insights/pipeline ohne config_id nutzt default

**Dateien:**
- backend/migrations/019_pipeline_system.sql
- backend/models.py (PipelineConfigCreate, PipelineConfigUpdate)
- backend/routers/insights.py (analyze_pipeline refactored)
- backend/routers/prompts.py (Pipeline-Config CRUD + Reset-to-Default)

**Nächste Schritte:**
- Frontend: Pipeline-Config Dialog + Admin-UI
- Design: Mobile-Responsive + Icons

Issue #28 Progress: Backend 3/3  | Frontend 0/3 🔲 | Design 0/3 🔲

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 09:42:28 +01:00

672 lines
28 KiB
Python

"""
AI Insights Endpoints for Mitai Jinkendo
Handles AI analysis execution, prompt management, and usage tracking.
"""
import os
import json
import uuid
import logging
import httpx
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, Header, Depends
from db import get_db, get_cursor, r2d
from auth import require_auth, require_admin, check_feature_access, increment_feature_usage
from routers.profiles import get_pid
from feature_logger import log_feature_usage
from quality_filter import get_quality_filter_sql
router = APIRouter(prefix="/api", tags=["insights"])
logger = logging.getLogger(__name__)
OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY", "")
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "anthropic/claude-sonnet-4")
ANTHROPIC_KEY = os.getenv("ANTHROPIC_API_KEY", "")
# ── Helper Functions ──────────────────────────────────────────────────────────
def check_ai_limit(pid: str):
"""Check if profile has reached daily AI limit. Returns (allowed, limit, used)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT ai_enabled, ai_limit_day FROM profiles WHERE id=%s", (pid,))
prof = cur.fetchone()
if not prof or not prof['ai_enabled']:
raise HTTPException(403, "KI ist für dieses Profil deaktiviert")
limit = prof['ai_limit_day']
if limit is None:
return (True, None, 0)
today = datetime.now().date().isoformat()
cur.execute("SELECT call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
usage = cur.fetchone()
used = usage['call_count'] if usage else 0
if used >= limit:
raise HTTPException(429, f"Tägliches KI-Limit erreicht ({limit} Calls)")
return (True, limit, used)
def inc_ai_usage(pid: str):
"""Increment AI usage counter for today."""
today = datetime.now().date().isoformat()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id, call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
row = cur.fetchone()
if row:
cur.execute("UPDATE ai_usage SET call_count=%s WHERE id=%s", (row['call_count']+1, row['id']))
else:
cur.execute("INSERT INTO ai_usage (id, profile_id, date, call_count) VALUES (%s,%s,%s,1)",
(str(uuid.uuid4()), pid, today))
def _get_profile_data(pid: str):
"""Fetch all relevant data for AI analysis."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
prof = r2d(cur.fetchone())
# Issue #31: Get global quality filter setting
quality_filter = get_quality_filter_sql(prof)
cur.execute("SELECT * FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 90", (pid,))
weight = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM circumference_log WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
circ = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM caliper_log WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
caliper = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM nutrition_log WHERE profile_id=%s ORDER BY date DESC LIMIT 90", (pid,))
nutrition = [r2d(r) for r in cur.fetchall()]
# Issue #31: Global quality filter (from user profile setting)
cur.execute(f"""
SELECT * FROM activity_log
WHERE profile_id=%s
{quality_filter}
ORDER BY date DESC LIMIT 90
""", (pid,))
activity = [r2d(r) for r in cur.fetchall()]
# v9d Phase 2: Sleep, Rest Days, Vitals
cur.execute("SELECT * FROM sleep_log WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
sleep = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM rest_days WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
rest_days = [r2d(r) for r in cur.fetchall()]
# v9d Phase 2d Refactored: separate baseline and BP tables
cur.execute("SELECT * FROM vitals_baseline WHERE profile_id=%s ORDER BY date DESC LIMIT 30", (pid,))
vitals_baseline = [r2d(r) for r in cur.fetchall()]
cur.execute("SELECT * FROM blood_pressure_log WHERE profile_id=%s ORDER BY measured_at DESC LIMIT 90", (pid,))
blood_pressure = [r2d(r) for r in cur.fetchall()]
return {
"profile": prof,
"weight": weight,
"circumference": circ,
"caliper": caliper,
"nutrition": nutrition,
"activity": activity,
"sleep": sleep,
"rest_days": rest_days,
"vitals_baseline": vitals_baseline,
"blood_pressure": blood_pressure
}
def _render_template(template: str, data: dict) -> str:
"""Simple template variable replacement."""
result = template
for k, v in data.items():
result = result.replace(f"{{{{{k}}}}}", str(v) if v is not None else "")
return result
def _prepare_template_vars(data: dict) -> dict:
"""Prepare template variables from profile data."""
prof = data['profile']
weight = data['weight']
circ = data['circumference']
caliper = data['caliper']
nutrition = data['nutrition']
activity = data['activity']
sleep = data.get('sleep', [])
rest_days = data.get('rest_days', [])
vitals_baseline = data.get('vitals_baseline', [])
blood_pressure = data.get('blood_pressure', [])
vars = {
"name": prof.get('name', 'Nutzer'),
"geschlecht": "männlich" if prof.get('sex') == 'm' else "weiblich",
"height": prof.get('height', 178),
"goal_weight": float(prof.get('goal_weight')) if prof.get('goal_weight') else "nicht gesetzt",
"goal_bf_pct": float(prof.get('goal_bf_pct')) if prof.get('goal_bf_pct') else "nicht gesetzt",
"weight_aktuell": float(weight[0]['weight']) if weight else "keine Daten",
"kf_aktuell": float(caliper[0]['body_fat_pct']) if caliper and caliper[0].get('body_fat_pct') else "unbekannt",
}
# Calculate age from dob
if prof.get('dob'):
try:
from datetime import date
dob = datetime.strptime(prof['dob'], '%Y-%m-%d').date()
today = date.today()
age = today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
vars['age'] = age
except:
vars['age'] = "unbekannt"
else:
vars['age'] = "unbekannt"
# Weight trend summary
if len(weight) >= 2:
recent = weight[:30]
delta = float(recent[0]['weight']) - float(recent[-1]['weight'])
vars['weight_trend'] = f"{len(recent)} Einträge, Δ30d: {delta:+.1f}kg"
else:
vars['weight_trend'] = "zu wenig Daten"
# Caliper summary
if caliper:
c = caliper[0]
bf = float(c.get('body_fat_pct')) if c.get('body_fat_pct') else '?'
vars['caliper_summary'] = f"KF: {bf}%, Methode: {c.get('sf_method','?')}"
else:
vars['caliper_summary'] = "keine Daten"
# Circumference summary
if circ:
c = circ[0]
parts = []
for k in ['c_waist', 'c_belly', 'c_hip']:
if c.get(k): parts.append(f"{k.split('_')[1]}: {float(c[k])}cm")
vars['circ_summary'] = ", ".join(parts) if parts else "keine Daten"
else:
vars['circ_summary'] = "keine Daten"
# Nutrition summary
if nutrition:
n = len(nutrition)
avg_kcal = sum(float(d.get('kcal',0) or 0) for d in nutrition) / n
avg_prot = sum(float(d.get('protein_g',0) or 0) for d in nutrition) / n
vars['nutrition_summary'] = f"{n} Tage, Ø {avg_kcal:.0f}kcal, {avg_prot:.0f}g Protein"
vars['nutrition_detail'] = vars['nutrition_summary']
vars['nutrition_days'] = n
vars['kcal_avg'] = round(avg_kcal)
vars['protein_avg'] = round(avg_prot,1)
vars['fat_avg'] = round(sum(float(d.get('fat_g',0) or 0) for d in nutrition) / n,1)
vars['carb_avg'] = round(sum(float(d.get('carbs_g',0) or 0) for d in nutrition) / n,1)
else:
vars['nutrition_summary'] = "keine Daten"
vars['nutrition_detail'] = "keine Daten"
vars['nutrition_days'] = 0
vars['kcal_avg'] = 0
vars['protein_avg'] = 0
vars['fat_avg'] = 0
vars['carb_avg'] = 0
# Protein targets
w = weight[0]['weight'] if weight else prof.get('height',178) - 100
w = float(w) # Convert Decimal to float for math operations
vars['protein_ziel_low'] = round(w * 1.6)
vars['protein_ziel_high'] = round(w * 2.2)
# Activity summary
if activity:
n = len(activity)
total_kcal = sum(float(a.get('kcal_active',0) or 0) for a in activity)
vars['activity_summary'] = f"{n} Trainings, {total_kcal:.0f}kcal gesamt"
vars['activity_detail'] = vars['activity_summary']
vars['activity_kcal_summary'] = f"Ø {total_kcal/n:.0f}kcal/Training"
else:
vars['activity_summary'] = "keine Daten"
vars['activity_detail'] = "keine Daten"
vars['activity_kcal_summary'] = "keine Daten"
# Sleep summary (v9d Phase 2b)
if sleep:
n = len(sleep)
avg_duration = sum(float(s.get('duration_minutes',0) or 0) for s in sleep) / n
avg_quality = sum(int(s.get('quality',0) or 0) for s in sleep if s.get('quality')) / max(sum(1 for s in sleep if s.get('quality')), 1)
deep_data = [s for s in sleep if s.get('deep_minutes')]
avg_deep = sum(float(s.get('deep_minutes',0)) for s in deep_data) / len(deep_data) if deep_data else 0
vars['sleep_summary'] = f"{n} Nächte, Ø {avg_duration/60:.1f}h Schlafdauer, Qualität {avg_quality:.1f}/5"
vars['sleep_detail'] = f"Ø {avg_duration:.0f}min gesamt, {avg_deep:.0f}min Tiefschlaf"
vars['sleep_avg_duration'] = round(avg_duration)
vars['sleep_avg_quality'] = round(avg_quality, 1)
vars['sleep_nights'] = n
else:
vars['sleep_summary'] = "keine Daten"
vars['sleep_detail'] = "keine Daten"
vars['sleep_avg_duration'] = 0
vars['sleep_avg_quality'] = 0
vars['sleep_nights'] = 0
# Rest Days summary (v9d Phase 2a)
if rest_days:
n = len(rest_days)
types = {}
for rd in rest_days:
rt = rd.get('rest_type', 'unknown')
types[rt] = types.get(rt, 0) + 1
type_summary = ", ".join([f"{k}: {v}x" for k, v in types.items()])
vars['rest_days_summary'] = f"{n} Ruhetage (letzte 30d): {type_summary}"
vars['rest_days_count'] = n
vars['rest_days_types'] = type_summary
else:
vars['rest_days_summary'] = "keine Daten"
vars['rest_days_count'] = 0
vars['rest_days_types'] = "keine"
# Vitals Baseline summary (v9d Phase 2d Refactored)
if vitals_baseline:
n = len(vitals_baseline)
hr_data = [v for v in vitals_baseline if v.get('resting_hr')]
hrv_data = [v for v in vitals_baseline if v.get('hrv')]
vo2_data = [v for v in vitals_baseline if v.get('vo2_max')]
avg_hr = sum(int(v.get('resting_hr')) for v in hr_data) / len(hr_data) if hr_data else 0
avg_hrv = sum(int(v.get('hrv')) for v in hrv_data) / len(hrv_data) if hrv_data else 0
latest_vo2 = float(vo2_data[0].get('vo2_max')) if vo2_data else 0
parts = []
if avg_hr: parts.append(f"Ruhepuls Ø {avg_hr:.0f}bpm")
if avg_hrv: parts.append(f"HRV Ø {avg_hrv:.0f}ms")
if latest_vo2: parts.append(f"VO2 Max {latest_vo2:.1f}")
vars['vitals_summary'] = f"{n} Messungen: " + ", ".join(parts) if parts else "keine verwertbaren Daten"
vars['vitals_detail'] = vars['vitals_summary']
vars['vitals_avg_hr'] = round(avg_hr)
vars['vitals_avg_hrv'] = round(avg_hrv)
vars['vitals_vo2_max'] = round(latest_vo2, 1) if latest_vo2 else "k.A."
else:
vars['vitals_summary'] = "keine Daten"
vars['vitals_detail'] = "keine Daten"
vars['vitals_avg_hr'] = 0
vars['vitals_avg_hrv'] = 0
vars['vitals_vo2_max'] = "k.A."
# Blood Pressure summary (v9d Phase 2d Refactored)
if blood_pressure:
n = len(blood_pressure)
bp_data = [bp for bp in blood_pressure if bp.get('systolic') and bp.get('diastolic')]
avg_bp_sys = sum(int(bp.get('systolic')) for bp in bp_data) / len(bp_data) if bp_data else 0
avg_bp_dia = sum(int(bp.get('diastolic')) for bp in bp_data) / len(bp_data) if bp_data else 0
vars['vitals_avg_bp'] = f"{round(avg_bp_sys)}/{round(avg_bp_dia)}" if avg_bp_sys else "k.A."
vars['bp_summary'] = f"{n} Messungen, Ø {avg_bp_sys:.0f}/{avg_bp_dia:.0f} mmHg" if avg_bp_sys else "keine Daten"
else:
vars['vitals_avg_bp'] = "k.A."
vars['bp_summary'] = "keine Daten"
return vars
# ── Endpoints ─────────────────────────────────────────────────────────────────
@router.get("/insights")
def get_all_insights(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get all AI insights for profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC", (pid,))
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.get("/insights/latest")
def get_latest_insights(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get latest AI insights across all scopes."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s ORDER BY created DESC LIMIT 10", (pid,))
rows = cur.fetchall()
return [r2d(r) for r in rows]
@router.get("/ai/insights/{scope}")
def get_ai_insight(scope: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get latest insight for specific scope."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_insights WHERE profile_id=%s AND scope=%s ORDER BY created DESC LIMIT 1", (pid,scope))
row = cur.fetchone()
if not row: return None
return r2d(row)
@router.delete("/insights/{insight_id}")
def delete_insight_by_id(insight_id: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Delete a specific insight by ID."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM ai_insights WHERE id=%s AND profile_id=%s", (insight_id, pid))
return {"ok":True}
@router.delete("/ai/insights/{scope}")
def delete_ai_insight(scope: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Delete all insights for specific scope."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM ai_insights WHERE profile_id=%s AND scope=%s", (pid,scope))
return {"ok":True}
@router.post("/insights/run/{slug}")
async def analyze_with_prompt(slug: str, x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Run AI analysis with specified prompt template."""
pid = get_pid(x_profile_id)
# Phase 4: Check feature access and ENFORCE
access = check_feature_access(pid, 'ai_calls')
log_feature_usage(pid, 'ai_calls', access, 'analyze')
if not access['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"ai_calls {access['reason']} (used: {access['used']}, limit: {access['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für KI-Analysen überschritten ({access['used']}/{access['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Get prompt template
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM ai_prompts WHERE slug=%s AND active=true", (slug,))
prompt_row = cur.fetchone()
if not prompt_row:
raise HTTPException(404, f"Prompt '{slug}' nicht gefunden")
prompt_tmpl = prompt_row['template']
data = _get_profile_data(pid)
vars = _prepare_template_vars(data)
final_prompt = _render_template(prompt_tmpl, vars)
# Call AI
if ANTHROPIC_KEY:
# Use Anthropic SDK
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": final_prompt}]
)
content = response.content[0].text
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": final_prompt}],
"max_tokens": 2000
},
timeout=60.0
)
if resp.status_code != 200:
raise HTTPException(500, f"KI-Fehler: {resp.text}")
content = resp.json()['choices'][0]['message']['content']
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Save insight (with history - no DELETE)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,%s,%s,CURRENT_TIMESTAMP)",
(str(uuid.uuid4()), pid, slug, content))
# Phase 2: Increment new feature usage counter
increment_feature_usage(pid, 'ai_calls')
# Old usage tracking (keep for now)
inc_ai_usage(pid)
return {"scope": slug, "content": content}
@router.post("/insights/pipeline")
async def analyze_pipeline(
config_id: Optional[str] = None,
x_profile_id: Optional[str] = Header(default=None),
session: dict = Depends(require_auth)
):
"""
Run configurable multi-stage pipeline analysis.
Args:
config_id: Pipeline config ID (optional, uses default if not specified)
"""
pid = get_pid(x_profile_id)
# Phase 4: Check pipeline feature access (boolean - enabled/disabled)
access_pipeline = check_feature_access(pid, 'ai_pipeline')
log_feature_usage(pid, 'ai_pipeline', access_pipeline, 'pipeline')
if not access_pipeline['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"ai_pipeline {access_pipeline['reason']}"
)
raise HTTPException(
status_code=403,
detail=f"Pipeline-Analyse ist nicht verfügbar. Bitte kontaktiere den Admin."
)
# Also check ai_calls (pipeline uses API calls too)
access_calls = check_feature_access(pid, 'ai_calls')
log_feature_usage(pid, 'ai_calls', access_calls, 'pipeline_calls')
if not access_calls['allowed']:
logger.warning(
f"[FEATURE-LIMIT] User {pid} blocked: "
f"ai_calls {access_calls['reason']} (used: {access_calls['used']}, limit: {access_calls['limit']})"
)
raise HTTPException(
status_code=403,
detail=f"Limit erreicht: Du hast das Kontingent für KI-Analysen überschritten ({access_calls['used']}/{access_calls['limit']}). "
f"Bitte kontaktiere den Admin oder warte bis zum nächsten Reset."
)
# Load pipeline config
with get_db() as conn:
cur = get_cursor(conn)
if config_id:
cur.execute("SELECT * FROM pipeline_configs WHERE id=%s AND active=true", (config_id,))
else:
cur.execute("SELECT * FROM pipeline_configs WHERE is_default=true AND active=true")
config = r2d(cur.fetchone())
if not config:
raise HTTPException(404, "Pipeline-Konfiguration nicht gefunden")
logger.info(f"[PIPELINE] Using config '{config['name']}' (id={config['id']})")
data = _get_profile_data(pid)
vars = _prepare_template_vars(data)
# Stage 1: Load and execute prompts from config
stage1_prompts = []
with get_db() as conn:
cur = get_cursor(conn)
for slug in config['stage1_prompts']:
cur.execute("SELECT slug, template FROM ai_prompts WHERE slug=%s AND active=true", (slug,))
prompt = r2d(cur.fetchone())
if prompt:
stage1_prompts.append(prompt)
else:
logger.warning(f"[PIPELINE] Stage 1 prompt '{slug}' not found or inactive")
stage1_results = {}
for p in stage1_prompts:
slug = p['slug']
final_prompt = _render_template(p['template'], vars)
if ANTHROPIC_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1000,
messages=[{"role": "user", "content": final_prompt}]
)
content = response.content[0].text.strip()
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": final_prompt}],
"max_tokens": 1000
},
timeout=60.0
)
content = resp.json()['choices'][0]['message']['content'].strip()
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Try to parse JSON, fallback to raw text
try:
stage1_results[slug] = json.loads(content)
except:
stage1_results[slug] = content
# Stage 2: Synthesis with dynamic placeholders
# Inject all stage1 results as {{stage1_<slug>}} placeholders
for slug, result in stage1_results.items():
# Convert slug like "pipeline_body" to placeholder name "stage1_body"
placeholder_name = slug.replace('pipeline_', 'stage1_')
vars[placeholder_name] = json.dumps(result, ensure_ascii=False) if isinstance(result, dict) else str(result)
# Load stage 2 prompt from config
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug=%s AND active=true", (config['stage2_prompt'],))
synth_row = cur.fetchone()
if not synth_row:
raise HTTPException(500, f"Pipeline synthesis prompt '{config['stage2_prompt']}' not found")
synth_prompt = _render_template(synth_row['template'], vars)
if ANTHROPIC_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": synth_prompt}]
)
synthesis = response.content[0].text
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": synth_prompt}],
"max_tokens": 2000
},
timeout=60.0
)
synthesis = resp.json()['choices'][0]['message']['content']
else:
raise HTTPException(500, "Keine KI-API konfiguriert")
# Stage 3: Optional (e.g., Goals)
goals_text = None
if config.get('stage3_prompt'):
# Check if conditions are met (for backwards compatibility with goals check)
prof = data['profile']
should_run_stage3 = True
# Special case: goals prompt only runs if goals are set
if config['stage3_prompt'] == 'pipeline_goals':
should_run_stage3 = bool(prof.get('goal_weight') or prof.get('goal_bf_pct'))
if should_run_stage3:
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT template FROM ai_prompts WHERE slug=%s AND active=true", (config['stage3_prompt'],))
goals_row = cur.fetchone()
if goals_row:
goals_prompt = _render_template(goals_row['template'], vars)
if ANTHROPIC_KEY:
import anthropic
client = anthropic.Anthropic(api_key=ANTHROPIC_KEY)
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=800,
messages=[{"role": "user", "content": goals_prompt}]
)
goals_text = response.content[0].text
elif OPENROUTER_KEY:
async with httpx.AsyncClient() as client:
resp = await client.post("https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
json={
"model": OPENROUTER_MODEL,
"messages": [{"role": "user", "content": goals_prompt}],
"max_tokens": 800
},
timeout=60.0
)
goals_text = resp.json()['choices'][0]['message']['content']
# Combine synthesis + goals
final_content = synthesis
if goals_text:
final_content += "\n\n" + goals_text
# Save with config-specific scope (with history - no DELETE)
scope = f"pipeline_{config['name'].lower().replace(' ', '_')}"
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("INSERT INTO ai_insights (id, profile_id, scope, content, created) VALUES (%s,%s,%s,%s,CURRENT_TIMESTAMP)",
(str(uuid.uuid4()), pid, scope, final_content))
logger.info(f"[PIPELINE] Completed '{config['name']}' - saved as scope='{scope}'")
# Phase 2: Increment ai_calls usage (pipeline uses multiple API calls)
# Note: We increment once per pipeline run, not per individual call
increment_feature_usage(pid, 'ai_calls')
# Old usage tracking (keep for now)
inc_ai_usage(pid)
return {
"scope": scope,
"content": final_content,
"stage1": stage1_results,
"config": {
"id": config['id'],
"name": config['name']
}
}
@router.get("/ai/usage")
def get_ai_usage(x_profile_id: Optional[str]=Header(default=None), session: dict=Depends(require_auth)):
"""Get AI usage stats for current profile."""
pid = get_pid(x_profile_id)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT ai_limit_day FROM profiles WHERE id=%s", (pid,))
prof = cur.fetchone()
limit = prof['ai_limit_day'] if prof else None
today = datetime.now().date().isoformat()
cur.execute("SELECT call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
usage = cur.fetchone()
used = usage['call_count'] if usage else 0
return {"limit": limit, "used": used, "remaining": (limit - used) if limit else None}