mitai-jinkendo/.claude/docs/technical/DATA_LAYER_EXTENSION_GUIDE.md
Lars 7940dc7560 docs: Struktur .claude/docs versionieren, working/, Gitea-Index, Regeln
- .gitignore: .claude/docs, rules, commands tracken; settings.local weiter ignorieren
- DOCUMENTATION.md: verbindliche Ablage functional/technical/working/issues
- .claude/README.md: Agent-Einstieg; GITEA_ISSUES_INDEX aus MCP (Stand 2026-04-08)
- Arbeitspapiere von docs/ nach .claude/docs/working/ verschoben
- docs/MEMBERSHIP_SYSTEM.md als Stub; kanonisch technical/MEMBERSHIP_SYSTEM.md
- CLAUDE.md Pflichtlektüre und Links angepasst; docs/README.md vereinfacht

Made-with: Cursor
2026-04-08 13:01:49 +02:00

20 KiB
Raw Blame History

Data Layer Extension Guide

Version: 1.0 Erstellt: 28. März 2026 Zielgruppe: Entwickler, Claude Code Phase: Post Phase 0c


Überblick

Dieser Guide beschreibt, wie man das Data Layer System erweitert mit:

  • Neuen Modulen
  • Neuen Funktionen in bestehenden Modulen
  • Neuen Berechnungslogiken
  • Neuen Aggregationsmethoden

Voraussetzung: Phase 0c abgeschlossen (Multi-Layer Architecture implementiert)


Modul-Struktur

Bestehende Module (Phase 0c)

backend/data_layer/
├── __init__.py              # Exports all functions
├── body_metrics.py          # Gewicht, FM, LBM, Umfänge
├── nutrition_metrics.py     # Kalorien, Protein, Makros
├── activity_metrics.py      # Training, Volumen, Abilities
├── recovery_metrics.py      # Sleep, RHR, HRV, Recovery Score
├── health_metrics.py        # BP, VO2Max, Health Stability
├── goals.py                 # Active goals, progress
├── correlations.py          # Lag-analysis, plateau detection
└── utils.py                 # Shared: confidence, baseline, outliers

Modul-Namenskonventionen

  • Singular: body_metrics.py (nicht bodies_metrics.py)
  • Domain-focused: Ein Modul pro fachlichem Bereich
  • Max ~500 Zeilen: Bei >500 Zeilen → Split erwägen

Neue Funktion hinzufügen

Template

# backend/data_layer/<module>.py

def get_<metric>_data(
    profile_id: str,
    days: int = 28,
    **kwargs
) -> dict:
    """
    [Eine Zeile: Was liefert diese Funktion?]

    [Optional: Ausführliche Beschreibung der Berechnung]

    Args:
        profile_id: User profile ID
        days: Analysis window (default 28)
        **kwargs: Additional parameters (z.B., goal_mode)

    Returns:
        {
            "<field>": <type>,      # Main result
            "confidence": str,      # REQUIRED: "high"/"medium"/"low"/"insufficient"
            "data_points": int,     # REQUIRED: Number of data points used
            "<additional>": <type>  # Any additional data
        }

    Confidence Rules:
        - "high":          >= X points
        - "medium":        >= Y points
        - "low":           >= Z points
        - "insufficient":  < Z points

    Example:
        >>> data = get_<metric>_data("profile_123", days=28)
        >>> print(data['<field>'])
        42.0
    """
    with get_db() as conn:
        cur = get_cursor(conn)

        # 1. DATA RETRIEVAL
        cur.execute("""
            SELECT ...
            FROM ...
            WHERE profile_id = %s
              AND date >= NOW() - INTERVAL '%s days'
            ORDER BY date
        """, (profile_id, days))
        rows = cur.fetchall()

        # 2. CONFIDENCE CALCULATION
        from data_layer.utils import calculate_confidence
        confidence = calculate_confidence(
            data_points=len(rows),
            days_requested=days,
            metric_type="general"  # or "correlation" or "trend"
        )

        # 3. EARLY RETURN IF INSUFFICIENT
        if confidence == 'insufficient':
            return {
                "confidence": "insufficient",
                "data_points": len(rows),
                # Include all fields with safe defaults
                "<field>": 0.0,
            }

        # 4. CALCULATION
        # ... your logic here ...

        # 5. RETURN STRUCTURED DATA
        return {
            "<field>": result,
            "confidence": confidence,
            "data_points": len(rows),
            # Additional fields as needed
        }

