15. Rate Limiting

Chapter 15 of 18 · 25 min

Rate limiting protects local model infrastructure from overload while ensuring fair resource allocation across users and requests. Without it, a single aggressive client can degrade service for everyone.

Token Bucket Algorithm

The token bucket algorithm provides flexible rate limiting:

import time
import threading
from dataclasses import dataclass
from typing import Callable

@dataclass
class RateLimitConfig:
    requests_per_minute: int
    tokens_per_minute: int  # Token budget for model
    burst_size: int = 5  # Allow short bursts above limit

class TokenBucket:
    """Token bucket for request rate limiting."""
    
    def __init__(self, rate: float, capacity: int):
        self.rate = rate  # Tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self._lock = threading.Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        """Attempt to consume tokens. Returns True if allowed."""
        with self._lock:
            now = time.time()
            elapsed = now - self.last_update
            
            # Refill tokens based on elapsed time
            self.tokens = min(
                self.capacity,
                self.tokens + elapsed * self.rate
            )
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

class RateLimiter:
    def __init__(self, config: RateLimitConfig):
        self.request_bucket = TokenBucket(
            rate=config.requests_per_minute / 60,
            capacity=config.burst_size
        )
        self.token_bucket = TokenBucket(
            rate=config.tokens_per_minute / 60,
            capacity=config.tokens_per_minute  # Allow burst up to full minute budget
        )
        self.config = config
    
    def allow_request(self, estimated_tokens: int) -> tuple[bool, str]:
        """Check if request is allowed under rate limits."""
        
        if not self.request_bucket.consume(1):
            return False, "Request rate limit exceeded"
        
        if not self.token_bucket.consume(estimated_tokens):
            return False, "Token rate limit exceeded"
        
        return True, ""
    
    def wait_time(self, estimated_tokens: int) -> float:
        """Calculate seconds to wait before request allowed."""
        request_wait = (1 - self.request_bucket.tokens) / self.request_bucket.rate
        token_wait = (estimated_tokens - self.token_bucket.tokens) / self.token_bucket.rate
        
        return max(0, max(request_wait, token_wait))

Per-User Rate Limiting

Track rate limits per user or API key:

from collections import defaultdict

class MultiUserRateLimiter:
    def __init__(self, default_config: RateLimitConfig):
        self.default_config = default_config
        self.limiters: dict[str, RateLimiter] = {}
        self._lock = threading.Lock()
    
    def get_limiter(self, user_id: str) -> RateLimiter:
        with self._lock:
            if user_id not in self.limiters:
                self.limiters[user_id] = RateLimiter(self.default_config)
            return self.limiters[user_id]
    
    def check_request(self, user_id: str, tokens: int) -> tuple[bool, str]:
        limiter = self.get_limiter(user_id)
        return limiter.allow_request(tokens)
    
    def set_user_limit(self, user_id: str, config: RateLimitConfig):
        with self._lock:
            self.limiters[user_id] = RateLimiter(config)

Middleware Integration

Integrate rate limiting with FastAPI:

from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse

rate_limiter = MultiUserRateLimiter(RateLimitConfig(
    requests_per_minute=60,
    tokens_per_minute=100000,
    burst_size=10
))

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Skip rate limiting for health checks
    if request.url.path == "/health":
        return await call_next(request)
    
    # Extract user ID from header or use IP
    user_id = request.headers.get("X-User-ID", request.client.host)
    
    # Estimate tokens from request body
    body = await request.body()
    estimated_tokens = estimate_token_count(body)
    
    allowed, reason = rate_limiter.check_request(user_id, estimated_tokens)
    
    if not allowed:
        wait_seconds = rate_limiter.get_limiter(user_id).wait_time(estimated_tokens)
        
        return JSONResponse(
            status_code=429,
            headers={
                "Retry-After": str(int(wait_seconds)),
                "X-RateLimit-Remaining": "0"
            },
            content={"error": reason, "retry_after": int(wait_seconds)}
        )
    
    response = await call_next(request)
    
    # Add rate limit headers to response
    remaining = rate_limiter.get_limiter(user_id).request_bucket.tokens
    response.headers["X-RateLimit-Remaining"] = str(int(remaining))
    
    return response

def estimate_token_count(body: bytes) -> int:
    # Rough estimate: ~4 characters per token
    return len(body) // 4

Ollama-Specific Rate Limits

Some Ollama endpoints have built-in rate limiting:

def check_ollama_load() -> dict:
    """Check Ollama server load before sending request."""
    try:
        response = requests.get("http://localhost:11434/api/info")
        if response.status_code == 200:
            info = response.json()
            return {
                "can_accept_requests": True,
                "current_load": info.get("model", "unknown")
            }
    except:
        pass
    return {"can_accept_requests": False, "reason": "server unreachable"}
EXERCISE

Implement per-user rate limiting with a configurable burst allowance. Verify that a single user can burst above the sustained rate but is throttled if sustained overlimit.