""" Activity Metrics Data Layer Provides structured data for training tracking and analysis. Functions: - get_activity_summary_data(): Count, total duration, calories, averages - get_activity_detail_data(): Detailed activity log entries - get_training_type_distribution_data(): Training category percentages All functions return structured data (dict) without formatting. Use placeholder_resolver.py for formatted strings for AI. Phase 0c: Multi-Layer Architecture Version: 1.0 """ from typing import Dict, List, Optional from datetime import datetime, timedelta, date from db import get_db, get_cursor, r2d from data_layer.utils import calculate_confidence, safe_float, safe_int def get_activity_summary_data( profile_id: str, days: int = 14 ) -> Dict: """ Get activity summary statistics. Args: profile_id: User profile ID days: Analysis window (default 14) Returns: { "activity_count": int, "total_duration_min": int, "total_kcal": int, "avg_duration_min": int, "avg_kcal_per_session": int, "sessions_per_week": float, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_activity_summary(pid, days) formatted string NEW: Structured data with all metrics """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT COUNT(*) as count, SUM(duration_min) as total_min, SUM(kcal_active) as total_kcal FROM activity_log WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) row = cur.fetchone() if not row or row['count'] == 0: return { "activity_count": 0, "total_duration_min": 0, "total_kcal": 0, "avg_duration_min": 0, "avg_kcal_per_session": 0, "sessions_per_week": 0.0, "confidence": "insufficient", "days_analyzed": days } activity_count = row['count'] total_min = safe_int(row['total_min']) total_kcal = safe_int(row['total_kcal']) avg_duration = int(total_min / activity_count) if activity_count > 0 else 0 avg_kcal = int(total_kcal / activity_count) if activity_count > 0 else 0 sessions_per_week = (activity_count / days * 7) if days > 0 else 0.0 confidence = calculate_confidence(activity_count, days, "general") return { "activity_count": activity_count, "total_duration_min": total_min, "total_kcal": total_kcal, "avg_duration_min": avg_duration, "avg_kcal_per_session": avg_kcal, "sessions_per_week": round(sessions_per_week, 1), "confidence": confidence, "days_analyzed": days } def get_activity_detail_data( profile_id: str, days: int = 14, limit: int = 50 ) -> Dict: """ Get detailed activity log entries. Args: profile_id: User profile ID days: Analysis window (default 14) limit: Maximum entries to return (default 50) Returns: { "activities": [ { "date": date, "activity_type": str, "duration_min": int, "kcal_active": int, "hr_avg": int | None, "training_category": str | None }, ... ], "total_count": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_activity_detail(pid, days) formatted string list NEW: Structured array with all fields """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') cur.execute( """SELECT date, activity_type, duration_min, kcal_active, hr_avg, training_category FROM activity_log WHERE profile_id=%s AND date >= %s ORDER BY date DESC LIMIT %s""", (profile_id, cutoff, limit) ) rows = cur.fetchall() if not rows: return { "activities": [], "total_count": 0, "confidence": "insufficient", "days_analyzed": days } activities = [] for row in rows: activities.append({ "date": row['date'], "activity_type": row['activity_type'], "duration_min": safe_int(row['duration_min']), "kcal_active": safe_int(row['kcal_active']), "hr_avg": safe_int(row['hr_avg']) if row.get('hr_avg') else None, "training_category": row.get('training_category') }) confidence = calculate_confidence(len(activities), days, "general") return { "activities": activities, "total_count": len(activities), "confidence": confidence, "days_analyzed": days } def get_training_type_distribution_data( profile_id: str, days: int = 14 ) -> Dict: """ Calculate training category distribution. Args: profile_id: User profile ID days: Analysis window (default 14) Returns: { "distribution": [ { "category": str, "count": int, "percentage": float }, ... ], "total_sessions": int, "categorized_sessions": int, "uncategorized_sessions": int, "confidence": str, "days_analyzed": int } Migration from Phase 0b: OLD: get_trainingstyp_verteilung(pid, days) top 3 formatted NEW: Complete distribution with percentages """ with get_db() as conn: cur = get_cursor(conn) cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d') # Get categorized activities cur.execute( """SELECT training_category, COUNT(*) as count FROM activity_log WHERE profile_id=%s AND date >= %s AND training_category IS NOT NULL GROUP BY training_category ORDER BY count DESC""", (profile_id, cutoff) ) rows = cur.fetchall() # Get total activity count (including uncategorized) cur.execute( """SELECT COUNT(*) as total FROM activity_log WHERE profile_id=%s AND date >= %s""", (profile_id, cutoff) ) total_row = cur.fetchone() total_sessions = total_row['total'] if total_row else 0 if not rows or total_sessions == 0: return { "distribution": [], "total_sessions": total_sessions, "categorized_sessions": 0, "uncategorized_sessions": total_sessions, "confidence": "insufficient", "days_analyzed": days } categorized_count = sum(row['count'] for row in rows) uncategorized_count = total_sessions - categorized_count distribution = [] for row in rows: count = row['count'] percentage = (count / total_sessions * 100) if total_sessions > 0 else 0 distribution.append({ "category": row['training_category'], "count": count, "percentage": round(percentage, 1) }) confidence = calculate_confidence(categorized_count, days, "general") return { "distribution": distribution, "total_sessions": total_sessions, "categorized_sessions": categorized_count, "uncategorized_sessions": uncategorized_count, "confidence": confidence, "days_analyzed": days }