Pflicht-Felder

Jede Funktion MUSS zurückgeben:

{
    "confidence": str,      # "high" | "medium" | "low" | "insufficient"
    "data_points": int,     # Anzahl verwendeter Datenpunkte
}

Warum?

  • Confidence: UI kann User warnen bei niedriger Datenqualität
  • Data Points: Debugging + Monitoring

Optionale Felder (Best Practices)

{
    "first_date": date,     # Ältester Datenpunkt
    "last_date": date,      # Neuester Datenpunkt
    "avg": float,           # Durchschnitt
    "std_dev": float,       # Standardabweichung
    "min": float,           # Minimum
    "max": float,           # Maximum
    "outliers": list[int],  # Indices von Ausreißern
}

Neue Berechnungslogik hinzufügen

1. Statistik-Funktionen (utils.py)

Wenn du eine neue statistische Berechnung brauchst:

# backend/data_layer/utils.py

def calculate_<statistic>(
    values: list[float],
    **kwargs
) -> float:
    """
    [Beschreibung der Statistik]

    Args:
        values: List of measurements
        **kwargs: Additional parameters

    Returns:
        Calculated statistic (float)

    Example:
        >>> calculate_<statistic>([1.0, 2.0, 3.0])
        2.0
    """
    # Implementation
    ...

Beispiele:

def calculate_median_absolute_deviation(values: list[float]) -> float:
    """
    MAD = median(|xi - median(x)|)

    More robust than standard deviation for outlier detection.
    """
    import statistics
    median = statistics.median(values)
    deviations = [abs(x - median) for x in values]
    return statistics.median(deviations)


def calculate_coefficient_of_variation(values: list[float]) -> float:
    """
    CV = (std_dev / mean) * 100

    Measures relative variability.
    """
    import statistics
    mean = statistics.mean(values)
    std_dev = statistics.stdev(values)
    return (std_dev / mean) * 100 if mean != 0 else 0.0


def calculate_z_score(value: float, mean: float, std_dev: float) -> float:
    """
    Z = (x - μ) / σ

    Standardized score.
    """
    return (value - mean) / std_dev if std_dev != 0 else 0.0

2. Aggregations-Funktionen (utils.py)

Neue Aggregationsmethoden für Goal Types:

# backend/data_layer/utils.py

def aggregate_data(
    values: list[tuple],  # [(date, value), ...]
    method: str,
    **kwargs
) -> float:
    """
    Aggregate data points using specified method.

    Args:
        values: List of (date, value) tuples
        method: Aggregation method (see below)
        **kwargs: Method-specific parameters

    Returns:
        Aggregated value (float)

    Supported Methods:
        - "latest":       Most recent value
        - "avg_7d":       Average last 7 days
        - "avg_30d":      Average last 30 days
        - "avg_90d":      Average last 90 days
        - "sum_7d":       Sum last 7 days
        - "sum_30d":      Sum last 30 days
        - "count_7d":     Count last 7 days
        - "count_30d":    Count last 30 days
        - "min_30d":      Minimum last 30 days
        - "max_30d":      Maximum last 30 days
        - "median_7d":    Median last 7 days
        - "median_30d":   Median last 30 days
        - "rolling_avg":  Rolling average (window from kwargs)
        - "percentile":   Nth percentile (n from kwargs)

    Example:
        >>> values = [(date1, 85.0), (date2, 84.5), ...]
        >>> aggregate_data(values, "avg_7d")
        84.7
    """
    from datetime import date, timedelta
    import statistics

    if not values:
        return 0.0

    # Sort by date (most recent first)
    sorted_values = sorted(values, key=lambda x: x[0], reverse=True)

    if method == "latest":
        return float(sorted_values[0][1])

    elif method.startswith("avg_"):
        days = int(method.split("_")[1].replace("d", ""))
        cutoff = date.today() - timedelta(days=days)
        recent = [v for d, v in sorted_values if d >= cutoff]
        return statistics.mean(recent) if recent else 0.0

    elif method.startswith("sum_"):
        days = int(method.split("_")[1].replace("d", ""))
        cutoff = date.today() - timedelta(days=days)
        recent = [v for d, v in sorted_values if d >= cutoff]
        return sum(recent)

    elif method.startswith("count_"):
        days = int(method.split("_")[1].replace("d", ""))
        cutoff = date.today() - timedelta(days=days)
        return len([v for d, v in sorted_values if d >= cutoff])

    elif method.startswith("min_") or method.startswith("max_"):
        func_name, days_str = method.split("_")
        days = int(days_str.replace("d", ""))
        cutoff = date.today() - timedelta(days=days)
        recent = [v for d, v in sorted_values if d >= cutoff]
        if not recent:
            return 0.0
        return min(recent) if func_name == "min" else max(recent)

    elif method.startswith("median_"):
        days = int(method.split("_")[1].replace("d", ""))
        cutoff = date.today() - timedelta(days=days)
        recent = [v for d, v in sorted_values if d >= cutoff]
        return statistics.median(recent) if recent else 0.0

    elif method == "rolling_avg":
        window = kwargs.get("window", 7)
        if len(sorted_values) < window:
            return statistics.mean([v for _, v in sorted_values])
        recent = sorted_values[:window]
        return statistics.mean([v for _, v in recent])

    elif method == "percentile":
        n = kwargs.get("n", 50)  # Default: median
        values_only = [v for _, v in sorted_values]
        return statistics.quantiles(values_only, n=100)[n - 1] if len(values_only) > 1 else values_only[0]

    else:
        raise ValueError(f"Unknown aggregation method: {method}")

