mitai-jinkendo/.claude/docs/technical/AGGREGATION_METHODS.md
Lars 7940dc7560 docs: Struktur .claude/docs versionieren, working/, Gitea-Index, Regeln
- .gitignore: .claude/docs, rules, commands tracken; settings.local weiter ignorieren
- DOCUMENTATION.md: verbindliche Ablage functional/technical/working/issues
- .claude/README.md: Agent-Einstieg; GITEA_ISSUES_INDEX aus MCP (Stand 2026-04-08)
- Arbeitspapiere von docs/ nach .claude/docs/working/ verschoben
- docs/MEMBERSHIP_SYSTEM.md als Stub; kanonisch technical/MEMBERSHIP_SYSTEM.md
- CLAUDE.md Pflichtlektüre und Links angepasst; docs/README.md vereinfacht

Made-with: Cursor
2026-04-08 13:01:49 +02:00

11 KiB
Raw Permalink Blame History

Aggregation Methods Goal Value Calculation

Zweck: Dokumentation für Entwicklung und Erweiterung von Aggregationsmethoden im Goal-System.

Datum: 2026-03-28 Version: 1.0 Modul: backend/goal_utils.py_fetch_by_aggregation_method()


Übersicht

Aggregationsmethoden berechnen den current_value von Goals aus Rohdaten (z.B. Trainings, Gewicht, Ernährung). Sie sind der Kern des dynamischen Goal-Tracking-Systems.

Beispiel:

Goal: "Trainingshäufigkeit Krafttraining"
  source_table: activity_log
  source_column: id  (nur für COUNT relevant)
  aggregation_method: avg_per_week_30d
  filter_conditions: {"training_category": "strength"}

 Berechnet: Durchschnittliche Anzahl Krafttrainings pro Woche (über 30 Tage)

Architektur

1. Wo sind Methoden definiert?

Datei: backend/goal_utils.py Funktion: _fetch_by_aggregation_method(conn, profile_id, table, column, method, filter_conditions)

Aufruf-Hierarchie:

goal_utils.fetch_goal_value()
  └─> _fetch_by_aggregation_method()
      └─> SQL Query mit method-spezifischer Logik

2. Verfügbare Methoden (Stand: 2026-03-28)

Methode Beschreibung SQL Aggregat Zeitfenster Use Case
latest Aktuellster Wert SELECT {column} ORDER BY date DESC LIMIT 1 Gewicht, Körperfett, VO2max
avg_7d 7-Tage-Durchschnitt AVG({column}) 7 Tage Durchschn. Ruhepuls, HRV
avg_30d 30-Tage-Durchschnitt AVG({column}) 30 Tage Durchschn. Kalorien, Protein
sum_30d 30-Tage-Summe SUM({column}) 30 Tage Gesamtkalorien, Trainingsminuten
count_7d Anzahl Einträge (7d) COUNT(*) 7 Tage Trainings letzte Woche
count_30d Anzahl Einträge (30d) COUNT(*) 30 Tage Trainings letzter Monat
min_30d Minimum (30d) MIN({column}) 30 Tage Niedrigster Ruhepuls
max_30d Maximum (30d) MAX({column}) 30 Tage Höchster VO2max
avg_per_week_30d Durchschn. pro Woche COUNT(*) / 4.3 30 Tage Trainingsfrequenz/Woche

3. Filter-Mechanismus

Alle Methoden unterstützen optionale Filter via filter_conditions (JSON):

filter_conditions = {"training_category": "strength"}

# Wird zu SQL:
# ... WHERE profile_id = %s AND training_category = %s

Unterstützte Filter-Typen:

  • Equality: {"column": "value"}WHERE column = 'value'
  • IN-Clause: {"column": ["val1", "val2"]}WHERE column IN ('val1', 'val2')

Neue Aggregationsmethode hinzufügen

Schritt 1: Anforderungen definieren

Checkliste:

  • Name: Eindeutig, beschreibend (z.B. avg_per_week_30d)
  • SQL-Aggregat: Welche Funktion? (COUNT, AVG, SUM, MIN, MAX, oder Custom)
  • Zeitfenster: Fixed (7d, 30d) oder dynamisch?
  • Spaltentyp: Numerisch (DECIMAL, INT) oder UUID/TEXT (nur COUNT)?
  • Filter-Support: Ja/Nein?
  • Return-Typ: float oder None

Schritt 2: Code-Template

Location: backend/goal_utils.py_fetch_by_aggregation_method()

