mitai-jinkendo/backend/routers/user_restrictions.py
Lars a849d5db9e
All checks were successful
Deploy Development / deploy (push) Successful in 56s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: add admin management routers for subscription system
Five new admin routers:

1. routers/features.py
   - GET/POST/PUT/DELETE /api/features
   - Feature registry CRUD
   - Allows adding new limitable features without schema changes

2. routers/tiers_mgmt.py
   - GET/POST/PUT/DELETE /api/tiers
   - Subscription tier management
   - Price configuration, sort order

3. routers/tier_limits.py
   - GET /api/tier-limits - Complete Tier x Feature matrix
   - PUT /api/tier-limits - Update single limit
   - PUT /api/tier-limits/batch - Batch update
   - DELETE /api/tier-limits - Remove limit (fallback to default)
   - Matrix editor backend

4. routers/user_restrictions.py
   - GET/POST/PUT/DELETE /api/user-restrictions
   - User-specific feature overrides
   - Highest priority in access hierarchy
   - Includes reason field for documentation

5. routers/access_grants.py
   - GET /api/access-grants - List grants with filters
   - POST /api/access-grants - Manual grant creation
   - PUT /api/access-grants/{id} - Extend/pause grants
   - DELETE /api/access-grants/{id} - Revoke access
   - Activity logging

All endpoints require admin authentication.
Completes backend API for v9c Phase 2.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 13:09:33 +01:00

141 lines
4.3 KiB
Python

"""
User Restrictions Management Endpoints for Mitai Jinkendo
Admin-only user-specific feature overrides.
"""
from fastapi import APIRouter, HTTPException, Depends
from db import get_db, get_cursor, r2d
from auth import require_admin
router = APIRouter(prefix="/api/user-restrictions", tags=["user-restrictions"])
@router.get("")
def list_user_restrictions(profile_id: str = None, session: dict = Depends(require_admin)):
"""
Admin: List user restrictions.
Optional query param: ?profile_id=... (filter by user)
"""
with get_db() as conn:
cur = get_cursor(conn)
if profile_id:
cur.execute("""
SELECT
ur.*,
f.name as feature_name,
f.category as feature_category,
p.name as profile_name
FROM user_feature_restrictions ur
JOIN features f ON f.id = ur.feature_id
JOIN profiles p ON p.id = ur.profile_id
WHERE ur.profile_id = %s
ORDER BY f.category, f.name
""", (profile_id,))
else:
cur.execute("""
SELECT
ur.*,
f.name as feature_name,
f.category as feature_category,
p.name as profile_name,
p.email as profile_email
FROM user_feature_restrictions ur
JOIN features f ON f.id = ur.feature_id
JOIN profiles p ON p.id = ur.profile_id
ORDER BY p.name, f.category, f.name
""")
return [r2d(r) for r in cur.fetchall()]
@router.post("")
def create_user_restriction(data: dict, session: dict = Depends(require_admin)):
"""
Admin: Create user-specific feature restriction.
Body:
{
"profile_id": "uuid",
"feature_id": "weight_entries",
"limit_value": 10, // NULL = unlimited, 0 = disabled
"reason": "Spam prevention"
}
"""
profile_id = data.get('profile_id')
feature_id = data.get('feature_id')
limit_value = data.get('limit_value')
reason = data.get('reason', '')
if not profile_id or not feature_id:
raise HTTPException(400, "profile_id und feature_id fehlen")
with get_db() as conn:
cur = get_cursor(conn)
# Check if restriction already exists
cur.execute("""
SELECT id FROM user_feature_restrictions
WHERE profile_id = %s AND feature_id = %s
""", (profile_id, feature_id))
if cur.fetchone():
raise HTTPException(400, "Einschränkung existiert bereits (nutze PUT zum Aktualisieren)")
# Create restriction
cur.execute("""
INSERT INTO user_feature_restrictions (
profile_id, feature_id, limit_value, reason, created_by
)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
""", (profile_id, feature_id, limit_value, reason, session['profile_id']))
restriction_id = cur.fetchone()['id']
conn.commit()
return {"ok": True, "id": restriction_id}
@router.put("/{restriction_id}")
def update_user_restriction(restriction_id: str, data: dict, session: dict = Depends(require_admin)):
"""Admin: Update user restriction."""
with get_db() as conn:
cur = get_cursor(conn)
updates = []
values = []
if 'limit_value' in data:
updates.append('limit_value = %s')
values.append(data['limit_value'])
if 'reason' in data:
updates.append('reason = %s')
values.append(data['reason'])
if not updates:
return {"ok": True}
updates.append('updated = CURRENT_TIMESTAMP')
values.append(restriction_id)
cur.execute(
f"UPDATE user_feature_restrictions SET {', '.join(updates)} WHERE id = %s",
values
)
conn.commit()
return {"ok": True}
@router.delete("/{restriction_id}")
def delete_user_restriction(restriction_id: str, session: dict = Depends(require_admin)):
"""Admin: Delete user restriction (reverts to tier limit)."""
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("DELETE FROM user_feature_restrictions WHERE id = %s", (restriction_id,))
conn.commit()
return {"ok": True}