3. Korrelations-Funktionen (correlations.py)

Neue Korrelations-Analysen:

# backend/data_layer/correlations.py

def get_<metric_a>_<metric_b>_correlation(
    profile_id: str,
    days: int = 90,
    max_lag: int = 7
) -> dict:
    """
    Correlation between <metric_a> and <metric_b> with lag analysis.

    Args:
        profile_id: User profile ID
        days: Analysis window
        max_lag: Maximum lag in days to test

    Returns:
        {
            "correlation": float,       # Pearson r at best lag
            "best_lag": int,            # Days of lag
            "p_value": float,           # Statistical significance
            "confidence": str,
            "paired_points": int,
            "interpretation": str       # "strong"/"moderate"/"weak"/"none"
        }

    Interpretation:
        |r| > 0.7: "strong"
        |r| > 0.5: "moderate"
        |r| > 0.3: "weak"
        |r| <= 0.3: "none"
    """
    # Implementation using scipy.stats or numpy
    ...

Neues Modul erstellen

Wann ein neues Modul?

Erstelle ein neues Modul wenn:

  • Neue fachliche Domäne (z.B., stress_metrics.py, hormone_metrics.py)
  • Bestehendes Modul >500 Zeilen
  • Klare thematische Trennung möglich

KEIN neues Modul wenn:

  • Nur 1-2 Funktionen (füge zu bestehendem Modul hinzu)
  • Starke Abhängigkeit zu bestehendem Modul (merge statt split)

Modul-Template

# backend/data_layer/<new_module>.py

"""
<Module Name> - <Brief description>

This module provides data functions for <domain>.

Functions:
    - get_<metric1>_data()
    - get_<metric2>_data()
    - ...

Usage:
    from data_layer.<new_module> import get_<metric>_data

    data = get_<metric>_data(profile_id="123", days=28)
"""

from typing import Optional, List, Dict, Tuple
from datetime import date, timedelta
from db import get_db, get_cursor


# ── PUBLIC FUNCTIONS ─────────────────────────────────────────────

def get_<metric>_data(
    profile_id: str,
    days: int = 28,
    **kwargs
) -> dict:
    """
    [Docstring as per template above]
    """
    ...


# ── PRIVATE HELPERS ──────────────────────────────────────────────

def _calculate_<internal_metric>(values: list[float]) -> float:
    """
    Internal helper for <module>.

    NOT exported from module.
    """
    ...


def _validate_<data>(data: dict) -> bool:
    """
    Internal validation helper.
    """
    ...

Exports in init.py

# backend/data_layer/__init__.py

