Five new admin routers:
1. routers/features.py
- GET/POST/PUT/DELETE /api/features
- Feature registry CRUD
- Allows adding new limitable features without schema changes
2. routers/tiers_mgmt.py
- GET/POST/PUT/DELETE /api/tiers
- Subscription tier management
- Price configuration, sort order
3. routers/tier_limits.py
- GET /api/tier-limits - Complete Tier x Feature matrix
- PUT /api/tier-limits - Update single limit
- PUT /api/tier-limits/batch - Batch update
- DELETE /api/tier-limits - Remove limit (fallback to default)
- Matrix editor backend
4. routers/user_restrictions.py
- GET/POST/PUT/DELETE /api/user-restrictions
- User-specific feature overrides
- Highest priority in access hierarchy
- Includes reason field for documentation
5. routers/access_grants.py
- GET /api/access-grants - List grants with filters
- POST /api/access-grants - Manual grant creation
- PUT /api/access-grants/{id} - Extend/pause grants
- DELETE /api/access-grants/{id} - Revoke access
- Activity logging
All endpoints require admin authentication.
Completes backend API for v9c Phase 2.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
193 lines
5.4 KiB
Python
193 lines
5.4 KiB
Python
"""
|
|
Access Grants Management Endpoints for Mitai Jinkendo
|
|
|
|
Admin-only access grants history and manual grant creation.
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_admin
|
|
|
|
router = APIRouter(prefix="/api/access-grants", tags=["access-grants"])
|
|
|
|
|
|
@router.get("")
|
|
def list_access_grants(
|
|
profile_id: str = None,
|
|
active_only: bool = False,
|
|
session: dict = Depends(require_admin)
|
|
):
|
|
"""
|
|
Admin: List access grants.
|
|
|
|
Query params:
|
|
- profile_id: Filter by user
|
|
- active_only: Only show currently active grants
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
query = """
|
|
SELECT
|
|
ag.*,
|
|
t.name as tier_name,
|
|
p.name as profile_name,
|
|
p.email as profile_email
|
|
FROM access_grants ag
|
|
JOIN tiers t ON t.id = ag.tier_id
|
|
JOIN profiles p ON p.id = ag.profile_id
|
|
"""
|
|
|
|
conditions = []
|
|
params = []
|
|
|
|
if profile_id:
|
|
conditions.append("ag.profile_id = %s")
|
|
params.append(profile_id)
|
|
|
|
if active_only:
|
|
conditions.append("ag.is_active = true")
|
|
conditions.append("ag.valid_until > CURRENT_TIMESTAMP")
|
|
|
|
if conditions:
|
|
query += " WHERE " + " AND ".join(conditions)
|
|
|
|
query += " ORDER BY ag.valid_until DESC"
|
|
|
|
cur.execute(query, params)
|
|
return [r2d(r) for r in cur.fetchall()]
|
|
|
|
|
|
@router.post("")
|
|
def create_access_grant(data: dict, session: dict = Depends(require_admin)):
|
|
"""
|
|
Admin: Manually create access grant.
|
|
|
|
Body:
|
|
{
|
|
"profile_id": "uuid",
|
|
"tier_id": "premium",
|
|
"duration_days": 30,
|
|
"reason": "Compensation for bug"
|
|
}
|
|
"""
|
|
profile_id = data.get('profile_id')
|
|
tier_id = data.get('tier_id')
|
|
duration_days = data.get('duration_days')
|
|
reason = data.get('reason', '')
|
|
|
|
if not profile_id or not tier_id or not duration_days:
|
|
raise HTTPException(400, "profile_id, tier_id und duration_days fehlen")
|
|
|
|
valid_from = datetime.now()
|
|
valid_until = valid_from + timedelta(days=duration_days)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Create grant
|
|
cur.execute("""
|
|
INSERT INTO access_grants (
|
|
profile_id, tier_id, granted_by, valid_from, valid_until
|
|
)
|
|
VALUES (%s, %s, 'admin', %s, %s)
|
|
RETURNING id
|
|
""", (profile_id, tier_id, valid_from, valid_until))
|
|
|
|
grant_id = cur.fetchone()['id']
|
|
|
|
# Log activity
|
|
cur.execute("""
|
|
INSERT INTO user_activity_log (profile_id, action, details)
|
|
VALUES (%s, 'access_grant_created', %s)
|
|
""", (
|
|
profile_id,
|
|
f'{{"tier": "{tier_id}", "duration_days": {duration_days}, "reason": "{reason}"}}'
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
return {
|
|
"ok": True,
|
|
"id": grant_id,
|
|
"valid_until": valid_until.isoformat()
|
|
}
|
|
|
|
|
|
@router.put("/{grant_id}")
|
|
def update_access_grant(grant_id: str, data: dict, session: dict = Depends(require_admin)):
|
|
"""
|
|
Admin: Update access grant (e.g., extend duration, pause/resume).
|
|
|
|
Body:
|
|
{
|
|
"is_active": false, // Pause grant
|
|
"valid_until": "2026-12-31T23:59:59" // Extend
|
|
}
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
updates = []
|
|
values = []
|
|
|
|
if 'is_active' in data:
|
|
updates.append('is_active = %s')
|
|
values.append(data['is_active'])
|
|
|
|
if not data['is_active']:
|
|
# Pausing - calculate remaining days
|
|
cur.execute("SELECT valid_until FROM access_grants WHERE id = %s", (grant_id,))
|
|
grant = cur.fetchone()
|
|
if grant:
|
|
remaining = (grant['valid_until'] - datetime.now()).days
|
|
updates.append('remaining_days = %s')
|
|
values.append(remaining)
|
|
updates.append('paused_at = CURRENT_TIMESTAMP')
|
|
|
|
if 'valid_until' in data:
|
|
updates.append('valid_until = %s')
|
|
values.append(data['valid_until'])
|
|
|
|
if not updates:
|
|
return {"ok": True}
|
|
|
|
updates.append('updated = CURRENT_TIMESTAMP')
|
|
values.append(grant_id)
|
|
|
|
cur.execute(
|
|
f"UPDATE access_grants SET {', '.join(updates)} WHERE id = %s",
|
|
values
|
|
)
|
|
conn.commit()
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/{grant_id}")
|
|
def revoke_access_grant(grant_id: str, session: dict = Depends(require_admin)):
|
|
"""Admin: Revoke access grant (hard delete)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Get grant info for logging
|
|
cur.execute("SELECT profile_id, tier_id FROM access_grants WHERE id = %s", (grant_id,))
|
|
grant = cur.fetchone()
|
|
|
|
if grant:
|
|
# Log revocation
|
|
cur.execute("""
|
|
INSERT INTO user_activity_log (profile_id, action, details)
|
|
VALUES (%s, 'access_grant_revoked', %s)
|
|
""", (
|
|
grant['profile_id'],
|
|
f'{{"grant_id": "{grant_id}", "tier": "{grant["tier_id"]}"}}'
|
|
))
|
|
|
|
# Delete grant
|
|
cur.execute("DELETE FROM access_grants WHERE id = %s", (grant_id,))
|
|
conn.commit()
|
|
|
|
return {"ok": True}
|