mitai-jinkendo/backend/placeholder_resolver.py
Lars 500de132b9 feat: AI-Prompts flexibilisierung - Backend & Admin UI (Issue #28, Part 1)
Backend complete:
- Migration 017: Add category column to ai_prompts
- placeholder_resolver.py: 20+ placeholders with resolver functions
- Extended routers/prompts.py with CRUD endpoints:
  * POST /api/prompts (create)
  * PUT /api/prompts/:id (update)
  * DELETE /api/prompts/:id (delete)
  * POST /api/prompts/:id/duplicate
  * PUT /api/prompts/reorder
  * POST /api/prompts/preview
  * GET /api/prompts/placeholders
  * POST /api/prompts/generate (KI-assisted generation)
  * POST /api/prompts/:id/optimize (KI analysis)
- Extended models.py with PromptCreate, PromptUpdate, PromptGenerateRequest

Frontend:
- AdminPromptsPage.jsx: Full CRUD UI with category filter, reordering

Meta-Features:
- KI generates prompts from goal description + example data
- KI analyzes and optimizes existing prompts

Next: PromptEditModal, PromptGenerator, api.js integration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 15:32:25 +01:00

309 lines
10 KiB
Python

"""
Placeholder Resolver for AI Prompts
Provides a registry of placeholder functions that resolve to actual user data.
Used for prompt templates and preview functionality.
"""
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable
from db import get_db, get_cursor, r2d
# ── Helper Functions ──────────────────────────────────────────────────────────
def get_profile_data(profile_id: str) -> Dict:
"""Load profile data for a user."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,))
return r2d(cur.fetchone()) if cur.rowcount > 0 else {}
def get_latest_weight(profile_id: str) -> Optional[str]:
"""Get latest weight entry."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT weight FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1",
(profile_id,)
)
row = cur.fetchone()
return f"{row['weight']:.1f} kg" if row else "nicht verfügbar"
def get_weight_trend(profile_id: str, days: int = 28) -> str:
"""Calculate weight trend description."""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT weight, date FROM weight_log
WHERE profile_id=%s AND date >= %s
ORDER BY date""",
(profile_id, cutoff)
)
rows = [r2d(r) for r in cur.fetchall()]
if len(rows) < 2:
return "nicht genug Daten"
first = rows[0]['weight']
last = rows[-1]['weight']
delta = last - first
if abs(delta) < 0.3:
return "stabil"
elif delta > 0:
return f"steigend (+{delta:.1f} kg in {days} Tagen)"
else:
return f"sinkend ({delta:.1f} kg in {days} Tagen)"
def get_latest_bf(profile_id: str) -> Optional[str]:
"""Get latest body fat percentage from caliper."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT bf_jpl FROM caliper_log
WHERE profile_id=%s AND bf_jpl IS NOT NULL
ORDER BY date DESC LIMIT 1""",
(profile_id,)
)
row = cur.fetchone()
return f"{row['bf_jpl']:.1f}%" if row else "nicht verfügbar"
def get_nutrition_avg(profile_id: str, field: str, days: int = 30) -> str:
"""Calculate average nutrition value."""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
f"""SELECT AVG({field}) as avg FROM nutrition_log
WHERE profile_id=%s AND date >= %s AND {field} IS NOT NULL""",
(profile_id, cutoff)
)
row = cur.fetchone()
if row and row['avg']:
if field == 'kcal':
return f"{int(row['avg'])} kcal/Tag (Ø {days} Tage)"
else:
return f"{int(row['avg'])}g/Tag (Ø {days} Tage)"
return "nicht verfügbar"
def get_activity_summary(profile_id: str, days: int = 14) -> str:
"""Get activity summary for recent period."""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT COUNT(*) as count,
SUM(duration_min) as total_min,
SUM(kcal_active) as total_kcal
FROM activity_log
WHERE profile_id=%s AND date >= %s""",
(profile_id, cutoff)
)
row = r2d(cur.fetchone())
if row['count'] == 0:
return f"Keine Aktivitäten in den letzten {days} Tagen"
avg_min = int(row['total_min'] / row['count']) if row['total_min'] else 0
return f"{row['count']} Einheiten in {days} Tagen (Ø {avg_min} min/Einheit, {int(row['total_kcal'] or 0)} kcal gesamt)"
def calculate_age(dob: Optional[str]) -> str:
"""Calculate age from date of birth."""
if not dob:
return "unbekannt"
try:
birth = datetime.strptime(dob, '%Y-%m-%d')
today = datetime.now()
age = today.year - birth.year - ((today.month, today.day) < (birth.month, birth.day))
return str(age)
except:
return "unbekannt"
def get_trainingstyp_verteilung(profile_id: str, days: int = 14) -> str:
"""Get training type distribution."""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT training_category, COUNT(*) as count
FROM activity_log
WHERE profile_id=%s AND date >= %s AND training_category IS NOT NULL
GROUP BY training_category
ORDER BY count DESC""",
(profile_id, cutoff)
)
rows = [r2d(r) for r in cur.fetchall()]
if not rows:
return "Keine kategorisierten Trainings"
total = sum(r['count'] for r in rows)
parts = [f"{r['training_category']}: {int(r['count']/total*100)}%" for r in rows[:3]]
return ", ".join(parts)
# ── Placeholder Registry ──────────────────────────────────────────────────────
PLACEHOLDER_MAP: Dict[str, Callable[[str], str]] = {
# Profil
'{{name}}': lambda pid: get_profile_data(pid).get('name', 'Nutzer'),
'{{age}}': lambda pid: calculate_age(get_profile_data(pid).get('dob')),
'{{height}}': lambda pid: str(get_profile_data(pid).get('height', 'unbekannt')),
'{{geschlecht}}': lambda pid: 'männlich' if get_profile_data(pid).get('sex') == 'm' else 'weiblich',
# Körper
'{{weight_aktuell}}': get_latest_weight,
'{{weight_trend}}': get_weight_trend,
'{{kf_aktuell}}': get_latest_bf,
'{{bmi}}': lambda pid: calculate_bmi(pid),
# Ernährung
'{{kcal_avg}}': lambda pid: get_nutrition_avg(pid, 'kcal', 30),
'{{protein_avg}}': lambda pid: get_nutrition_avg(pid, 'protein', 30),
'{{carb_avg}}': lambda pid: get_nutrition_avg(pid, 'carb', 30),
'{{fat_avg}}': lambda pid: get_nutrition_avg(pid, 'fat', 30),
# Training
'{{activity_summary}}': get_activity_summary,
'{{trainingstyp_verteilung}}': get_trainingstyp_verteilung,
# Zeitraum
'{{datum_heute}}': lambda pid: datetime.now().strftime('%d.%m.%Y'),
'{{zeitraum_7d}}': lambda pid: 'letzte 7 Tage',
'{{zeitraum_30d}}': lambda pid: 'letzte 30 Tage',
'{{zeitraum_90d}}': lambda pid: 'letzte 90 Tage',
}
def calculate_bmi(profile_id: str) -> str:
"""Calculate BMI from latest weight and profile height."""
profile = get_profile_data(profile_id)
if not profile.get('height'):
return "nicht verfügbar"
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT weight FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1",
(profile_id,)
)
row = cur.fetchone()
if not row:
return "nicht verfügbar"
height_m = profile['height'] / 100
bmi = row['weight'] / (height_m ** 2)
return f"{bmi:.1f}"
# ── Public API ────────────────────────────────────────────────────────────────
def resolve_placeholders(template: str, profile_id: str) -> str:
"""
Replace all {{placeholders}} in template with actual user data.
Args:
template: Prompt template with placeholders
profile_id: User profile ID
Returns:
Resolved template with placeholders replaced by values
"""
result = template
for placeholder, resolver in PLACEHOLDER_MAP.items():
if placeholder in result:
try:
value = resolver(profile_id)
result = result.replace(placeholder, str(value))
except Exception as e:
# On error, replace with error message
result = result.replace(placeholder, f"[Fehler: {placeholder}]")
return result
def get_unknown_placeholders(template: str) -> List[str]:
"""
Find all placeholders in template that are not in PLACEHOLDER_MAP.
Args:
template: Prompt template
Returns:
List of unknown placeholder names (without {{}})
"""
# Find all {{...}} patterns
found = re.findall(r'\{\{(\w+)\}\}', template)
# Filter to only unknown ones
known_names = {p.strip('{}') for p in PLACEHOLDER_MAP.keys()}
unknown = [p for p in found if p not in known_names]
return list(set(unknown)) # Remove duplicates
def get_available_placeholders(categories: Optional[List[str]] = None) -> Dict[str, List[str]]:
"""
Get available placeholders, optionally filtered by categories.
Args:
categories: Optional list of categories to filter (körper, ernährung, training, etc.)
Returns:
Dict mapping category to list of placeholders
"""
placeholder_categories = {
'profil': [
'{{name}}', '{{age}}', '{{height}}', '{{geschlecht}}'
],
'körper': [
'{{weight_aktuell}}', '{{weight_trend}}', '{{kf_aktuell}}', '{{bmi}}'
],
'ernährung': [
'{{kcal_avg}}', '{{protein_avg}}', '{{carb_avg}}', '{{fat_avg}}'
],
'training': [
'{{activity_summary}}', '{{trainingstyp_verteilung}}'
],
'zeitraum': [
'{{datum_heute}}', '{{zeitraum_7d}}', '{{zeitraum_30d}}', '{{zeitraum_90d}}'
]
}
if not categories:
return placeholder_categories
# Filter to requested categories
return {k: v for k, v in placeholder_categories.items() if k in categories}
def get_placeholder_example_values(profile_id: str) -> Dict[str, str]:
"""
Get example values for all placeholders using real user data.
Args:
profile_id: User profile ID
Returns:
Dict mapping placeholder to example value
"""
examples = {}
for placeholder, resolver in PLACEHOLDER_MAP.items():
try:
examples[placeholder] = resolver(profile_id)
except Exception as e:
examples[placeholder] = f"[Fehler: {str(e)}]"
return examples