mitai-jinkendo/backend/goal_utils.py
Lars 0e89850df8
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
fix: add start_date and created_at to get_active_goals query
ROOT CAUSE: get_active_goals() SELECT was missing start_date and created_at
IMPACT: Time-based deviation calculation failed silently for all goals

Now returns:
- start_date: Required for accurate time-based progress calculation
- created_at: Fallback when start_date is not set

This fixes:
- Zielgewicht (weight) should now show +7% ahead
- Körperfett should show time deviation
- All goals with target_date now have time-based tracking
2026-03-28 17:18:53 +01:00

569 lines
19 KiB
Python

"""
Goal Utilities - Abstraction Layer for Focus Weights & Universal Value Fetcher
This module provides:
1. Abstraction layer between goal modes and focus weights (Phase 1)
2. Universal value fetcher for dynamic goal types (Phase 1.5)
Version History:
- V1 (Phase 1): Maps goal_mode to predefined weights
- V1.5 (Phase 1.5): Universal value fetcher for DB-registry goal types
- V2 (future): Reads from focus_areas table with custom user weights
Part of Phase 1 + Phase 1.5: Flexible Goal System
"""
from typing import Dict, Optional, Any, List
from datetime import date, timedelta
from decimal import Decimal
import json
from db import get_cursor, get_db
def get_focus_weights(conn, profile_id: str) -> Dict[str, float]:
"""
Get focus area weights for a profile.
V2 (Goal System v2.0): Reads from focus_areas table with custom user weights.
Falls back to goal_mode mapping if focus_areas not set.
Args:
conn: Database connection
profile_id: User's profile ID
Returns:
Dict with focus weights (sum = 1.0):
{
'weight_loss': 0.3, # Fat loss priority
'muscle_gain': 0.2, # Muscle gain priority
'strength': 0.25, # Strength training priority
'endurance': 0.25, # Cardio/endurance priority
'flexibility': 0.0, # Mobility priority
'health': 0.0 # General health maintenance
}
Example Usage in Phase 0b:
weights = get_focus_weights(conn, profile_id)
# Score calculation considers user's focus
overall_score = (
body_score * weights['weight_loss'] +
strength_score * weights['strength'] +
cardio_score * weights['endurance']
)
"""
cur = get_cursor(conn)
# V2: Try to fetch from focus_areas table
cur.execute("""
SELECT weight_loss_pct, muscle_gain_pct, strength_pct,
endurance_pct, flexibility_pct, health_pct
FROM focus_areas
WHERE profile_id = %s AND active = true
LIMIT 1
""", (profile_id,))
row = cur.fetchone()
if row:
# Convert percentages to weights (0-1 range)
return {
'weight_loss': row['weight_loss_pct'] / 100.0,
'muscle_gain': row['muscle_gain_pct'] / 100.0,
'strength': row['strength_pct'] / 100.0,
'endurance': row['endurance_pct'] / 100.0,
'flexibility': row['flexibility_pct'] / 100.0,
'health': row['health_pct'] / 100.0
}
# V1 Fallback: Use goal_mode if focus_areas not set
cur.execute(
"SELECT goal_mode FROM profiles WHERE id = %s",
(profile_id,)
)
row = cur.fetchone()
if not row:
# Ultimate fallback: balanced health focus
return {
'weight_loss': 0.0,
'muscle_gain': 0.0,
'strength': 0.10,
'endurance': 0.20,
'flexibility': 0.15,
'health': 0.55
}
goal_mode = row['goal_mode']
if not goal_mode:
return {
'weight_loss': 0.0,
'muscle_gain': 0.0,
'strength': 0.10,
'endurance': 0.20,
'flexibility': 0.15,
'health': 0.55
}
# V1: Predefined weight mappings per goal_mode (fallback)
WEIGHT_MAPPINGS = {
'weight_loss': {
'weight_loss': 0.60,
'endurance': 0.20,
'muscle_gain': 0.0,
'strength': 0.10,
'flexibility': 0.05,
'health': 0.05
},
'strength': {
'strength': 0.50,
'muscle_gain': 0.40,
'endurance': 0.10,
'weight_loss': 0.0,
'flexibility': 0.0,
'health': 0.0
},
'endurance': {
'endurance': 0.70,
'health': 0.20,
'flexibility': 0.10,
'weight_loss': 0.0,
'muscle_gain': 0.0,
'strength': 0.0
},
'recomposition': {
'weight_loss': 0.30,
'muscle_gain': 0.30,
'strength': 0.25,
'endurance': 0.10,
'flexibility': 0.05,
'health': 0.0
},
'health': {
'health': 0.50,
'endurance': 0.20,
'flexibility': 0.15,
'strength': 0.10,
'weight_loss': 0.05,
'muscle_gain': 0.0
}
}
return WEIGHT_MAPPINGS.get(goal_mode, WEIGHT_MAPPINGS['health'])
def get_primary_focus(conn, profile_id: str) -> str:
"""
Get the primary focus area for a profile.
Returns the focus area with the highest weight.
Useful for UI labels and simple decision logic.
Args:
conn: Database connection
profile_id: User's profile ID
Returns:
Primary focus area name (e.g., 'weight_loss', 'strength')
"""
weights = get_focus_weights(conn, profile_id)
return max(weights.items(), key=lambda x: x[1])[0]
def get_focus_description(focus_area: str) -> str:
"""
Get human-readable description for a focus area.
Args:
focus_area: Focus area key (e.g., 'weight_loss')
Returns:
German description for UI display
"""
descriptions = {
'weight_loss': 'Gewichtsreduktion & Fettabbau',
'muscle_gain': 'Muskelaufbau & Hypertrophie',
'strength': 'Kraftsteigerung & Performance',
'endurance': 'Ausdauer & aerobe Kapazität',
'flexibility': 'Beweglichkeit & Mobilität',
'health': 'Allgemeine Gesundheit & Erhaltung'
}
return descriptions.get(focus_area, focus_area)
# ============================================================================
# Phase 1.5: Universal Value Fetcher for Dynamic Goal Types
# ============================================================================
def get_goal_type_config(conn, type_key: str) -> Optional[Dict[str, Any]]:
"""
Get goal type configuration from database registry.
Args:
conn: Database connection
type_key: Goal type key (e.g., 'weight', 'meditation_minutes')
Returns:
Dict with config or None if not found/inactive
"""
cur = get_cursor(conn)
cur.execute("""
SELECT type_key, source_table, source_column, aggregation_method,
calculation_formula, filter_conditions, label_de, unit, icon, category
FROM goal_type_definitions
WHERE type_key = %s AND is_active = true
LIMIT 1
""", (type_key,))
return cur.fetchone()
def get_current_value_for_goal(conn, profile_id: str, goal_type: str) -> Optional[float]:
"""
Universal value fetcher for any goal type.
Reads configuration from goal_type_definitions table and executes
appropriate query based on aggregation_method or calculation_formula.
Args:
conn: Database connection
profile_id: User's profile ID
goal_type: Goal type key (e.g., 'weight', 'meditation_minutes')
Returns:
Current value as float or None if not available
"""
config = get_goal_type_config(conn, goal_type)
if not config:
print(f"[WARNING] Goal type '{goal_type}' not found or inactive")
return None
# Complex calculation (e.g., lean_mass)
if config['calculation_formula']:
return _execute_calculation_formula(conn, profile_id, config['calculation_formula'])
# Simple aggregation
return _fetch_by_aggregation_method(
conn,
profile_id,
config['source_table'],
config['source_column'],
config['aggregation_method'],
config.get('filter_conditions')
)
def _fetch_by_aggregation_method(
conn,
profile_id: str,
table: str,
column: str,
method: str,
filter_conditions: Optional[Any] = None
) -> Optional[float]:
"""
Fetch value using specified aggregation method.
Supported methods:
- latest: Most recent value
- avg_7d: 7-day average
- avg_30d: 30-day average
- sum_30d: 30-day sum
- count_7d: Count of entries in last 7 days
- count_30d: Count of entries in last 30 days
- min_30d: Minimum value in last 30 days
- max_30d: Maximum value in last 30 days
Args:
filter_conditions: Optional JSON filters (e.g., {"training_category": "strength"})
"""
# Guard: source_table/column required for simple aggregation
if not table or not column:
print(f"[WARNING] Missing source_table or source_column for aggregation")
return None
# Table-specific date column mapping (some tables use different column names)
DATE_COLUMN_MAP = {
'blood_pressure_log': 'measured_at',
'activity_log': 'date',
'weight_log': 'date',
'circumference_log': 'date',
'caliper_log': 'date',
'nutrition_log': 'date',
'sleep_log': 'date',
'vitals_baseline': 'date',
'rest_days': 'date',
'fitness_tests': 'test_date'
}
date_col = DATE_COLUMN_MAP.get(table, 'date')
# Build filter SQL from JSON conditions
filter_sql = ""
filter_params = []
if filter_conditions:
try:
if isinstance(filter_conditions, str):
filters = json.loads(filter_conditions)
else:
filters = filter_conditions
for filter_col, filter_val in filters.items():
if isinstance(filter_val, list):
# IN clause for multiple values
placeholders = ', '.join(['%s'] * len(filter_val))
filter_sql += f" AND {filter_col} IN ({placeholders})"
filter_params.extend(filter_val)
else:
# Single value equality
filter_sql += f" AND {filter_col} = %s"
filter_params.append(filter_val)
except (json.JSONDecodeError, TypeError, AttributeError) as e:
print(f"[WARNING] Invalid filter_conditions: {e}, ignoring filters")
cur = get_cursor(conn)
try:
if method == 'latest':
params = [profile_id] + filter_params
cur.execute(f"""
SELECT {column} FROM {table}
WHERE profile_id = %s AND {column} IS NOT NULL{filter_sql}
ORDER BY {date_col} DESC LIMIT 1
""", params)
row = cur.fetchone()
return float(row[column]) if row else None
elif method == 'avg_7d':
days_ago = date.today() - timedelta(days=7)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT AVG({column}) as avg_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['avg_value']) if row and row['avg_value'] is not None else None
elif method == 'avg_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT AVG({column}) as avg_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['avg_value']) if row and row['avg_value'] is not None else None
elif method == 'sum_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT SUM({column}) as sum_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['sum_value']) if row and row['sum_value'] is not None else None
elif method == 'count_7d':
days_ago = date.today() - timedelta(days=7)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT COUNT(*) as count_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
""", params)
row = cur.fetchone()
return float(row['count_value']) if row else 0.0
elif method == 'count_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT COUNT(*) as count_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
""", params)
row = cur.fetchone()
return float(row['count_value']) if row else 0.0
elif method == 'min_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT MIN({column}) as min_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['min_value']) if row and row['min_value'] is not None else None
elif method == 'max_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT MAX({column}) as max_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['max_value']) if row and row['max_value'] is not None else None
elif method == 'avg_per_week_30d':
# Average count per week over 30 days
# Use case: Training frequency per week (smoothed over 4.3 weeks)
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT COUNT(*) as count_value FROM {table}
WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
""", params)
row = cur.fetchone()
if row and row['count_value'] is not None:
# 30 days = 4.285 weeks (30/7)
return round(float(row['count_value']) / 4.285, 2)
return None
else:
print(f"[WARNING] Unknown aggregation method: {method}")
return None
except Exception as e:
# Log detailed error for debugging
print(f"[ERROR] Failed to fetch value from {table}.{column} using {method}: {e}")
print(f"[ERROR] Filter conditions: {filter_conditions}")
print(f"[ERROR] Filter SQL: {filter_sql}")
print(f"[ERROR] Filter params: {filter_params}")
# CRITICAL: Rollback transaction to avoid InFailedSqlTransaction errors
try:
conn.rollback()
print(f"[INFO] Transaction rolled back after query error")
except Exception as rollback_err:
print(f"[WARNING] Rollback failed: {rollback_err}")
# Return None so goal creation can continue without current_value
# (current_value will be NULL in the goal record)
return None
def _execute_calculation_formula(conn, profile_id: str, formula_json: str) -> Optional[float]:
"""
Execute complex calculation formula.
Currently supports:
- lean_mass: weight - (weight * body_fat_pct / 100)
Future: Parse JSON formula and execute dynamically.
Args:
conn: Database connection
profile_id: User's profile ID
formula_json: JSON string with calculation config
Returns:
Calculated value or None
"""
try:
formula = json.loads(formula_json)
calc_type = formula.get('type')
if calc_type == 'lean_mass':
# Get dependencies
cur = get_cursor(conn)
cur.execute("""
SELECT weight FROM weight_log
WHERE profile_id = %s
ORDER BY date DESC LIMIT 1
""", (profile_id,))
weight_row = cur.fetchone()
cur.execute("""
SELECT body_fat_pct FROM caliper_log
WHERE profile_id = %s
ORDER BY date DESC LIMIT 1
""", (profile_id,))
bf_row = cur.fetchone()
if weight_row and bf_row:
weight = float(weight_row['weight'])
bf_pct = float(bf_row['body_fat_pct'])
lean_mass = weight - (weight * bf_pct / 100.0)
return round(lean_mass, 2)
return None
else:
print(f"[WARNING] Unknown calculation type: {calc_type}")
return None
except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
print(f"[ERROR] Formula execution failed: {e}, formula={formula_json}")
return None
# Future V2 Implementation (commented out for reference):
"""
def get_focus_weights_v2(conn, profile_id: str) -> Dict[str, float]:
'''V2: Read from focus_areas table with custom user weights'''
cur = get_cursor(conn)
cur.execute('''
SELECT weight_loss_pct, muscle_gain_pct, endurance_pct,
strength_pct, flexibility_pct, health_pct
FROM focus_areas
WHERE profile_id = %s AND active = true
LIMIT 1
''', (profile_id,))
row = cur.fetchone()
if not row:
# Fallback to V1 behavior
return get_focus_weights(conn, profile_id)
# Convert percentages to weights (0-1 range)
return {
'weight_loss': row['weight_loss_pct'] / 100.0,
'muscle_gain': row['muscle_gain_pct'] / 100.0,
'endurance': row['endurance_pct'] / 100.0,
'strength': row['strength_pct'] / 100.0,
'flexibility': row['flexibility_pct'] / 100.0,
'health': row['health_pct'] / 100.0
}
"""
def get_active_goals(profile_id: str) -> List[Dict]:
"""
Get all active goals for a profile.
Returns list of goal dicts with id, type, target_value, current_value, etc.
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT id, goal_type, name, target_value, target_date,
current_value, start_value, start_date, progress_pct,
status, is_primary, created_at
FROM goals
WHERE profile_id = %s
AND status IN ('active', 'in_progress')
ORDER BY is_primary DESC, created_at DESC
""", (profile_id,))
return [dict(row) for row in cur.fetchall()]
def get_goal_by_id(goal_id: str) -> Optional[Dict]:
"""Get a single goal by ID"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT id, profile_id, goal_type, target_value, target_date,
current_value, start_value, progress_pct, status, is_primary
FROM goals
WHERE id = %s
""", (goal_id,))
row = cur.fetchone()
return dict(row) if row else None