New router: routers/subscription.py Endpoints: - GET /api/subscription/me - Own subscription info (tier, trial, grants) - GET /api/subscription/usage - Feature usage with limits - GET /api/subscription/limits - All feature limits for current tier Features: - Shows effective tier (considers access_grants) - Lists active access grants (from coupons, trials) - Per-feature usage tracking - Email verification status Uses new middleware: get_effective_tier(), check_feature_access() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
188 lines
5.7 KiB
Python
188 lines
5.7 KiB
Python
"""
|
|
User Subscription Endpoints for Mitai Jinkendo
|
|
|
|
User-facing subscription info (own tier, usage, limits).
|
|
"""
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_auth, get_effective_tier, check_feature_access
|
|
|
|
router = APIRouter(prefix="/api/subscription", tags=["subscription"])
|
|
|
|
|
|
@router.get("/me")
|
|
def get_my_subscription(session: dict = Depends(require_auth)):
|
|
"""
|
|
Get current user's subscription info.
|
|
|
|
Returns:
|
|
- tier: Current effective tier (considers access_grants)
|
|
- profile_tier: Base tier from profile
|
|
- trial_ends_at: Trial expiration (if applicable)
|
|
- email_verified: Email verification status
|
|
- active_grants: List of active access grants (coupons, trials)
|
|
"""
|
|
profile_id = session['profile_id']
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Get profile info
|
|
cur.execute("""
|
|
SELECT tier, trial_ends_at, email_verified
|
|
FROM profiles
|
|
WHERE id = %s
|
|
""", (profile_id,))
|
|
profile = cur.fetchone()
|
|
|
|
if not profile:
|
|
return {"error": "Profile not found"}
|
|
|
|
# Get effective tier (considers access_grants)
|
|
effective_tier = get_effective_tier(profile_id)
|
|
|
|
# Get active access grants
|
|
cur.execute("""
|
|
SELECT
|
|
ag.id,
|
|
ag.tier_id,
|
|
ag.granted_by,
|
|
ag.valid_from,
|
|
ag.valid_until,
|
|
ag.is_active,
|
|
ag.paused_by,
|
|
ag.remaining_days,
|
|
t.name as tier_name
|
|
FROM access_grants ag
|
|
JOIN tiers t ON t.id = ag.tier_id
|
|
WHERE ag.profile_id = %s
|
|
AND ag.valid_until > CURRENT_TIMESTAMP
|
|
ORDER BY ag.valid_until DESC
|
|
""", (profile_id,))
|
|
grants = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Get tier info
|
|
cur.execute("""
|
|
SELECT id, name, description, price_monthly_cents, price_yearly_cents
|
|
FROM tiers
|
|
WHERE id = %s
|
|
""", (effective_tier,))
|
|
tier_info = r2d(cur.fetchone())
|
|
|
|
return {
|
|
"tier": effective_tier,
|
|
"tier_info": tier_info,
|
|
"profile_tier": profile['tier'],
|
|
"trial_ends_at": profile['trial_ends_at'].isoformat() if profile['trial_ends_at'] else None,
|
|
"email_verified": profile['email_verified'],
|
|
"active_grants": grants
|
|
}
|
|
|
|
|
|
@router.get("/usage")
|
|
def get_my_usage(session: dict = Depends(require_auth)):
|
|
"""
|
|
Get current user's feature usage.
|
|
|
|
Returns list of features with current usage and limits.
|
|
"""
|
|
profile_id = session['profile_id']
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Get all active features
|
|
cur.execute("""
|
|
SELECT id, name, category, limit_type, reset_period
|
|
FROM features
|
|
WHERE active = true
|
|
ORDER BY category, name
|
|
""")
|
|
features = [r2d(r) for r in cur.fetchall()]
|
|
|
|
# Get usage for each feature
|
|
usage_list = []
|
|
for feature in features:
|
|
access = check_feature_access(profile_id, feature['id'])
|
|
usage_list.append({
|
|
"feature_id": feature['id'],
|
|
"feature_name": feature['name'],
|
|
"category": feature['category'],
|
|
"limit_type": feature['limit_type'],
|
|
"reset_period": feature['reset_period'],
|
|
"allowed": access['allowed'],
|
|
"limit": access['limit'],
|
|
"used": access['used'],
|
|
"remaining": access['remaining'],
|
|
"reason": access['reason']
|
|
})
|
|
|
|
return {
|
|
"tier": get_effective_tier(profile_id),
|
|
"features": usage_list
|
|
}
|
|
|
|
|
|
@router.get("/limits")
|
|
def get_my_limits(session: dict = Depends(require_auth)):
|
|
"""
|
|
Get all feature limits for current tier.
|
|
|
|
Simplified view - just shows what's allowed/not allowed.
|
|
"""
|
|
profile_id = session['profile_id']
|
|
tier_id = get_effective_tier(profile_id)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Get all features with their limits for this tier
|
|
cur.execute("""
|
|
SELECT
|
|
f.id,
|
|
f.name,
|
|
f.category,
|
|
f.limit_type,
|
|
COALESCE(tl.limit_value, f.default_limit) as limit_value
|
|
FROM features f
|
|
LEFT JOIN tier_limits tl ON tl.feature_id = f.id AND tl.tier_id = %s
|
|
WHERE f.active = true
|
|
ORDER BY f.category, f.name
|
|
""", (tier_id,))
|
|
|
|
features = []
|
|
for row in cur.fetchall():
|
|
rd = r2d(row)
|
|
limit = rd['limit_value']
|
|
|
|
# Interpret limit
|
|
if limit is None:
|
|
status = "unlimited"
|
|
elif limit == 0:
|
|
status = "disabled"
|
|
elif rd['limit_type'] == 'boolean':
|
|
status = "enabled" if limit == 1 else "disabled"
|
|
else:
|
|
status = f"limit: {limit}"
|
|
|
|
features.append({
|
|
"feature_id": rd['id'],
|
|
"feature_name": rd['name'],
|
|
"category": rd['category'],
|
|
"limit": limit,
|
|
"status": status
|
|
})
|
|
|
|
# Get tier info
|
|
cur.execute("SELECT name, description FROM tiers WHERE id = %s", (tier_id,))
|
|
tier = cur.fetchone()
|
|
|
|
return {
|
|
"tier_id": tier_id,
|
|
"tier_name": tier['name'] if tier else tier_id,
|
|
"tier_description": tier['description'] if tier else '',
|
|
"features": features
|
|
}
|