""" Placeholder Resolver for AI Prompts Provides a registry of placeholder functions that resolve to actual user data. Used for prompt templates and preview functionality. """ import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Callable from db import get_db, get_cursor, r2d # ── Helper Functions ────────────────────────────────────────────────────────── def get_profile_data(profile_id: str) -> Dict: """Load profile data for a user.""" with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,)) return r2d(cur.fetchone()) if cur.rowcount > 0 else {} def get_latest_weight(profile_id: str) -> Optional[str]: """Get latest weight entry.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT weight FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1", (profile_id,) ) row = cur.fetchone() return f"{row['weight']:.1f} kg" if row else "nicht verfügbar" def get_weight_trend(profile_id: str, days: int = 28) -> str: """Calculate weight trend description.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT weight, date FROM weight_log WHERE profile_id=%s AND date >= %s ORDER BY date""", (profile_id, cutoff) ) rows = [r2d(r) for r in cur.fetchall()] if len(rows) < 2: return "nicht genug Daten" first = rows[0]['weight'] last = rows[-1]['weight'] delta = last - first if abs(delta) < 0.3: return "stabil" elif delta > 0: return f"steigend (+{delta:.1f} kg in {days} Tagen)" else: return f"sinkend ({delta:.1f} kg in {days} Tagen)" def get_latest_bf(profile_id: str) -> Optional[str]: """Get latest body fat percentage from caliper.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT body_fat_pct FROM caliper_log WHERE profile_id=%s AND body_fat_pct IS NOT NULL ORDER BY date DESC LIMIT 1""", (profile_id,) ) row = cur.fetchone() return f"{row['body_fat_pct']:.1f}%" if row else "nicht verfügbar" def get_nutrition_avg(profile_id: str, field: str, days: int = 30) -> str: """Calculate average nutrition value.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Map field names to actual column names field_map = { 'protein': 'protein_g', 'fat': 'fat_g', 'carb': 'carbs_g', 'kcal': 'kcal' } db_field = field_map.get(field, field) cur.execute( f"""SELECT AVG({db_field}) as avg FROM nutrition_log WHERE profile_id=%s AND date >= %s AND {db_field} IS NOT NULL""", (profile_id, cutoff) ) row = cur.fetchone() if row and row['avg']: if field == 'kcal': return f"{int(row['avg'])} kcal/Tag (Ø {days} Tage)" else: return f"{int(row['avg'])}g/Tag (Ø {days} Tage)" return "nicht verfügbar" def get_caliper_summary(profile_id: str) -> str: """Get latest caliper measurements summary.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT body_fat_pct, sf_method, date FROM caliper_log WHERE profile_id=%s AND body_fat_pct IS NOT NULL ORDER BY date DESC LIMIT 1""", (profile_id,) ) row = r2d(cur.fetchone()) if cur.rowcount > 0 else None if not row: return "keine Caliper-Messungen" method = row.get('sf_method', 'unbekannt') return f"{row['body_fat_pct']:.1f}% ({method} am {row['date']})" def get_circ_summary(profile_id: str) -> str: """Get latest circumference measurements summary.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT c_neck, c_chest, c_waist, c_belly, c_hip, c_thigh, c_calf, c_arm, date FROM circumference_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1""", (profile_id,) ) row = r2d(cur.fetchone()) if cur.rowcount > 0 else None if not row: return "keine Umfangsmessungen" # Check all 8 circumference points parts = [] if row.get('c_neck'): parts.append(f"Nacken {row['c_neck']:.1f}cm") if row.get('c_chest'): parts.append(f"Brust {row['c_chest']:.1f}cm") if row.get('c_waist'): parts.append(f"Taille {row['c_waist']:.1f}cm") if row.get('c_belly'): parts.append(f"Bauch {row['c_belly']:.1f}cm") if row.get('c_hip'): parts.append(f"Hüfte {row['c_hip']:.1f}cm") if row.get('c_thigh'): parts.append(f"Oberschenkel {row['c_thigh']:.1f}cm") if row.get('c_calf'): parts.append(f"Wade {row['c_calf']:.1f}cm") if row.get('c_arm'): parts.append(f"Arm {row['c_arm']:.1f}cm") return f"{', '.join(parts)} ({row['date']})" if parts else "keine Daten" def get_goal_weight(profile_id: str) -> str: """Get goal weight from profile.""" profile = get_profile_data(profile_id) goal = profile.get('goal_weight') return f"{goal:.1f}" if goal else "nicht gesetzt" def get_goal_bf_pct(profile_id: str) -> str: """Get goal body fat percentage from profile.""" profile = get_profile_data(profile_id) goal = profile.get('goal_bf_pct') return f"{goal:.1f}" if goal else "nicht gesetzt" def get_nutrition_days(profile_id: str, days: int = 30) -> str: """Get number of days with nutrition data.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT COUNT(DISTINCT date) as days FROM nutrition_log WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) row = cur.fetchone() return str(row['days']) if row else "0" def get_protein_ziel_low(profile_id: str) -> str: """Calculate lower protein target based on current weight (1.6g/kg).""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT weight FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1""", (profile_id,) ) row = cur.fetchone() if row: return f"{int(float(row['weight']) * 1.6)}" return "nicht verfügbar" def get_protein_ziel_high(profile_id: str) -> str: """Calculate upper protein target based on current weight (2.2g/kg).""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT weight FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1""", (profile_id,) ) row = cur.fetchone() if row: return f"{int(float(row['weight']) * 2.2)}" return "nicht verfügbar" def get_activity_summary(profile_id: str, days: int = 14) -> str: """Get activity summary for recent period.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT COUNT(*) as count, SUM(duration_min) as total_min, SUM(kcal_active) as total_kcal FROM activity_log WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) row = r2d(cur.fetchone()) if row['count'] == 0: return f"Keine Aktivitäten in den letzten {days} Tagen" avg_min = int(row['total_min'] / row['count']) if row['total_min'] else 0 return f"{row['count']} Einheiten in {days} Tagen (Ø {avg_min} min/Einheit, {int(row['total_kcal'] or 0)} kcal gesamt)" def calculate_age(dob) -> str: """Calculate age from date of birth (accepts date object or string).""" if not dob: return "unbekannt" try: # Handle both datetime.date objects and strings if isinstance(dob, str): birth = datetime.strptime(dob, '%Y-%m-%d').date() else: birth = dob # Already a date object from PostgreSQL today = datetime.now().date() age = today.year - birth.year - ((today.month, today.day) < (birth.month, birth.day)) return str(age) except Exception as e: return "unbekannt" def get_activity_detail(profile_id: str, days: int = 14) -> str: """Get detailed activity log for analysis.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, activity_type, duration_min, kcal_active, hr_avg FROM activity_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC LIMIT 50""", (profile_id, cutoff) ) rows = [r2d(r) for r in cur.fetchall()] if not rows: return f"Keine Aktivitäten in den letzten {days} Tagen" # Format as readable list lines = [] for r in rows: hr_str = f" HF={r['hr_avg']}" if r.get('hr_avg') else "" lines.append( f"{r['date']}: {r['activity_type']} ({r['duration_min']}min, {r.get('kcal_active', 0)}kcal{hr_str})" ) return '\n'.join(lines[:20]) # Max 20 entries to avoid token bloat def get_trainingstyp_verteilung(profile_id: str, days: int = 14) -> str: """Get training type distribution.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT training_category, COUNT(*) as count FROM activity_log WHERE profile_id=%s AND date >= %s AND training_category IS NOT NULL GROUP BY training_category ORDER BY count DESC""", (profile_id, cutoff) ) rows = [r2d(r) for r in cur.fetchall()] if not rows: return "Keine kategorisierten Trainings" total = sum(r['count'] for r in rows) parts = [f"{r['training_category']}: {int(r['count']/total*100)}%" for r in rows[:3]] return ", ".join(parts) def get_sleep_avg_duration(profile_id: str, days: int = 7) -> str: """Calculate average sleep duration in hours.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT sleep_segments FROM sleep_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC""", (profile_id, cutoff) ) rows = cur.fetchall() if not rows: return "nicht verfügbar" total_minutes = 0 for row in rows: segments = row['sleep_segments'] if segments: # Sum duration_min from all segments for seg in segments: total_minutes += seg.get('duration_min', 0) if total_minutes == 0: return "nicht verfügbar" avg_hours = total_minutes / len(rows) / 60 return f"{avg_hours:.1f}h" def get_sleep_avg_quality(profile_id: str, days: int = 7) -> str: """Calculate average sleep quality (Deep+REM %).""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT sleep_segments FROM sleep_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC""", (profile_id, cutoff) ) rows = cur.fetchall() if not rows: return "nicht verfügbar" total_quality = 0 count = 0 for row in rows: segments = row['sleep_segments'] if segments: # Note: segments use 'phase' key (not 'stage'), stored lowercase (deep, rem, light, awake) deep_rem_min = sum(s.get('duration_min', 0) for s in segments if s.get('phase') in ['deep', 'rem']) total_min = sum(s.get('duration_min', 0) for s in segments) if total_min > 0: quality_pct = (deep_rem_min / total_min) * 100 total_quality += quality_pct count += 1 if count == 0: return "nicht verfügbar" avg_quality = total_quality / count return f"{avg_quality:.0f}% (Deep+REM)" def get_rest_days_count(profile_id: str, days: int = 30) -> str: """Count rest days in the given period.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT COUNT(DISTINCT date) as count FROM rest_days WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) row = cur.fetchone() count = row['count'] if row else 0 return f"{count} Ruhetage" def get_vitals_avg_hr(profile_id: str, days: int = 7) -> str: """Calculate average resting heart rate.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT AVG(resting_hr) as avg FROM vitals_baseline WHERE profile_id=%s AND date >= %s AND resting_hr IS NOT NULL""", (profile_id, cutoff) ) row = cur.fetchone() if row and row['avg']: return f"{int(row['avg'])} bpm" return "nicht verfügbar" def get_vitals_avg_hrv(profile_id: str, days: int = 7) -> str: """Calculate average heart rate variability.""" with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT AVG(hrv) as avg FROM vitals_baseline WHERE profile_id=%s AND date >= %s AND hrv IS NOT NULL""", (profile_id, cutoff) ) row = cur.fetchone() if row and row['avg']: return f"{int(row['avg'])} ms" return "nicht verfügbar" def get_vitals_vo2_max(profile_id: str) -> str: """Get latest VO2 Max value.""" with get_db() as conn: cur = get_cursor(conn) cur.execute( """SELECT vo2_max FROM vitals_baseline WHERE profile_id=%s AND vo2_max IS NOT NULL ORDER BY date DESC LIMIT 1""", (profile_id,) ) row = cur.fetchone() if row and row['vo2_max']: return f"{row['vo2_max']:.1f} ml/kg/min" return "nicht verfügbar" # ── Placeholder Registry ────────────────────────────────────────────────────── PLACEHOLDER_MAP: Dict[str, Callable[[str], str]] = { # Profil '{{name}}': lambda pid: get_profile_data(pid).get('name', 'Nutzer'), '{{age}}': lambda pid: calculate_age(get_profile_data(pid).get('dob')), '{{height}}': lambda pid: str(get_profile_data(pid).get('height', 'unbekannt')), '{{geschlecht}}': lambda pid: 'männlich' if get_profile_data(pid).get('sex') == 'm' else 'weiblich', # Körper '{{weight_aktuell}}': get_latest_weight, '{{weight_trend}}': get_weight_trend, '{{kf_aktuell}}': get_latest_bf, '{{bmi}}': lambda pid: calculate_bmi(pid), '{{caliper_summary}}': get_caliper_summary, '{{circ_summary}}': get_circ_summary, '{{goal_weight}}': get_goal_weight, '{{goal_bf_pct}}': get_goal_bf_pct, # Ernährung '{{kcal_avg}}': lambda pid: get_nutrition_avg(pid, 'kcal', 30), '{{protein_avg}}': lambda pid: get_nutrition_avg(pid, 'protein', 30), '{{carb_avg}}': lambda pid: get_nutrition_avg(pid, 'carb', 30), '{{fat_avg}}': lambda pid: get_nutrition_avg(pid, 'fat', 30), '{{nutrition_days}}': lambda pid: get_nutrition_days(pid, 30), '{{protein_ziel_low}}': get_protein_ziel_low, '{{protein_ziel_high}}': get_protein_ziel_high, # Training '{{activity_summary}}': get_activity_summary, '{{activity_detail}}': get_activity_detail, '{{trainingstyp_verteilung}}': get_trainingstyp_verteilung, # Schlaf & Erholung '{{sleep_avg_duration}}': lambda pid: get_sleep_avg_duration(pid, 7), '{{sleep_avg_quality}}': lambda pid: get_sleep_avg_quality(pid, 7), '{{rest_days_count}}': lambda pid: get_rest_days_count(pid, 30), # Vitalwerte '{{vitals_avg_hr}}': lambda pid: get_vitals_avg_hr(pid, 7), '{{vitals_avg_hrv}}': lambda pid: get_vitals_avg_hrv(pid, 7), '{{vitals_vo2_max}}': get_vitals_vo2_max, # Zeitraum '{{datum_heute}}': lambda pid: datetime.now().strftime('%d.%m.%Y'), '{{zeitraum_7d}}': lambda pid: 'letzte 7 Tage', '{{zeitraum_30d}}': lambda pid: 'letzte 30 Tage', '{{zeitraum_90d}}': lambda pid: 'letzte 90 Tage', } def calculate_bmi(profile_id: str) -> str: """Calculate BMI from latest weight and profile height.""" profile = get_profile_data(profile_id) if not profile.get('height'): return "nicht verfügbar" with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT weight FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 1", (profile_id,) ) row = cur.fetchone() if not row: return "nicht verfügbar" height_m = profile['height'] / 100 bmi = row['weight'] / (height_m ** 2) return f"{bmi:.1f}" # ── Public API ──────────────────────────────────────────────────────────────── def resolve_placeholders(template: str, profile_id: str) -> str: """ Replace all {{placeholders}} in template with actual user data. Args: template: Prompt template with placeholders profile_id: User profile ID Returns: Resolved template with placeholders replaced by values """ result = template for placeholder, resolver in PLACEHOLDER_MAP.items(): if placeholder in result: try: value = resolver(profile_id) result = result.replace(placeholder, str(value)) except Exception as e: # On error, replace with error message result = result.replace(placeholder, f"[Fehler: {placeholder}]") return result def get_unknown_placeholders(template: str) -> List[str]: """ Find all placeholders in template that are not in PLACEHOLDER_MAP. Args: template: Prompt template Returns: List of unknown placeholder names (without {{}}) """ # Find all {{...}} patterns found = re.findall(r'\{\{(\w+)\}\}', template) # Filter to only unknown ones known_names = {p.strip('{}') for p in PLACEHOLDER_MAP.keys()} unknown = [p for p in found if p not in known_names] return list(set(unknown)) # Remove duplicates def get_available_placeholders(categories: Optional[List[str]] = None) -> Dict[str, List[str]]: """ Get available placeholders, optionally filtered by categories. Args: categories: Optional list of categories to filter (körper, ernährung, training, etc.) Returns: Dict mapping category to list of placeholders """ placeholder_categories = { 'profil': [ '{{name}}', '{{age}}', '{{height}}', '{{geschlecht}}' ], 'körper': [ '{{weight_aktuell}}', '{{weight_trend}}', '{{kf_aktuell}}', '{{bmi}}' ], 'ernährung': [ '{{kcal_avg}}', '{{protein_avg}}', '{{carb_avg}}', '{{fat_avg}}' ], 'training': [ '{{activity_summary}}', '{{trainingstyp_verteilung}}' ], 'zeitraum': [ '{{datum_heute}}', '{{zeitraum_7d}}', '{{zeitraum_30d}}', '{{zeitraum_90d}}' ] } if not categories: return placeholder_categories # Filter to requested categories return {k: v for k, v in placeholder_categories.items() if k in categories} def get_placeholder_example_values(profile_id: str) -> Dict[str, str]: """ Get example values for all placeholders using real user data. Args: profile_id: User profile ID Returns: Dict mapping placeholder to example value """ examples = {} for placeholder, resolver in PLACEHOLDER_MAP.items(): try: examples[placeholder] = resolver(profile_id) except Exception as e: examples[placeholder] = f"[Fehler: {str(e)}]" return examples def get_placeholder_catalog(profile_id: str) -> Dict[str, List[Dict[str, str]]]: """ Get grouped placeholder catalog with descriptions and example values. Args: profile_id: User profile ID Returns: Dict mapping category to list of {key, description, example} """ # Placeholder definitions with descriptions placeholders = { 'Profil': [ ('name', 'Name des Nutzers'), ('age', 'Alter in Jahren'), ('height', 'Körpergröße in cm'), ('geschlecht', 'Geschlecht'), ], 'Körper': [ ('weight_aktuell', 'Aktuelles Gewicht in kg'), ('weight_trend', 'Gewichtstrend (7d/30d)'), ('kf_aktuell', 'Aktueller Körperfettanteil in %'), ('bmi', 'Body Mass Index'), ], 'Ernährung': [ ('kcal_avg', 'Durchschn. Kalorien (30d)'), ('protein_avg', 'Durchschn. Protein in g (30d)'), ('carb_avg', 'Durchschn. Kohlenhydrate in g (30d)'), ('fat_avg', 'Durchschn. Fett in g (30d)'), ], 'Training': [ ('activity_summary', 'Aktivitäts-Zusammenfassung (7d)'), ('trainingstyp_verteilung', 'Verteilung nach Trainingstypen'), ], 'Schlaf & Erholung': [ ('sleep_avg_duration', 'Durchschn. Schlafdauer (7d)'), ('sleep_avg_quality', 'Durchschn. Schlafqualität (7d)'), ('rest_days_count', 'Anzahl Ruhetage (30d)'), ], 'Vitalwerte': [ ('vitals_avg_hr', 'Durchschn. Ruhepuls (7d)'), ('vitals_avg_hrv', 'Durchschn. HRV (7d)'), ('vitals_vo2_max', 'Aktueller VO2 Max'), ], 'Zeitraum': [ ('datum_heute', 'Heutiges Datum'), ('zeitraum_7d', '7-Tage-Zeitraum'), ('zeitraum_30d', '30-Tage-Zeitraum'), ], } catalog = {} for category, items in placeholders.items(): catalog[category] = [] for key, description in items: placeholder = f'{{{{{key}}}}}' # Get example value if resolver exists resolver = PLACEHOLDER_MAP.get(placeholder) if resolver: try: example = resolver(profile_id) except Exception: example = '[Nicht verfügbar]' else: example = '[Nicht implementiert]' catalog[category].append({ 'key': key, 'description': description, 'example': str(example) }) return catalog