mitai-jinkendo/backend/profile_evaluator.py
Lars 1b9cd6d5e6
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: Training Type Profiles - Phase 1.1 Foundation (#15)
## Implemented

### DB-Schema (Migrations)
- Migration 013: training_parameters table (16 standard parameters)
- Migration 014: training_types.profile + activity_log.evaluation columns
- Performance metric calculations (avg_hr_percent, kcal_per_km)

### Backend - Rule Engine
- RuleEvaluator: Generic rule evaluation with 9 operators
  - gte, lte, gt, lt, eq, neq, between, in, not_in
  - Weighted scoring system
  - Pass strategies: all_must_pass, weighted_score, at_least_n

- IntensityZoneEvaluator: HR zone analysis
- TrainingEffectsEvaluator: Abilities development

### Backend - Master Evaluator
- TrainingProfileEvaluator: 7-dimensional evaluation
  1. Minimum Requirements (Quality Gates)
  2. Intensity Zones (HR zones)
  3. Training Effects (Abilities)
  4. Periodization (Frequency & Recovery)
  5. Performance Indicators (KPIs)
  6. Safety (Warnings)
  7. AI Context (simplified for MVP)

- evaluation_helper.py: Utilities for loading + saving
- routers/evaluation.py: API endpoints
  - POST /api/evaluation/activity/{id}
  - POST /api/evaluation/batch
  - GET /api/evaluation/parameters

### Integration
- main.py: Router registration

## TODO (Phase 1.2)
- Auto-evaluation on activity INSERT/UPDATE
- Admin-UI for profile editing
- User-UI for results display

## Testing
-  Syntax checks passed
- 🔲 Runtime testing pending (after auto-evaluation)

Part of Issue #15 - Training Type Profiles System
2026-03-23 10:49:26 +01:00

350 lines
12 KiB
Python

"""
Training Type Profiles - Master Evaluator
Comprehensive activity evaluation across all 7 dimensions.
Issue: #15
Date: 2026-03-23
"""
from typing import Dict, Optional, List
from datetime import datetime
import logging
from rule_engine import RuleEvaluator, IntensityZoneEvaluator, TrainingEffectsEvaluator
logger = logging.getLogger(__name__)
class TrainingProfileEvaluator:
"""
Master class for comprehensive activity evaluation.
Evaluates an activity against a training type profile across 7 dimensions:
1. Minimum Requirements (Quality Gates)
2. Intensity Zones (HR zones)
3. Training Effects (Abilities)
4. Periodization (Frequency & Recovery)
5. Performance Indicators (KPIs)
6. Safety (Warnings)
7. AI Context
"""
def __init__(self, parameters_registry: Dict[str, Dict]):
"""
Initialize evaluator with parameter registry.
Args:
parameters_registry: Dict mapping parameter_key -> config
"""
self.parameters_registry = parameters_registry
self.rule_evaluator = RuleEvaluator()
self.zone_evaluator = IntensityZoneEvaluator()
self.effects_evaluator = TrainingEffectsEvaluator()
def evaluate_activity(
self,
activity: Dict,
training_type_profile: Optional[Dict],
context: Optional[Dict] = None
) -> Dict:
"""
Complete evaluation of an activity against its training type profile.
Args:
activity: Activity data dictionary
training_type_profile: Training type profile (JSONB)
context: {
"user_profile": {...},
"recent_activities": [...],
"historical_activities": [...]
}
Returns:
{
"evaluated_at": ISO timestamp,
"profile_version": str,
"rule_set_results": {
"minimum_requirements": {...},
"intensity_zones": {...},
"training_effects": {...},
"periodization": {...},
"performance_indicators": {...},
"safety": {...}
},
"overall_score": float (0-1),
"quality_label": str,
"recommendations": [str],
"warnings": [str]
}
"""
# No profile? Return unvalidated result
if not training_type_profile:
return self._create_unvalidated_result()
rule_sets = training_type_profile.get("rule_sets", {})
context = context or {}
results = {
"evaluated_at": datetime.now().isoformat(),
"profile_version": training_type_profile.get("version", "unknown"),
"rule_set_results": {}
}
# ━━━ 1. MINIMUM REQUIREMENTS ━━━
if "minimum_requirements" in rule_sets:
results["rule_set_results"]["minimum_requirements"] = \
self.rule_evaluator.evaluate_rule_set(
rule_sets["minimum_requirements"],
activity,
self.parameters_registry
)
# ━━━ 2. INTENSITY ZONES ━━━
if "intensity_zones" in rule_sets:
results["rule_set_results"]["intensity_zones"] = \
self.zone_evaluator.evaluate(
rule_sets["intensity_zones"],
activity,
context.get("user_profile", {})
)
# ━━━ 3. TRAINING EFFECTS ━━━
if "training_effects" in rule_sets:
results["rule_set_results"]["training_effects"] = \
self.effects_evaluator.evaluate(
rule_sets["training_effects"],
activity,
results["rule_set_results"].get("intensity_zones")
)
# ━━━ 4. PERIODIZATION ━━━
if "periodization" in rule_sets:
results["rule_set_results"]["periodization"] = \
self._evaluate_periodization(
rule_sets["periodization"],
activity,
context.get("recent_activities", [])
)
# ━━━ 5. PERFORMANCE INDICATORS ━━━
if "performance_indicators" in rule_sets:
results["rule_set_results"]["performance_indicators"] = \
self._evaluate_performance(
rule_sets["performance_indicators"],
activity,
context.get("historical_activities", [])
)
# ━━━ 6. SAFETY WARNINGS ━━━
if "safety" in rule_sets:
results["rule_set_results"]["safety"] = \
self._evaluate_safety(
rule_sets["safety"],
activity
)
# ━━━ OVERALL SCORE & QUALITY LABEL ━━━
overall_score = self._calculate_overall_score(results["rule_set_results"])
results["overall_score"] = overall_score
results["quality_label"] = self._get_quality_label(overall_score)
# ━━━ RECOMMENDATIONS & WARNINGS ━━━
results["recommendations"] = self._generate_recommendations(results)
results["warnings"] = self._collect_warnings(results)
return results
def _create_unvalidated_result(self) -> Dict:
"""Creates result for activities without profile."""
return {
"evaluated_at": datetime.now().isoformat(),
"profile_version": None,
"rule_set_results": {},
"overall_score": None,
"quality_label": None,
"recommendations": ["Kein Trainingsprofil konfiguriert"],
"warnings": []
}
def _evaluate_periodization(
self,
config: Dict,
activity: Dict,
recent_activities: List[Dict]
) -> Dict:
"""
Evaluates periodization compliance (frequency & recovery).
Simplified for MVP - full implementation later.
"""
if not config.get("enabled", False):
return {"enabled": False}
# Basic frequency check
training_type_id = activity.get("training_type_id")
same_type_this_week = sum(
1 for a in recent_activities
if a.get("training_type_id") == training_type_id
)
frequency_config = config.get("frequency", {})
optimal = frequency_config.get("per_week_optimal", 3)
return {
"enabled": True,
"weekly_count": same_type_this_week,
"optimal_count": optimal,
"frequency_status": "optimal" if same_type_this_week <= optimal else "over_optimal",
"recovery_adequate": True, # Simplified for MVP
"warning": None
}
def _evaluate_performance(
self,
config: Dict,
activity: Dict,
historical_activities: List[Dict]
) -> Dict:
"""
Evaluates performance development.
Simplified for MVP - full implementation later.
"""
if not config.get("enabled", False):
return {"enabled": False}
return {
"enabled": True,
"trend": "stable", # Simplified
"metrics_comparison": {},
"benchmark_level": "intermediate"
}
def _evaluate_safety(self, config: Dict, activity: Dict) -> Dict:
"""
Evaluates safety warnings.
"""
if not config.get("enabled", False):
return {"enabled": False, "warnings": []}
warnings_config = config.get("warnings", [])
triggered_warnings = []
for warning_rule in warnings_config:
param_key = warning_rule.get("parameter")
operator = warning_rule.get("operator")
threshold = warning_rule.get("value")
severity = warning_rule.get("severity", "medium")
message = warning_rule.get("message", "")
actual_value = activity.get(param_key)
if actual_value is not None:
operator_func = RuleEvaluator.OPERATORS.get(operator)
if operator_func and operator_func(actual_value, threshold):
triggered_warnings.append({
"severity": severity,
"message": message,
"parameter": param_key,
"actual_value": actual_value,
"threshold": threshold
})
return {
"enabled": True,
"warnings": triggered_warnings
}
def _calculate_overall_score(self, rule_set_results: Dict) -> float:
"""
Calculates weighted overall score.
Weights:
- Minimum Requirements: 40%
- Intensity Zones: 20%
- Periodization: 20%
- Performance: 10%
- Training Effects: 10%
"""
weights = {
"minimum_requirements": 0.4,
"intensity_zones": 0.2,
"periodization": 0.2,
"performance_indicators": 0.1,
"training_effects": 0.1
}
total_score = 0.0
total_weight = 0.0
for rule_set_name, weight in weights.items():
result = rule_set_results.get(rule_set_name)
if result and result.get("enabled"):
score = result.get("score", 0.5)
# Special handling for different result types
if rule_set_name == "intensity_zones":
score = result.get("duration_quality", 0.5)
elif rule_set_name == "periodization":
score = 1.0 if result.get("recovery_adequate", False) else 0.5
total_score += score * weight
total_weight += weight
return round(total_score / total_weight, 2) if total_weight > 0 else 0.5
def _get_quality_label(self, score: Optional[float]) -> Optional[str]:
"""Converts score to quality label."""
if score is None:
return None
if score >= 0.9:
return "excellent"
elif score >= 0.7:
return "good"
elif score >= 0.5:
return "acceptable"
else:
return "poor"
def _generate_recommendations(self, results: Dict) -> List[str]:
"""Generates actionable recommendations."""
recommendations = []
# Check minimum requirements
min_req = results["rule_set_results"].get("minimum_requirements", {})
if min_req.get("enabled") and not min_req.get("passed"):
for failed in min_req.get("failed_rules", []):
param = failed.get("parameter")
actual = failed.get("actual_value")
expected = failed.get("expected_value")
reason = failed.get("reason", "")
symbol = failed.get("operator_symbol", "")
recommendations.append(
f"{param}: {actual} {symbol} {expected} - {reason}"
)
# Check intensity zones
zone_result = results["rule_set_results"].get("intensity_zones", {})
if zone_result.get("enabled") and zone_result.get("recommendation"):
recommendations.append(zone_result["recommendation"])
# Default recommendation if excellent
if results.get("quality_label") == "excellent" and not recommendations:
recommendations.append("Hervorragendes Training! Weiter so.")
return recommendations
def _collect_warnings(self, results: Dict) -> List[str]:
"""Collects all warnings from safety checks."""
safety_result = results["rule_set_results"].get("safety", {})
if not safety_result.get("enabled"):
return []
warnings = []
for warning in safety_result.get("warnings", []):
severity_icon = "🔴" if warning["severity"] == "high" else "⚠️"
warnings.append(f"{severity_icon} {warning['message']}")
return warnings