KEY INSIGHT
An API gateway consolidates authentication, rate limiting, and routingΓÇöbuilding one demonstrates integration of every concept from this course.
This capstone project combines logging, error handling, health checks, documentation, client libraries, load testing, caching, and production hardening into a single deployable system.
The gateway accepts requests, validates authentication, applies rate limits, routes to backend services, caches responses, and returns formatted results. All interactions log for debugging. All errors follow RFC 7807 format.
```python
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from pydantic import BaseModel, Field
from typing import Optional
import hashlib
import json
import redis.asyncio as redis
import time
import uuid
from contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar("request_id")
logger = logging.getLogger("gateway")
app = FastAPI(title="AI Gateway v1.0")
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
class GatewayConfig:
redis_url: str = "redis://localhost:6379"
backend_url: str = "http://localhost:11434"
cache_ttl: int = 3600
rate_limit: str = "100/minute"
config = GatewayConfig()
@app.on_event("startup")
async def startup():
app.state.redis = await redis.from_url(config.redis_url)
@app.on_event("shutdown")
async def shutdown():
await app.state.redis.close()
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request_id_var.set(request_id)
start = time.perf_counter()
response = await call_next(request)
duration = (time.perf_counter() - start) * 1000
logger.info(
"request_complete",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"status": response.status_code,
"duration_ms": round(duration, 1)
}
)
response.headers["X-Request-ID"] = request_id
return response
@app.exception_handler(RateLimitExceeded)
async def rate_limit_exceeded(request: Request, exc: RateLimitExceeded):
return JSONResponse(
status_code=429,
content={
"type": "https://example.com/errors/rate-limit",
"title": "Too Many Requests",
"status": 429,
"detail": str(exc.detail),
"instance": str(request.url)
}
)
@app.exception_handler(Exception)
async def generic_exception(request: Request, exc: Exception):
logger.exception("Unhandled exception")
return JSONResponse(
status_code=500,
content={
"type": "https://example.com/errors/internal",
"title": "Internal Server Error",
"status": 500,
"detail": "An unexpected error occurred",
"instance": str(request.url)
}
)
class CompletionRequest(BaseModel):
model: str = Field(..., example="llama3.2:latest")
messages: list[dict] = Field(..., min_length=1)
temperature: Optional[float] = Field(0.7, ge=0, le=2)
def verify_api_key(request: Request) -> str:
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid authorization")
return auth_header[7:]
@limiter.limit("100/minute")
@app.post("/v1/chat/completions", summary="Generate chat completions")
async def chat_completions(
request: Request,
body: CompletionRequest,
api_key: str = Depends(verify_api_key)
):
cache_key = f"cache:completion:{hashlib.sha256(json.dumps(body.model_dump(), sort_keys=True).encode()).hexdigest()}"
cached = await app.state.redis.get(cache_key)
if cached:
response = json.loads(cached)
response["cached"] = True
return response
async with httpx.AsyncClient(timeout=30.0) as client:
backend_response = await client.post(
f"{config.backend_url}/api/chat",
json={"model": body.model, "messages": body.messages}
)
backend_response.raise_for_status()
result = backend_response.json()
response = {
"model": body.model,
"content": result.get("message", {}).get("content", ""),
"tokens_used": result.get("eval_count", 0),
"finish_reason": "stop"
}
await app.state.redis.setex(cache_key, config.cache_ttl, json.dumps(response))
return response
@app.get("/health/live")
async def liveness():
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
try:
await app.state.redis.ping()
return {"status": "healthy"}
except Exception:
return JSONResponse(status_code=503, content={"status": "unhealthy"})
```
The gateway combines all hardening techniques. Authentication middleware validates every request before business logic. Rate limiting middleware enforces fair usage. Logging middleware captures all interactions for debugging. Error handlers format all failures consistently.
Test the gateway end-to-end: authenticate successfully, hit rate limits, observe cached responses, verify health checks, and generate OpenAPI documentation. Load test with `locust` to validate performance under concurrent traffic.