# Data Layer Extension Guide **Version:** 1.0 **Erstellt:** 28. März 2026 **Zielgruppe:** Entwickler, Claude Code **Phase:** Post Phase 0c --- ## Überblick Dieser Guide beschreibt, wie man das Data Layer System erweitert mit: - Neuen Modulen - Neuen Funktionen in bestehenden Modulen - Neuen Berechnungslogiken - Neuen Aggregationsmethoden **Voraussetzung:** Phase 0c abgeschlossen (Multi-Layer Architecture implementiert) --- ## Modul-Struktur ### Bestehende Module (Phase 0c) ``` backend/data_layer/ ├── __init__.py # Exports all functions ├── body_metrics.py # Gewicht, FM, LBM, Umfänge ├── nutrition_metrics.py # Kalorien, Protein, Makros ├── activity_metrics.py # Training, Volumen, Abilities ├── recovery_metrics.py # Sleep, RHR, HRV, Recovery Score ├── health_metrics.py # BP, VO2Max, Health Stability ├── goals.py # Active goals, progress ├── correlations.py # Lag-analysis, plateau detection └── utils.py # Shared: confidence, baseline, outliers ``` ### Modul-Namenskonventionen - **Singular:** `body_metrics.py` (nicht `bodies_metrics.py`) - **Domain-focused:** Ein Modul pro fachlichem Bereich - **Max ~500 Zeilen:** Bei >500 Zeilen → Split erwägen --- ## Neue Funktion hinzufügen ### Template ```python # backend/data_layer/.py def get__data( profile_id: str, days: int = 28, **kwargs ) -> dict: """ [Eine Zeile: Was liefert diese Funktion?] [Optional: Ausführliche Beschreibung der Berechnung] Args: profile_id: User profile ID days: Analysis window (default 28) **kwargs: Additional parameters (z.B., goal_mode) Returns: { "": , # Main result "confidence": str, # REQUIRED: "high"/"medium"/"low"/"insufficient" "data_points": int, # REQUIRED: Number of data points used "": # Any additional data } Confidence Rules: - "high": >= X points - "medium": >= Y points - "low": >= Z points - "insufficient": < Z points Example: >>> data = get__data("profile_123", days=28) >>> print(data['']) 42.0 """ with get_db() as conn: cur = get_cursor(conn) # 1. DATA RETRIEVAL cur.execute(""" SELECT ... FROM ... WHERE profile_id = %s AND date >= NOW() - INTERVAL '%s days' ORDER BY date """, (profile_id, days)) rows = cur.fetchall() # 2. CONFIDENCE CALCULATION from data_layer.utils import calculate_confidence confidence = calculate_confidence( data_points=len(rows), days_requested=days, metric_type="general" # or "correlation" or "trend" ) # 3. EARLY RETURN IF INSUFFICIENT if confidence == 'insufficient': return { "confidence": "insufficient", "data_points": len(rows), # Include all fields with safe defaults "": 0.0, } # 4. CALCULATION # ... your logic here ... # 5. RETURN STRUCTURED DATA return { "": result, "confidence": confidence, "data_points": len(rows), # Additional fields as needed } ``` ### Pflicht-Felder **Jede Funktion MUSS zurückgeben:** ```python { "confidence": str, # "high" | "medium" | "low" | "insufficient" "data_points": int, # Anzahl verwendeter Datenpunkte } ``` **Warum?** - Confidence: UI kann User warnen bei niedriger Datenqualität - Data Points: Debugging + Monitoring ### Optionale Felder (Best Practices) ```python { "first_date": date, # Ältester Datenpunkt "last_date": date, # Neuester Datenpunkt "avg": float, # Durchschnitt "std_dev": float, # Standardabweichung "min": float, # Minimum "max": float, # Maximum "outliers": list[int], # Indices von Ausreißern } ``` --- ## Neue Berechnungslogik hinzufügen ### 1. Statistik-Funktionen (utils.py) **Wenn du eine neue statistische Berechnung brauchst:** ```python # backend/data_layer/utils.py def calculate_( values: list[float], **kwargs ) -> float: """ [Beschreibung der Statistik] Args: values: List of measurements **kwargs: Additional parameters Returns: Calculated statistic (float) Example: >>> calculate_([1.0, 2.0, 3.0]) 2.0 """ # Implementation ... ``` **Beispiele:** ```python def calculate_median_absolute_deviation(values: list[float]) -> float: """ MAD = median(|xi - median(x)|) More robust than standard deviation for outlier detection. """ import statistics median = statistics.median(values) deviations = [abs(x - median) for x in values] return statistics.median(deviations) def calculate_coefficient_of_variation(values: list[float]) -> float: """ CV = (std_dev / mean) * 100 Measures relative variability. """ import statistics mean = statistics.mean(values) std_dev = statistics.stdev(values) return (std_dev / mean) * 100 if mean != 0 else 0.0 def calculate_z_score(value: float, mean: float, std_dev: float) -> float: """ Z = (x - μ) / σ Standardized score. """ return (value - mean) / std_dev if std_dev != 0 else 0.0 ``` ### 2. Aggregations-Funktionen (utils.py) **Neue Aggregationsmethoden für Goal Types:** ```python # backend/data_layer/utils.py def aggregate_data( values: list[tuple], # [(date, value), ...] method: str, **kwargs ) -> float: """ Aggregate data points using specified method. Args: values: List of (date, value) tuples method: Aggregation method (see below) **kwargs: Method-specific parameters Returns: Aggregated value (float) Supported Methods: - "latest": Most recent value - "avg_7d": Average last 7 days - "avg_30d": Average last 30 days - "avg_90d": Average last 90 days - "sum_7d": Sum last 7 days - "sum_30d": Sum last 30 days - "count_7d": Count last 7 days - "count_30d": Count last 30 days - "min_30d": Minimum last 30 days - "max_30d": Maximum last 30 days - "median_7d": Median last 7 days - "median_30d": Median last 30 days - "rolling_avg": Rolling average (window from kwargs) - "percentile": Nth percentile (n from kwargs) Example: >>> values = [(date1, 85.0), (date2, 84.5), ...] >>> aggregate_data(values, "avg_7d") 84.7 """ from datetime import date, timedelta import statistics if not values: return 0.0 # Sort by date (most recent first) sorted_values = sorted(values, key=lambda x: x[0], reverse=True) if method == "latest": return float(sorted_values[0][1]) elif method.startswith("avg_"): days = int(method.split("_")[1].replace("d", "")) cutoff = date.today() - timedelta(days=days) recent = [v for d, v in sorted_values if d >= cutoff] return statistics.mean(recent) if recent else 0.0 elif method.startswith("sum_"): days = int(method.split("_")[1].replace("d", "")) cutoff = date.today() - timedelta(days=days) recent = [v for d, v in sorted_values if d >= cutoff] return sum(recent) elif method.startswith("count_"): days = int(method.split("_")[1].replace("d", "")) cutoff = date.today() - timedelta(days=days) return len([v for d, v in sorted_values if d >= cutoff]) elif method.startswith("min_") or method.startswith("max_"): func_name, days_str = method.split("_") days = int(days_str.replace("d", "")) cutoff = date.today() - timedelta(days=days) recent = [v for d, v in sorted_values if d >= cutoff] if not recent: return 0.0 return min(recent) if func_name == "min" else max(recent) elif method.startswith("median_"): days = int(method.split("_")[1].replace("d", "")) cutoff = date.today() - timedelta(days=days) recent = [v for d, v in sorted_values if d >= cutoff] return statistics.median(recent) if recent else 0.0 elif method == "rolling_avg": window = kwargs.get("window", 7) if len(sorted_values) < window: return statistics.mean([v for _, v in sorted_values]) recent = sorted_values[:window] return statistics.mean([v for _, v in recent]) elif method == "percentile": n = kwargs.get("n", 50) # Default: median values_only = [v for _, v in sorted_values] return statistics.quantiles(values_only, n=100)[n - 1] if len(values_only) > 1 else values_only[0] else: raise ValueError(f"Unknown aggregation method: {method}") ``` ### 3. Korrelations-Funktionen (correlations.py) **Neue Korrelations-Analysen:** ```python # backend/data_layer/correlations.py def get___correlation( profile_id: str, days: int = 90, max_lag: int = 7 ) -> dict: """ Correlation between and with lag analysis. Args: profile_id: User profile ID days: Analysis window max_lag: Maximum lag in days to test Returns: { "correlation": float, # Pearson r at best lag "best_lag": int, # Days of lag "p_value": float, # Statistical significance "confidence": str, "paired_points": int, "interpretation": str # "strong"/"moderate"/"weak"/"none" } Interpretation: |r| > 0.7: "strong" |r| > 0.5: "moderate" |r| > 0.3: "weak" |r| <= 0.3: "none" """ # Implementation using scipy.stats or numpy ... ``` --- ## Neues Modul erstellen ### Wann ein neues Modul? **Erstelle ein neues Modul wenn:** - ✅ Neue fachliche Domäne (z.B., `stress_metrics.py`, `hormone_metrics.py`) - ✅ Bestehendes Modul >500 Zeilen - ✅ Klare thematische Trennung möglich **KEIN neues Modul wenn:** - ❌ Nur 1-2 Funktionen (füge zu bestehendem Modul hinzu) - ❌ Starke Abhängigkeit zu bestehendem Modul (merge statt split) ### Modul-Template ```python # backend/data_layer/.py """ - This module provides data functions for . Functions: - get__data() - get__data() - ... Usage: from data_layer. import get__data data = get__data(profile_id="123", days=28) """ from typing import Optional, List, Dict, Tuple from datetime import date, timedelta from db import get_db, get_cursor # ── PUBLIC FUNCTIONS ───────────────────────────────────────────── def get__data( profile_id: str, days: int = 28, **kwargs ) -> dict: """ [Docstring as per template above] """ ... # ── PRIVATE HELPERS ────────────────────────────────────────────── def _calculate_(values: list[float]) -> float: """ Internal helper for . NOT exported from module. """ ... def _validate_(data: dict) -> bool: """ Internal validation helper. """ ... ``` ### Exports in __init__.py ```python # backend/data_layer/__init__.py # Existing modules from .body_metrics import * from .nutrition_metrics import * from .activity_metrics import * from .recovery_metrics import * from .health_metrics import * from .goals import * from .correlations import * from .utils import * # NEW MODULE from . import * __all__ = [ # Existing exports... # NEW MODULE exports 'get__data', 'get__data', ] ``` --- ## Integration mit Goal Types ### Goal Type mit neuer Aggregationsmethode **Scenario:** Du hast eine neue Aggregationsmethode `avg_per_week_30d` implementiert. #### 1. In utils.py implementieren ```python # backend/data_layer/utils.py def aggregate_data(values, method, **kwargs): # ... existing methods ... elif method == "avg_per_week_30d": # Group by week, calculate average per week from collections import defaultdict weeks = defaultdict(list) for d, v in values: week_start = d - timedelta(days=d.weekday()) weeks[week_start].append(v) week_avgs = [sum(vals) / len(vals) for vals in weeks.values()] return sum(week_avgs) / len(week_avgs) if week_avgs else 0.0 # ... ``` #### 2. In goal_utils.py nutzen ```python # backend/goal_utils.py def _fetch_by_aggregation_method( cur, profile_id: str, source_table: str, source_column: str, aggregation_method: str, date_column: str = 'date', filter_conditions: dict = None ) -> Optional[float]: """ Fetch current value using aggregation method. Now supports: - latest, avg_7d, avg_30d, sum_30d, count_7d, etc. - avg_per_week_30d (NEW) """ # Fetch data cur.execute(f""" SELECT {date_column}, {source_column} FROM {source_table} WHERE profile_id = %s ORDER BY {date_column} DESC LIMIT 100 """, (profile_id,)) rows = cur.fetchall() if not rows: return None # Use aggregate_data from utils from data_layer.utils import aggregate_data return aggregate_data(rows, aggregation_method) ``` #### 3. In Frontend verfügbar machen ```javascript // frontend/src/pages/AdminGoalTypesPage.jsx const AGGREGATION_METHODS = [ { value: 'latest', label: 'Aktuellster Wert' }, { value: 'avg_7d', label: 'Durchschnitt 7 Tage' }, { value: 'avg_30d', label: 'Durchschnitt 30 Tage' }, { value: 'sum_30d', label: 'Summe 30 Tage' }, { value: 'avg_per_week_30d', label: 'Durchschnitt pro Woche (30d)' }, // NEW // ... ] ``` --- ## Testing-Strategie ### Unit Tests für neue Funktionen ```python # backend/tests/test_data_layer.py import pytest from data_layer. import get__data @pytest.fixture def test_profile_with_data(db_connection): """Create test profile with sample data""" # Setup profile_id = "test_profile_123" # Insert test data into relevant tables ... yield profile_id # Teardown ... def test_get_metric_data_sufficient(test_profile_with_data): """Test with sufficient data points""" data = get__data(test_profile_with_data, days=28) assert data['confidence'] in ['high', 'medium', 'low'] assert data['data_points'] >= 18 assert '' in data assert isinstance(data[''], float) def test_get_metric_data_insufficient(): """Test with insufficient data""" data = get__data("no_data_profile", days=28) assert data['confidence'] == 'insufficient' assert data['data_points'] == 0 def test_get_metric_data_edge_cases(test_profile_with_data): """Test edge cases: outliers, missing values, etc.""" # Test with extreme values # Test with gaps in data # Test with all same values ... def test_get_metric_data_parameters(test_profile_with_data): """Test different parameter combinations""" # Test different days values for days in [7, 28, 90]: data = get__data(test_profile_with_data, days=days) assert data is not None # Test additional parameters data = get__data(test_profile_with_data, days=28, goal_mode="strength") assert data is not None ``` ### Integration Tests ```python # backend/tests/test_charts_integration.py def test_chart_uses_data_layer(client, auth_token): """Test that chart endpoint uses data layer correctly""" response = client.get( "/api/charts/", headers={"X-Auth-Token": auth_token} ) assert response.status_code == 200 data = response.json() # Verify Chart.js structure assert 'chart_type' in data assert 'data' in data assert 'metadata' in data # Verify metadata includes confidence assert 'confidence' in data['metadata'] ``` --- ## Performance Considerations ### 1. Query Optimization **Problem:** N+1 Queries ```python # ❌ BAD: for goal_id in goal_ids: cur.execute("SELECT * FROM goals WHERE id = %s", (goal_id,)) # ... process each goal ... # ✅ GOOD: cur.execute("SELECT * FROM goals WHERE id = ANY(%s)", (goal_ids,)) ``` **Problem:** Unindexed Columns ```sql -- Add index if querying frequently by date range CREATE INDEX IF NOT EXISTS idx_weight_log_profile_date ON weight_log(profile_id, date DESC); ``` ### 2. Caching **For expensive calculations:** ```python from functools import lru_cache @lru_cache(maxsize=128) def get_expensive_calculation(profile_id: str, days: int) -> dict: """Cache results for 128 most recent calls""" ... ``` **Note:** In-memory cache resets on restart. For persistent cache → Redis (later). ### 3. Pagination **For large datasets:** ```python def get__data( profile_id: str, days: int = 28, limit: int = 1000, offset: int = 0 ) -> dict: """ Paginated data retrieval. """ cur.execute(""" SELECT ... FROM ... WHERE profile_id = %s ORDER BY date DESC LIMIT %s OFFSET %s """, (profile_id, limit, offset)) ``` --- ## Checkliste: Neue Funktion ``` [ ] Richtiges Modul gewählt (oder neues Modul erstellt) [ ] Funktion implementiert mit korrekter Signatur [ ] Docstring vollständig (Args, Returns, Example) [ ] Confidence calculation included [ ] Returns structured data (dict with primitives) [ ] NO formatting (no strings with units) [ ] Decimal → Float conversion wo nötig [ ] Safe dict access (.get() mit defaults) [ ] SQL parameter binding (keine String-Concatenation) [ ] Unit tests geschrieben (sufficient/insufficient/edge cases) [ ] Integration test geschrieben (wenn Chart/API endpoint) [ ] Performance geprüft (< 500ms) [ ] In __init__.py exportiert [ ] Dokumentation aktualisiert (CLAUDE.md) [ ] Commit mit aussagekräftiger Message ``` --- ## Häufige Fehler ### 1. Vergessen Confidence zu berechnen ```python # ❌ WRONG: return {"value": result} # ✅ CORRECT: from data_layer.utils import calculate_confidence confidence = calculate_confidence(len(rows), days, "general") return {"value": result, "confidence": confidence, "data_points": len(rows)} ``` ### 2. Formatierung im Data Layer ```python # ❌ WRONG (Data Layer): return {"slope": f"{slope:.2f} kg/Woche"} # ✅ CORRECT (Data Layer): return {"slope": 0.23} # Just the number # ✅ FORMATTING (KI Layer): return f"{data['slope']:.2f} kg/Woche" ``` ### 3. Hardcoded Thresholds ```python # ❌ WRONG: if len(rows) < 18: # Magic number return {"confidence": "insufficient"} # ✅ CORRECT: confidence = calculate_confidence(len(rows), days, "general") if confidence == "insufficient": return {"confidence": "insufficient", ...} ``` --- ## Support & Hilfe **Bei Fragen:** 1. Lies PLACEHOLDER_DEVELOPMENT_GUIDE.md 2. Prüfe bestehende Funktionen als Beispiel 3. Frag im Team oder erstelle Gitea Issue **Debugging:** 1. Unit Test schreiben 2. Print intermediate results 3. Check SQL query mit `EXPLAIN ANALYZE` 4. Profile mit `cProfile` wenn Performance-Problem --- **Autor:** Claude Sonnet 4.5 **Version:** 1.0 **Letzte Aktualisierung:** 28. März 2026