# Existing modules
from .body_metrics import *
from .nutrition_metrics import *
from .activity_metrics import *
from .recovery_metrics import *
from .health_metrics import *
from .goals import *
from .correlations import *
from .utils import *

# NEW MODULE
from .<new_module> import *

__all__ = [
    # Existing exports...

    # NEW MODULE exports
    'get_<metric1>_data',
    'get_<metric2>_data',
]

Integration mit Goal Types

Goal Type mit neuer Aggregationsmethode

Scenario: Du hast eine neue Aggregationsmethode avg_per_week_30d implementiert.

1. In utils.py implementieren

# backend/data_layer/utils.py

def aggregate_data(values, method, **kwargs):
    # ... existing methods ...

    elif method == "avg_per_week_30d":
        # Group by week, calculate average per week
        from collections import defaultdict
        weeks = defaultdict(list)

        for d, v in values:
            week_start = d - timedelta(days=d.weekday())
            weeks[week_start].append(v)

        week_avgs = [sum(vals) / len(vals) for vals in weeks.values()]
        return sum(week_avgs) / len(week_avgs) if week_avgs else 0.0

    # ...

2. In goal_utils.py nutzen

# backend/goal_utils.py

def _fetch_by_aggregation_method(
    cur,
    profile_id: str,
    source_table: str,
    source_column: str,
    aggregation_method: str,
    date_column: str = 'date',
    filter_conditions: dict = None
) -> Optional[float]:
    """
    Fetch current value using aggregation method.

    Now supports:
        - latest, avg_7d, avg_30d, sum_30d, count_7d, etc.
        - avg_per_week_30d (NEW)
    """
    # Fetch data
    cur.execute(f"""
        SELECT {date_column}, {source_column}
        FROM {source_table}
        WHERE profile_id = %s
        ORDER BY {date_column} DESC
        LIMIT 100
    """, (profile_id,))
    rows = cur.fetchall()

    if not rows:
        return None

    # Use aggregate_data from utils
    from data_layer.utils import aggregate_data
    return aggregate_data(rows, aggregation_method)

3. In Frontend verfügbar machen

// frontend/src/pages/AdminGoalTypesPage.jsx

const AGGREGATION_METHODS = [
  { value: 'latest', label: 'Aktuellster Wert' },
  { value: 'avg_7d', label: 'Durchschnitt 7 Tage' },
  { value: 'avg_30d', label: 'Durchschnitt 30 Tage' },
  { value: 'sum_30d', label: 'Summe 30 Tage' },
  { value: 'avg_per_week_30d', label: 'Durchschnitt pro Woche (30d)' }, // NEW
  // ...
]

Testing-Strategie

Unit Tests für neue Funktionen

# backend/tests/test_data_layer.py

import pytest
from data_layer.<module> import get_<metric>_data

@pytest.fixture
def test_profile_with_data(db_connection):
    """Create test profile with sample data"""
    # Setup
    profile_id = "test_profile_123"
    # Insert test data into relevant tables
    ...
    yield profile_id
    # Teardown
    ...


def test_get_metric_data_sufficient(test_profile_with_data):
    """Test with sufficient data points"""
    data = get_<metric>_data(test_profile_with_data, days=28)

    assert data['confidence'] in ['high', 'medium', 'low']
    assert data['data_points'] >= 18
    assert '<field>' in data
    assert isinstance(data['<field>'], float)


def test_get_metric_data_insufficient():
    """Test with insufficient data"""
    data = get_<metric>_data("no_data_profile", days=28)

    assert data['confidence'] == 'insufficient'
    assert data['data_points'] == 0


def test_get_metric_data_edge_cases(test_profile_with_data):
    """Test edge cases: outliers, missing values, etc."""
    # Test with extreme values
    # Test with gaps in data
    # Test with all same values
    ...


def test_get_metric_data_parameters(test_profile_with_data):
    """Test different parameter combinations"""
    # Test different days values
    for days in [7, 28, 90]:
        data = get_<metric>_data(test_profile_with_data, days=days)
        assert data is not None

    # Test additional parameters
    data = get_<metric>_data(test_profile_with_data, days=28, goal_mode="strength")
    assert data is not None

Integration Tests

