15. Rate Limiting
Chapter 15 of 18 · 25 min
Rate limiting protects local model infrastructure from overload while ensuring fair resource allocation across users and requests. Without it, a single aggressive client can degrade service for everyone.
Token Bucket Algorithm
The token bucket algorithm provides flexible rate limiting:
import time
import threading
from dataclasses import dataclass
from typing import Callable
@dataclass
class RateLimitConfig:
requests_per_minute: int
tokens_per_minute: int # Token budget for model
burst_size: int = 5 # Allow short bursts above limit
class TokenBucket:
"""Token bucket for request rate limiting."""
def __init__(self, rate: float, capacity: int):
self.rate = rate # Tokens per second
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self._lock = threading.Lock()
def consume(self, tokens: int = 1) -> bool:
"""Attempt to consume tokens. Returns True if allowed."""
with self._lock:
now = time.time()
elapsed = now - self.last_update
# Refill tokens based on elapsed time
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.rate
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
class RateLimiter:
def __init__(self, config: RateLimitConfig):
self.request_bucket = TokenBucket(
rate=config.requests_per_minute / 60,
capacity=config.burst_size
)
self.token_bucket = TokenBucket(
rate=config.tokens_per_minute / 60,
capacity=config.tokens_per_minute # Allow burst up to full minute budget
)
self.config = config
def allow_request(self, estimated_tokens: int) -> tuple[bool, str]:
"""Check if request is allowed under rate limits."""
if not self.request_bucket.consume(1):
return False, "Request rate limit exceeded"
if not self.token_bucket.consume(estimated_tokens):
return False, "Token rate limit exceeded"
return True, ""
def wait_time(self, estimated_tokens: int) -> float:
"""Calculate seconds to wait before request allowed."""
request_wait = (1 - self.request_bucket.tokens) / self.request_bucket.rate
token_wait = (estimated_tokens - self.token_bucket.tokens) / self.token_bucket.rate
return max(0, max(request_wait, token_wait))
Per-User Rate Limiting
Track rate limits per user or API key:
from collections import defaultdict
class MultiUserRateLimiter:
def __init__(self, default_config: RateLimitConfig):
self.default_config = default_config
self.limiters: dict[str, RateLimiter] = {}
self._lock = threading.Lock()
def get_limiter(self, user_id: str) -> RateLimiter:
with self._lock:
if user_id not in self.limiters:
self.limiters[user_id] = RateLimiter(self.default_config)
return self.limiters[user_id]
def check_request(self, user_id: str, tokens: int) -> tuple[bool, str]:
limiter = self.get_limiter(user_id)
return limiter.allow_request(tokens)
def set_user_limit(self, user_id: str, config: RateLimitConfig):
with self._lock:
self.limiters[user_id] = RateLimiter(config)
Middleware Integration
Integrate rate limiting with FastAPI:
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
rate_limiter = MultiUserRateLimiter(RateLimitConfig(
requests_per_minute=60,
tokens_per_minute=100000,
burst_size=10
))
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Skip rate limiting for health checks
if request.url.path == "/health":
return await call_next(request)
# Extract user ID from header or use IP
user_id = request.headers.get("X-User-ID", request.client.host)
# Estimate tokens from request body
body = await request.body()
estimated_tokens = estimate_token_count(body)
allowed, reason = rate_limiter.check_request(user_id, estimated_tokens)
if not allowed:
wait_seconds = rate_limiter.get_limiter(user_id).wait_time(estimated_tokens)
return JSONResponse(
status_code=429,
headers={
"Retry-After": str(int(wait_seconds)),
"X-RateLimit-Remaining": "0"
},
content={"error": reason, "retry_after": int(wait_seconds)}
)
response = await call_next(request)
# Add rate limit headers to response
remaining = rate_limiter.get_limiter(user_id).request_bucket.tokens
response.headers["X-RateLimit-Remaining"] = str(int(remaining))
return response
def estimate_token_count(body: bytes) -> int:
# Rough estimate: ~4 characters per token
return len(body) // 4
Ollama-Specific Rate Limits
Some Ollama endpoints have built-in rate limiting:
def check_ollama_load() -> dict:
"""Check Ollama server load before sending request."""
try:
response = requests.get("http://localhost:11434/api/info")
if response.status_code == 200:
info = response.json()
return {
"can_accept_requests": True,
"current_load": info.get("model", "unknown")
}
except:
pass
return {"can_accept_requests": False, "reason": "server unreachable"}
EXERCISE
Implement per-user rate limiting with a configurable burst allowance. Verify that a single user can burst above the sustained rate but is throttled if sustained overlimit.