- PostgreSQL returns numeric values as Decimal objects - psycopg2.Json() cannot serialize Decimal to JSON - Added convert_decimals() helper function - Converts activity_data, context, and evaluation_result before saving Fixes: Batch evaluation errors (31 errors 'Decimal is not JSON serializable') Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
288 lines
7.4 KiB
Python
288 lines
7.4 KiB
Python
"""
|
|
Training Type Profiles - Helper Functions
|
|
Utilities for loading parameters, profiles, and running evaluations.
|
|
|
|
Issue: #15
|
|
Date: 2026-03-23
|
|
"""
|
|
from typing import Dict, Optional, List
|
|
from decimal import Decimal
|
|
import logging
|
|
|
|
from db import get_cursor
|
|
from profile_evaluator import TrainingProfileEvaluator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def convert_decimals(obj):
|
|
"""
|
|
Recursively converts Decimal objects to float for JSON serialization.
|
|
|
|
PostgreSQL returns numeric values as Decimal, but psycopg2.Json() can't serialize them.
|
|
"""
|
|
if isinstance(obj, Decimal):
|
|
return float(obj)
|
|
elif isinstance(obj, dict):
|
|
return {k: convert_decimals(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [convert_decimals(item) for item in obj]
|
|
return obj
|
|
|
|
|
|
def load_parameters_registry(cur) -> Dict[str, Dict]:
|
|
"""
|
|
Loads training parameters registry from database.
|
|
|
|
Returns:
|
|
Dict mapping parameter_key -> config
|
|
"""
|
|
cur.execute("""
|
|
SELECT key, name_de, name_en, category, data_type, unit,
|
|
description_de, source_field, validation_rules
|
|
FROM training_parameters
|
|
WHERE is_active = true
|
|
""")
|
|
|
|
registry = {}
|
|
for row in cur.fetchall():
|
|
registry[row['key']] = dict(row)
|
|
|
|
return registry
|
|
|
|
|
|
def load_training_type_profile(cur, training_type_id: int) -> Optional[Dict]:
|
|
"""
|
|
Loads training type profile for a given type ID.
|
|
|
|
Returns:
|
|
Profile JSONB or None if not configured
|
|
"""
|
|
cur.execute(
|
|
"SELECT profile FROM training_types WHERE id = %s",
|
|
(training_type_id,)
|
|
)
|
|
row = cur.fetchone()
|
|
|
|
if row and row['profile']:
|
|
return row['profile']
|
|
|
|
return None
|
|
|
|
|
|
def load_evaluation_context(
|
|
cur,
|
|
profile_id: str,
|
|
activity_date: str,
|
|
lookback_days: int = 30
|
|
) -> Dict:
|
|
"""
|
|
Loads context data for evaluation (user profile + recent activities).
|
|
|
|
Args:
|
|
cur: Database cursor
|
|
profile_id: User profile ID
|
|
activity_date: Date of activity being evaluated
|
|
lookback_days: How many days of history to load
|
|
|
|
Returns:
|
|
{
|
|
"user_profile": {...},
|
|
"recent_activities": [...],
|
|
"historical_activities": [...]
|
|
}
|
|
"""
|
|
# Load user profile
|
|
cur.execute(
|
|
"SELECT hf_max, sleep_goal_minutes FROM profiles WHERE id = %s",
|
|
(profile_id,)
|
|
)
|
|
user_row = cur.fetchone()
|
|
user_profile = dict(user_row) if user_row else {}
|
|
|
|
# Load recent activities (last N days)
|
|
cur.execute("""
|
|
SELECT id, date, training_type_id, duration_min, hr_avg, hr_max,
|
|
distance_km, kcal_active, rpe
|
|
FROM activity_log
|
|
WHERE profile_id = %s
|
|
AND date >= %s::date - INTERVAL '%s days'
|
|
AND date < %s::date
|
|
ORDER BY date DESC
|
|
LIMIT 50
|
|
""", (profile_id, activity_date, lookback_days, activity_date))
|
|
|
|
recent_activities = [dict(r) for r in cur.fetchall()]
|
|
|
|
# Historical activities (same for MVP)
|
|
historical_activities = recent_activities
|
|
|
|
return {
|
|
"user_profile": user_profile,
|
|
"recent_activities": recent_activities,
|
|
"historical_activities": historical_activities
|
|
}
|
|
|
|
|
|
def evaluate_and_save_activity(
|
|
cur,
|
|
activity_id: str,
|
|
activity_data: Dict,
|
|
training_type_id: int,
|
|
profile_id: str
|
|
) -> Optional[Dict]:
|
|
"""
|
|
Evaluates an activity and saves the result to the database.
|
|
|
|
Args:
|
|
cur: Database cursor
|
|
activity_id: Activity ID
|
|
activity_data: Activity data dict
|
|
training_type_id: Training type ID
|
|
profile_id: User profile ID
|
|
|
|
Returns:
|
|
Evaluation result or None if no profile configured
|
|
"""
|
|
# Load profile
|
|
profile = load_training_type_profile(cur, training_type_id)
|
|
if not profile:
|
|
logger.info(f"[EVALUATION] No profile for training_type {training_type_id}, skipping")
|
|
return None
|
|
|
|
# Load parameters registry
|
|
parameters = load_parameters_registry(cur)
|
|
|
|
# Load context
|
|
context = load_evaluation_context(
|
|
cur,
|
|
profile_id,
|
|
activity_data.get("date"),
|
|
lookback_days=30
|
|
)
|
|
|
|
# Convert Decimal values in activity_data and context
|
|
activity_data_clean = convert_decimals(activity_data)
|
|
context_clean = convert_decimals(context)
|
|
|
|
# Evaluate
|
|
evaluator = TrainingProfileEvaluator(parameters)
|
|
evaluation_result = evaluator.evaluate_activity(
|
|
activity_data_clean,
|
|
profile,
|
|
context_clean
|
|
)
|
|
|
|
# Save to database
|
|
from psycopg2.extras import Json
|
|
|
|
# Convert Decimal to float for JSON serialization
|
|
evaluation_result_clean = convert_decimals(evaluation_result)
|
|
|
|
cur.execute("""
|
|
UPDATE activity_log
|
|
SET evaluation = %s,
|
|
quality_label = %s,
|
|
overall_score = %s
|
|
WHERE id = %s
|
|
""", (
|
|
Json(evaluation_result_clean),
|
|
evaluation_result_clean.get("quality_label"),
|
|
evaluation_result_clean.get("overall_score"),
|
|
activity_id
|
|
))
|
|
|
|
logger.info(
|
|
f"[EVALUATION] Activity {activity_id}: "
|
|
f"{evaluation_result.get('quality_label')} "
|
|
f"(score: {evaluation_result.get('overall_score')})"
|
|
)
|
|
|
|
return evaluation_result
|
|
|
|
|
|
def batch_evaluate_activities(
|
|
cur,
|
|
profile_id: str,
|
|
limit: Optional[int] = None
|
|
) -> Dict:
|
|
"""
|
|
Re-evaluates all activities for a user.
|
|
|
|
Useful for:
|
|
- Initial setup after profiles are configured
|
|
- Re-evaluation after profile changes
|
|
|
|
Args:
|
|
cur: Database cursor
|
|
profile_id: User profile ID
|
|
limit: Optional limit for testing
|
|
|
|
Returns:
|
|
{
|
|
"total": int,
|
|
"evaluated": int,
|
|
"skipped": int,
|
|
"errors": int
|
|
}
|
|
"""
|
|
# Load all activities
|
|
query = """
|
|
SELECT id, profile_id, date, training_type_id, duration_min,
|
|
hr_avg, hr_max, distance_km, kcal_active, kcal_resting,
|
|
rpe, pace_min_per_km, cadence, elevation_gain
|
|
FROM activity_log
|
|
WHERE profile_id = %s
|
|
ORDER BY date DESC
|
|
"""
|
|
params = [profile_id]
|
|
|
|
if limit:
|
|
query += " LIMIT %s"
|
|
params.append(limit)
|
|
|
|
cur.execute(query, params)
|
|
activities = cur.fetchall()
|
|
|
|
stats = {
|
|
"total": len(activities),
|
|
"evaluated": 0,
|
|
"skipped": 0,
|
|
"errors": 0
|
|
}
|
|
|
|
# Track error details
|
|
error_details = []
|
|
|
|
for activity in activities:
|
|
activity_dict = dict(activity)
|
|
try:
|
|
result = evaluate_and_save_activity(
|
|
cur,
|
|
activity_dict["id"],
|
|
activity_dict,
|
|
activity_dict["training_type_id"],
|
|
profile_id
|
|
)
|
|
|
|
if result:
|
|
stats["evaluated"] += 1
|
|
else:
|
|
stats["skipped"] += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"[BATCH-EVAL] Error evaluating {activity_dict['id']}: {e}")
|
|
error_details.append({
|
|
"activity_id": activity_dict['id'],
|
|
"training_type_id": activity_dict.get('training_type_id'),
|
|
"error": str(e)
|
|
})
|
|
stats["errors"] += 1
|
|
|
|
# Add error details to stats (limit to first 10)
|
|
if error_details:
|
|
stats["error_details"] = error_details[:10]
|
|
|
|
logger.info(f"[BATCH-EVAL] Completed: {stats}")
|
|
return stats
|