18. Hybrid Gateway Project
This capstone project implements a production-ready hybrid AI gateway combining all concepts from previous chapters. The architecture balances cost, performance, and reliability through intelligent routing, thorough monitoring, and dependable security.
The implementation requires three main components: a FastAPI gateway service, a local inference server, and a cloud provider adapter. These components communicate through well-defined interfaces that enable testing and swapping of implementations.
# gateway/main.py
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from gateway.router import HybridRouter
from gateway.monitoring import MetricsCollector
from gateway.security import AuthMiddleware
from gateway.tracker import RequestTracker
from gateway.cost import CostAnalyzer
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.router = HybridRouter(
local_models=load_local_config(),
cloud_providers=load_cloud_config()
)
app.state.metrics = MetricsCollector()
app.state.tracker = RequestTracker(storage=TimescaleDBConnection())
app.state.cost_analyzer = CostAnalyzer(
pricing=load_pricing_config(),
gpu_config=load_gpu_config()
)
yield
# Shutdown
await app.state.tracker.close()
app = FastAPI(lifespan=lifespan)
@app.middleware("auth")
async def authenticate(request: Request, call_next):
token = extract_bearer_token(request)
if not token or not await verify_token(token):
return JSONResponse({"error": "Unauthorized"}, 401)
request.state.auth = parse_token(token)
return await call_next(request)
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatRequest, req: Request):
request_id = await app.state.tracker.begin_request(req)
await app.state.metrics.increment("requests_total", {"model": request.model})
try:
with app.state.metrics.time("request_duration", {"model": request.model}):
response = await app.state.router.route(request)
await app.state.tracker.complete_request(
request_id, response,
calculate_cost(response, app.state.cost_analyzer)
)
return ChatResponse.from_provider(response)
except Exception as e:
await app.state.tracker.fail_request(request_id, e)
raise
Local inference integration requires container deployment and health monitoring. The inference server exposes a standardized API that the gateway calls regardless of the underlying model type. Container orchestration handles scaling and failover.
# docker-compose.yml for local inference
version: '3.8'
services:
gateway:
build: ./gateway
ports:
- "8080:8080"
environment:
- LOCAL_INFERENCE_URL=http://inference:8081
- CLOUD_PROVIDER=azure
depends_on:
- inference
deploy:
resources:
limits:
memory: 2G
inference:
build: ./inference-server
ports:
- "8081:8081"
environment:
- MODEL_NAME=mistral-7b-instruct
- GPU_DEVICE=0
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
The monitoring stack aggregates metrics and provides visualization. Prometheus scrapes gateway and inference endpoints; Grafana displays dashboards. Alertmanager routes notifications to appropriate channels based on severity.
# prometheus/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'gateway'
static_configs:
- targets: ['gateway:8080']
metrics_path: /metrics
- job_name: 'inference'
static_configs:
- targets: ['inference:8081']
metrics_path: /metrics
Testing validates the complete system. Integration tests verify routing logic, fallback behavior, and error handling. Load tests confirm performance under stress. Security tests validate authentication and authorization boundaries.
# tests/test_gateway.py
import pytest
from httpx import AsyncClient
@pytest.fixture
async def client():
async with AsyncClient(base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_fallback_on_local_failure(client):
# Mock local inference to fail
mock_local_failure()
response = await client.post("/v1/chat/completions",
json={"prompt": "test", "model": "gpt-4"},
headers={"Authorization": f"Bearer {valid_token()}"})
assert response.status_code == 200
assert response.json()["provider"] == "cloud" # Fell back
@pytest.mark.asyncio
async def test_local_routing_for_simple_tasks(client):
response = await client.post("/v1/chat/completions",
json={"prompt": "classify: positive or negative",
"model": "auto"},
headers={"Authorization": f"Bearer {valid_token()}"})
# Should route to local based on complexity
assert response.json()["provider"] == "local"
@pytest.mark.asyncio
async def test_unauthorized_request_rejected(client):
response = await client.post("/v1/chat/completions",
json={"prompt": "test"},
headers={"Authorization": "Bearer invalid_token"})
assert response.status_code == 401
Deployment automation provisions infrastructure, deploys containers, and configures monitoring. Terraform manages cloud resources; Kubernetes handles container orchestration; Helm charts package configurations for reuse.
Implement the complete hybrid gateway. Configure local inference with a 7B model, connect a cloud provider, implement fallback chains, add thorough monitoring, and deploy to a Kubernetes cluster. Validate behavior through integration tests.