mitai-jinkendo/backend/routers/importdata.py
Lars b4a1856f79
All checks were successful
Deploy Development / deploy (push) Successful in 58s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
refactor: modular backend architecture with 14 router modules
Phase 2 Complete - Backend Refactoring:
- Extracted all endpoints to dedicated router modules
- main.py: 1878 → 75 lines (-96% reduction)
- Created modular structure for maintainability

Router Structure (60 endpoints total):
├── auth.py          - 7 endpoints (login, logout, password reset)
├── profiles.py      - 7 endpoints (CRUD + current user)
├── weight.py        - 5 endpoints (tracking + stats)
├── circumference.py - 4 endpoints (body measurements)
├── caliper.py       - 4 endpoints (skinfold tracking)
├── activity.py      - 6 endpoints (workouts + Apple Health import)
├── nutrition.py     - 4 endpoints (diet + FDDB import)
├── photos.py        - 3 endpoints (progress photos)
├── insights.py      - 8 endpoints (AI analysis + pipeline)
├── prompts.py       - 2 endpoints (AI prompt management)
├── admin.py         - 7 endpoints (user management)
├── stats.py         - 1 endpoint (dashboard stats)
├── exportdata.py    - 3 endpoints (CSV/JSON/ZIP export)
└── importdata.py    - 1 endpoint (ZIP import)

Core modules maintained:
- db.py: PostgreSQL connection + helpers
- auth.py: Auth functions (hash, verify, sessions)
- models.py: 11 Pydantic models

Benefits:
- Self-contained modules with clear responsibilities
- Easier to navigate and modify specific features
- Improved code organization and readability
- 100% functional compatibility maintained
- All syntax checks passed

Updated CLAUDE.md with new architecture documentation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 11:15:35 +01:00

268 lines
13 KiB
Python