# backend/tests/test_charts_integration.py

def test_chart_uses_data_layer(client, auth_token):
    """Test that chart endpoint uses data layer correctly"""
    response = client.get(
        "/api/charts/<metric>",
        headers={"X-Auth-Token": auth_token}
    )

    assert response.status_code == 200
    data = response.json()

    # Verify Chart.js structure
    assert 'chart_type' in data
    assert 'data' in data
    assert 'metadata' in data

    # Verify metadata includes confidence
    assert 'confidence' in data['metadata']

Performance Considerations

1. Query Optimization

Problem: N+1 Queries

# ❌ BAD:
for goal_id in goal_ids:
    cur.execute("SELECT * FROM goals WHERE id = %s", (goal_id,))
    # ... process each goal ...

# ✅ GOOD:
cur.execute("SELECT * FROM goals WHERE id = ANY(%s)", (goal_ids,))

Problem: Unindexed Columns

-- Add index if querying frequently by date range
CREATE INDEX IF NOT EXISTS idx_weight_log_profile_date
ON weight_log(profile_id, date DESC);

2. Caching

For expensive calculations:

from functools import lru_cache

@lru_cache(maxsize=128)
def get_expensive_calculation(profile_id: str, days: int) -> dict:
    """Cache results for 128 most recent calls"""
    ...

Note: In-memory cache resets on restart. For persistent cache → Redis (later).

3. Pagination

For large datasets:

def get_<metric>_data(
    profile_id: str,
    days: int = 28,
    limit: int = 1000,
    offset: int = 0
) -> dict:
    """
    Paginated data retrieval.
    """
    cur.execute("""
        SELECT ...
        FROM ...
        WHERE profile_id = %s
        ORDER BY date DESC
        LIMIT %s OFFSET %s
    """, (profile_id, limit, offset))

Checkliste: Neue Funktion

[ ] Richtiges Modul gewählt (oder neues Modul erstellt)
[ ] Funktion implementiert mit korrekter Signatur
[ ] Docstring vollständig (Args, Returns, Example)
[ ] Confidence calculation included
[ ] Returns structured data (dict with primitives)
[ ] NO formatting (no strings with units)
[ ] Decimal → Float conversion wo nötig
[ ] Safe dict access (.get() mit defaults)
[ ] SQL parameter binding (keine String-Concatenation)
[ ] Unit tests geschrieben (sufficient/insufficient/edge cases)
[ ] Integration test geschrieben (wenn Chart/API endpoint)
[ ] Performance geprüft (< 500ms)
[ ] In __init__.py exportiert
[ ] Dokumentation aktualisiert (CLAUDE.md)
[ ] Commit mit aussagekräftiger Message

Häufige Fehler

1. Vergessen Confidence zu berechnen

# ❌ WRONG:
return {"value": result}

# ✅ CORRECT:
from data_layer.utils import calculate_confidence
confidence = calculate_confidence(len(rows), days, "general")
return {"value": result, "confidence": confidence, "data_points": len(rows)}

2. Formatierung im Data Layer

# ❌ WRONG (Data Layer):
return {"slope": f"{slope:.2f} kg/Woche"}

# ✅ CORRECT (Data Layer):
return {"slope": 0.23}  # Just the number

# ✅ FORMATTING (KI Layer):
return f"{data['slope']:.2f} kg/Woche"

3. Hardcoded Thresholds

# ❌ WRONG:
if len(rows) < 18:  # Magic number
    return {"confidence": "insufficient"}

# ✅ CORRECT:
confidence = calculate_confidence(len(rows), days, "general")
if confidence == "insufficient":
    return {"confidence": "insufficient", ...}

Support & Hilfe

Bei Fragen:

  1. Lies PLACEHOLDER_DEVELOPMENT_GUIDE.md
  2. Prüfe bestehende Funktionen als Beispiel
  3. Frag im Team oder erstelle Gitea Issue

Debugging:

  1. Unit Test schreiben
  2. Print intermediate results
  3. Check SQL query mit EXPLAIN ANALYZE
  4. Profile mit cProfile wenn Performance-Problem

Autor: Claude Sonnet 4.5 Version: 1.0 Letzte Aktualisierung: 28. März 2026