mitai-jinkendo/backend/goal_utils.py
Lars 2303c04123
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 14s
feat: filtered goal types - count specific training types
NEW FEATURE: Filter conditions for goal types
Enables counting/aggregating specific subsets of data.

Example use case: Count only strength training sessions per week
- Create goal type with filter: {"training_type": "strength"}
- count_7d now counts only strength training, not all activities

Implementation:
- Migration 026: filter_conditions JSONB column
- Backend: Dynamic WHERE clause building from JSON filters
- Supports single value: {"training_type": "strength"}
- Supports multiple values: {"training_type": ["strength", "hiit"]}
- Works with all 8 aggregation methods (count, avg, sum, min, max)
- Frontend: JSON textarea with example + validation
- Pydantic models: filter_conditions field added

Technical details:
- SQL injection safe (parameterized queries)
- Graceful degradation (invalid JSON ignored with warning)
- Backward compatible (NULL filters = no filtering)

Answers user question: 'Kann ich Trainingstypen wie Krafttraining separat zählen?'
Answer: YES! 🎯
2026-03-27 08:14:22 +01:00

460 lines
16 KiB
Python

"""
Goal Utilities - Abstraction Layer for Focus Weights & Universal Value Fetcher
This module provides:
1. Abstraction layer between goal modes and focus weights (Phase 1)
2. Universal value fetcher for dynamic goal types (Phase 1.5)
Version History:
- V1 (Phase 1): Maps goal_mode to predefined weights
- V1.5 (Phase 1.5): Universal value fetcher for DB-registry goal types
- V2 (future): Reads from focus_areas table with custom user weights
Part of Phase 1 + Phase 1.5: Flexible Goal System
"""
from typing import Dict, Optional, Any
from datetime import date, timedelta
from decimal import Decimal
import json
from db import get_cursor
def get_focus_weights(conn, profile_id: str) -> Dict[str, float]:
"""
Get focus area weights for a profile.
This is an abstraction layer that will evolve:
- V1 (now): Maps goal_mode → predefined weights
- V2 (later): Reads from focus_areas table → custom user weights
Args:
conn: Database connection
profile_id: User's profile ID
Returns:
Dict with focus weights (sum = 1.0):
{
'weight_loss': 0.3, # Fat loss priority
'muscle_gain': 0.2, # Muscle gain priority
'strength': 0.25, # Strength training priority
'endurance': 0.25, # Cardio/endurance priority
'flexibility': 0.0, # Mobility priority
'health': 0.0 # General health maintenance
}
Example Usage in Phase 0b:
weights = get_focus_weights(conn, profile_id)
# Score calculation considers user's focus
overall_score = (
body_score * weights['weight_loss'] +
strength_score * weights['strength'] +
cardio_score * weights['endurance']
)
"""
cur = get_cursor(conn)
# Fetch current goal_mode
cur.execute(
"SELECT goal_mode FROM profiles WHERE id = %s",
(profile_id,)
)
row = cur.fetchone()
if not row:
# Fallback: balanced health focus
return {
'weight_loss': 0.0,
'muscle_gain': 0.0,
'strength': 0.0,
'endurance': 0.0,
'flexibility': 0.0,
'health': 1.0
}
goal_mode = row['goal_mode'] or 'health'
# V1: Predefined weight mappings per goal_mode
# These represent "typical" focus distributions for each mode
WEIGHT_MAPPINGS = {
'weight_loss': {
'weight_loss': 0.60, # Primary: fat loss
'endurance': 0.20, # Support: cardio for calorie burn
'muscle_gain': 0.0, # Not compatible
'strength': 0.10, # Maintain muscle during deficit
'flexibility': 0.05, # Minor: mobility work
'health': 0.05 # Minor: general wellness
},
'strength': {
'strength': 0.50, # Primary: strength gains
'muscle_gain': 0.40, # Support: hypertrophy
'endurance': 0.10, # Minor: work capacity
'weight_loss': 0.0, # Not compatible with strength focus
'flexibility': 0.0,
'health': 0.0
},
'endurance': {
'endurance': 0.70, # Primary: aerobic capacity
'health': 0.20, # Support: cardiovascular health
'flexibility': 0.10, # Support: mobility for running
'weight_loss': 0.0,
'muscle_gain': 0.0,
'strength': 0.0
},
'recomposition': {
'weight_loss': 0.30, # Equal: lose fat
'muscle_gain': 0.30, # Equal: gain muscle
'strength': 0.25, # Support: progressive overload
'endurance': 0.10, # Minor: conditioning
'flexibility': 0.05, # Minor: mobility
'health': 0.0
},
'health': {
'health': 0.50, # Primary: general wellness
'endurance': 0.20, # Support: cardio health
'flexibility': 0.15, # Support: mobility
'strength': 0.10, # Support: functional strength
'weight_loss': 0.05, # Minor: maintain healthy weight
'muscle_gain': 0.0
}
}
return WEIGHT_MAPPINGS.get(goal_mode, WEIGHT_MAPPINGS['health'])
def get_primary_focus(conn, profile_id: str) -> str:
"""
Get the primary focus area for a profile.
Returns the focus area with the highest weight.
Useful for UI labels and simple decision logic.
Args:
conn: Database connection
profile_id: User's profile ID
Returns:
Primary focus area name (e.g., 'weight_loss', 'strength')
"""
weights = get_focus_weights(conn, profile_id)
return max(weights.items(), key=lambda x: x[1])[0]
def get_focus_description(focus_area: str) -> str:
"""
Get human-readable description for a focus area.
Args:
focus_area: Focus area key (e.g., 'weight_loss')
Returns:
German description for UI display
"""
descriptions = {
'weight_loss': 'Gewichtsreduktion & Fettabbau',
'muscle_gain': 'Muskelaufbau & Hypertrophie',
'strength': 'Kraftsteigerung & Performance',
'endurance': 'Ausdauer & aerobe Kapazität',
'flexibility': 'Beweglichkeit & Mobilität',
'health': 'Allgemeine Gesundheit & Erhaltung'
}
return descriptions.get(focus_area, focus_area)
# ============================================================================
# Phase 1.5: Universal Value Fetcher for Dynamic Goal Types
# ============================================================================
def get_goal_type_config(conn, type_key: str) -> Optional[Dict[str, Any]]:
"""
Get goal type configuration from database registry.
Args:
conn: Database connection
type_key: Goal type key (e.g., 'weight', 'meditation_minutes')
Returns:
Dict with config or None if not found/inactive
"""
cur = get_cursor(conn)
cur.execute("""
SELECT type_key, source_table, source_column, aggregation_method,
calculation_formula, filter_conditions, label_de, unit, icon, category
FROM goal_type_definitions
WHERE type_key = %s AND is_active = true
LIMIT 1
""", (type_key,))
return cur.fetchone()
def get_current_value_for_goal(conn, profile_id: str, goal_type: str) -> Optional[float]:
"""
Universal value fetcher for any goal type.
Reads configuration from goal_type_definitions table and executes
appropriate query based on aggregation_method or calculation_formula.
Args:
conn: Database connection
profile_id: User's profile ID
goal_type: Goal type key (e.g., 'weight', 'meditation_minutes')
Returns:
Current value as float or None if not available
"""
config = get_goal_type_config(conn, goal_type)
if not config:
print(f"[WARNING] Goal type '{goal_type}' not found or inactive")
return None
# Complex calculation (e.g., lean_mass)
if config['calculation_formula']:
return _execute_calculation_formula(conn, profile_id, config['calculation_formula'])
# Simple aggregation
return _fetch_by_aggregation_method(
conn,
profile_id,
config['source_table'],
config['source_column'],
config['aggregation_method'],
config.get('filter_conditions')
)
def _fetch_by_aggregation_method(
conn,
profile_id: str,
table: str,
column: str,
method: str,
filter_conditions: Optional[Any] = None
) -> Optional[float]:
"""
Fetch value using specified aggregation method.
Supported methods:
- latest: Most recent value
- avg_7d: 7-day average
- avg_30d: 30-day average
- sum_30d: 30-day sum
- count_7d: Count of entries in last 7 days
- count_30d: Count of entries in last 30 days
- min_30d: Minimum value in last 30 days
- max_30d: Maximum value in last 30 days
Args:
filter_conditions: Optional JSON filters (e.g., {"training_type": "strength"})
"""
# Guard: source_table/column required for simple aggregation
if not table or not column:
print(f"[WARNING] Missing source_table or source_column for aggregation")
return None
# Build filter SQL from JSON conditions
filter_sql = ""
filter_params = []
if filter_conditions:
try:
if isinstance(filter_conditions, str):
filters = json.loads(filter_conditions)
else:
filters = filter_conditions
for filter_col, filter_val in filters.items():
if isinstance(filter_val, list):
# IN clause for multiple values
placeholders = ', '.join(['%s'] * len(filter_val))
filter_sql += f" AND {filter_col} IN ({placeholders})"
filter_params.extend(filter_val)
else:
# Single value equality
filter_sql += f" AND {filter_col} = %s"
filter_params.append(filter_val)
except (json.JSONDecodeError, TypeError, AttributeError) as e:
print(f"[WARNING] Invalid filter_conditions: {e}, ignoring filters")
cur = get_cursor(conn)
try:
if method == 'latest':
params = [profile_id] + filter_params
cur.execute(f"""
SELECT {column} FROM {table}
WHERE profile_id = %s AND {column} IS NOT NULL{filter_sql}
ORDER BY date DESC LIMIT 1
""", params)
row = cur.fetchone()
return float(row[column]) if row else None
elif method == 'avg_7d':
days_ago = date.today() - timedelta(days=7)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT AVG({column}) as avg_value FROM {table}
WHERE profile_id = %s AND date >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['avg_value']) if row and row['avg_value'] is not None else None
elif method == 'avg_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT AVG({column}) as avg_value FROM {table}
WHERE profile_id = %s AND date >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['avg_value']) if row and row['avg_value'] is not None else None
elif method == 'sum_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT SUM({column}) as sum_value FROM {table}
WHERE profile_id = %s AND date >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['sum_value']) if row and row['sum_value'] is not None else None
elif method == 'count_7d':
days_ago = date.today() - timedelta(days=7)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT COUNT(*) as count_value FROM {table}
WHERE profile_id = %s AND date >= %s{filter_sql}
""", params)
row = cur.fetchone()
return float(row['count_value']) if row else 0.0
elif method == 'count_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT COUNT(*) as count_value FROM {table}
WHERE profile_id = %s AND date >= %s{filter_sql}
""", params)
row = cur.fetchone()
return float(row['count_value']) if row else 0.0
elif method == 'min_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT MIN({column}) as min_value FROM {table}
WHERE profile_id = %s AND date >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['min_value']) if row and row['min_value'] is not None else None
elif method == 'max_30d':
days_ago = date.today() - timedelta(days=30)
params = [profile_id, days_ago] + filter_params
cur.execute(f"""
SELECT MAX({column}) as max_value FROM {table}
WHERE profile_id = %s AND date >= %s AND {column} IS NOT NULL{filter_sql}
""", params)
row = cur.fetchone()
return float(row['max_value']) if row and row['max_value'] is not None else None
else:
print(f"[WARNING] Unknown aggregation method: {method}")
return None
except Exception as e:
print(f"[ERROR] Failed to fetch value from {table}.{column} using {method}: {e}")
return None
def _execute_calculation_formula(conn, profile_id: str, formula_json: str) -> Optional[float]:
"""
Execute complex calculation formula.
Currently supports:
- lean_mass: weight - (weight * body_fat_pct / 100)
Future: Parse JSON formula and execute dynamically.
Args:
conn: Database connection
profile_id: User's profile ID
formula_json: JSON string with calculation config
Returns:
Calculated value or None
"""
try:
formula = json.loads(formula_json)
calc_type = formula.get('type')
if calc_type == 'lean_mass':
# Get dependencies
cur = get_cursor(conn)
cur.execute("""
SELECT weight FROM weight_log
WHERE profile_id = %s
ORDER BY date DESC LIMIT 1
""", (profile_id,))
weight_row = cur.fetchone()
cur.execute("""
SELECT body_fat_pct FROM caliper_log
WHERE profile_id = %s
ORDER BY date DESC LIMIT 1
""", (profile_id,))
bf_row = cur.fetchone()
if weight_row and bf_row:
weight = float(weight_row['weight'])
bf_pct = float(bf_row['body_fat_pct'])
lean_mass = weight - (weight * bf_pct / 100.0)
return round(lean_mass, 2)
return None
else:
print(f"[WARNING] Unknown calculation type: {calc_type}")
return None
except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
print(f"[ERROR] Formula execution failed: {e}, formula={formula_json}")
return None
# Future V2 Implementation (commented out for reference):
"""
def get_focus_weights_v2(conn, profile_id: str) -> Dict[str, float]:
'''V2: Read from focus_areas table with custom user weights'''
cur = get_cursor(conn)
cur.execute('''
SELECT weight_loss_pct, muscle_gain_pct, endurance_pct,
strength_pct, flexibility_pct, health_pct
FROM focus_areas
WHERE profile_id = %s AND active = true
LIMIT 1
''', (profile_id,))
row = cur.fetchone()
if not row:
# Fallback to V1 behavior
return get_focus_weights(conn, profile_id)
# Convert percentages to weights (0-1 range)
return {
'weight_loss': row['weight_loss_pct'] / 100.0,
'muscle_gain': row['muscle_gain_pct'] / 100.0,
'endurance': row['endurance_pct'] / 100.0,
'strength': row['strength_pct'] / 100.0,
'flexibility': row['flexibility_pct'] / 100.0,
'health': row['health_pct'] / 100.0
}
"""