shinkan-jinkendo/backend/club_features.py
Lars 30dc30c7aa
Some checks failed
Deploy Development / deploy (push) Successful in 43s
Test Suite / pytest-backend (push) Failing after 0s
Test Suite / lint-backend (push) Successful in 0s
Test Suite / build-frontend (push) Successful in 14s
Test Suite / k6 /health Baseline (push) Failing after 4m0s
Test Suite / playwright-tests (push) Failing after 3m41s
Enhance Tenant Context and Access Control Features
- Introduced `email_verified` and `account_state` attributes in the `TenantContext` to improve user state management.
- Updated the `resolve_tenant_context` function to dynamically fetch `email_verified` status from the database and determine `account_state` based on user roles and memberships.
- Implemented `assert_min_account_state` checks across various endpoints to enforce access control based on user account status.
- Incremented version to 1.1.0 in version.py to reflect these enhancements in tenant context management and access control.
2026-06-06 21:10:52 +02:00

507 lines
15 KiB
Python

"""
Vereinsbezogene Feature-Limits (Mitai-v9c-Pattern, Subjekt club_id).
Spez: .claude/docs/technical/CLUB_MEMBERSHIP_AND_FEATURES.v1.md
Phase 2 (M2): probe_club_feature_access — JSON-Log, kein HTTP-Block.
Phase 4 (M5+): CLUB_FEATURE_ENFORCE=1 — HTTP 403 + increment.
Legacy profil-zentriert: auth.check_feature_access (001 / Mitai-Überbleibsel) — nicht für Shinkan-Limits nutzen.
"""
from __future__ import annotations
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional, TYPE_CHECKING
from fastapi import HTTPException
from db import get_db, get_cursor
if TYPE_CHECKING:
from tenant_context import TenantContext
# Bestands-Features: Verbrauch = Live-Zählung in DB (nicht club_feature_usage)
_INVENTORY_FEATURES = frozenset(
{"exercises", "training_groups", "active_members", "training_programs"}
)
def _calculate_next_reset(reset_period: str, *, now: Optional[datetime] = None) -> Optional[datetime]:
"""Nächster Reset-Zeitpunkt; None bei 'never'."""
ref = now or datetime.now(timezone.utc)
if reset_period == "never":
return None
if reset_period == "daily":
tomorrow = ref.date() + timedelta(days=1)
return datetime.combine(tomorrow, datetime.min.time(), tzinfo=timezone.utc)
if reset_period == "monthly":
if ref.month == 12:
return datetime(ref.year + 1, 1, 1, tzinfo=timezone.utc)
return datetime(ref.year, ref.month + 1, 1, tzinfo=timezone.utc)
return None
def _normalize_limit(raw: Any) -> Optional[int]:
"""NULL = unbegrenzt; -1 (Legacy 001) wird als unbegrenzt behandelt."""
if raw is None:
return None
try:
v = int(raw)
except (TypeError, ValueError):
return None
if v < 0:
return None
return v
def get_effective_club_plan(cur, club_id: int) -> str:
"""
Effektiver Plan für einen Verein.
1. Aktiver club_access_grants mit plan_id (Zeitfenster, neueste ends_at)
2. club_subscriptions.status = 'active' → plan_id
3. Fallback 'free'
"""
cur.execute(
"""
SELECT plan_id
FROM club_access_grants
WHERE club_id = %s
AND plan_id IS NOT NULL
AND starts_at <= NOW()
AND ends_at > NOW()
ORDER BY ends_at DESC
LIMIT 1
""",
(club_id,),
)
grant = cur.fetchone()
if grant and grant.get("plan_id"):
return str(grant["plan_id"])
cur.execute(
"""
SELECT plan_id
FROM club_subscriptions
WHERE club_id = %s AND status = 'active'
LIMIT 1
""",
(club_id,),
)
sub = cur.fetchone()
if sub and sub.get("plan_id"):
return str(sub["plan_id"])
return "free"
def _resolve_club_limit(cur, club_id: int, feature_id: str, feature_row: dict) -> Optional[int]:
"""Limit-Wert: Override > Plan > Feature-Default."""
cur.execute(
"""
SELECT limit_value
FROM club_feature_overrides
WHERE club_id = %s AND feature_id = %s
""",
(club_id, feature_id),
)
override = cur.fetchone()
if override is not None:
return _normalize_limit(override.get("limit_value"))
plan_id = get_effective_club_plan(cur, club_id)
cur.execute(
"""
SELECT limit_value
FROM club_plan_limits
WHERE plan_id = %s AND feature_id = %s
""",
(plan_id, feature_id),
)
plan_lim = cur.fetchone()
if plan_lim is not None:
return _normalize_limit(plan_lim.get("limit_value"))
return _normalize_limit(feature_row.get("default_limit"))
def _live_inventory_count(cur, club_id: int, feature_id: str) -> Optional[int]:
"""Aktueller Bestand für reset_period=never Features."""
if feature_id == "exercises":
cur.execute(
"""
SELECT COUNT(*)::int AS c
FROM exercises
WHERE club_id = %s AND status != 'archived'
""",
(club_id,),
)
elif feature_id == "training_groups":
cur.execute(
"SELECT COUNT(*)::int AS c FROM training_groups WHERE club_id = %s",
(club_id,),
)
elif feature_id == "active_members":
cur.execute(
"""
SELECT COUNT(*)::int AS c
FROM club_members
WHERE club_id = %s AND status = 'active'
""",
(club_id,),
)
elif feature_id == "training_programs":
cur.execute(
"""
SELECT COUNT(*)::int AS c FROM (
SELECT id FROM training_framework_programs WHERE club_id = %s
UNION ALL
SELECT id FROM training_modules WHERE club_id = %s
) t
""",
(club_id, club_id),
)
else:
return None
row = cur.fetchone()
return int(row["c"] or 0) if row else 0
def resolve_club_id_for_probe(
tenant: "TenantContext",
*,
object_club_id: Optional[int] = None,
) -> Optional[int]:
"""Verein für Feature-Probe: explizites Objekt > effective_club_id."""
if object_club_id is not None:
return int(object_club_id)
eff = getattr(tenant, "effective_club_id", None)
return int(eff) if eff is not None else None
def _maybe_reset_usage(cur, conn, club_id: int, feature_id: str, feature_row: dict, usage_row: Optional[dict]) -> int:
"""Setzt Zähler zurück wenn reset_at überschritten; gibt aktuellen used zurück."""
used = int(usage_row.get("usage_count") or 0) if usage_row else 0
reset_at = usage_row.get("reset_at") if usage_row else None
period = (feature_row.get("reset_period") or "never").strip().lower()
if not usage_row or not reset_at or period == "never":
return used
now = datetime.now(timezone.utc)
ra = reset_at
if hasattr(ra, "tzinfo") and ra.tzinfo is None:
ra = ra.replace(tzinfo=timezone.utc)
if ra and now > ra:
next_reset = _calculate_next_reset(period, now=now)
cur.execute(
"""
UPDATE club_feature_usage
SET usage_count = 0, reset_at = %s, updated_at = NOW()
WHERE club_id = %s AND feature_id = %s
""",
(next_reset, club_id, feature_id),
)
conn.commit()
return 0
return used
def check_club_feature_access(
club_id: int,
feature_id: str,
*,
conn=None,
) -> Dict[str, Any]:
"""
Prüft Vereins-Kontingent für ein Feature.
Returns:
allowed, limit, used, remaining, reason, plan_id, reset_at (optional)
"""
if conn is not None:
return _check_club_impl(club_id, feature_id, conn)
with get_db() as c:
return _check_club_impl(club_id, feature_id, c)
def _check_club_impl(club_id: int, feature_id: str, conn) -> Dict[str, Any]:
cur = get_cursor(conn)
cur.execute(
"""
SELECT id, limit_type, reset_period, default_limit, active, enforcement_subject
FROM features
WHERE id = %s AND app = 'shinkan'
""",
(feature_id,),
)
feature = cur.fetchone()
if not feature or not feature.get("active"):
return {
"allowed": False,
"limit": None,
"used": 0,
"remaining": None,
"reason": "feature_not_found",
"plan_id": get_effective_club_plan(cur, club_id),
}
plan_id = get_effective_club_plan(cur, club_id)
limit = _resolve_club_limit(cur, club_id, feature_id, feature)
limit_type = (feature.get("limit_type") or "count").strip().lower()
if limit_type == "boolean":
allowed = limit == 1
return {
"allowed": allowed,
"limit": limit,
"used": 0,
"remaining": None,
"reason": "enabled" if allowed else "feature_disabled",
"plan_id": plan_id,
}
cur.execute(
"""
SELECT usage_count, reset_at
FROM club_feature_usage
WHERE club_id = %s AND feature_id = %s
""",
(club_id, feature_id),
)
usage = cur.fetchone()
used = _maybe_reset_usage(cur, conn, club_id, feature_id, feature, usage)
period = (feature.get("reset_period") or "never").strip().lower()
if period == "never" and feature_id in _INVENTORY_FEATURES:
inv = _live_inventory_count(cur, club_id, feature_id)
if inv is not None:
used = inv
if limit is None:
return {
"allowed": True,
"limit": None,
"used": used,
"remaining": None,
"reason": "unlimited",
"plan_id": plan_id,
"reset_at": usage.get("reset_at") if usage else None,
}
if limit == 0:
return {
"allowed": False,
"limit": 0,
"used": used,
"remaining": 0,
"reason": "feature_disabled",
"plan_id": plan_id,
"reset_at": usage.get("reset_at") if usage else None,
}
allowed = used < limit
return {
"allowed": allowed,
"limit": limit,
"used": used,
"remaining": max(0, limit - used),
"reason": "within_limit" if allowed else "limit_exceeded",
"plan_id": plan_id,
"reset_at": usage.get("reset_at") if usage else None,
}
def club_feature_enforcement_enabled() -> bool:
"""Phase 4: Hard-Block aktiv (Env CLUB_FEATURE_ENFORCE=1)."""
return os.getenv("CLUB_FEATURE_ENFORCE", "0").strip() == "1"
def probe_club_feature_access(
*,
feature_id: str,
action: str,
club_id: Optional[int] = None,
profile_id: Optional[int] = None,
endpoint: Optional[str] = None,
conn=None,
) -> Dict[str, Any]:
"""
Phase 2: Prüft Vereins-Kontingent, schreibt JSON-Log, blockiert standardmäßig nicht.
Bei CLUB_FEATURE_ENFORCE=1: HTTP 403 wenn nicht allowed.
"""
from club_feature_logger import log_club_feature_usage
if club_id is None:
access = {
"allowed": True,
"limit": None,
"used": 0,
"remaining": None,
"reason": "no_club_context",
"plan_id": None,
}
log_club_feature_usage(
club_id=None,
profile_id=profile_id,
feature_id=feature_id,
action=action,
access=access,
endpoint=endpoint,
phase="probe",
)
return access
if conn is not None:
access = check_club_feature_access(club_id, feature_id, conn=conn)
else:
with get_db() as c:
access = check_club_feature_access(club_id, feature_id, conn=c)
log_club_feature_usage(
club_id=club_id,
profile_id=profile_id,
feature_id=feature_id,
action=action,
access=access,
endpoint=endpoint,
phase="enforce" if club_feature_enforcement_enabled() else "probe",
)
if club_feature_enforcement_enabled() and not access.get("allowed"):
limit = access.get("limit")
used = access.get("used", 0)
detail = (
f"Kontingent überschritten für {feature_id} "
f"({used}/{limit if limit is not None else ''}). "
f"Grund: {access.get('reason', 'limit_exceeded')}."
)
raise HTTPException(status_code=403, detail=detail)
return access
def increment_club_feature_usage(
club_id: int,
feature_id: str,
*,
profile_id: Optional[int] = None,
action: Optional[str] = None,
conn=None,
) -> None:
"""Erhöht Vereins-Zähler (nur bei neuem Verbrauch / INSERT-Pfad aufrufen)."""
def _run(c):
cur = get_cursor(c)
cur.execute(
"""
SELECT reset_period, limit_type
FROM features
WHERE id = %s AND app = 'shinkan' AND active = true
""",
(feature_id,),
)
feature = cur.fetchone()
if not feature:
return
if (feature.get("limit_type") or "count").strip().lower() == "boolean":
return
period = (feature.get("reset_period") or "never").strip().lower()
next_reset = _calculate_next_reset(period)
cur.execute(
"""
INSERT INTO club_feature_usage (club_id, feature_id, usage_count, reset_at, last_used_at)
VALUES (%s, %s, 1, %s, NOW())
ON CONFLICT (club_id, feature_id)
DO UPDATE SET
usage_count = club_feature_usage.usage_count + 1,
last_used_at = NOW(),
updated_at = NOW()
""",
(club_id, feature_id, next_reset),
)
if profile_id is not None or action:
cur.execute(
"""
INSERT INTO club_feature_usage_events (club_id, feature_id, profile_id, action)
VALUES (%s, %s, %s, %s)
""",
(club_id, feature_id, profile_id, action or feature_id),
)
c.commit()
if conn is not None:
_run(conn)
else:
with get_db() as c:
_run(c)
def list_club_entitlements(cur, club_id: int, *, conn=None) -> Dict[str, Any]:
"""Alle aktiven Shinkan-Features mit effektivem Limit und Verbrauch (Liste, intern)."""
db_conn = conn if conn is not None else cur.connection
plan_id = get_effective_club_plan(cur, club_id)
cur.execute(
"""
SELECT id, name, category, limit_type, reset_period
FROM features
WHERE app = 'shinkan' AND active = true
ORDER BY category, id
"""
)
rows = cur.fetchall()
features_out = []
for row in rows:
fid = row["id"]
access = _check_club_impl(club_id, fid, db_conn)
features_out.append(
{
"id": fid,
"name": row.get("name"),
"category": row.get("category"),
"limit_type": row.get("limit_type"),
"reset_period": row.get("reset_period"),
"allowed": access.get("allowed"),
"limit": access.get("limit"),
"used": access.get("used"),
"remaining": access.get("remaining"),
"reason": access.get("reason"),
"reset_at": access.get("reset_at"),
}
)
return {"club_id": club_id, "plan_id": plan_id, "features": features_out}
def club_features_map(cur, club_id: int, *, conn=None) -> Dict[str, Any]:
"""Feature-Kontingente als Dict feature_id → Zustand (für /me/entitlements)."""
raw = list_club_entitlements(cur, club_id, conn=conn)
features_dict: Dict[str, Any] = {}
for row in raw.get("features") or []:
fid = row["id"]
features_dict[fid] = {
"name": row.get("name"),
"category": row.get("category"),
"limit_type": row.get("limit_type"),
"reset_period": row.get("reset_period"),
"allowed": row.get("allowed"),
"limit": row.get("limit"),
"used": row.get("used"),
"remaining": row.get("remaining"),
"reason": row.get("reason"),
"reset_at": row.get("reset_at"),
}
return {
"club_id": raw.get("club_id"),
"plan_id": raw.get("plan_id"),
"features": features_dict,
}