mitai-jinkendo/backend/data_layer/activity_metrics.py
Lars bf84e3c2a5
All checks were successful
Deploy Development / deploy (push) Successful in 51s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat: enhance fitness dashboard with new metrics and insights
- Refactored the `calculate_proxy_internal_load_7d` function to `calculate_proxy_internal_load_window`, allowing for dynamic day range input.
- Introduced new functions for calculating training volume deltas and building fitness progress insights, enhancing user feedback on training metrics.
- Updated the fitness dashboard to include new charts for quality sessions and load monitoring, improving data visualization.
- Integrated these new metrics into the fitness dashboard overview, providing users with comprehensive insights into their training performance.
- Streamlined the router to utilize the new chart-building functions, ensuring consistency and maintainability across the application.
2026-04-20 08:04:50 +02:00

1538 lines
50 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Activity Metrics Data Layer
Provides structured data for training tracking and analysis.
Functions:
- get_activity_summary_data(): Count, total duration, calories, averages
- get_activity_detail_data(): Detailed activity log entries
- get_training_type_distribution_data(): Training category percentages
- get_training_frequency_by_type_data(): Häufigkeit & Intensität pro activity_type
- get_training_inter_session_gap_data(): Pausen zwischen Einheiten (Stunden)
- get_training_sessions_recent_weeks_data(): Wochen-JSON für KI-Kontext
- get_training_parameters_ki_glossary_data(): Parameter-Katalog (Feld, Namen, Beschreibungen) für KI
All functions return structured data (dict) without formatting.
Use placeholder_resolver.py for formatted strings for AI.
Phase 0c: Multi-Layer Architecture
Version: 1.0
"""
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta, date, time
import statistics
from db import get_db, get_cursor, r2d
from data_layer.activity_session_metrics import enrich_sessions_with_metrics
from data_layer.utils import calculate_confidence, safe_float, safe_int, serialize_dates
from data_layer.prompt_output_compact import (
normalize_prompt_number,
session_metrics_list_to_key_value_compact,
)
def get_activity_summary_data(
profile_id: str,
days: int = 14
) -> Dict:
"""
Get activity summary statistics.
Args:
profile_id: User profile ID
days: Analysis window (default 14)
Returns:
{
"activity_count": int,
"total_duration_min": int,
"total_kcal": int,
"avg_duration_min": int,
"avg_kcal_per_session": int,
"sessions_per_week": float,
"confidence": str,
"days_analyzed": int
}
Migration from Phase 0b:
OLD: get_activity_summary(pid, days) formatted string
NEW: Structured data with all metrics
"""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT
COUNT(*) as count,
SUM(duration_min) as total_min,
SUM(kcal_active) as total_kcal
FROM activity_log
WHERE profile_id=%s AND date >= %s""",
(profile_id, cutoff)
)
row = cur.fetchone()
if not row or row['count'] == 0:
return {
"activity_count": 0,
"total_duration_min": 0,
"total_kcal": 0,
"avg_duration_min": 0,
"avg_kcal_per_session": 0,
"sessions_per_week": 0.0,
"confidence": "insufficient",
"days_analyzed": days
}
activity_count = row['count']
total_min = safe_int(row['total_min'])
total_kcal = safe_int(row['total_kcal'])
avg_duration = int(total_min / activity_count) if activity_count > 0 else 0
avg_kcal = int(total_kcal / activity_count) if activity_count > 0 else 0
sessions_per_week = (activity_count / days * 7) if days > 0 else 0.0
confidence = calculate_confidence(activity_count, days, "general")
return {
"activity_count": activity_count,
"total_duration_min": total_min,
"total_kcal": total_kcal,
"avg_duration_min": avg_duration,
"avg_kcal_per_session": avg_kcal,
"sessions_per_week": round(sessions_per_week, 1),
"confidence": confidence,
"days_analyzed": days
}
def get_activity_detail_data(
profile_id: str,
days: int = 14,
limit: int = 50
) -> Dict:
"""
Get detailed activity log entries.
Args:
profile_id: User profile ID
days: Analysis window (default 14)
limit: Maximum entries to return (default 50)
Returns:
{
"activities": [
{
"date": date,
"activity_type": str,
"duration_min": int,
"kcal_active": int,
"hr_avg": int | None,
"training_category": str | None,
"session_metrics": list | None, # EAV (enrich_sessions_with_metrics)
},
...
],
"total_count": int,
"confidence": str,
"days_analyzed": int
}
Migration from Phase 0b:
OLD: get_activity_detail(pid, days) formatted string list
NEW: Structured array with all fields
"""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
cur.execute(
"""SELECT
id,
date,
activity_type,
duration_min,
kcal_active,
hr_avg,
training_category
FROM activity_log
WHERE profile_id=%s AND date >= %s
ORDER BY date DESC
LIMIT %s""",
(profile_id, cutoff, limit),
)
rows = cur.fetchall()
if not rows:
return {
"activities": [],
"total_count": 0,
"confidence": "insufficient",
"days_analyzed": days,
}
activities = []
for row in rows:
activities.append(
{
"id": str(row["id"]),
"date": row["date"],
"activity_type": row["activity_type"],
"duration_min": safe_int(row["duration_min"]),
"kcal_active": safe_int(row["kcal_active"]),
"hr_avg": safe_int(row["hr_avg"]) if row.get("hr_avg") else None,
"training_category": row.get("training_category"),
}
)
enrich_sessions_with_metrics(cur, activities)
confidence = calculate_confidence(len(activities), days, "general")
return {
"activities": activities,
"total_count": len(activities),
"confidence": confidence,
"days_analyzed": days,
}
def get_training_type_distribution_data(
profile_id: str,
days: int = 14
) -> Dict:
"""
Calculate training category distribution.
Args:
profile_id: User profile ID
days: Analysis window (default 14)
Returns:
{
"distribution": [
{
"category": str,
"count": int,
"percentage": float
},
...
],
"total_sessions": int,
"categorized_sessions": int,
"uncategorized_sessions": int,
"confidence": str,
"days_analyzed": int
}
Migration from Phase 0b:
OLD: get_trainingstyp_verteilung(pid, days) top 3 formatted
NEW: Complete distribution with percentages
"""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
# Get categorized activities
cur.execute(
"""SELECT
training_category,
COUNT(*) as count
FROM activity_log
WHERE profile_id=%s
AND date >= %s
AND training_category IS NOT NULL
GROUP BY training_category
ORDER BY count DESC""",
(profile_id, cutoff)
)
rows = cur.fetchall()
# Get total activity count (including uncategorized)
cur.execute(
"""SELECT COUNT(*) as total
FROM activity_log
WHERE profile_id=%s AND date >= %s""",
(profile_id, cutoff)
)
total_row = cur.fetchone()
total_sessions = total_row['total'] if total_row else 0
if not rows or total_sessions == 0:
return {
"distribution": [],
"total_sessions": total_sessions,
"categorized_sessions": 0,
"uncategorized_sessions": total_sessions,
"confidence": "insufficient",
"days_analyzed": days
}
categorized_count = sum(row['count'] for row in rows)
uncategorized_count = total_sessions - categorized_count
distribution = []
for row in rows:
count = row['count']
percentage = (count / total_sessions * 100) if total_sessions > 0 else 0
distribution.append({
"category": row['training_category'],
"count": count,
"percentage": round(percentage, 1)
})
confidence = calculate_confidence(categorized_count, days, "general")
return {
"distribution": distribution,
"total_sessions": total_sessions,
"categorized_sessions": categorized_count,
"uncategorized_sessions": uncategorized_count,
"confidence": confidence,
"days_analyzed": days
}
# ============================================================================
# Calculated Metrics (migrated from calculations/activity_metrics.py)
# ============================================================================
# These functions return simple values for placeholders and scoring.
# Use get_*_data() functions above for structured chart data.
def calculate_training_minutes_week(profile_id: str) -> Optional[int]:
"""Calculate total training minutes last 7 days"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT SUM(duration_min) as total_minutes
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '7 days'
""", (profile_id,))
row = cur.fetchone()
return int(row['total_minutes']) if row and row['total_minutes'] else None
def calculate_training_frequency_7d(profile_id: str) -> Optional[int]:
"""Calculate number of training sessions last 7 days"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT COUNT(*) as session_count
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '7 days'
""", (profile_id,))
row = cur.fetchone()
return int(row['session_count']) if row else None
def calculate_quality_sessions_pct(profile_id: str, days: int = 28) -> Optional[int]:
"""Anteil qualitativ guter Sessions (quality_label) im Zeitfenster ``days``."""
if days < 1:
days = 28
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE quality_label IN ('excellent', 'very_good', 'good')) as quality_count
FROM activity_log
WHERE profile_id = %s
AND date >= %s
""",
(profile_id, cutoff),
)
row = cur.fetchone()
if not row or row["total"] == 0:
return None
pct = (row["quality_count"] / row["total"]) * 100
return int(pct)
# ============================================================================
# A2: Intensity Distribution (Proxy-based)
# ============================================================================
def calculate_intensity_proxy_distribution(profile_id: str) -> Optional[Dict]:
"""
Calculate intensity distribution (proxy until HR zones available)
Returns dict: {'low': X, 'moderate': Y, 'high': Z} in minutes
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT duration_min, hr_avg, hr_max
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
activities = cur.fetchall()
if not activities:
return None
low_min = 0
moderate_min = 0
high_min = 0
for activity in activities:
duration = activity['duration_min']
avg_hr = activity['hr_avg']
max_hr = activity['hr_max']
# Simple proxy classification
if avg_hr:
# Rough HR-based classification (assumes max HR ~190)
if avg_hr < 120:
low_min += duration
elif avg_hr < 150:
moderate_min += duration
else:
high_min += duration
else:
# Fallback: assume moderate
moderate_min += duration
return {
'low': low_min,
'moderate': moderate_min,
'high': high_min
}
# ============================================================================
# A4: Ability Balance Calculations
# ============================================================================
def calculate_ability_balance(profile_id: str) -> Optional[Dict]:
"""
Calculate ability balance from training_types.abilities
Returns dict with scores per ability dimension (0-100)
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT a.duration_min, tt.abilities
FROM activity_log a
JOIN training_types tt ON a.training_category = tt.category
WHERE a.profile_id = %s
AND a.date >= CURRENT_DATE - INTERVAL '28 days'
AND tt.abilities IS NOT NULL
""", (profile_id,))
activities = cur.fetchall()
if not activities:
return None
# Accumulate ability load (duration × ability weight)
ability_loads = {
'strength': 0,
'endurance': 0,
'mental': 0,
'coordination': 0,
'mobility': 0
}
for activity in activities:
duration = activity['duration_min']
abilities = activity['abilities'] # JSONB
if not abilities:
continue
for ability, weight in abilities.items():
if ability in ability_loads:
ability_loads[ability] += duration * weight
# Normalize to 0-100 scale
max_load = max(ability_loads.values()) if ability_loads else 1
if max_load == 0:
return None
normalized = {
ability: int((load / max_load) * 100)
for ability, load in ability_loads.items()
}
return normalized
def calculate_ability_balance_strength(profile_id: str) -> Optional[int]:
"""Get strength ability score"""
balance = calculate_ability_balance(profile_id)
return balance['strength'] if balance else None
def calculate_ability_balance_endurance(profile_id: str) -> Optional[int]:
"""Get endurance ability score"""
balance = calculate_ability_balance(profile_id)
return balance['endurance'] if balance else None
def calculate_ability_balance_mental(profile_id: str) -> Optional[int]:
"""Get mental ability score"""
balance = calculate_ability_balance(profile_id)
return balance['mental'] if balance else None
def calculate_ability_balance_coordination(profile_id: str) -> Optional[int]:
"""Get coordination ability score"""
balance = calculate_ability_balance(profile_id)
return balance['coordination'] if balance else None
def calculate_ability_balance_mobility(profile_id: str) -> Optional[int]:
"""Get mobility ability score"""
balance = calculate_ability_balance(profile_id)
return balance['mobility'] if balance else None
# ============================================================================
# A5: Load Monitoring (Proxy-based)
# ============================================================================
def calculate_proxy_internal_load_window(profile_id: str, days: int = 7) -> Optional[float]:
"""
Proxy-Last über die letzten ``days`` Kalendertage (gleiche Formel wie bisher nur für 7 Tage).
"""
if days < 1:
days = 7
intensity_factors = {'low': 1.0, 'moderate': 1.5, 'high': 2.0}
quality_factors = {
'excellent': 1.15,
'very_good': 1.05,
'good': 1.0,
'acceptable': 0.9,
'poor': 0.75,
'excluded': 0.0
}
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT duration_min, hr_avg, rpe
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - (%s::int * INTERVAL '1 day')
""",
(profile_id, days),
)
activities = cur.fetchall()
if not activities:
return None
total_load = 0
for activity in activities:
duration = activity['duration_min']
avg_hr = activity['hr_avg']
# Map RPE to quality (rpe 8-10 = excellent, 6-7 = good, 4-5 = moderate, <4 = poor)
rpe = activity.get('rpe')
if rpe and rpe >= 8:
quality = 'excellent'
elif rpe and rpe >= 6:
quality = 'good'
elif rpe and rpe >= 4:
quality = 'moderate'
else:
quality = 'good' # default
# Determine intensity
if avg_hr:
if avg_hr < 120:
intensity = 'low'
elif avg_hr < 150:
intensity = 'moderate'
else:
intensity = 'high'
else:
intensity = 'moderate'
load = float(duration) * intensity_factors[intensity] * quality_factors.get(quality, 1.0)
total_load += load
return float(total_load)
def calculate_proxy_internal_load_7d(profile_id: str) -> Optional[float]:
"""Letzte 7 Tage — Kompatibilität mit Platzhaltern / älteren Aufrufern."""
return calculate_proxy_internal_load_window(profile_id, 7)
def calculate_monotony_score(profile_id: str) -> Optional[float]:
"""
Calculate training monotony (last 7 days)
Monotony = mean daily load / std dev daily load
Higher = more monotonous
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT date, SUM(duration_min) as daily_duration
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY date
ORDER BY date
""", (profile_id,))
daily_loads = [float(row['daily_duration']) for row in cur.fetchall() if row['daily_duration']]
if len(daily_loads) < 4:
return None
mean_load = sum(daily_loads) / len(daily_loads)
std_dev = statistics.stdev(daily_loads)
if std_dev == 0:
return None
monotony = mean_load / std_dev
return round(monotony, 2)
def calculate_strain_score(profile_id: str) -> Optional[int]:
"""
Calculate training strain (last 7 days)
Strain = weekly load × monotony
"""
weekly_load = calculate_proxy_internal_load_7d(profile_id)
monotony = calculate_monotony_score(profile_id)
if weekly_load is None or monotony is None:
return None
strain = weekly_load * monotony
return int(strain)
# ============================================================================
# A6: Activity Goal Alignment Score (Dynamic Focus Areas)
# ============================================================================
def calculate_activity_score(profile_id: str, focus_weights: Optional[Dict] = None) -> Optional[int]:
"""
Activity goal alignment score 0-100
Weighted by user's activity-related focus areas
"""
if focus_weights is None:
from data_layer.scores import get_user_focus_weights
focus_weights = get_user_focus_weights(profile_id)
# Activity-related focus areas (English keys from DB); Gewichte float (kein Decimal×float)
strength = float(focus_weights.get('strength', 0) or 0)
strength_endurance = float(focus_weights.get('strength_endurance', 0) or 0)
power = float(focus_weights.get('power', 0) or 0)
total_strength = strength + strength_endurance + power
aerobic = float(focus_weights.get('aerobic_endurance', 0) or 0)
anaerobic = float(focus_weights.get('anaerobic_endurance', 0) or 0)
cardiovascular = float(focus_weights.get('cardiovascular_health', 0) or 0)
total_cardio = aerobic + anaerobic + cardiovascular
flexibility = float(focus_weights.get('flexibility', 0) or 0)
mobility = float(focus_weights.get('mobility', 0) or 0)
balance = float(focus_weights.get('balance', 0) or 0)
reaction = float(focus_weights.get('reaction', 0) or 0)
rhythm = float(focus_weights.get('rhythm', 0) or 0)
coordination = float(focus_weights.get('coordination', 0) or 0)
total_ability = flexibility + mobility + balance + reaction + rhythm + coordination
total_activity_weight = total_strength + total_cardio + total_ability
if total_activity_weight == 0:
return None # No activity goals
components = []
# 1. Weekly minutes (general activity volume)
minutes = calculate_training_minutes_week(profile_id)
if minutes is not None:
# WHO: 150-300 min/week
if 150 <= minutes <= 300:
minutes_score = 100
elif minutes < 150:
minutes_score = max(40, (minutes / 150) * 100)
else:
minutes_score = max(80, 100 - ((minutes - 300) / 10))
# Volume relevant for all activity types (20% base weight)
components.append(('minutes', minutes_score, total_activity_weight * 0.2))
# 2. Quality sessions (always relevant)
quality_pct = calculate_quality_sessions_pct(profile_id)
if quality_pct is not None:
# Quality gets 10% base weight
components.append(('quality', quality_pct, total_activity_weight * 0.1))
# 3. Strength presence (if strength focus active)
if total_strength > 0:
strength_score = _score_strength_presence(profile_id)
if strength_score is not None:
components.append(('strength', strength_score, total_strength))
# 4. Cardio presence (if cardio focus active)
if total_cardio > 0:
cardio_score = _score_cardio_presence(profile_id)
if cardio_score is not None:
components.append(('cardio', cardio_score, total_cardio))
# 5. Ability balance (if mobility/coordination focus active)
if total_ability > 0:
balance_score = _score_ability_balance(profile_id)
if balance_score is not None:
components.append(('balance', balance_score, total_ability))
if not components:
return None
# Weighted average (float: DB-Aggregate können Decimal sein)
total_score = sum(float(score) * float(weight) for _, score, weight in components)
total_weight = sum(float(weight) for _, _, weight in components)
return int(total_score / total_weight)
def _score_strength_presence(profile_id: str) -> Optional[int]:
"""Score strength training presence (0-100)"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT COUNT(DISTINCT date) as strength_days
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '7 days'
AND training_category = 'strength'
""", (profile_id,))
row = cur.fetchone()
if not row:
return None
strength_days = row['strength_days']
# Target: 2-4 days/week
if 2 <= strength_days <= 4:
return 100
elif strength_days == 1:
return 60
elif strength_days == 5:
return 85
elif strength_days == 0:
return 0
else:
return 70
def _score_cardio_presence(profile_id: str) -> Optional[int]:
"""Score cardio training presence (0-100)"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT COUNT(DISTINCT date) as cardio_days, SUM(duration_min) as cardio_minutes
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '7 days'
AND training_category = 'cardio'
""", (profile_id,))
row = cur.fetchone()
if not row:
return None
# psycopg2: SUM() → oft Decimal — vor Mix mit float konvertieren
cardio_days = int(row['cardio_days'] or 0)
cardio_minutes = float(row['cardio_minutes'] or 0)
# Target: 3-5 days/week, 150+ minutes
day_score = min(100.0, (cardio_days / 4) * 100)
minute_score = min(100.0, (cardio_minutes / 150) * 100)
return int((day_score + minute_score) / 2)
def _score_ability_balance(profile_id: str) -> Optional[int]:
"""Score ability balance (0-100)"""
balance = calculate_ability_balance(profile_id)
if not balance:
return None
# Good balance = all abilities > 40, std_dev < 30
values = list(balance.values())
min_value = min(values)
std_dev = statistics.stdev(values) if len(values) > 1 else 0
# Score based on minimum coverage and balance
min_score = min(100, min_value * 2) # Want all > 50
balance_score = max(0, 100 - (std_dev * 2)) # Want low std_dev
return int((min_score + balance_score) / 2)
# ============================================================================
# A7: Rest Day Compliance
# ============================================================================
def calculate_rest_day_compliance(profile_id: str) -> Optional[int]:
"""
Calculate rest day compliance percentage (last 28 days)
Returns percentage of planned rest days that were respected
"""
with get_db() as conn:
cur = get_cursor(conn)
# Get planned rest days
cur.execute("""
SELECT date, rest_config->>'focus' as rest_type
FROM rest_days
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
rest_days = {row['date']: row['rest_type'] for row in cur.fetchall()}
if not rest_days:
return None
# Check if training occurred on rest days
cur.execute("""
SELECT date, training_category
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
training_days = {}
for row in cur.fetchall():
if row['date'] not in training_days:
training_days[row['date']] = []
training_days[row['date']].append(row['training_category'])
# Count compliance
compliant = 0
total = len(rest_days)
for rest_date, rest_type in rest_days.items():
if rest_date not in training_days:
# Full rest = compliant
compliant += 1
else:
# Check if training violates rest type
categories = training_days[rest_date]
if rest_type == 'strength_rest' and 'strength' not in categories:
compliant += 1
elif rest_type == 'cardio_rest' and 'cardio' not in categories:
compliant += 1
# If rest_type == 'recovery', any training = non-compliant
compliance_pct = (compliant / total) * 100
return int(compliance_pct)
# ============================================================================
# A8: VO2max Development
# ============================================================================
def calculate_vo2max_trend_28d(profile_id: str) -> Optional[float]:
"""Calculate VO2max trend (change over 28 days)"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT vo2_max, date
FROM vitals_baseline
WHERE profile_id = %s
AND vo2_max IS NOT NULL
AND date >= CURRENT_DATE - INTERVAL '28 days'
ORDER BY date DESC
""", (profile_id,))
measurements = cur.fetchall()
if len(measurements) < 2:
return None
recent = measurements[0]['vo2_max']
oldest = measurements[-1]['vo2_max']
change = recent - oldest
return round(change, 1)
# ============================================================================
# Data Quality Assessment
# ============================================================================
def calculate_activity_data_quality(profile_id: str) -> Dict[str, any]:
"""
Assess data quality for activity metrics
Returns dict with quality score and details
"""
with get_db() as conn:
cur = get_cursor(conn)
# Activity entries last 28 days
cur.execute("""
SELECT COUNT(*) as total,
COUNT(hr_avg) as with_hr,
COUNT(rpe) as with_quality
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '28 days'
""", (profile_id,))
counts = cur.fetchone()
total_entries = counts['total']
hr_coverage = counts['with_hr'] / total_entries if total_entries > 0 else 0
quality_coverage = counts['with_quality'] / total_entries if total_entries > 0 else 0
# Score components
frequency_score = min(100, (total_entries / 15) * 100) # 15 = ~4 sessions/week
hr_score = hr_coverage * 100
quality_score = quality_coverage * 100
# Overall score
overall_score = int(
frequency_score * 0.5 +
hr_score * 0.25 +
quality_score * 0.25
)
if overall_score >= 80:
confidence = "high"
elif overall_score >= 60:
confidence = "medium"
else:
confidence = "low"
return {
"overall_score": overall_score,
"confidence": confidence,
"measurements": {
"activities_28d": total_entries,
"hr_coverage_pct": int(hr_coverage * 100),
"quality_coverage_pct": int(quality_coverage * 100)
},
"component_scores": {
"frequency": int(frequency_score),
"hr": int(hr_score),
"quality": int(quality_score)
}
}
def _session_sort_ts(row: Dict) -> datetime:
"""Einheitlicher Zeitstempel für Sortierung und Pausenberechnung."""
d = row["date"]
if isinstance(d, str):
d = datetime.strptime(d[:10], "%Y-%m-%d").date()
st = row.get("start_time")
if st is None:
t = time(12, 0, 0)
else:
t = st
return datetime.combine(d, t)
def get_training_frequency_by_type_data(
profile_id: str,
days: int = 28,
) -> Dict[str, Any]:
"""
Pro activity_type (Roh-Label aus Import/Anzeige): Häufigkeit & Intensitätskennzahlen.
Returns:
{
"days_analyzed": int,
"confidence": str,
"by_type": [
{
"activity_type": str,
"session_count": int,
"sessions_per_week": float,
"avg_duration_min": float | None,
"avg_kcal_active": float | None,
"avg_hr_avg": float | None,
"avg_hr_max": float | None,
"avg_rpe": float | None,
"avg_kcal_per_min": float | None, # grobe Intensität, wenn kcal & Dauer
},
...
],
}
"""
weeks = max(days / 7.0, 0.01)
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cur.execute(
"""
SELECT
activity_type,
COUNT(*)::int AS session_count,
AVG(duration_min)::float AS avg_duration_min,
AVG(kcal_active)::float AS avg_kcal_active,
AVG(hr_avg)::float AS avg_hr_avg,
AVG(hr_max)::float AS avg_hr_max,
AVG(rpe)::float AS avg_rpe,
SUM(COALESCE(duration_min, 0))::float AS sum_duration,
SUM(COALESCE(kcal_active, 0))::float AS sum_kcal
FROM activity_log
WHERE profile_id = %s AND date >= %s
GROUP BY activity_type
ORDER BY session_count DESC
""",
(profile_id, cutoff),
)
rows = [r2d(r) for r in cur.fetchall()]
if not rows:
return {
"days_analyzed": days,
"confidence": "insufficient",
"by_type": [],
}
by_type = []
for r in rows:
sc = int(r["session_count"])
sum_dur = float(r["sum_duration"] or 0)
sum_kcal = float(r["sum_kcal"] or 0)
kcal_per_min = (sum_kcal / sum_dur) if sum_dur > 0 else None
by_type.append(
{
"activity_type": r["activity_type"],
"session_count": sc,
"sessions_per_week": round(sc / weeks, 2),
"avg_duration_min": r["avg_duration_min"],
"avg_kcal_active": r["avg_kcal_active"],
"avg_hr_avg": r["avg_hr_avg"],
"avg_hr_max": r["avg_hr_max"],
"avg_rpe": r["avg_rpe"],
"avg_kcal_per_min": round(kcal_per_min, 2) if kcal_per_min is not None else None,
}
)
total_sessions = sum(x["session_count"] for x in by_type)
confidence = calculate_confidence(total_sessions, days, "general")
return {
"days_analyzed": days,
"confidence": confidence,
"by_type": by_type,
}
def get_training_inter_session_gap_data(
profile_id: str,
days: int = 28,
) -> Dict[str, Any]:
"""
Mittlere/median Pausen zwischen aufeinanderfolgenden Trainingseinheiten (Stunden).
Sortierung: Datum + start_time (fehlend → 12:00), dann created.
"""
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cur.execute(
"""
SELECT date, start_time, created
FROM activity_log
WHERE profile_id = %s AND date >= %s
ORDER BY date ASC, start_time ASC NULLS LAST, created ASC
""",
(profile_id, cutoff),
)
rows = [r2d(r) for r in cur.fetchall()]
if len(rows) < 2:
return {
"days_analyzed": days,
"confidence": "insufficient",
"gap_hours_median": None,
"gap_hours_mean": None,
"gap_hours_min": None,
"gaps_count": 0,
}
gaps = []
prev_ts = None
for r in rows:
ts = _session_sort_ts(r)
if prev_ts is not None:
gaps.append((ts - prev_ts).total_seconds() / 3600.0)
prev_ts = ts
if not gaps:
return {
"days_analyzed": days,
"confidence": "insufficient",
"gap_hours_median": None,
"gap_hours_mean": None,
"gap_hours_min": None,
"gaps_count": 0,
}
gaps_sorted = sorted(gaps)
mid = len(gaps_sorted) // 2
median = (
gaps_sorted[mid]
if len(gaps_sorted) % 2
else (gaps_sorted[mid - 1] + gaps_sorted[mid]) / 2.0
)
confidence = calculate_confidence(len(rows), days, "general")
return {
"days_analyzed": days,
"confidence": confidence,
"gap_hours_median": round(median, 1),
"gap_hours_mean": round(statistics.mean(gaps), 1),
"gap_hours_min": round(min(gaps), 1),
"gaps_count": len(gaps),
}
def get_training_sessions_recent_weeks_data(
profile_id: str,
weeks: int = 4,
) -> Dict[str, Any]:
"""
Letzte Wochen mit Einzeltrainings für KI-Kontext (Dauer, kcal, HF, Typ).
weeks: Anzahl zurückliegender ISO-Kalenderwochen (Default 4).
session_metrics pro Einheit: kompaktes Objekt ``{key: Wert}`` (keine wiederholten
Namen/Beschreibungen). Bedeutung der Keys: Platzhalter ``{{training_parameters_glossary_md}}``.
Zahlen werden für Prompt-Token kompakt gerundet.
"""
days = max(weeks * 7, 7)
with get_db() as conn:
cur = get_cursor(conn)
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
cur.execute(
"""
SELECT
a.id,
a.date,
a.start_time,
a.activity_type,
a.training_category,
a.duration_min,
a.kcal_active,
a.hr_avg,
a.hr_max,
a.rpe,
tt.name_de AS training_type_name
FROM activity_log a
LEFT JOIN training_types tt ON tt.id = a.training_type_id
WHERE a.profile_id = %s AND a.date >= %s
ORDER BY a.date ASC, a.start_time ASC NULLS LAST, a.created ASC
""",
(profile_id, cutoff),
)
rows = [r2d(r) for r in cur.fetchall()]
enrich_sessions_with_metrics(cur, rows)
if not rows:
return {
"weeks": [],
"meta": {
"weeks_requested": weeks,
"days_loaded": days,
"session_count": 0,
"confidence": "insufficient",
"session_metrics_shape": "key_value",
"metric_semantics_placeholder": "{{training_parameters_glossary_md}}",
},
}
by_week: Dict[str, List[Dict]] = {}
for r in rows:
d = r["date"]
if isinstance(d, str):
d = datetime.strptime(d[:10], "%Y-%m-%d").date()
iso = d.isocalendar()
wk = f"{iso.year}-W{iso.week:02d}"
if wk not in by_week:
by_week[wk] = []
dur = r.get("duration_min")
dur_f = float(dur) if dur is not None else None
kcal = r.get("kcal_active")
kcal_f = float(kcal) if kcal is not None else None
hr_a = r.get("hr_avg")
hr_m = r.get("hr_max")
sm_compact = session_metrics_list_to_key_value_compact(r.get("session_metrics"))
by_week[wk].append(
{
"id": str(r["id"]),
"date": d,
"start_time": str(r["start_time"]) if r.get("start_time") is not None else None,
"activity_type": r.get("activity_type"),
"training_category": r.get("training_category"),
"training_type_name": r.get("training_type_name"),
"duration_min": normalize_prompt_number(dur_f) if dur_f is not None else None,
"kcal_active": normalize_prompt_number(kcal_f) if kcal_f is not None else None,
"hr_avg": int(hr_a) if hr_a is not None else None,
"hr_max": int(hr_m) if hr_m is not None else None,
"rpe": int(r["rpe"]) if r.get("rpe") is not None else None,
"session_metrics": sm_compact,
}
)
week_keys = sorted(by_week.keys())
weeks_out = [{"week_iso": wk, "sessions": by_week[wk]} for wk in week_keys]
confidence = calculate_confidence(len(rows), days, "general")
return serialize_dates(
{
"weeks": weeks_out,
"meta": {
"weeks_requested": weeks,
"days_loaded": days,
"session_count": len(rows),
"confidence": confidence,
"session_metrics_shape": "key_value",
"metric_semantics_placeholder": "{{training_parameters_glossary_md}}",
},
}
)
def get_training_parameters_ki_glossary_data(profile_id: str) -> Dict[str, Any]:
"""
Alle aktiven ``training_parameters`` für KI-Kontext (z. B. neben ``training_sessions_recent_json``).
Enthält technischen key, name_de/name_en, description_de/description_en, data_type, unit, category.
Args:
profile_id: Reserviert für spätere Einschränkung (z. B. nur im Profil vorkommende Keys);
aktuell ungenutzt, Signatur bleibt für Platzhalter-Resolver.
"""
_ = profile_id
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT key, name_de, name_en, description_de, description_en,
data_type, unit, category
FROM training_parameters
WHERE is_active = true
ORDER BY category, key
"""
)
rows = [r2d(r) for r in cur.fetchall()]
return {
"parameters": rows,
"meta": {"count": len(rows), "scope": "global_active_catalog"},
}
# ============================================================================
# Chart payloads (Phase 0c / Layer 1) — gemeinsam mit charts-Router und Layer-2b-Bundles
# ============================================================================
def build_training_volume_chart_payload(profile_id: str, weeks: int) -> Dict[str, Any]:
"""
Wöchentliches Trainingsvolumen (Minuten) — gleiche Logik wie GET /api/charts/training-volume.
"""
if weeks < 4:
weeks = 4
if weeks > 52:
weeks = 52
cutoff = (datetime.now() - timedelta(weeks=weeks)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT
DATE_TRUNC('week', date) as week_start,
SUM(duration_min) as total_minutes,
COUNT(*) as session_count
FROM activity_log
WHERE profile_id=%s AND date >= %s
GROUP BY week_start
ORDER BY week_start""",
(profile_id, cutoff),
)
rows = cur.fetchall()
if not rows:
return {
"chart_type": "bar",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Aktivitätsdaten vorhanden",
},
}
labels = [row["week_start"].strftime("KW %V") for row in rows]
values = [safe_float(row["total_minutes"]) for row in rows]
confidence = calculate_confidence(len(rows), weeks * 7, "general")
return {
"chart_type": "bar",
"data": {
"labels": labels,
"datasets": [
{
"label": "Trainingsminuten",
"data": values,
"backgroundColor": "#1D9E75",
"borderColor": "#085041",
"borderWidth": 1,
}
],
},
"metadata": serialize_dates(
{
"confidence": confidence,
"data_points": len(rows),
"avg_minutes_week": round(sum(values) / len(values), 1) if values else 0,
"total_sessions": sum(row["session_count"] for row in rows),
}
),
}
def build_training_type_distribution_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
"""
Trainingstyp-Verteilung — gleiche Logik wie GET /api/charts/training-type-distribution.
"""
dist_data = get_training_type_distribution_data(profile_id, days)
if dist_data["confidence"] == "insufficient":
return {
"chart_type": "pie",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Trainingstypen-Daten",
},
}
labels = [item["category"] for item in dist_data["distribution"]]
values = [item["count"] for item in dist_data["distribution"]]
colors = [
"#1D9E75",
"#3B82F6",
"#F59E0B",
"#EF4444",
"#8B5CF6",
"#10B981",
"#F97316",
"#06B6D4",
]
return {
"chart_type": "pie",
"data": {
"labels": labels,
"datasets": [
{
"data": values,
"backgroundColor": colors[: len(values)],
"borderWidth": 2,
"borderColor": "#fff",
}
],
},
"metadata": {
"confidence": dist_data["confidence"],
"total_sessions": dist_data["total_sessions"],
"categorized_sessions": dist_data["categorized_sessions"],
"uncategorized_sessions": dist_data["uncategorized_sessions"],
},
}
def get_training_volume_two_week_delta(profile_id: str) -> Dict[str, Any]:
"""
Trainingsminuten: letzte 7 Kalendertage vs. die 7 Tage davor (Fortschritt Volumen).
"""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT
COALESCE(SUM(duration_min) FILTER (WHERE date >= CURRENT_DATE - INTERVAL '7 days'), 0)::bigint AS last7,
COALESCE(SUM(duration_min) FILTER (
WHERE date < CURRENT_DATE - INTERVAL '7 days'
AND date >= CURRENT_DATE - INTERVAL '14 days'), 0)::bigint AS prev7
FROM activity_log
WHERE profile_id = %s
AND date >= CURRENT_DATE - INTERVAL '14 days'
""",
(profile_id,),
)
row = cur.fetchone()
if not row:
return {"last7_min": 0, "prior7_min": 0, "delta_pct": None, "has_data": False}
last7 = int(row["last7"] or 0)
prev7 = int(row["prev7"] or 0)
if last7 == 0 and prev7 == 0:
return {"last7_min": 0, "prior7_min": 0, "delta_pct": None, "has_data": False}
delta_pct: Optional[float] = None
if prev7 > 0:
delta_pct = round((last7 - prev7) / float(prev7) * 100.0, 1)
return {
"last7_min": last7,
"prior7_min": prev7,
"delta_pct": delta_pct,
"has_data": True,
}
def build_quality_sessions_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
"""Qualitäts-Sessions vs. regulär — gleiche Logik wie GET /api/charts/quality-sessions."""
if days < 7:
days = 7
if days > 90:
days = 90
quality_pct = calculate_quality_sessions_pct(profile_id, days)
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT COUNT(*) as total
FROM activity_log
WHERE profile_id=%s AND date >= %s""",
(profile_id, cutoff),
)
row = cur.fetchone()
total_sessions = row["total"] if row else 0
if total_sessions == 0:
return {
"chart_type": "bar",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Aktivitätsdaten",
},
}
q = float(quality_pct or 0)
quality_count = int(round(q / 100.0 * total_sessions))
quality_count = max(0, min(quality_count, total_sessions))
regular_count = total_sessions - quality_count
return {
"chart_type": "bar",
"data": {
"labels": ["Qualitäts-Sessions", "Reguläre Sessions"],
"datasets": [
{
"label": "Anzahl",
"data": [quality_count, regular_count],
"backgroundColor": ["#1D9E75", "#888"],
"borderColor": "#085041",
"borderWidth": 1,
}
],
},
"metadata": {
"confidence": calculate_confidence(total_sessions, days, "general"),
"data_points": total_sessions,
"quality_pct": round(q, 1),
"quality_count": quality_count,
"regular_count": regular_count,
},
}
def build_load_monitoring_chart_payload(profile_id: str, days: int) -> Dict[str, Any]:
"""Tages-Load-Zeitreihe + ACWR — gleiche Logik wie GET /api/charts/load-monitoring."""
if days < 14:
days = 14
if days > 90:
days = 90
acute_load = calculate_proxy_internal_load_window(profile_id, 7)
chronic_load = calculate_proxy_internal_load_window(profile_id, 28)
acwr = (
(acute_load / chronic_load) if acute_load is not None and chronic_load and chronic_load > 0 else 0.0
)
cutoff = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""SELECT
date,
SUM(duration_min * COALESCE(rpe, 5)) as daily_load
FROM activity_log
WHERE profile_id=%s AND date >= %s
GROUP BY date
ORDER BY date""",
(profile_id, cutoff),
)
rows = cur.fetchall()
if not rows:
return {
"chart_type": "line",
"data": {"labels": [], "datasets": []},
"metadata": {
"confidence": "insufficient",
"data_points": 0,
"message": "Keine Load-Daten",
},
}
labels = [row["date"].isoformat() for row in rows]
values = [safe_float(row["daily_load"]) for row in rows]
al = float(acute_load) if acute_load is not None else 0.0
cl = float(chronic_load) if chronic_load is not None else 0.0
return {
"chart_type": "line",
"data": {
"labels": labels,
"datasets": [
{
"label": "Tages-Load",
"data": values,
"borderColor": "#1D9E75",
"backgroundColor": "rgba(29, 158, 117, 0.1)",
"borderWidth": 2,
"tension": 0.3,
"fill": True,
}
],
},
"metadata": serialize_dates(
{
"confidence": calculate_confidence(len(rows), days, "general"),
"data_points": len(rows),
"acute_load_7d": round(al, 1),
"chronic_load_28d": round(cl, 1),
"acwr": round(acwr, 2),
"acwr_status": "optimal" if 0.8 <= acwr <= 1.3 else "suboptimal",
}
),
}