"""
Data Import Endpoints for Mitai Jinkendo
Handles ZIP import with validation and rollback support.
"""
import os
import csv
import io
import json
import uuid
import zipfile
from pathlib import Path
from typing import Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException, UploadFile, File, Header, Depends
from db import get_db, get_cursor
from auth import require_auth
from routers.profiles import get_pid
router = APIRouter(prefix="/api/import", tags=["import"])
PHOTOS_DIR = Path(os.getenv("PHOTOS_DIR", "./photos"))
@router.post("/zip")
async def import_zip(
file: UploadFile = File(...),
x_profile_id: Optional[str] = Header(default=None),
session: dict = Depends(require_auth)
):
"""
Import data from ZIP export file.
- Validates export format
- Imports missing entries only (ON CONFLICT DO NOTHING)
- Imports photos
- Returns import summary
- Full rollback on error
"""
pid = get_pid(x_profile_id)
# Read uploaded file
content = await file.read()
zip_buffer = io.BytesIO(content)
try:
with zipfile.ZipFile(zip_buffer, 'r') as zf:
# 1. Validate profile.json
if 'profile.json' not in zf.namelist():
raise HTTPException(400, "Ungültiger Export: profile.json fehlt")
profile_data = json.loads(zf.read('profile.json').decode('utf-8'))
export_version = profile_data.get('export_version', '1')
# Stats tracker
stats = {
'weight': 0,
'circumferences': 0,
'caliper': 0,
'nutrition': 0,
'activity': 0,
'photos': 0,
'insights': 0
}
with get_db() as conn:
cur = get_cursor(conn)
try:
# 2. Import weight.csv
if 'data/weight.csv' in zf.namelist():
csv_data = zf.read('data/weight.csv').decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(csv_data), delimiter=';')
for row in reader:
cur.execute("""
INSERT INTO weight_log (profile_id, date, weight, note, source, created)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (profile_id, date) DO NOTHING
""", (
pid,
row['date'],
float(row['weight']) if row['weight'] else None,
row.get('note', ''),
row.get('source', 'import'),
row.get('created', datetime.now())
))
if cur.rowcount > 0:
stats['weight'] += 1
# 3. Import circumferences.csv
if 'data/circumferences.csv' in zf.namelist():
csv_data = zf.read('data/circumferences.csv').decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(csv_data), delimiter=';')
for row in reader:
cur.execute("""
INSERT INTO circumference_log (
profile_id, date, c_waist, c_hip, c_chest, c_neck,
c_arm, c_thigh, c_calf, notes, created
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (profile_id, date) DO NOTHING
""", (
pid,
row['date'],
float(row['waist']) if row.get('waist') else None,
float(row['hip']) if row.get('hip') else None,
float(row['chest']) if row.get('chest') else None,
float(row['neck']) if row.get('neck') else None,
float(row['upper_arm']) if row.get('upper_arm') else None,
float(row['thigh']) if row.get('thigh') else None,
float(row['calf']) if row.get('calf') else None,
row.get('note', ''),
row.get('created', datetime.now())
))
if cur.rowcount > 0:
stats['circumferences'] += 1
# 4. Import caliper.csv
if 'data/caliper.csv' in zf.namelist():
csv_data = zf.read('data/caliper.csv').decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(csv_data), delimiter=';')
for row in reader:
cur.execute("""
INSERT INTO caliper_log (
profile_id, date, sf_chest, sf_abdomen, sf_thigh,
sf_triceps, sf_subscap, sf_suprailiac, sf_axilla,
sf_method, body_fat_pct, notes, created
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (profile_id, date) DO NOTHING
""", (
pid,
row['date'],
float(row['chest']) if row.get('chest') else None,
float(row['abdomen']) if row.get('abdomen') else None,
float(row['thigh']) if row.get('thigh') else None,
float(row['tricep']) if row.get('tricep') else None,
float(row['subscapular']) if row.get('subscapular') else None,
float(row['suprailiac']) if row.get('suprailiac') else None,
float(row['midaxillary']) if row.get('midaxillary') else None,
row.get('method', 'jackson3'),
float(row['bf_percent']) if row.get('bf_percent') else None,
row.get('note', ''),
row.get('created', datetime.now())
))
if cur.rowcount > 0:
stats['caliper'] += 1
# 5. Import nutrition.csv
if 'data/nutrition.csv' in zf.namelist():
csv_data = zf.read('data/nutrition.csv').decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(csv_data), delimiter=';')
for row in reader:
cur.execute("""
INSERT INTO nutrition_log (
profile_id, date, kcal, protein_g, fat_g, carbs_g, source, created
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (profile_id, date) DO NOTHING
""", (
pid,
row['date'],
float(row['kcal']) if row.get('kcal') else None,
float(row['protein']) if row.get('protein') else None,
float(row['fat']) if row.get('fat') else None,
float(row['carbs']) if row.get('carbs') else None,
row.get('source', 'import'),
row.get('created', datetime.now())
))
if cur.rowcount > 0:
stats['nutrition'] += 1
# 6. Import activity.csv
if 'data/activity.csv' in zf.namelist():
csv_data = zf.read('data/activity.csv').decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(csv_data), delimiter=';')
for row in reader:
cur.execute("""
INSERT INTO activity_log (
profile_id, date, activity_type, duration_min,
kcal_active, hr_avg, hr_max, distance_km, notes, source, created
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
pid,
row['date'],
row.get('type', 'Training'),
float(row['duration_min']) if row.get('duration_min') else None,
float(row['kcal']) if row.get('kcal') else None,
float(row['heart_rate_avg']) if row.get('heart_rate_avg') else None,
float(row['heart_rate_max']) if row.get('heart_rate_max') else None,
float(row['distance_km']) if row.get('distance_km') else None,
row.get('note', ''),
row.get('source', 'import'),
row.get('created', datetime.now())
))
if cur.rowcount > 0:
stats['activity'] += 1
# 7. Import ai_insights.json
if 'insights/ai_insights.json' in zf.namelist():
insights_data = json.loads(zf.read('insights/ai_insights.json').decode('utf-8'))
for insight in insights_data:
cur.execute("""
INSERT INTO ai_insights (profile_id, scope, content, created)
VALUES (%s, %s, %s, %s)
""", (
pid,
insight['scope'],
insight['result'],
insight.get('created', datetime.now())
))
stats['insights'] += 1
# 8. Import photos
photo_files = [f for f in zf.namelist() if f.startswith('photos/') and not f.endswith('/')]
for photo_file in photo_files:
# Extract date from filename (format: YYYY-MM-DD_N.jpg)
filename = Path(photo_file).name
parts = filename.split('_')
photo_date = parts[0] if len(parts) > 0 else datetime.now().strftime('%Y-%m-%d')
# Generate new ID and path
photo_id = str(uuid.uuid4())
ext = Path(filename).suffix
new_filename = f"{photo_id}{ext}"
target_path = PHOTOS_DIR / new_filename
# Check if photo already exists for this date
cur.execute("""
SELECT id FROM photos
WHERE profile_id = %s AND date = %s
""", (pid, photo_date))
if cur.fetchone() is None:
# Write photo file
with open(target_path, 'wb') as f:
f.write(zf.read(photo_file))
# Insert DB record
cur.execute("""
INSERT INTO photos (id, profile_id, date, path, created)
VALUES (%s, %s, %s, %s, %s)
""", (photo_id, pid, photo_date, new_filename, datetime.now()))
stats['photos'] += 1
# Commit transaction
conn.commit()
except Exception as e:
# Rollback on any error
conn.rollback()
raise HTTPException(500, f"Import fehlgeschlagen: {str(e)}")
return {
"ok": True,
"message": "Import erfolgreich",
"stats": stats,
"total": sum(stats.values())
}
except zipfile.BadZipFile:
raise HTTPException(400, "Ungültige ZIP-Datei")
except Exception as e:
raise HTTPException(500, f"Import-Fehler: {str(e)}")