KEY INSIGHT
Rate limiting in a Nigerian SaaS must balance fair usage across different tiers while handling the bursty traffic patterns common in Lagos and Abuja internet connections, where connection quality varies significantly.
Rate limiting serves two purposes: protecting system resources and enforcing tier boundaries. For a Nigerian AI SaaS, the implementation must handle variance in client behavior while providing clear feedback when limits are approached.
```python
from typing import Optional
from datetime import datetime, timedelta
from collections import defaultdict
import time
import hashlib
from redis import Redis
class RateLimiter:
"""Token bucket rate limiter with Redis backend."""
def __init__(self, redis_client: Redis):
self.redis = redis_client
self.default_limits = {
'free': {'requests': 100, 'window': 60}, # 100/min
'starter': {'requests': 1000, 'window': 60}, # 1000/min
'professional': {'requests': 5000, 'window': 60}, # 5000/min
'enterprise': {'requests': 20000, 'window': 60}
}
def check_rate_limit(
self,
tenant_id: str,
endpoint: str,
tier: str
) -> dict:
"""Check and update rate limit, returning limit status."""
limits = self.default_limits.get(tier, self.default_limits['free'])
max_requests = limits['requests']
window_seconds = limits['window']
key = f"ratelimit:{tenant_id}:{endpoint}:{int(time.time() / window_seconds)}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window_seconds)
remaining = max(0, max_requests - current)
reset_time = (int(time.time() / window_seconds) + 1) * window_seconds
return {
'allowed': current <= max_requests,
'limit': max_requests,
'remaining': remaining,
'reset': reset_time,
'retry_after': max(0, window_seconds - (int(time.time()) % window_seconds)) if current > max_requests else None
}
```
For AI endpoints specifically, rate limiting must account for token consumption, not just request counts. An AI SaaS making 50 small requests consumes different resources than 50 large requests.
```python
class AIEndpointRateLimiter:
"""Rate limiter for AI endpoints with token awareness."""
def __init__(self, redis_client: Redis, openai_client):
self.redis = redis_client
self.openai = openai_client
self.limits = {
'free': {'monthly_tokens': 100000},
'starter': {'monthly_tokens': 1000000},
'professional': {'monthly_tokens': 5000000},
'enterprise': {'monthly_tokens': 50000000}
}
def check_and_consume(
self,
tenant_id: str,
tier: str,
prompt_tokens: int,
completion_tokens: int
) -> tuple[bool, dict]:
"""Check token budget and consume from allocation."""
total_tokens = prompt_tokens + completion_tokens
monthly_limit = self.limits.get(tier, {}).get('monthly_tokens', 0)
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
key = f"ai_tokens:{tenant_id}:{month_start.isoformat()}"
current_usage = int(self.redis.get(key) or 0)
new_usage = current_usage + total_tokens
if new_usage > monthly_limit:
return False, {
'current_usage': current_usage,
'monthly_limit': monthly_limit,
'requested': total_tokens,
'shortage': new_usage - monthly_limit
}
self.redis.incrby(key, total_tokens)
self.redis.expire(key, 86400 * 35) # Keep for full billing period
return True, {
'current_usage': new_usage,
'monthly_limit': monthly_limit,
'remaining': monthly_limit - new_usage
}
```
**Handling Burst Traffic:**
Nigerian internet connections often have inconsistent latency, leading to clients retrying failed requests multiple times. This creates artificial spikes in rate limit consumption.
```python
def handle_burst_gracefully(
limiter: RateLimiter,
tenant_id: str,
endpoint: str,
tier: str,
request_count: int
) -> dict:
"""Handle burst requests with jitter-aware limiting."""
result = limiter.check_rate_limit(tenant_id, endpoint, tier)
if not result['allowed']:
# Apply jitter to spread retry attempts
jitter = random.uniform(1.1, 1.5)
base_wait = result['retry_after'] or 60
suggested_wait = int(base_wait * jitter)
return {
'allowed': False,
'retry_after': min(suggested_wait, 300), # Cap at 5 minutes
'message': f"Rate limit exceeded. Retry after {suggested_wait} seconds."
}
return result
```
**Common Failure Modes:**
Redis rate limiting fails silently when Redis is unavailable, causing all checks to pass and overwhelming the system. Always implement a fallback that denies requests when the rate limiter cannot confirm allowance.
```python
def check_rate_limit_with_fallback(
limiter: RateLimiter,
tenant_id: str,
endpoint: str,
tier: str
) -> dict:
"""Check rate limit with circuit breaker fallback."""
try:
return limiter.check_rate_limit(tenant_id, endpoint, tier)
except RedisConnectionError:
logger.error(f"Redis unavailable for rate limit check, tenant {tenant_id}")
# Fail closed: deny request when limiter unavailable
return {
'allowed': False,
'error': 'rate_limiter_unavailable',
'retry_after': 5
}
```