Phase 2 Complete - Backend Refactoring: - Extracted all endpoints to dedicated router modules - main.py: 1878 → 75 lines (-96% reduction) - Created modular structure for maintainability Router Structure (60 endpoints total): ├── auth.py - 7 endpoints (login, logout, password reset) ├── profiles.py - 7 endpoints (CRUD + current user) ├── weight.py - 5 endpoints (tracking + stats) ├── circumference.py - 4 endpoints (body measurements) ├── caliper.py - 4 endpoints (skinfold tracking) ├── activity.py - 6 endpoints (workouts + Apple Health import) ├── nutrition.py - 4 endpoints (diet + FDDB import) ├── photos.py - 3 endpoints (progress photos) ├── insights.py - 8 endpoints (AI analysis + pipeline) ├── prompts.py - 2 endpoints (AI prompt management) ├── admin.py - 7 endpoints (user management) ├── stats.py - 1 endpoint (dashboard stats) ├── exportdata.py - 3 endpoints (CSV/JSON/ZIP export) └── importdata.py - 1 endpoint (ZIP import) Core modules maintained: - db.py: PostgreSQL connection + helpers - auth.py: Auth functions (hash, verify, sessions) - models.py: 11 Pydantic models Benefits: - Self-contained modules with clear responsibilities - Easier to navigate and modify specific features - Improved code organization and readability - 100% functional compatibility maintained - All syntax checks passed Updated CLAUDE.md with new architecture documentation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
158 lines
5.3 KiB
Python
158 lines
5.3 KiB
Python
"""
|
|
Admin Management Endpoints for Mitai Jinkendo
|
|
|
|
Handles user management, permissions, and email testing (admin-only).
|
|
"""
|
|
import os
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_admin, hash_pin
|
|
from models import AdminProfileUpdate
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
|
|
|
|
|
@router.get("/profiles")
|
|
def admin_list_profiles(session: dict=Depends(require_admin)):
|
|
"""Admin: List all profiles with stats."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM profiles ORDER BY created")
|
|
profs = [r2d(r) for r in cur.fetchall()]
|
|
|
|
for p in profs:
|
|
pid = p['id']
|
|
cur.execute("SELECT COUNT(*) as count FROM weight_log WHERE profile_id=%s", (pid,))
|
|
p['weight_count'] = cur.fetchone()['count']
|
|
cur.execute("SELECT COUNT(*) as count FROM ai_insights WHERE profile_id=%s", (pid,))
|
|
p['ai_insights_count'] = cur.fetchone()['count']
|
|
|
|
today = datetime.now().date().isoformat()
|
|
cur.execute("SELECT call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
|
|
usage = cur.fetchone()
|
|
p['ai_usage_today'] = usage['call_count'] if usage else 0
|
|
|
|
return profs
|
|
|
|
|
|
@router.put("/profiles/{pid}")
|
|
def admin_update_profile(pid: str, data: AdminProfileUpdate, session: dict=Depends(require_admin)):
|
|
"""Admin: Update profile settings."""
|
|
with get_db() as conn:
|
|
updates = {k:v for k,v in data.model_dump().items() if v is not None}
|
|
if not updates:
|
|
return {"ok": True}
|
|
|
|
cur = get_cursor(conn)
|
|
cur.execute(f"UPDATE profiles SET {', '.join(f'{k}=%s' for k in updates)} WHERE id=%s",
|
|
list(updates.values()) + [pid])
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.put("/profiles/{pid}/permissions")
|
|
def admin_set_permissions(pid: str, data: dict, session: dict=Depends(require_admin)):
|
|
"""Admin: Set profile permissions."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
updates = []
|
|
values = []
|
|
if 'ai_enabled' in data:
|
|
updates.append('ai_enabled=%s')
|
|
values.append(data['ai_enabled'])
|
|
if 'ai_limit_day' in data:
|
|
updates.append('ai_limit_day=%s')
|
|
values.append(data['ai_limit_day'])
|
|
if 'export_enabled' in data:
|
|
updates.append('export_enabled=%s')
|
|
values.append(data['export_enabled'])
|
|
if 'role' in data:
|
|
updates.append('role=%s')
|
|
values.append(data['role'])
|
|
|
|
if updates:
|
|
cur.execute(f"UPDATE profiles SET {', '.join(updates)} WHERE id=%s", values + [pid])
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.put("/profiles/{pid}/email")
|
|
def admin_set_email(pid: str, data: dict, session: dict=Depends(require_admin)):
|
|
"""Admin: Set profile email."""
|
|
email = data.get('email', '').strip().lower()
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("UPDATE profiles SET email=%s WHERE id=%s", (email if email else None, pid))
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.put("/profiles/{pid}/pin")
|
|
def admin_set_pin(pid: str, data: dict, session: dict=Depends(require_admin)):
|
|
"""Admin: Set profile PIN/password."""
|
|
new_pin = data.get('pin', '')
|
|
if len(new_pin) < 4:
|
|
raise HTTPException(400, "PIN/Passwort muss mind. 4 Zeichen haben")
|
|
|
|
new_hash = hash_pin(new_pin)
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.get("/email/status")
|
|
def admin_email_status(session: dict=Depends(require_admin)):
|
|
"""Admin: Check email configuration status."""
|
|
smtp_host = os.getenv("SMTP_HOST")
|
|
smtp_user = os.getenv("SMTP_USER")
|
|
smtp_pass = os.getenv("SMTP_PASS")
|
|
app_url = os.getenv("APP_URL", "http://localhost:3002")
|
|
|
|
configured = bool(smtp_host and smtp_user and smtp_pass)
|
|
|
|
return {
|
|
"configured": configured,
|
|
"smtp_host": smtp_host or "",
|
|
"smtp_user": smtp_user or "",
|
|
"app_url": app_url
|
|
}
|
|
|
|
|
|
@router.post("/email/test")
|
|
def admin_test_email(data: dict, session: dict=Depends(require_admin)):
|
|
"""Admin: Send test email."""
|
|
email = data.get('to', '')
|
|
if not email:
|
|
raise HTTPException(400, "E-Mail-Adresse fehlt")
|
|
|
|
try:
|
|
smtp_host = os.getenv("SMTP_HOST")
|
|
smtp_port = int(os.getenv("SMTP_PORT", 587))
|
|
smtp_user = os.getenv("SMTP_USER")
|
|
smtp_pass = os.getenv("SMTP_PASS")
|
|
smtp_from = os.getenv("SMTP_FROM")
|
|
|
|
if not smtp_host or not smtp_user or not smtp_pass:
|
|
raise HTTPException(500, "SMTP nicht konfiguriert")
|
|
|
|
msg = MIMEText("Dies ist eine Test-E-Mail von Mitai Jinkendo.")
|
|
msg['Subject'] = "Test-E-Mail"
|
|
msg['From'] = smtp_from
|
|
msg['To'] = email
|
|
|
|
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
|
server.starttls()
|
|
server.login(smtp_user, smtp_pass)
|
|
server.send_message(msg)
|
|
|
|
return {"ok": True, "message": f"Test-E-Mail an {email} gesendet"}
|
|
except Exception as e:
|
|
raise HTTPException(500, f"Fehler beim Senden: {str(e)}")
|