mitai-jinkendo/backend/routers/admin.py
Lars 01c0d1745f
All checks were successful
Deploy Development / deploy (push) Successful in 56s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: implement merge_missing_catalog_widgets function to enhance dashboard layout
- Added the `merge_missing_catalog_widgets` function to append missing widget IDs from the catalog to the dashboard layout while preserving the existing order.
- Updated the admin and app dashboard routes to utilize the new function, ensuring that new catalog entries are visible without requiring users to reset their layouts.
- Enhanced tests to validate the functionality of the new merging logic, ensuring proper integration with existing layouts.
- Bumped application version to reflect these changes.
2026-04-22 08:38:38 +02:00

300 lines
10 KiB
Python

"""
Admin Management Endpoints for Mitai Jinkendo
Handles user management, permissions, and email testing (admin-only).
"""
import os
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from typing import Any
from fastapi import APIRouter, HTTPException, Depends
from db import get_db, get_cursor, r2d
from auth import require_admin, hash_pin
from models import AdminProfileUpdate
from dashboard_layout_schema import (
ALLOWED_WIDGET_IDS,
DashboardLayoutPayload,
merge_missing_catalog_widgets,
product_default_layout_dict,
)
from dashboard_widget_entitlements import widgets_catalog_admin_payload
from widget_catalog import WIDGET_CATALOG
from widget_feature_requirements_db import (
clear_custom_requirements,
fetch_assignments_bundle,
set_custom_requirements,
)
from system_dashboard_product_default import (
delete_product_default_override,
get_product_default_base_dict,
get_stored_product_default_validated,
upsert_product_default_base,
)
router = APIRouter(prefix="/api/admin", tags=["admin"])
@router.get("/profiles")
def admin_list_profiles(session: dict=Depends(require_admin)):
"""Admin: List all profiles with stats."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT * FROM profiles ORDER BY created")
profs = [r2d(r) for r in cur.fetchall()]
for p in profs:
pid = p['id']
cur.execute("SELECT COUNT(*) as count FROM weight_log WHERE profile_id=%s", (pid,))
p['weight_count'] = cur.fetchone()['count']
cur.execute("SELECT COUNT(*) as count FROM ai_insights WHERE profile_id=%s", (pid,))
p['ai_insights_count'] = cur.fetchone()['count']
today = datetime.now().date().isoformat()
cur.execute("SELECT call_count FROM ai_usage WHERE profile_id=%s AND date=%s", (pid, today))
usage = cur.fetchone()
p['ai_usage_today'] = usage['call_count'] if usage else 0
return profs
@router.put("/profiles/{pid}")
def admin_update_profile(pid: str, data: AdminProfileUpdate, session: dict=Depends(require_admin)):
"""Admin: Update profile settings."""
with get_db() as conn:
updates = {k:v for k,v in data.model_dump().items() if v is not None}
if not updates:
return {"ok": True}
cur = get_cursor(conn)
cur.execute(f"UPDATE profiles SET {', '.join(f'{k}=%s' for k in updates)} WHERE id=%s",
list(updates.values()) + [pid])
return {"ok": True}
@router.put("/profiles/{pid}/permissions")
def admin_set_permissions(pid: str, data: dict, session: dict=Depends(require_admin)):
"""Admin: Set profile permissions."""
with get_db() as conn:
cur = get_cursor(conn)
updates = []
values = []
if 'ai_enabled' in data:
updates.append('ai_enabled=%s')
values.append(data['ai_enabled'])
if 'ai_limit_day' in data:
updates.append('ai_limit_day=%s')
values.append(data['ai_limit_day'])
if 'export_enabled' in data:
updates.append('export_enabled=%s')
values.append(data['export_enabled'])
if 'role' in data:
updates.append('role=%s')
values.append(data['role'])
if updates:
cur.execute(f"UPDATE profiles SET {', '.join(updates)} WHERE id=%s", values + [pid])
return {"ok": True}
@router.put("/profiles/{pid}/email")
def admin_set_email(pid: str, data: dict, session: dict=Depends(require_admin)):
"""Admin: Set profile email."""
email = data.get('email', '').strip().lower()
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("UPDATE profiles SET email=%s WHERE id=%s", (email if email else None, pid))
return {"ok": True}
@router.put("/profiles/{pid}/pin")
def admin_set_pin(pid: str, data: dict, session: dict=Depends(require_admin)):
"""Admin: Set profile PIN/password."""
new_pin = data.get('pin', '')
if len(new_pin) < 4:
raise HTTPException(400, "PIN/Passwort muss mind. 4 Zeichen haben")
new_hash = hash_pin(new_pin)
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("UPDATE profiles SET pin_hash=%s WHERE id=%s", (new_hash, pid))
return {"ok": True}
@router.get("/email/status")
def admin_email_status(session: dict=Depends(require_admin)):
"""Admin: Check email configuration status."""
smtp_host = os.getenv("SMTP_HOST")
smtp_user = os.getenv("SMTP_USER")
smtp_pass = os.getenv("SMTP_PASS")
app_url = os.getenv("APP_URL", "http://localhost:3002")
configured = bool(smtp_host and smtp_user and smtp_pass)
return {
"configured": configured,
"smtp_host": smtp_host or "",
"smtp_user": smtp_user or "",
"app_url": app_url
}
@router.post("/email/test")
def admin_test_email(data: dict, session: dict=Depends(require_admin)):
"""Admin: Send test email."""
email = data.get('to', '')
if not email:
raise HTTPException(400, "E-Mail-Adresse fehlt")
try:
smtp_host = os.getenv("SMTP_HOST")
smtp_port = int(os.getenv("SMTP_PORT", 587))
smtp_user = os.getenv("SMTP_USER")
smtp_pass = os.getenv("SMTP_PASS")
smtp_from = os.getenv("SMTP_FROM")
if not smtp_host or not smtp_user or not smtp_pass:
raise HTTPException(500, "SMTP nicht konfiguriert")
msg = MIMEText("Dies ist eine Test-E-Mail von Mitai Jinkendo.")
msg['Subject'] = "Test-E-Mail"
msg['From'] = smtp_from
msg['To'] = email
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
return {"ok": True, "message": f"Test-E-Mail an {email} gesendet"}
except Exception as e:
raise HTTPException(500, f"Fehler beim Senden: {str(e)}")
@router.get("/widgets/catalog-full")
def admin_widgets_catalog_full(session: dict = Depends(require_admin)):
"""Dashboard-Widget-Katalog ohne Feature-Filter (Konfiguration des Produkt-Standards)."""
_ = session
return widgets_catalog_admin_payload()
@router.get("/dashboard-product-default")
def admin_get_dashboard_product_default(session: dict = Depends(require_admin)):
"""Aktueller Produkt-Dashboard-Standard (DB oder Code)."""
_ = session
with get_db() as conn:
layout = merge_missing_catalog_widgets(get_product_default_base_dict(conn))
from_database = get_stored_product_default_validated(conn) is not None
code_ref = product_default_layout_dict()
return {
"from_database": from_database,
"layout": layout,
"code_reference": code_ref,
}
@router.put("/dashboard-product-default")
def admin_put_dashboard_product_default(
body: dict[str, Any],
session: dict = Depends(require_admin),
):
"""System-Standard persistieren (JSON wie Nutzer-Layout v1)."""
_ = session
try:
payload = DashboardLayoutPayload.model_validate(body)
except Exception as e:
raise HTTPException(422, str(e)) from e
stored = payload.to_stored_dict()
with get_db() as conn:
upsert_product_default_base(conn, stored)
return {"ok": True, "layout": stored, "from_database": True}
@router.delete("/dashboard-product-default")
def admin_delete_dashboard_product_default(session: dict = Depends(require_admin)):
"""DB-Override entfernen; App fällt auf Code-Standard zurück."""
_ = session
with get_db() as conn:
delete_product_default_override(conn)
layout = merge_missing_catalog_widgets(get_product_default_base_dict(conn))
return {"ok": True, "layout": layout, "from_database": False}
@router.get("/widget-feature-assignments")
def admin_list_widget_feature_assignments(session: dict = Depends(require_admin)):
"""Alle Katalog-Widgets mit Custom-/Katalog-Feature-Listen zur Admin-Pflege."""
_ = session
with get_db() as conn:
custom_ids, by_w = fetch_assignments_bundle(conn)
cur = get_cursor(conn)
cur.execute(
"""
SELECT id, name, category, active
FROM features
ORDER BY category NULLS LAST, name
"""
)
features = [r2d(r) for r in cur.fetchall()]
widgets_out: list[dict[str, Any]] = []
for e in WIDGET_CATALOG:
wid = e["id"]
cf = e.get("requires_feature")
code_fids = [cf] if cf else []
uses_custom = wid in custom_ids
widgets_out.append(
{
"id": wid,
"title": e["title"],
"description": e["description"],
"uses_custom_requirements": uses_custom,
"feature_ids": list(by_w.get(wid, [])) if uses_custom else [],
"catalog_feature_ids": code_fids,
}
)
return {"widgets": widgets_out, "features": features}
@router.put("/widget-feature-assignments/{widget_id}")
def admin_put_widget_feature_assignment(
widget_id: str,
body: dict[str, Any],
session: dict = Depends(require_admin),
):
"""
mode=catalog: Katalog-Fallback (requires_feature im Code).
mode=custom: AND über feature_ids (leere Liste = Widget ohne Feature-Gate).
"""
_ = session
if widget_id not in ALLOWED_WIDGET_IDS:
raise HTTPException(404, "Unbekanntes Widget")
mode = body.get("mode", "custom")
if mode == "catalog":
with get_db() as conn:
clear_custom_requirements(conn, widget_id)
return {"ok": True, "widget_id": widget_id, "uses_custom_requirements": False, "feature_ids": []}
if mode != "custom":
raise HTTPException(422, "mode muss catalog oder custom sein")
raw_ids = body.get("feature_ids")
if not isinstance(raw_ids, list):
raise HTTPException(422, "feature_ids muss Liste sein")
cleaned = [str(x).strip() for x in raw_ids if x is not None and str(x).strip()]
with get_db() as conn:
cur = get_cursor(conn)
for fid in cleaned:
cur.execute("SELECT id FROM features WHERE id = %s", (fid,))
if not cur.fetchone():
raise HTTPException(422, f"Unbekanntes Feature: {fid}")
set_custom_requirements(conn, widget_id, cleaned)
return {
"ok": True,
"widget_id": widget_id,
"uses_custom_requirements": True,
"feature_ids": cleaned,
}