feat: add user subscription info endpoints
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s

New router: routers/subscription.py
Endpoints:
- GET /api/subscription/me - Own subscription info (tier, trial, grants)
- GET /api/subscription/usage - Feature usage with limits
- GET /api/subscription/limits - All feature limits for current tier

Features:
- Shows effective tier (considers access_grants)
- Lists active access grants (from coupons, trials)
- Per-feature usage tracking
- Email verification status

Uses new middleware: get_effective_tier(), check_feature_access()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Lars 2026-03-19 13:05:55 +01:00
parent c002cb1e54
commit ae47652d0c
2 changed files with 189 additions and 1 deletions

View File

@ -17,7 +17,7 @@ from db import init_db
# Import routers
from routers import auth, profiles, weight, circumference, caliper
from routers import activity, nutrition, photos, insights, prompts
from routers import admin, stats, exportdata, importdata
from routers import admin, stats, exportdata, importdata, subscription
# ── App Configuration ─────────────────────────────────────────────────────────
DATA_DIR = Path(os.getenv("DATA_DIR", "./data"))
@ -73,6 +73,7 @@ app.include_router(admin.router) # /api/admin/*
app.include_router(stats.router) # /api/stats
app.include_router(exportdata.router) # /api/export/*
app.include_router(importdata.router) # /api/import/*
app.include_router(subscription.router) # /api/subscription/*
# ── Health Check ──────────────────────────────────────────────────────────────
@app.get("/")

View File

@ -0,0 +1,187 @@
"""
User Subscription Endpoints for Mitai Jinkendo
User-facing subscription info (own tier, usage, limits).
"""
from datetime import datetime
from fastapi import APIRouter, Depends
from db import get_db, get_cursor, r2d
from auth import require_auth, get_effective_tier, check_feature_access
router = APIRouter(prefix="/api/subscription", tags=["subscription"])
@router.get("/me")
def get_my_subscription(session: dict = Depends(require_auth)):
"""
Get current user's subscription info.
Returns:
- tier: Current effective tier (considers access_grants)
- profile_tier: Base tier from profile
- trial_ends_at: Trial expiration (if applicable)
- email_verified: Email verification status
- active_grants: List of active access grants (coupons, trials)
"""
profile_id = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get profile info
cur.execute("""
SELECT tier, trial_ends_at, email_verified
FROM profiles
WHERE id = %s
""", (profile_id,))
profile = cur.fetchone()
if not profile:
return {"error": "Profile not found"}
# Get effective tier (considers access_grants)
effective_tier = get_effective_tier(profile_id)
# Get active access grants
cur.execute("""
SELECT
ag.id,
ag.tier_id,
ag.granted_by,
ag.valid_from,
ag.valid_until,
ag.is_active,
ag.paused_by,
ag.remaining_days,
t.name as tier_name
FROM access_grants ag
JOIN tiers t ON t.id = ag.tier_id
WHERE ag.profile_id = %s
AND ag.valid_until > CURRENT_TIMESTAMP
ORDER BY ag.valid_until DESC
""", (profile_id,))
grants = [r2d(r) for r in cur.fetchall()]
# Get tier info
cur.execute("""
SELECT id, name, description, price_monthly_cents, price_yearly_cents
FROM tiers
WHERE id = %s
""", (effective_tier,))
tier_info = r2d(cur.fetchone())
return {
"tier": effective_tier,
"tier_info": tier_info,
"profile_tier": profile['tier'],
"trial_ends_at": profile['trial_ends_at'].isoformat() if profile['trial_ends_at'] else None,
"email_verified": profile['email_verified'],
"active_grants": grants
}
@router.get("/usage")
def get_my_usage(session: dict = Depends(require_auth)):
"""
Get current user's feature usage.
Returns list of features with current usage and limits.
"""
profile_id = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get all active features
cur.execute("""
SELECT id, name, category, limit_type, reset_period
FROM features
WHERE active = true
ORDER BY category, name
""")
features = [r2d(r) for r in cur.fetchall()]
# Get usage for each feature
usage_list = []
for feature in features:
access = check_feature_access(profile_id, feature['id'])
usage_list.append({
"feature_id": feature['id'],
"feature_name": feature['name'],
"category": feature['category'],
"limit_type": feature['limit_type'],
"reset_period": feature['reset_period'],
"allowed": access['allowed'],
"limit": access['limit'],
"used": access['used'],
"remaining": access['remaining'],
"reason": access['reason']
})
return {
"tier": get_effective_tier(profile_id),
"features": usage_list
}
@router.get("/limits")
def get_my_limits(session: dict = Depends(require_auth)):
"""
Get all feature limits for current tier.
Simplified view - just shows what's allowed/not allowed.
"""
profile_id = session['profile_id']
tier_id = get_effective_tier(profile_id)
with get_db() as conn:
cur = get_cursor(conn)
# Get all features with their limits for this tier
cur.execute("""
SELECT
f.id,
f.name,
f.category,
f.limit_type,
COALESCE(tl.limit_value, f.default_limit) as limit_value
FROM features f
LEFT JOIN tier_limits tl ON tl.feature_id = f.id AND tl.tier_id = %s
WHERE f.active = true
ORDER BY f.category, f.name
""", (tier_id,))
features = []
for row in cur.fetchall():
rd = r2d(row)
limit = rd['limit_value']
# Interpret limit
if limit is None:
status = "unlimited"
elif limit == 0:
status = "disabled"
elif rd['limit_type'] == 'boolean':
status = "enabled" if limit == 1 else "disabled"
else:
status = f"limit: {limit}"
features.append({
"feature_id": rd['id'],
"feature_name": rd['name'],
"category": rd['category'],
"limit": limit,
"status": status
})
# Get tier info
cur.execute("SELECT name, description FROM tiers WHERE id = %s", (tier_id,))
tier = cur.fetchone()
return {
"tier_id": tier_id,
"tier_name": tier['name'] if tier else tier_id,
"tier_description": tier['description'] if tier else '',
"features": features
}