## Implemented
### DB-Schema (Migrations)
- Migration 013: training_parameters table (16 standard parameters)
- Migration 014: training_types.profile + activity_log.evaluation columns
- Performance metric calculations (avg_hr_percent, kcal_per_km)
### Backend - Rule Engine
- RuleEvaluator: Generic rule evaluation with 9 operators
- gte, lte, gt, lt, eq, neq, between, in, not_in
- Weighted scoring system
- Pass strategies: all_must_pass, weighted_score, at_least_n
- IntensityZoneEvaluator: HR zone analysis
- TrainingEffectsEvaluator: Abilities development
### Backend - Master Evaluator
- TrainingProfileEvaluator: 7-dimensional evaluation
1. Minimum Requirements (Quality Gates)
2. Intensity Zones (HR zones)
3. Training Effects (Abilities)
4. Periodization (Frequency & Recovery)
5. Performance Indicators (KPIs)
6. Safety (Warnings)
7. AI Context (simplified for MVP)
- evaluation_helper.py: Utilities for loading + saving
- routers/evaluation.py: API endpoints
- POST /api/evaluation/activity/{id}
- POST /api/evaluation/batch
- GET /api/evaluation/parameters
### Integration
- main.py: Router registration
## TODO (Phase 1.2)
- Auto-evaluation on activity INSERT/UPDATE
- Admin-UI for profile editing
- User-UI for results display
## Testing
- ✅ Syntax checks passed
- 🔲 Runtime testing pending (after auto-evaluation)
Part of Issue #15 - Training Type Profiles System
350 lines
12 KiB
Python
350 lines
12 KiB
Python
"""
|
|
Training Type Profiles - Master Evaluator
|
|
Comprehensive activity evaluation across all 7 dimensions.
|
|
|
|
Issue: #15
|
|
Date: 2026-03-23
|
|
"""
|
|
from typing import Dict, Optional, List
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
from rule_engine import RuleEvaluator, IntensityZoneEvaluator, TrainingEffectsEvaluator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TrainingProfileEvaluator:
|
|
"""
|
|
Master class for comprehensive activity evaluation.
|
|
|
|
Evaluates an activity against a training type profile across 7 dimensions:
|
|
1. Minimum Requirements (Quality Gates)
|
|
2. Intensity Zones (HR zones)
|
|
3. Training Effects (Abilities)
|
|
4. Periodization (Frequency & Recovery)
|
|
5. Performance Indicators (KPIs)
|
|
6. Safety (Warnings)
|
|
7. AI Context
|
|
"""
|
|
|
|
def __init__(self, parameters_registry: Dict[str, Dict]):
|
|
"""
|
|
Initialize evaluator with parameter registry.
|
|
|
|
Args:
|
|
parameters_registry: Dict mapping parameter_key -> config
|
|
"""
|
|
self.parameters_registry = parameters_registry
|
|
self.rule_evaluator = RuleEvaluator()
|
|
self.zone_evaluator = IntensityZoneEvaluator()
|
|
self.effects_evaluator = TrainingEffectsEvaluator()
|
|
|
|
def evaluate_activity(
|
|
self,
|
|
activity: Dict,
|
|
training_type_profile: Optional[Dict],
|
|
context: Optional[Dict] = None
|
|
) -> Dict:
|
|
"""
|
|
Complete evaluation of an activity against its training type profile.
|
|
|
|
Args:
|
|
activity: Activity data dictionary
|
|
training_type_profile: Training type profile (JSONB)
|
|
context: {
|
|
"user_profile": {...},
|
|
"recent_activities": [...],
|
|
"historical_activities": [...]
|
|
}
|
|
|
|
Returns:
|
|
{
|
|
"evaluated_at": ISO timestamp,
|
|
"profile_version": str,
|
|
"rule_set_results": {
|
|
"minimum_requirements": {...},
|
|
"intensity_zones": {...},
|
|
"training_effects": {...},
|
|
"periodization": {...},
|
|
"performance_indicators": {...},
|
|
"safety": {...}
|
|
},
|
|
"overall_score": float (0-1),
|
|
"quality_label": str,
|
|
"recommendations": [str],
|
|
"warnings": [str]
|
|
}
|
|
"""
|
|
# No profile? Return unvalidated result
|
|
if not training_type_profile:
|
|
return self._create_unvalidated_result()
|
|
|
|
rule_sets = training_type_profile.get("rule_sets", {})
|
|
context = context or {}
|
|
|
|
results = {
|
|
"evaluated_at": datetime.now().isoformat(),
|
|
"profile_version": training_type_profile.get("version", "unknown"),
|
|
"rule_set_results": {}
|
|
}
|
|
|
|
# ━━━ 1. MINIMUM REQUIREMENTS ━━━
|
|
if "minimum_requirements" in rule_sets:
|
|
results["rule_set_results"]["minimum_requirements"] = \
|
|
self.rule_evaluator.evaluate_rule_set(
|
|
rule_sets["minimum_requirements"],
|
|
activity,
|
|
self.parameters_registry
|
|
)
|
|
|
|
# ━━━ 2. INTENSITY ZONES ━━━
|
|
if "intensity_zones" in rule_sets:
|
|
results["rule_set_results"]["intensity_zones"] = \
|
|
self.zone_evaluator.evaluate(
|
|
rule_sets["intensity_zones"],
|
|
activity,
|
|
context.get("user_profile", {})
|
|
)
|
|
|
|
# ━━━ 3. TRAINING EFFECTS ━━━
|
|
if "training_effects" in rule_sets:
|
|
results["rule_set_results"]["training_effects"] = \
|
|
self.effects_evaluator.evaluate(
|
|
rule_sets["training_effects"],
|
|
activity,
|
|
results["rule_set_results"].get("intensity_zones")
|
|
)
|
|
|
|
# ━━━ 4. PERIODIZATION ━━━
|
|
if "periodization" in rule_sets:
|
|
results["rule_set_results"]["periodization"] = \
|
|
self._evaluate_periodization(
|
|
rule_sets["periodization"],
|
|
activity,
|
|
context.get("recent_activities", [])
|
|
)
|
|
|
|
# ━━━ 5. PERFORMANCE INDICATORS ━━━
|
|
if "performance_indicators" in rule_sets:
|
|
results["rule_set_results"]["performance_indicators"] = \
|
|
self._evaluate_performance(
|
|
rule_sets["performance_indicators"],
|
|
activity,
|
|
context.get("historical_activities", [])
|
|
)
|
|
|
|
# ━━━ 6. SAFETY WARNINGS ━━━
|
|
if "safety" in rule_sets:
|
|
results["rule_set_results"]["safety"] = \
|
|
self._evaluate_safety(
|
|
rule_sets["safety"],
|
|
activity
|
|
)
|
|
|
|
# ━━━ OVERALL SCORE & QUALITY LABEL ━━━
|
|
overall_score = self._calculate_overall_score(results["rule_set_results"])
|
|
results["overall_score"] = overall_score
|
|
results["quality_label"] = self._get_quality_label(overall_score)
|
|
|
|
# ━━━ RECOMMENDATIONS & WARNINGS ━━━
|
|
results["recommendations"] = self._generate_recommendations(results)
|
|
results["warnings"] = self._collect_warnings(results)
|
|
|
|
return results
|
|
|
|
def _create_unvalidated_result(self) -> Dict:
|
|
"""Creates result for activities without profile."""
|
|
return {
|
|
"evaluated_at": datetime.now().isoformat(),
|
|
"profile_version": None,
|
|
"rule_set_results": {},
|
|
"overall_score": None,
|
|
"quality_label": None,
|
|
"recommendations": ["Kein Trainingsprofil konfiguriert"],
|
|
"warnings": []
|
|
}
|
|
|
|
def _evaluate_periodization(
|
|
self,
|
|
config: Dict,
|
|
activity: Dict,
|
|
recent_activities: List[Dict]
|
|
) -> Dict:
|
|
"""
|
|
Evaluates periodization compliance (frequency & recovery).
|
|
|
|
Simplified for MVP - full implementation later.
|
|
"""
|
|
if not config.get("enabled", False):
|
|
return {"enabled": False}
|
|
|
|
# Basic frequency check
|
|
training_type_id = activity.get("training_type_id")
|
|
same_type_this_week = sum(
|
|
1 for a in recent_activities
|
|
if a.get("training_type_id") == training_type_id
|
|
)
|
|
|
|
frequency_config = config.get("frequency", {})
|
|
optimal = frequency_config.get("per_week_optimal", 3)
|
|
|
|
return {
|
|
"enabled": True,
|
|
"weekly_count": same_type_this_week,
|
|
"optimal_count": optimal,
|
|
"frequency_status": "optimal" if same_type_this_week <= optimal else "over_optimal",
|
|
"recovery_adequate": True, # Simplified for MVP
|
|
"warning": None
|
|
}
|
|
|
|
def _evaluate_performance(
|
|
self,
|
|
config: Dict,
|
|
activity: Dict,
|
|
historical_activities: List[Dict]
|
|
) -> Dict:
|
|
"""
|
|
Evaluates performance development.
|
|
|
|
Simplified for MVP - full implementation later.
|
|
"""
|
|
if not config.get("enabled", False):
|
|
return {"enabled": False}
|
|
|
|
return {
|
|
"enabled": True,
|
|
"trend": "stable", # Simplified
|
|
"metrics_comparison": {},
|
|
"benchmark_level": "intermediate"
|
|
}
|
|
|
|
def _evaluate_safety(self, config: Dict, activity: Dict) -> Dict:
|
|
"""
|
|
Evaluates safety warnings.
|
|
"""
|
|
if not config.get("enabled", False):
|
|
return {"enabled": False, "warnings": []}
|
|
|
|
warnings_config = config.get("warnings", [])
|
|
triggered_warnings = []
|
|
|
|
for warning_rule in warnings_config:
|
|
param_key = warning_rule.get("parameter")
|
|
operator = warning_rule.get("operator")
|
|
threshold = warning_rule.get("value")
|
|
severity = warning_rule.get("severity", "medium")
|
|
message = warning_rule.get("message", "")
|
|
|
|
actual_value = activity.get(param_key)
|
|
|
|
if actual_value is not None:
|
|
operator_func = RuleEvaluator.OPERATORS.get(operator)
|
|
if operator_func and operator_func(actual_value, threshold):
|
|
triggered_warnings.append({
|
|
"severity": severity,
|
|
"message": message,
|
|
"parameter": param_key,
|
|
"actual_value": actual_value,
|
|
"threshold": threshold
|
|
})
|
|
|
|
return {
|
|
"enabled": True,
|
|
"warnings": triggered_warnings
|
|
}
|
|
|
|
def _calculate_overall_score(self, rule_set_results: Dict) -> float:
|
|
"""
|
|
Calculates weighted overall score.
|
|
|
|
Weights:
|
|
- Minimum Requirements: 40%
|
|
- Intensity Zones: 20%
|
|
- Periodization: 20%
|
|
- Performance: 10%
|
|
- Training Effects: 10%
|
|
"""
|
|
weights = {
|
|
"minimum_requirements": 0.4,
|
|
"intensity_zones": 0.2,
|
|
"periodization": 0.2,
|
|
"performance_indicators": 0.1,
|
|
"training_effects": 0.1
|
|
}
|
|
|
|
total_score = 0.0
|
|
total_weight = 0.0
|
|
|
|
for rule_set_name, weight in weights.items():
|
|
result = rule_set_results.get(rule_set_name)
|
|
if result and result.get("enabled"):
|
|
score = result.get("score", 0.5)
|
|
|
|
# Special handling for different result types
|
|
if rule_set_name == "intensity_zones":
|
|
score = result.get("duration_quality", 0.5)
|
|
elif rule_set_name == "periodization":
|
|
score = 1.0 if result.get("recovery_adequate", False) else 0.5
|
|
|
|
total_score += score * weight
|
|
total_weight += weight
|
|
|
|
return round(total_score / total_weight, 2) if total_weight > 0 else 0.5
|
|
|
|
def _get_quality_label(self, score: Optional[float]) -> Optional[str]:
|
|
"""Converts score to quality label."""
|
|
if score is None:
|
|
return None
|
|
|
|
if score >= 0.9:
|
|
return "excellent"
|
|
elif score >= 0.7:
|
|
return "good"
|
|
elif score >= 0.5:
|
|
return "acceptable"
|
|
else:
|
|
return "poor"
|
|
|
|
def _generate_recommendations(self, results: Dict) -> List[str]:
|
|
"""Generates actionable recommendations."""
|
|
recommendations = []
|
|
|
|
# Check minimum requirements
|
|
min_req = results["rule_set_results"].get("minimum_requirements", {})
|
|
if min_req.get("enabled") and not min_req.get("passed"):
|
|
for failed in min_req.get("failed_rules", []):
|
|
param = failed.get("parameter")
|
|
actual = failed.get("actual_value")
|
|
expected = failed.get("expected_value")
|
|
reason = failed.get("reason", "")
|
|
symbol = failed.get("operator_symbol", "")
|
|
|
|
recommendations.append(
|
|
f"{param}: {actual} {symbol} {expected} - {reason}"
|
|
)
|
|
|
|
# Check intensity zones
|
|
zone_result = results["rule_set_results"].get("intensity_zones", {})
|
|
if zone_result.get("enabled") and zone_result.get("recommendation"):
|
|
recommendations.append(zone_result["recommendation"])
|
|
|
|
# Default recommendation if excellent
|
|
if results.get("quality_label") == "excellent" and not recommendations:
|
|
recommendations.append("Hervorragendes Training! Weiter so.")
|
|
|
|
return recommendations
|
|
|
|
def _collect_warnings(self, results: Dict) -> List[str]:
|
|
"""Collects all warnings from safety checks."""
|
|
safety_result = results["rule_set_results"].get("safety", {})
|
|
if not safety_result.get("enabled"):
|
|
return []
|
|
|
|
warnings = []
|
|
for warning in safety_result.get("warnings", []):
|
|
severity_icon = "🔴" if warning["severity"] == "high" else "⚠️"
|
|
warnings.append(f"{severity_icon} {warning['message']}")
|
|
|
|
return warnings
|