elif method == 'neue_methode':
    # 1. Zeitfenster definieren (falls relevant)
    days_ago = date.today() - timedelta(days=30)

    # 2. Parameter vorbereiten (inkl. filter_params)
    params = [profile_id, days_ago] + filter_params

    # 3. SQL Query (mit date_col und filter_sql)
    cur.execute(f"""
        SELECT AGG_FUNCTION({column}) as result_value
        FROM {table}
        WHERE profile_id = %s
          AND {date_col} >= %s
          AND {column} IS NOT NULL{filter_sql}
    """, params)

    # 4. Result extrahieren und konvertieren
    row = cur.fetchone()
    return float(row['result_value']) if row and row['result_value'] is not None else None

Schritt 3: Spaltentyp-Validierung

Wichtig: Nur numerische Aggregationen (AVG, SUM, MIN, MAX) auf numerischen Spalten!

Spaltentypen:

  • AVG/SUM/MIN/MAX: DECIMAL, INT, FLOAT
  • AVG/SUM/MIN/MAX: UUID, TEXT, VARCHAR
  • COUNT: Beliebiger Typ (UUID, TEXT, etc.)

Bei Fehlkonfiguration:

# Wird automatisch geloggt + None returned (siehe except-Block Zeile 414-430)
[ERROR] Failed to fetch value from activity_log.id using avg_7d:
        function avg(uuid) does not exist

Schritt 4: Testen

Manueller Test:

from goal_utils import _fetch_by_aggregation_method
from db import get_db

with get_db() as conn:
    result = _fetch_by_aggregation_method(
        conn,
        profile_id='...',
        table='activity_log',
        column='id',
        method='avg_per_week_30d',
        filter_conditions={"training_category": "strength"}
    )
    print(f"Result: {result}")

Unit-Test (TODO):

# backend/tests/test_goal_utils.py
def test_avg_per_week_30d():
    # Setup: Insert 12 activities in last 30 days
    # Expected: 12 / 4.3 ≈ 2.79
    assert result == pytest.approx(2.79, abs=0.1)

Beispiel-Implementierung: avg_per_week_30d

Use Case: Trainingshäufigkeit pro Woche (geglättet über 30 Tage)

Berechnung: (Anzahl Trainings in 30 Tagen) / 4.3 Wochen

Code:

elif method == 'avg_per_week_30d':
    days_ago = date.today() - timedelta(days=30)
    params = [profile_id, days_ago] + filter_params
    cur.execute(f"""
        SELECT COUNT(*) as count_value FROM {table}
        WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
    """, params)
    row = cur.fetchone()
    if row and row['count_value'] is not None:
        # 30 Tage = 4.285 Wochen (30/7)
        return round(float(row['count_value']) / 4.285, 2)
    return None

Warum 4.285?

  • 30 Tage ÷ 7 Tage/Woche = 4.285 Wochen
  • Alternativ: 4.3 (gerundet) für einfachere Rechnung

Best Practices

1. Naming Conventions

Pattern: {aggregat}_{spalte}_{zeitfenster}

  • avg_hr_7d Average heart rate, 7 days
  • count_per_week_30d Count per week, averaged over 30 days
  • sum_calories_30d Sum of calories, 30 days
  • get_training_count Unklar, kein Zeitfenster
  • calc_average Zu generisch

2. Return-Werte

Konsistenz:

  • Erfolg: float (auch bei 0.0)
  • Keine Daten: None (nicht 0.0!)
  • Fehler: None (geloggt im except-Block)

Warum None statt 0.0?

# None = "Keine Daten vorhanden"
# 0.0 = "Gemessen, aber Wert ist tatsächlich 0"

3. Date-Columns

Nicht alle Tabellen nutzen date als Spaltenname:

DATE_COLUMN_MAP = {
    'blood_pressure_log': 'measured_at',  # TIMESTAMP
    'activity_log': 'date',               # DATE
    'fitness_tests': 'test_date',         # DATE
    # ... siehe goal_utils.py Zeile 289-300
}

Nutzung: date_col = DATE_COLUMN_MAP.get(table, 'date')

4. Filter-Safety

SQL-Injection-Schutz:

  • Parametrisierte Queries: WHERE col = %s + params
  • String-Interpolation: WHERE col = '{value}'

Filter-Validierung:

try:
    filters = json.loads(filter_conditions) if isinstance(filter_conditions, str) else filter_conditions
    # ... build filter_sql
