KEY INSIGHT
Complex AI workflows often require coordinating multiple MCP serversΓÇöa hub pattern enables consistent routing, authentication, and error handling across server federations.
Design a hub that routes requests to appropriate servers:
```python
class MCPHub:
def __init__(self):
self.servers: dict[str, MCPServer] = {}
self.router = Router()
def register(self, name: str, server: MCPServer):
self.servers[name] = server
self.router.register_server(name, server.get_capabilities())
async def handle(self, request: dict) -> dict:
tool_name = request.get("params", {}).get("name", "")
server_name = self.router.route(tool_name)
if not server_name:
return error_response("tool_not_found", f"Unknown tool: {tool_name}")
try:
server = self.servers[server_name]
return await server.handle(request)
except Exception as e:
return error_response("internal_error", str(e))
class Router:
def __init__(self):
self.routes: dict[str, str] = {}
def register_server(self, name: str, capabilities: list[str]):
for capability in capabilities:
self.routes[capability] = name
def route(self, tool_name: str) -> str | None:
return self.routes.get(tool_name)
```
Federated discovery lets servers announce capabilities:
```python
from mcp.types import ServerCapabilities, Tool
class FederatedHub:
def __init__(self):
self.local_servers: dict[str, MCPClient] = {}
self.remote_hubs: list[str] = []
self.cache = CapabilityCache(ttl=300)
async def discover_tools(self) -> list[Tool]:
all_tools = []
# Local servers
for name, server in self.local_servers.items():
tools = await server.list_tools()
for tool in tools:
tool.server = name
all_tools.append(tool)
# Remote hubs
for hub_url in self.remote_hubs:
remote_tools = await self.query_remote_hub(hub_url)
all_tools.extend(remote_tools)
return all_tools
async def query_remote_hub(self, url: str) -> list[Tool]:
# Check cache first
cached = self.cache.get(url)
if cached:
return cached
# Query remote hub
response = await self.http_client.get(f"{url}/capabilities")
tools = [Tool(**t) for t in response.json()["tools"]]
self.cache.set(url, tools)
return tools
```
Unified authentication across servers:
```python
class UnifiedAuth:
def __init__(self, providers: dict[str, AuthProvider]):
self.providers = providers
async def authenticate(self, request: Request) -> Session:
token = extract_token(request)
for provider_name, provider in self.providers.items():
if provider.can_handle(token):
session = await provider.validate(token)
session.provider = provider_name
return session
raise PermissionError("No valid authentication")
async def authorize(self, session: Session, tool: str) -> bool:
# Check provider-specific permissions
provider = self.providers[session.provider]
return await provider.check_permission(session, tool)
class Session:
def __init__(self, user_id: str, scopes: list[str]):
self.user_id = user_id
self.scopes = scopes
self.provider = None
```
Error aggregation and retry across federated servers:
```python
async def federated_operation(
hub: FederatedHub,
tool_names: list[str],
arguments: dict,
) -> list[dict]:
results = []
errors = []
for tool_name in tool_names:
try:
result = await hub.forward(tool_name, arguments)
results.append({"tool": tool_name, "status": "success", "result": result})
except Exception as e:
# Check for fallback
fallback = await hub.get_fallback(tool_name)
if fallback:
result = await hub.forward(fallback, arguments)
results.append({
"tool": tool_name,
"status": "fallback_success",
"fallback_to": fallback,
"result": result
})
else:
errors.append({"tool": tool_name, "status": "failed", "error": str(e)})
return {"results": results, "errors": errors}
```