Erste Version - Universal CSV Importer für EAV und activity_log #85

Merged
Lars merged 17 commits from develop into main 2026-04-15 11:46:31 +02:00
4 changed files with 189 additions and 26 deletions
Showing only changes of commit c6e8371d5a - Show all commits

View File

@ -9,9 +9,10 @@ import uuid
import logging
import re
import calendar
from datetime import date
from datetime import date, time as dt_time
from typing import Optional
from dateutil import parser as du_parser
from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends, Query
from db import get_db, get_cursor, r2d
@ -38,6 +39,43 @@ def _month_date_bounds(ym: str) -> tuple[date, date]:
last = calendar.monthrange(y, mo)[1]
return date(y, mo, 1), date(y, mo, last)
def _normalize_apple_health_start(start_raw: str) -> tuple[str, Optional[dt_time]]:
"""ISO/Apple-Export Start → (YYYY-MM-DD, TIME ohne μs) für stabile Dedupe + INSERT."""
s = (start_raw or "").strip()
if not s:
return "", None
try:
parsed = du_parser.parse(s, dayfirst=False)
t = parsed.time().replace(microsecond=0)
return parsed.date().isoformat(), t
except (ValueError, TypeError, OverflowError):
if len(s) >= 10:
return s[:10], None
return "", None
_ACTIVITY_DEDUP_WINDOW = """
PARTITION BY al.profile_id, al.date,
COALESCE(al.activity_type, ''),
COALESCE(al.start_time::text, ''),
COALESCE(ROUND(al.duration_min::numeric, 1), '-999999'::numeric),
COALESCE(ROUND(al.kcal_active::numeric, 1), '-999999'::numeric)
ORDER BY al.created DESC NULLS LAST, al.id DESC
"""
def _activity_rows_after_list_query(cur):
rows = []
for r in cur.fetchall():
d = r2d(r)
if not d:
continue
d.pop("_dup_rn", None)
rows.append(d)
return rows
# Evaluation import with error handling (Phase 1.2)
try:
from evaluation_helper import evaluate_and_save_activity
@ -61,6 +99,10 @@ def list_activity(
False,
description="True = alle Einträge des Profils (ohne quality_label-Filter). Für /activity Erfassung.",
),
collapse_duplicate_sessions: bool = Query(
False,
description="True = Sessions mit gleichem Datum/Typ/Startzeit/Dauer/Kcal falten (neueste Zeile behalten).",
),
session: dict = Depends(require_auth),
):
"""Get activity entries for current profile. Optional *days* filter by calendar window (not the same as *limit*)."""
@ -72,10 +114,12 @@ def list_activity(
# Issue #31: Qualitätsfilter — auf der Erfassungsseite /activity abschaltbar (skip_quality_filter)
if skip_quality_filter:
quality_filter = ""
quality_filter_al = ""
else:
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
profile = r2d(cur.fetchone())
quality_filter = get_quality_filter_sql(profile or {})
quality_filter = get_quality_filter_sql(profile or {}, "")
quality_filter_al = get_quality_filter_sql(profile or {}, "al.")
if month:
if days is not None:
@ -83,6 +127,25 @@ def list_activity(
if offset != 0:
raise HTTPException(status_code=400, detail="month und offset schließen sich aus")
d0, d1 = _month_date_bounds(month)
if collapse_duplicate_sessions:
cur.execute(
f"""
SELECT d.* FROM (
SELECT al.*, ROW_NUMBER() OVER (
{_ACTIVITY_DEDUP_WINDOW}
) AS _dup_rn
FROM activity_log al
WHERE al.profile_id = %s
{quality_filter_al}
AND al.date >= %s AND al.date <= %s
) d
WHERE d._dup_rn = 1
ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC
LIMIT %s
""",
(pid, d0, d1, limit),
)
return _activity_rows_after_list_query(cur)
cur.execute(
f"""
SELECT * FROM activity_log
@ -97,6 +160,25 @@ def list_activity(
return [r2d(r) for r in cur.fetchall()]
if days is not None:
if collapse_duplicate_sessions:
cur.execute(
f"""
SELECT d.* FROM (
SELECT al.*, ROW_NUMBER() OVER (
{_ACTIVITY_DEDUP_WINDOW}
) AS _dup_rn
FROM activity_log al
WHERE al.profile_id = %s
{quality_filter_al}
AND al.date >= (CURRENT_DATE - %s::integer)
) d
WHERE d._dup_rn = 1
ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC
LIMIT %s OFFSET %s
""",
(pid, days, limit, offset),
)
return _activity_rows_after_list_query(cur)
cur.execute(
f"""
SELECT * FROM activity_log
@ -109,6 +191,24 @@ def list_activity(
(pid, days, limit, offset),
)
else:
if collapse_duplicate_sessions:
cur.execute(
f"""
SELECT d.* FROM (
SELECT al.*, ROW_NUMBER() OVER (
{_ACTIVITY_DEDUP_WINDOW}
) AS _dup_rn
FROM activity_log al
WHERE al.profile_id = %s
{quality_filter_al}
) d
WHERE d._dup_rn = 1
ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC
LIMIT %s OFFSET %s
""",
(pid, limit, offset),
)
return _activity_rows_after_list_query(cur)
cur.execute(
f"""
SELECT * FROM activity_log
@ -230,22 +330,40 @@ def activity_stats(
else:
cur.execute("SELECT * FROM profiles WHERE id=%s", (pid,))
profile = r2d(cur.fetchone())
quality_filter = get_quality_filter_sql(profile or {})
quality_filter = get_quality_filter_sql(profile or {}, "")
cur.execute(
f"SELECT COUNT(*)::bigint AS c FROM activity_log WHERE profile_id=%s {quality_filter}",
(pid,),
)
total_in_profile = int(cur.fetchone()["c"])
cur.execute(
f"""
SELECT * FROM activity_log
WHERE profile_id=%s {quality_filter}
ORDER BY date DESC, start_time DESC NULLS LAST, id DESC
LIMIT 30
""",
(pid,),
)
rows = [r2d(r) for r in cur.fetchall()]
if skip_quality_filter:
cur.execute(
f"""
SELECT d.* FROM (
SELECT al.*, ROW_NUMBER() OVER (
{_ACTIVITY_DEDUP_WINDOW}
) AS _dup_rn
FROM activity_log al
WHERE al.profile_id = %s
) d
WHERE d._dup_rn = 1
ORDER BY d.date DESC, d.start_time DESC NULLS LAST, d.id DESC
LIMIT 30
""",
(pid,),
)
rows = _activity_rows_after_list_query(cur)
else:
cur.execute(
f"""
SELECT * FROM activity_log
WHERE profile_id=%s {quality_filter}
ORDER BY date DESC, start_time DESC NULLS LAST, id DESC
LIMIT 30
""",
(pid,),
)
rows = [r2d(r) for r in cur.fetchall()]
if not rows:
return {
"count": 0,
@ -543,9 +661,11 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
for row in reader:
wtype = row.get('Workout Type','').strip()
start = row.get('Start','').strip()
if not wtype or not start: continue
try: date = start[:10]
except: continue
if not wtype or not start:
continue
workout_date, workout_start_t = _normalize_apple_health_start(start)
if not workout_date:
continue
dur = row.get('Duration','').strip()
duration_min = None
if dur:
@ -563,11 +683,15 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
training_type_id, training_category, training_subcategory = get_training_type_for_activity(wtype, pid)
try:
# Check if entry already exists (duplicate detection by date + start_time)
cur.execute("""
# Duplicate detection: normiertes Datum + TIME (Apple-Export kann Start in verschiedenen Formaten liefern)
cur.execute(
"""
SELECT id FROM activity_log
WHERE profile_id = %s AND date = %s AND start_time = %s
""", (pid, date, start))
WHERE profile_id = %s AND date = %s::date
AND start_time IS NOT DISTINCT FROM %s::time
""",
(pid, workout_date, workout_start_t),
)
existing = cur.fetchone()
if existing:
@ -575,7 +699,8 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
existing_id = existing['id']
cur.execute("""
UPDATE activity_log
SET end_time = %s,
SET start_time = %s,
end_time = %s,
activity_type = %s,
duration_min = %s,
kcal_active = %s,
@ -588,7 +713,7 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
training_subcategory = %s
WHERE id = %s
""", (
row.get('End',''), wtype, duration_min,
workout_start_t, row.get('End',''), wtype, duration_min,
kj(row.get('Aktive Energie (kJ)','')),
kj(row.get('Ruheeinträge (kJ)','')),
tf(row.get('Durchschn. Herzfrequenz (count/min)','')),
@ -606,7 +731,7 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
activity_dict = {
"id": existing_id,
"profile_id": pid,
"date": date,
"date": workout_date,
"training_type_id": training_type_id,
"duration_min": duration_min,
"hr_avg": tf(row.get('Durchschn. Herzfrequenz (count/min)','')),
@ -630,7 +755,7 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
(id,profile_id,date,start_time,end_time,activity_type,duration_min,kcal_active,kcal_resting,
hr_avg,hr_max,distance_km,source,training_type_id,training_category,training_subcategory,created)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'apple_health',%s,%s,%s,CURRENT_TIMESTAMP)""",
(new_id,pid,date,start,row.get('End',''),wtype,duration_min,
(new_id,pid,workout_date,workout_start_t,row.get('End',''),wtype,duration_min,
kj(row.get('Aktive Energie (kJ)','')),kj(row.get('Ruheeinträge (kJ)','')),
tf(row.get('Durchschn. Herzfrequenz (count/min)','')),
tf(row.get('Max. Herzfrequenz (count/min)','')),
@ -645,7 +770,7 @@ async def import_activity_csv(file: UploadFile=File(...), x_profile_id: Optional
activity_dict = {
"id": new_id,
"profile_id": pid,
"date": date,
"date": workout_date,
"training_type_id": training_type_id,
"duration_min": duration_min,
"hr_avg": tf(row.get('Durchschn. Herzfrequenz (count/min)','')),

View File

@ -344,6 +344,7 @@ export default function ActivityPage() {
chain.map((ym) =>
api.listActivity(ACTIVITY_MONTH_FETCH_LIMIT, undefined, {
skipQualityFilter: true,
collapseDuplicateSessions: true,
month: ym,
})
)
@ -379,6 +380,7 @@ export default function ActivityPage() {
try {
const more = await api.listActivity(ACTIVITY_MONTH_FETCH_LIMIT, undefined, {
skipQualityFilter: true,
collapseDuplicateSessions: true,
month: prev,
})
const newChain = [...chain, prev]

View File

@ -141,13 +141,14 @@ export const api = {
/**
* @param {number} [limit=200]
* @param {number} [days] nur Einträge ab HEUTEdays (Kalendertage), backend-filtert
* @param {{ offset?: number, skipQualityFilter?: boolean, month?: string }} [opts] month = YYYY-MM (schließt days/offset aus)
* @param {{ offset?: number, skipQualityFilter?: boolean, month?: string, collapseDuplicateSessions?: boolean }} [opts] month = YYYY-MM (schließt days/offset aus)
*/
listActivity: (limit=200, days, opts={})=> {
const q = new URLSearchParams({ limit: String(limit) })
if (days != null && days !== '') q.set('days', String(days))
if (opts.month) q.set('month', String(opts.month))
if (opts.offset != null && opts.offset > 0) q.set('offset', String(opts.offset))
if (opts.collapseDuplicateSessions) q.set('collapse_duplicate_sessions', 'true')
if (opts.skipQualityFilter) q.set('skip_quality_filter', 'true')
return req(`/activity?${q}`)
},

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Volles PostgreSQL-Backup im Custom-Format (pg_restore-kompatibel).
# Auf dem Host ausführen, wo der Postgres-Container läuft (z. B. Raspberry Pi mit docker compose).
#
# BACKUP_DIR=/path/to/safe/storage ./scripts/backup/mitai_pg_dump.sh
#
# Variablen (optional): POSTGRES_CONTAINER, POSTGRES_DB, POSTGRES_USER, BACKUP_DIR
set -euo pipefail
CONTAINER="${POSTGRES_CONTAINER:-mitai-db-prod}"
DB="${POSTGRES_DB:-mitai_prod}"
USER="${POSTGRES_USER:-mitai_prod}"
OUT_DIR="${BACKUP_DIR:-.}"
STAMP="$(date +%Y%m%d_%H%M%S)"
mkdir -p "${OUT_DIR}"
OUT="${OUT_DIR}/${DB}_${STAMP}.dump"
if ! docker inspect "$CONTAINER" &>/dev/null; then
echo "Container nicht gefunden: $CONTAINER" >&2
exit 1
fi
docker exec "$CONTAINER" pg_dump -U "$USER" -Fc --no-owner --no-acl "$DB" > "$OUT"
ls -la "$OUT"
echo "OK: $OUT (zum Zurückspielen: siehe Kommentar unten in diesem Skript)"
# ── Restore (nur bei Notfall; Backend vorher stoppen, sonst offene Verbindungen) ──
# docker compose stop backend
# docker cp "$OUT" "$CONTAINER:/tmp/restore.dump"
# docker exec "$CONTAINER" pg_restore -U "$USER" -d "$DB" --clean --if-exists --no-owner --no-acl /tmp/restore.dump
# docker compose start backend
#
# Hinweis: --clean entfernt Objekte vor dem Wiederherstellen; kurze Unterbrechung der DB.
# Für „nur lesen“ Backup reicht die .dump-Datei auf externem Medium zu kopieren.