except (json.JSONDecodeError, TypeError, AttributeError) as e:
    print(f"[WARNING] Invalid filter_conditions: {e}, ignoring filters")

5. Performance

Query-Optimierung:

  • WHERE profile_id = %s ist immer erste Bedingung (Index)
  • AND {column} IS NOT NULL vor Aggregation (reduziert NULL-Handling)
  • ORDER BY {date_col} DESC LIMIT 1 für latest (schneller als MAX)

Erweiterte Methoden (Future)

Statistische Analysen

Median:

elif method == 'median_30d':
    # PostgreSQL: PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY column)
    cur.execute(f"""
        SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY {column}) as median_value
        FROM {table}
        WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
    """, params)

Standard Deviation:

elif method == 'stddev_30d':
    cur.execute(f"""
        SELECT STDDEV({column}) as stddev_value FROM {table}
        WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
    """, params)

Trend (Linear Regression):

elif method == 'trend_30d':
    # Slope via REGR_SLOPE(y, x)
    cur.execute(f"""
        SELECT REGR_SLOPE(
            {column},
            EXTRACT(EPOCH FROM {date_col})
        ) as slope FROM {table}
        WHERE profile_id = %s AND {date_col} >= %s{filter_sql}
    """, params)

Kalenderwoche

elif method == 'count_calendar_week':
    # Montag der aktuellen Woche
    today = date.today()
    monday = today - timedelta(days=today.weekday())

    cur.execute(f"""
        SELECT COUNT(*) as count_value FROM {table}
        WHERE profile_id = %s
          AND {date_col} >= %s
          AND {date_col} < %s + INTERVAL '7 days'{filter_sql}
    """, [profile_id, monday] + filter_params)

Fehlerbehandlung

Exception-Handling

Alle Methoden sind wrapped in try-except (Zeile 329-430):

try:
    # ... method logic
except Exception as e:
    print(f"[ERROR] Failed to fetch value from {table}.{column} using {method}: {e}")
    print(f"[ERROR] Filter conditions: {filter_conditions}")

    # CRITICAL: Rollback transaction
    conn.rollback()

    return None

Warum Rollback?

  • PostgreSQL bleibt in InFailedSqlTransaction bis Rollback
  • Ohne Rollback: Alle nachfolgenden Queries schlagen fehl

Typische Fehler

Fehler Ursache Lösung
function avg(uuid) does not exist AVG auf UUID-Spalte Methode auf count_* ändern
column "xyz" does not exist Falsche source_column Schema prüfen, Spalte korrigieren
division by zero Keine Daten für Durchschnitt None-Check vor Division
UndefinedColumn: training_category Filter-Spalte existiert nicht Filter entfernen oder Spalte anlegen

Migration zu neuer Methode

Szenario: Bestehende Goal-Type-Definition ändern

Beispiel: sport_pro_woche von avg_7d zu avg_per_week_30d

SQL:

UPDATE goal_type_definitions
SET aggregation_method = 'avg_per_week_30d'
WHERE type_key = 'sport_pro_woche';

Wichtig:

  • Bestehende Goals behalten ihre current_value (historisch)
  • Nächste Berechnung nutzt neue Methode
  • UI zeigt dann neuen Wert

Dokumentations-Pflicht

Bei jeder neuen Methode:

  1. Eintrag in dieser Datei (Tabelle "Verfügbare Methoden")
  2. Docstring in _fetch_by_aggregation_method()
  3. Beispiel-Anwendung (Use Case)
  4. Unit-Test (wenn möglich)
  5. Update in goal_types.py Schema-Info (falls relevant für Admin-UI)

Zusammenfassung

Aggregationsmethoden sind:

  • Zentral in goal_utils.py
  • SQL-basiert (PostgreSQL-Funktionen)
  • Filter-fähig (JSON-basiert)
  • Error-safe (Rollback + None-Return)
  • Erweiterbar (einfaches elif-Pattern)

Für neue Methoden:

  1. Name definieren ({aggregat}_{zeitfenster})
  2. SQL Query schreiben (mit filter_sql)
  3. Testen (manuell + Unit-Test)
  4. Dokumentieren (diese Datei)

Bei Fragen:

  • Siehe backend/goal_utils.py Zeile 259-430
  • Siehe bestehende Methoden als Template
  • Siehe .claude/docs/working/GOALS_SYSTEM_UNIFIED_ANALYSIS.md für Kontext