- .gitignore: .claude/docs, rules, commands tracken; settings.local weiter ignorieren - DOCUMENTATION.md: verbindliche Ablage functional/technical/working/issues - .claude/README.md: Agent-Einstieg; GITEA_ISSUES_INDEX aus MCP (Stand 2026-04-08) - Arbeitspapiere von docs/ nach .claude/docs/working/ verschoben - docs/MEMBERSHIP_SYSTEM.md als Stub; kanonisch technical/MEMBERSHIP_SYSTEM.md - CLAUDE.md Pflichtlektüre und Links angepasst; docs/README.md vereinfacht Made-with: Cursor
20 KiB
Data Layer Extension Guide
Version: 1.0 Erstellt: 28. März 2026 Zielgruppe: Entwickler, Claude Code Phase: Post Phase 0c
Überblick
Dieser Guide beschreibt, wie man das Data Layer System erweitert mit:
- Neuen Modulen
- Neuen Funktionen in bestehenden Modulen
- Neuen Berechnungslogiken
- Neuen Aggregationsmethoden
Voraussetzung: Phase 0c abgeschlossen (Multi-Layer Architecture implementiert)
Modul-Struktur
Bestehende Module (Phase 0c)
backend/data_layer/
├── __init__.py # Exports all functions
├── body_metrics.py # Gewicht, FM, LBM, Umfänge
├── nutrition_metrics.py # Kalorien, Protein, Makros
├── activity_metrics.py # Training, Volumen, Abilities
├── recovery_metrics.py # Sleep, RHR, HRV, Recovery Score
├── health_metrics.py # BP, VO2Max, Health Stability
├── goals.py # Active goals, progress
├── correlations.py # Lag-analysis, plateau detection
└── utils.py # Shared: confidence, baseline, outliers
Modul-Namenskonventionen
- Singular:
body_metrics.py(nichtbodies_metrics.py) - Domain-focused: Ein Modul pro fachlichem Bereich
- Max ~500 Zeilen: Bei >500 Zeilen → Split erwägen
Neue Funktion hinzufügen
Template
# backend/data_layer/<module>.py
def get_<metric>_data(
profile_id: str,
days: int = 28,
**kwargs
) -> dict:
"""
[Eine Zeile: Was liefert diese Funktion?]
[Optional: Ausführliche Beschreibung der Berechnung]
Args:
profile_id: User profile ID
days: Analysis window (default 28)
**kwargs: Additional parameters (z.B., goal_mode)
Returns:
{
"<field>": <type>, # Main result
"confidence": str, # REQUIRED: "high"/"medium"/"low"/"insufficient"
"data_points": int, # REQUIRED: Number of data points used
"<additional>": <type> # Any additional data
}
Confidence Rules:
- "high": >= X points
- "medium": >= Y points
- "low": >= Z points
- "insufficient": < Z points
Example:
>>> data = get_<metric>_data("profile_123", days=28)
>>> print(data['<field>'])
42.0
"""
with get_db() as conn:
cur = get_cursor(conn)
# 1. DATA RETRIEVAL
cur.execute("""
SELECT ...
FROM ...
WHERE profile_id = %s
AND date >= NOW() - INTERVAL '%s days'
ORDER BY date
""", (profile_id, days))
rows = cur.fetchall()
# 2. CONFIDENCE CALCULATION
from data_layer.utils import calculate_confidence
confidence = calculate_confidence(
data_points=len(rows),
days_requested=days,
metric_type="general" # or "correlation" or "trend"
)
# 3. EARLY RETURN IF INSUFFICIENT
if confidence == 'insufficient':
return {
"confidence": "insufficient",
"data_points": len(rows),
# Include all fields with safe defaults
"<field>": 0.0,
}
# 4. CALCULATION
# ... your logic here ...
# 5. RETURN STRUCTURED DATA
return {
"<field>": result,
"confidence": confidence,
"data_points": len(rows),
# Additional fields as needed
}
Pflicht-Felder
Jede Funktion MUSS zurückgeben:
{
"confidence": str, # "high" | "medium" | "low" | "insufficient"
"data_points": int, # Anzahl verwendeter Datenpunkte
}
Warum?
- Confidence: UI kann User warnen bei niedriger Datenqualität
- Data Points: Debugging + Monitoring
Optionale Felder (Best Practices)
{
"first_date": date, # Ältester Datenpunkt
"last_date": date, # Neuester Datenpunkt
"avg": float, # Durchschnitt
"std_dev": float, # Standardabweichung
"min": float, # Minimum
"max": float, # Maximum
"outliers": list[int], # Indices von Ausreißern
}
Neue Berechnungslogik hinzufügen
1. Statistik-Funktionen (utils.py)
Wenn du eine neue statistische Berechnung brauchst:
# backend/data_layer/utils.py
def calculate_<statistic>(
values: list[float],
**kwargs
) -> float:
"""
[Beschreibung der Statistik]
Args:
values: List of measurements
**kwargs: Additional parameters
Returns:
Calculated statistic (float)
Example:
>>> calculate_<statistic>([1.0, 2.0, 3.0])
2.0
"""
# Implementation
...
Beispiele:
def calculate_median_absolute_deviation(values: list[float]) -> float:
"""
MAD = median(|xi - median(x)|)
More robust than standard deviation for outlier detection.
"""
import statistics
median = statistics.median(values)
deviations = [abs(x - median) for x in values]
return statistics.median(deviations)
def calculate_coefficient_of_variation(values: list[float]) -> float:
"""
CV = (std_dev / mean) * 100
Measures relative variability.
"""
import statistics
mean = statistics.mean(values)
std_dev = statistics.stdev(values)
return (std_dev / mean) * 100 if mean != 0 else 0.0
def calculate_z_score(value: float, mean: float, std_dev: float) -> float:
"""
Z = (x - μ) / σ
Standardized score.
"""
return (value - mean) / std_dev if std_dev != 0 else 0.0
2. Aggregations-Funktionen (utils.py)
Neue Aggregationsmethoden für Goal Types:
# backend/data_layer/utils.py
def aggregate_data(
values: list[tuple], # [(date, value), ...]
method: str,
**kwargs
) -> float:
"""
Aggregate data points using specified method.
Args:
values: List of (date, value) tuples
method: Aggregation method (see below)
**kwargs: Method-specific parameters
Returns:
Aggregated value (float)
Supported Methods:
- "latest": Most recent value
- "avg_7d": Average last 7 days
- "avg_30d": Average last 30 days
- "avg_90d": Average last 90 days
- "sum_7d": Sum last 7 days
- "sum_30d": Sum last 30 days
- "count_7d": Count last 7 days
- "count_30d": Count last 30 days
- "min_30d": Minimum last 30 days
- "max_30d": Maximum last 30 days
- "median_7d": Median last 7 days
- "median_30d": Median last 30 days
- "rolling_avg": Rolling average (window from kwargs)
- "percentile": Nth percentile (n from kwargs)
Example:
>>> values = [(date1, 85.0), (date2, 84.5), ...]
>>> aggregate_data(values, "avg_7d")
84.7
"""
from datetime import date, timedelta
import statistics
if not values:
return 0.0
# Sort by date (most recent first)
sorted_values = sorted(values, key=lambda x: x[0], reverse=True)
if method == "latest":
return float(sorted_values[0][1])
elif method.startswith("avg_"):
days = int(method.split("_")[1].replace("d", ""))
cutoff = date.today() - timedelta(days=days)
recent = [v for d, v in sorted_values if d >= cutoff]
return statistics.mean(recent) if recent else 0.0
elif method.startswith("sum_"):
days = int(method.split("_")[1].replace("d", ""))
cutoff = date.today() - timedelta(days=days)
recent = [v for d, v in sorted_values if d >= cutoff]
return sum(recent)
elif method.startswith("count_"):
days = int(method.split("_")[1].replace("d", ""))
cutoff = date.today() - timedelta(days=days)
return len([v for d, v in sorted_values if d >= cutoff])
elif method.startswith("min_") or method.startswith("max_"):
func_name, days_str = method.split("_")
days = int(days_str.replace("d", ""))
cutoff = date.today() - timedelta(days=days)
recent = [v for d, v in sorted_values if d >= cutoff]
if not recent:
return 0.0
return min(recent) if func_name == "min" else max(recent)
elif method.startswith("median_"):
days = int(method.split("_")[1].replace("d", ""))
cutoff = date.today() - timedelta(days=days)
recent = [v for d, v in sorted_values if d >= cutoff]
return statistics.median(recent) if recent else 0.0
elif method == "rolling_avg":
window = kwargs.get("window", 7)
if len(sorted_values) < window:
return statistics.mean([v for _, v in sorted_values])
recent = sorted_values[:window]
return statistics.mean([v for _, v in recent])
elif method == "percentile":
n = kwargs.get("n", 50) # Default: median
values_only = [v for _, v in sorted_values]
return statistics.quantiles(values_only, n=100)[n - 1] if len(values_only) > 1 else values_only[0]
else:
raise ValueError(f"Unknown aggregation method: {method}")
3. Korrelations-Funktionen (correlations.py)
Neue Korrelations-Analysen:
# backend/data_layer/correlations.py
def get_<metric_a>_<metric_b>_correlation(
profile_id: str,
days: int = 90,
max_lag: int = 7
) -> dict:
"""
Correlation between <metric_a> and <metric_b> with lag analysis.
Args:
profile_id: User profile ID
days: Analysis window
max_lag: Maximum lag in days to test
Returns:
{
"correlation": float, # Pearson r at best lag
"best_lag": int, # Days of lag
"p_value": float, # Statistical significance
"confidence": str,
"paired_points": int,
"interpretation": str # "strong"/"moderate"/"weak"/"none"
}
Interpretation:
|r| > 0.7: "strong"
|r| > 0.5: "moderate"
|r| > 0.3: "weak"
|r| <= 0.3: "none"
"""
# Implementation using scipy.stats or numpy
...
Neues Modul erstellen
Wann ein neues Modul?
Erstelle ein neues Modul wenn:
- ✅ Neue fachliche Domäne (z.B.,
stress_metrics.py,hormone_metrics.py) - ✅ Bestehendes Modul >500 Zeilen
- ✅ Klare thematische Trennung möglich
KEIN neues Modul wenn:
- ❌ Nur 1-2 Funktionen (füge zu bestehendem Modul hinzu)
- ❌ Starke Abhängigkeit zu bestehendem Modul (merge statt split)
Modul-Template
# backend/data_layer/<new_module>.py
"""
<Module Name> - <Brief description>
This module provides data functions for <domain>.
Functions:
- get_<metric1>_data()
- get_<metric2>_data()
- ...
Usage:
from data_layer.<new_module> import get_<metric>_data
data = get_<metric>_data(profile_id="123", days=28)
"""
from typing import Optional, List, Dict, Tuple
from datetime import date, timedelta
from db import get_db, get_cursor
# ── PUBLIC FUNCTIONS ─────────────────────────────────────────────
def get_<metric>_data(
profile_id: str,
days: int = 28,
**kwargs
) -> dict:
"""
[Docstring as per template above]
"""
...
# ── PRIVATE HELPERS ──────────────────────────────────────────────
def _calculate_<internal_metric>(values: list[float]) -> float:
"""
Internal helper for <module>.
NOT exported from module.
"""
...
def _validate_<data>(data: dict) -> bool:
"""
Internal validation helper.
"""
...
Exports in init.py
# backend/data_layer/__init__.py
# Existing modules
from .body_metrics import *
from .nutrition_metrics import *
from .activity_metrics import *
from .recovery_metrics import *
from .health_metrics import *
from .goals import *
from .correlations import *
from .utils import *
# NEW MODULE
from .<new_module> import *
__all__ = [
# Existing exports...
# NEW MODULE exports
'get_<metric1>_data',
'get_<metric2>_data',
]
Integration mit Goal Types
Goal Type mit neuer Aggregationsmethode
Scenario: Du hast eine neue Aggregationsmethode avg_per_week_30d implementiert.
1. In utils.py implementieren
# backend/data_layer/utils.py
def aggregate_data(values, method, **kwargs):
# ... existing methods ...
elif method == "avg_per_week_30d":
# Group by week, calculate average per week
from collections import defaultdict
weeks = defaultdict(list)
for d, v in values:
week_start = d - timedelta(days=d.weekday())
weeks[week_start].append(v)
week_avgs = [sum(vals) / len(vals) for vals in weeks.values()]
return sum(week_avgs) / len(week_avgs) if week_avgs else 0.0
# ...
2. In goal_utils.py nutzen
# backend/goal_utils.py
def _fetch_by_aggregation_method(
cur,
profile_id: str,
source_table: str,
source_column: str,
aggregation_method: str,
date_column: str = 'date',
filter_conditions: dict = None
) -> Optional[float]:
"""
Fetch current value using aggregation method.
Now supports:
- latest, avg_7d, avg_30d, sum_30d, count_7d, etc.
- avg_per_week_30d (NEW)
"""
# Fetch data
cur.execute(f"""
SELECT {date_column}, {source_column}
FROM {source_table}
WHERE profile_id = %s
ORDER BY {date_column} DESC
LIMIT 100
""", (profile_id,))
rows = cur.fetchall()
if not rows:
return None
# Use aggregate_data from utils
from data_layer.utils import aggregate_data
return aggregate_data(rows, aggregation_method)
3. In Frontend verfügbar machen
// frontend/src/pages/AdminGoalTypesPage.jsx
const AGGREGATION_METHODS = [
{ value: 'latest', label: 'Aktuellster Wert' },
{ value: 'avg_7d', label: 'Durchschnitt 7 Tage' },
{ value: 'avg_30d', label: 'Durchschnitt 30 Tage' },
{ value: 'sum_30d', label: 'Summe 30 Tage' },
{ value: 'avg_per_week_30d', label: 'Durchschnitt pro Woche (30d)' }, // NEW
// ...
]
Testing-Strategie
Unit Tests für neue Funktionen
# backend/tests/test_data_layer.py
import pytest
from data_layer.<module> import get_<metric>_data
@pytest.fixture
def test_profile_with_data(db_connection):
"""Create test profile with sample data"""
# Setup
profile_id = "test_profile_123"
# Insert test data into relevant tables
...
yield profile_id
# Teardown
...
def test_get_metric_data_sufficient(test_profile_with_data):
"""Test with sufficient data points"""
data = get_<metric>_data(test_profile_with_data, days=28)
assert data['confidence'] in ['high', 'medium', 'low']
assert data['data_points'] >= 18
assert '<field>' in data
assert isinstance(data['<field>'], float)
def test_get_metric_data_insufficient():
"""Test with insufficient data"""
data = get_<metric>_data("no_data_profile", days=28)
assert data['confidence'] == 'insufficient'
assert data['data_points'] == 0
def test_get_metric_data_edge_cases(test_profile_with_data):
"""Test edge cases: outliers, missing values, etc."""
# Test with extreme values
# Test with gaps in data
# Test with all same values
...
def test_get_metric_data_parameters(test_profile_with_data):
"""Test different parameter combinations"""
# Test different days values
for days in [7, 28, 90]:
data = get_<metric>_data(test_profile_with_data, days=days)
assert data is not None
# Test additional parameters
data = get_<metric>_data(test_profile_with_data, days=28, goal_mode="strength")
assert data is not None
Integration Tests
# backend/tests/test_charts_integration.py
def test_chart_uses_data_layer(client, auth_token):
"""Test that chart endpoint uses data layer correctly"""
response = client.get(
"/api/charts/<metric>",
headers={"X-Auth-Token": auth_token}
)
assert response.status_code == 200
data = response.json()
# Verify Chart.js structure
assert 'chart_type' in data
assert 'data' in data
assert 'metadata' in data
# Verify metadata includes confidence
assert 'confidence' in data['metadata']
Performance Considerations
1. Query Optimization
Problem: N+1 Queries
# ❌ BAD:
for goal_id in goal_ids:
cur.execute("SELECT * FROM goals WHERE id = %s", (goal_id,))
# ... process each goal ...
# ✅ GOOD:
cur.execute("SELECT * FROM goals WHERE id = ANY(%s)", (goal_ids,))
Problem: Unindexed Columns
-- Add index if querying frequently by date range
CREATE INDEX IF NOT EXISTS idx_weight_log_profile_date
ON weight_log(profile_id, date DESC);
2. Caching
For expensive calculations:
from functools import lru_cache
@lru_cache(maxsize=128)
def get_expensive_calculation(profile_id: str, days: int) -> dict:
"""Cache results for 128 most recent calls"""
...
Note: In-memory cache resets on restart. For persistent cache → Redis (later).
3. Pagination
For large datasets:
def get_<metric>_data(
profile_id: str,
days: int = 28,
limit: int = 1000,
offset: int = 0
) -> dict:
"""
Paginated data retrieval.
"""
cur.execute("""
SELECT ...
FROM ...
WHERE profile_id = %s
ORDER BY date DESC
LIMIT %s OFFSET %s
""", (profile_id, limit, offset))
Checkliste: Neue Funktion
[ ] Richtiges Modul gewählt (oder neues Modul erstellt)
[ ] Funktion implementiert mit korrekter Signatur
[ ] Docstring vollständig (Args, Returns, Example)
[ ] Confidence calculation included
[ ] Returns structured data (dict with primitives)
[ ] NO formatting (no strings with units)
[ ] Decimal → Float conversion wo nötig
[ ] Safe dict access (.get() mit defaults)
[ ] SQL parameter binding (keine String-Concatenation)
[ ] Unit tests geschrieben (sufficient/insufficient/edge cases)
[ ] Integration test geschrieben (wenn Chart/API endpoint)
[ ] Performance geprüft (< 500ms)
[ ] In __init__.py exportiert
[ ] Dokumentation aktualisiert (CLAUDE.md)
[ ] Commit mit aussagekräftiger Message
Häufige Fehler
1. Vergessen Confidence zu berechnen
# ❌ WRONG:
return {"value": result}
# ✅ CORRECT:
from data_layer.utils import calculate_confidence
confidence = calculate_confidence(len(rows), days, "general")
return {"value": result, "confidence": confidence, "data_points": len(rows)}
2. Formatierung im Data Layer
# ❌ WRONG (Data Layer):
return {"slope": f"{slope:.2f} kg/Woche"}
# ✅ CORRECT (Data Layer):
return {"slope": 0.23} # Just the number
# ✅ FORMATTING (KI Layer):
return f"{data['slope']:.2f} kg/Woche"
3. Hardcoded Thresholds
# ❌ WRONG:
if len(rows) < 18: # Magic number
return {"confidence": "insufficient"}
# ✅ CORRECT:
confidence = calculate_confidence(len(rows), days, "general")
if confidence == "insufficient":
return {"confidence": "insufficient", ...}
Support & Hilfe
Bei Fragen:
- Lies PLACEHOLDER_DEVELOPMENT_GUIDE.md
- Prüfe bestehende Funktionen als Beispiel
- Frag im Team oder erstelle Gitea Issue
Debugging:
- Unit Test schreiben
- Print intermediate results
- Check SQL query mit
EXPLAIN ANALYZE - Profile mit
cProfilewenn Performance-Problem
Autor: Claude Sonnet 4.5 Version: 1.0 Letzte Aktualisierung: 28. März 2026