MCP Server Implementation
Learn mcp server implementation through RunLocalAI's practical lens: mcp, model context protocol, servers and tools, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.
- B015
- B016
Why this course matters
MCP Server Implementation is for builders turning local models into working tools, agents and retrieval systems. It connects mcp, model context protocol, servers and tools to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?
What you will be able to do
By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.
How to use this course
Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as What is MCP?, MCP Specification Overview, Architecture: Client vs Server and Python SDK Setup and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.
- 01What is MCP?MCP standardizes how AI systems interact with external tools, resources, and prompts through a defined protocol rather than custom integrations. Model Context Protocol (MCP) is an open specification that defines how AI applications communicate with external capabilities. Instead of building custom integrations for every tool an AI might need, MCP provides a standardized interface that works across different AI frameworks and implementations. The core problem MCP solves is integration fragmentation. Without a protocol, connecting an AI assistant to a file system, database, or web API requires unique code for each connection. If the AI application changes, all those custom integrations break. MCP abstracts this by defining message formats, transport mechanisms, and capability negotiation that remain stable across implementations. MCP has three primary capability types: **Resources** expose data to AI applications. A resource might be a file, database query result, or API response. AI systems request resources when they need contextual information for their responses. **Tools** enable AI systems to perform actions. Unlike resources, tools modify stateΓÇöcreating files, executing queries, posting messages. AI applications call tools based on user requests and can describe results back to the user. **Prompts** provide reusable prompt templates. These pre-configured prompts help standardize complex workflows, ensuring AI applications respond consistently for common tasks. The protocol operates over HTTP or stdio, with JSON-RPC 2.0 as the message format. Clients discover server capabilities during connection initialization, then use those capabilities through defined method calls. MCP servers are language-agnostic. The official SDK exists for TypeScript and Python, but any implementation that follows the protocol specification works with any MCP-compatible client. Understanding MCP matters because it shifts integration work from custom code to configuration and standards. Building one MCP server provides connectivity to any AI client that speaks the protocolΓÇöfuture-proofing integrations against framework changes.15 min
- 02MCP Specification OverviewMCP defines a complete lifecycle of connection, capability discovery, and request-response patterns that all implementations must follow. The MCP specification defines a state machine for connections between clients and servers. Understanding this lifecycle is essential before writing any server code. **Connection Initialization:** When an MCP client connects, it sends an `initialize` request containing protocol version information and client capabilities. The server responds with its name, version, and capabilities. This exchange establishes which features both sides support. After initialization, both sides send `initialized` notifications to signal readiness. **Capability Declaration:** Servers declare which features they support in their response. The three capability categories map directly to the three feature types: ```json { "capabilities": { "resources": { "subscribe": true, "listChanged": true }, "tools": {}, "prompts": { "listChanged": true } } } ``` A server declaring `"tools": {}` supports tool operations but not dynamic tool list updates. A server with `"tools": null` does not support tools at all. **Request/Response Pattern:** All operations use JSON-RPC 2.0. Requests include an `id` field for correlation, methods are named with dot notation (`tools/list`, `resources/list`), and responses include either a `result` object or an `error` object. **Notification Pattern:** Some operations do not expect responsesΓÇöthese are notifications. For example, when a resource changes, the server sends a `notifications/changed` message. The client does not respond to notifications. **Error Handling:** MCP defines standard error codes: - `-32700`: Parse error (invalid JSON) - `-32600`: Invalid request (wrong structure) - `-32601`: Method not found - `-32602`: Invalid parameters - `-32603`: Internal error Custom error codes below `-32000` are reserved for transport-specific errors. **Protocol Versioning:** MCP uses semantic versioning for the protocol. Clients should negotiate compatible versions, though the specification recommends supporting at least version 1.0 for stability.15 min
- 03Architecture: Client vs ServerMCP follows a strict client-server model where the AI application drives requests and the server provides capabilities without knowing which AI framework initiated the connection. MCP architecture separates concerns clearly. The server owns the capabilitiesΓÇöit exposes tools, resources, and prompts. The client owns the AI applicationΓÇöit decides when to call capabilities and how to use responses. **Server Responsibilities:** MCP servers implement the server-side of the protocol. They: - Accept connections on configured transport mechanisms - Declare capabilities during initialization - Handle method calls and return results - Send notifications when state changes (resource updates, tool list changes) - Handle authentication and authorization if required - Manage their own state independently of client connections Servers do not initiate requests. They respond to client calls, which keeps their implementation simpler and more predictable. **Client Responsibilities:** MCP clients connect to servers and use their capabilities. They: - Initiate protocol handshakes - Handle server capabilities and adapt behavior accordingly - Call methods and process responses - Handle notifications appropriately - Cache capability information to avoid repeated discovery A single client can connect to multiple servers simultaneously. The client mediates between the AI application and potentially many backend services. **Transport Options:** MCP supports two primary transport mechanisms: **stdio transport** uses standard input and output streams. This works well for local integrations where the server runs as a subprocess. Communication is local, low-latency, and requires no network configuration. ```python # stdio transport setup (conceptual) import sys import json # Server reads requests from stdin for line in sys.stdin: request = json.loads(line) response = handle_request(request) print(json.dumps(response), flush=True) ``` **HTTP+SSE transport** uses HTTP for client-to-server requests and Server-Sent Events for server-to-client notifications. This enables remote server deployments and works through firewalls with standard HTTP traffic. The transport choice affects deployment, latency, and security characteristics. Local tools often use stdio; remote services typically use HTTP+SSE. **Connection Lifecycle:** A client establishes one long-lived connection per server. The connection handles all capability interactions. When the AI application shuts down, the client closes connections gracefully. Understanding this architecture clarifies why MCP servers are stateless between requestsΓÇöeach request carries complete context through method parameters, and the server does not track conversation state.15 min
- 04Python SDK SetupThe MCP Python SDK abstracts protocol details, letting developers focus on capability implementation rather than message handling. The MCP Python SDK (`mcp`) provides the building blocks for server implementation. Installation and basic project structure come first. **Installation:** The SDK installs via pip: ```bash pip install mcp ``` Verify the installation: ```python import mcp print(mcp.__version__) ``` **Minimal Server Structure:** A basic MCP server requires three components: transport setup, request handlers, and capability definitions. ```python from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextResource app = Server("my-first-server") @app.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="hello", description="Returns a greeting", inputSchema={"type": "object", "properties": {}} ) ] @app.call_tool() async def call_tool(name: str, arguments: dict) -> str: if name == "hello": return "Hello from MCP server!" raise ValueError(f"Unknown tool: {name}") ``` **Async Considerations:** MCP servers use async/await patterns throughout. The SDK relies on Python's `asyncio` for concurrent request handling. Server implementations must be async-compatible. ```python import asyncio async def main(): async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main()) ``` **Debugging Setup:** When developing, run the server directly to see JSON-RPC messages: ```bash python my_server.py ``` Input requests manually to test responses. The stdio transport outputs JSON directly, making debugging straightforward. **Common Installation Failures:** - Python version mismatch: MCP SDK requires Python 3.10+ - Missing optional dependencies: `pip install mcp[cli]` for additional tools - Virtual environment conflicts: Ensure activation before installation **Testing Without a Client:** The SDK includes testing utilities: ```python from mcp.testing import ClientSession from mcp.server.stdio import stdio_server async def test_server(): async with stdio_server() as streams: async with ClientSession(streams[0], streams[1]) as session: await session.initialize() tools = await session.list_tools() print(tools) ```20 min
- 05Resources in MCPResources expose data to AI applications through a URI-based system with optional change subscriptions that keep AI context current. Resources in MCP represent data that servers expose to clients. Unlike tools, resources do not perform actionsΓÇöthey provide information. AI applications request resources when context is needed for task completion. **Resource Structure:** Each resource has a URI, name, description, and MIME type. URIs follow a custom scheme that the server defines. ```python from mcp.types import Resource, ResourceTemplate # Single resource file_resource = Resource( uri="file:///config/app.yaml", name="application_config", description="Current application configuration", mimeType="application/yaml" ) # Resource template (dynamic resources) config_template = ResourceTemplate( uriTemplate="file:///config/{filename}", name="dynamic_config", description="Configuration files by name", mimeType="application/yaml" ) ``` **Implementing Resource Handlers:** Servers must implement handlers for listing and reading resources. ```python @app.list_resources() async def list_resources() -> list[Resource]: return [ Resource( uri="file:///data/status.txt", name="status", description="System status file", mimeType="text/plain" ) ] @app.read_resource() async def read_resource(uri: str) -> str: if uri == "file:///data/status.txt": with open("/data/status.txt", "r") as f: return f.read() raise ValueError(f"Unknown resource: {uri}") ``` **Resource Templates:** Templates handle dynamic resource URIs. The client provides substituted values when requesting. ```python @app.list_resource_templates() async def list_resource_templates() -> list[ResourceTemplate]: return [ ResourceTemplate( uriTemplate="file:///users/{user_id}/profile", name="user_profile", description="User profile by ID" ) ] @app.read_resource() async def read_resource(uri: str) -> str: # Parse template variables from URI if uri.startswith("file:///users/"): user_id = uri.split("/")[3] return get_user_profile(user_id) ``` **Change Notifications:** Resources can notify clients when content changes. This keeps AI context accurate without repeated polling. ```python # Server sends notification when resource changes await app.request_context.session.send_resource_updated( uri="file:///data/status.txt" ) ``` Clients subscribe to resources during listing. Servers must track subscriptions and send updates accordingly. **Failure Modes:** - Missing resource files return appropriate errors, not empty strings - Large resources should stream or limit size - Permissions errors surface as specific error codes, not generic failures20 min
- 06Tools in MCPTools are the action layer of MCPΓÇöeach tool defines a schema that clients use to construct valid execution requests. Tools in MCP enable AI applications to perform operations. Where resources read data, tools modify state. Every tool has a name, description, and JSON Schema defining its parameters. **Tool Definition:** ```python from mcp.types import Tool list_tools_tool = Tool( name="list_files", description="List files in a directory matching a pattern", inputSchema={ "type": "object", "properties": { "directory": { "type": "string", "description": "Directory to list files from" }, "pattern": { "type": "string", "description": "Glob pattern to filter files" } }, "required": ["directory"] } ) ``` **Tool Execution:** The `call_tool` handler receives the tool name and arguments map. ```python @app.call_tool() async def call_tool(name: str, arguments: dict) -> str | list[ContentBlock]: match name: case "list_files": return list_directory( arguments.get("directory"), arguments.get("pattern", "*") ) case "read_file": return read_file_content(arguments.get("path")) case _: raise ValueError(f"Tool not found: {name}") ``` **Return Types:** Tools can return strings, structured data, or content blocks. Content blocks enable mixed return types. ```python from mcp.types import ImageContent, TextContent @app.call_tool() async def call_tool(name: str, arguments: dict): if name == "generate_chart": chart_data = generate_chart_data(arguments) return [ TextContent(type="text", text="Chart generated successfully"), ImageContent( type="image", data=chart_data["base64"], mimeType="image/png" ) ] ``` **Error Handling:** Tool errors should be descriptive. Return error information that clients can display to users. ```python @app.call_tool() async def call_tool(name: str, arguments: dict): try: return execute_tool(name, arguments) except PermissionError as e: raise ToolError(f"Permission denied: {e}") except FileNotFoundError as e: raise ToolError(f"File not found: {e}") ``` **Schema Validation:** The inputSchema defines what clients send. Server implementations should validate incoming arguments against the schema. ```python import jsonschema @app.call_tool() async def call_tool(name: str, arguments: dict): tool = get_tool_definition(name) try: jsonschema.validate(arguments, tool.inputSchema) except jsonschema.ValidationError as e: raise ToolError(f"Invalid arguments: {e.message}") # proceed with execution ``` **Tool Discovery:** Clients call `tools/list` to discover available tools and their schemas. Servers should return complete, accurate schemas that clients can use for input validation and user interface generation.20 min
- 07Prompts in MCPPrompts are reusable template structures that standardize complex AI interactions across multiple invocations. Prompts in MCP provide pre-defined templates that clients can invoke. Rather than repeating complex prompt construction, clients reference prompt names and provide arguments. **Prompt Definition:** ```python from mcp.types import Prompt, PromptArgument review_prompt = Prompt( name="code_review", description="Generate a code review for the provided diff", arguments=[ PromptArgument( name="language", description="Programming language of the code", required=True ), PromptArgument( name="diff", description="Git diff to review", required=True ) ] ) ``` **Implementing Prompt Handlers:** ```python @app.list_prompts() async def list_prompts() -> list[Prompt]: return [ review_prompt, Prompt( name="debug_assistant", description="Systematic debugging assistance", arguments=[ PromptArgument( name="error_message", description="Error to investigate", required=True ) ] ) ] @app.get_prompt() async def get_prompt(name: str, arguments: dict) -> GetPromptResult: match name: case "code_review": return GetPromptResult( messages=[ PromptMessage( role="user", content=TextContent( type="text", text=f"Review this {arguments['language']} code:\n\n{arguments['diff']}" ) ) ] ) ``` **Prompt Composition:** Prompts can include multiple messages with different roles. This enables few-shot learning within prompts. ```python case "debug_assistant": return GetPromptResult( messages=[ PromptMessage( role="system", content=TextContent( type="text", text="You are a debugging specialist. Ask clarifying questions before proposing solutions." ) ), PromptMessage( role="user", content=TextContent( type="text", text=f"Error encountered: {arguments['error_message']}" ) ) ] ) ``` **Static Prompts:** Some prompts require no arguments. These work well for system-level instructions. ```python system_prompt = Prompt( name="security_reminder", description="Standard security reminder for sensitive operations", arguments=[] ) ``` **Dynamic Prompts:** Prompts can reference current resources or state: ```python case "status_report": current_status = get_system_status() # Fetch live data return GetPromptResult( messages=[ PromptMessage( role="user", content=TextContent( type="text", text=f"System status:\n{current_status}" ) ) ] ) ``` **Use Cases:** Prompts excel for: code review templates, debugging workflows, documentation generation, testing checklists. Any repetitive prompt structure benefits from extraction into a prompt.20 min
- 08File System ServerA file system MCP server requires careful permission scoping, path validation, and error handling to prevent unauthorized access while enabling useful operations. Building a file system MCP server demonstrates real-world integration challenges. This chapter covers implementation decisions that apply to most resource-intensive servers. **Security Foundation:** Never expose absolute paths. Establish a root directory that bounds all operations. ```python from pathlib import Path class FileSystemServer: def __init__(self, root_path: str): self.root = Path(root_path).resolve() # Verify root exists and is accessible if not self.root.exists(): raise ValueError(f"Root path does not exist: {self.root}") def _resolve_path(self, relative_path: str) -> Path: # Prevent path traversal attacks requested = (self.root / relative_path).resolve() if not str(requested).startswith(str(self.root)): raise PermissionError("Path outside root directory") return requested ``` **Resource Implementation:** ```python from mcp.types import Resource, ResourceTemplate @server.list_resources() async def list_resources() -> list[Resource]: return [ Resource( uri=f"fs://README.md", name="readme", description="Project README", mimeType="text/markdown" ), Resource( uri=f"fs://config.yaml", name="config", description="Application configuration", mimeType="application/yaml" ) ] @server.list_resource_templates() async def list_templates() -> list[ResourceTemplate]: return [ ResourceTemplate( uriTemplate="fs://{filepath}", name="file", description="Access files by path" ) ] @server.read_resource() async def read_resource(uri: str) -> str: path = self._resolve_path(uri.replace("fs://", "")) if path.is_dir(): return "\n".join(f.name for f in path.iterdir()) return path.read_text() ``` **Tool Implementation:** ```python @server.call_tool() async def call_tool(name: str, args: dict) -> str: match name: case "read_file": path = self._resolve_path(args["path"]) return path.read_text() case "write_file": path = self._resolve_path(args["path"]) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(args["content"]) return f"Wrote {len(args['content'])} bytes to {args['path']}" case "list_directory": path = self._resolve_path(args["path"]) return "\n".join(f.name for f in path.iterdir()) case "file_exists": path = self._resolve_path(args["path"]) return "yes" if path.exists() else "no" ``` **Error Handling:** ```python @server.read_resource() async def read_resource(uri: str) -> str: try: path = self._resolve_path(uri.replace("fs://", "")) return path.read_text() except FileNotFoundError: raise ResourceError(f"File not found: {uri}") except PermissionError: raise ResourceError(f"Access denied: {uri}") except IsADirectoryError: raise ResourceError(f"Path is a directory: {uri}") ``` **Configuration Options:** ```python @dataclass class FileSystemConfig: root_path: str max_file_size: int = 1024 * 1024 # 1MB default allowed_extensions: list[str] | None = None write_enabled: bool = False ```20 min
- 09Database Query ServerDatabase MCP servers must balance query flexibility with security, using parameterized queries and strict schema definitions to prevent injection attacks. Connecting AI applications to databases through MCP requires careful design. The goal is enabling natural language queries while maintaining security and performance. **Connection Management:** ```python import asyncpg from contextlib import asynccontextmanager class DatabaseServer: def __init__(self, connection_string: str): self.connection_string = connection_string self._pool: asyncpg.Pool | None = None async def connect(self): self._pool = await asyncpg.create_pool( self.connection_string, min_size=2, max_size=10 ) async def disconnect(self): if self._pool: await self._pool.close() @asynccontextmanager async def transaction(self): async with self._pool.acquire() as conn: async with conn.transaction(): yield conn ``` **Schema Introspection:** Servers should expose table structure as resources. ```python @server.list_resources() async def list_resources() -> list[Resource]: return [ Resource( uri="db://schema", name="database_schema", description="Database schema overview", mimeType="application/json" ) ] @server.read_resource() async def read_resource(uri: str) -> str: if uri == "db://schema": async with self._pool.acquire() as conn: tables = await conn.fetch(""" SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_schema = 'public' ORDER BY table_name, ordinal_position """) return json.dumps([dict(row) for row in tables], indent=2) ``` **Query Tool Implementation:** ```python @server.call_tool() async def call_tool(name: str, args: dict) -> str: match name: case "execute_query": return await self._execute_select(args["sql"]) case "get_table_data": return await self._get_table( args["table"], args.get("limit", 100), args.get("offset", 0) ) case "get_table_schema": return await self._get_schema(args["table"]) async def _execute_select(self, sql: str) -> str: # Validate SQL is SELECT only normalized = sql.strip().upper() if not normalized.startswith("SELECT"): raise ToolError("Only SELECT queries allowed") async with self._pool.acquire() as conn: rows = await conn.fetch(sql) return json.dumps([dict(row) for row in rows], indent=2) async def _get_table(self, table: str, limit: int, offset: int) -> str: # Parameterized query prevents injection async with self._pool.acquire() as conn: rows = await conn.fetch( f'SELECT * FROM "{table}" LIMIT $1 OFFSET $2', limit, offset ) return json.dumps([dict(row) for row in rows], indent=2) ``` **Security Considerations:** - Whitelist allowed tables - Implement query timeouts - Limit result set sizes - Log all queries for audit - Validate identifier names against whitelist **Connection Failures:** ```python async def _safe_query(self, sql: str, *args): try: async with self._pool.acquire() as conn: return await conn.fetch(sql, *args) except asyncpg.PostgresConnectionError: raise ToolError("Database connection failed") except asyncpg.UndefinedTableError: raise ToolError(f"Table not found") except asyncpg.TooManyRowsError: raise ToolError("Query returned too many rows, add filters") ```20 min
- 10API Integration ServerAPI MCP servers translate between the MCP protocol and external HTTP APIs, handling authentication, rate limiting, and response transformation. Integrating external APIs into MCP requires converting between protocol formats and API-specific behaviors. This chapter covers the patterns needed for reliable API integration. **HTTP Client Setup:** ```python import httpx from httpx import AsyncClient class APIServer: def __init__(self, base_url: str, api_key: str): self.base_url = base_url.rstrip("/") self._client = AsyncClient( base_url=self.base_url, headers={"Authorization": f"Bearer {api_key}"}, timeout=30.0 ) async def close(self): await self._client.aclose() ``` **Pagination Handling:** APIs return paginated results. MCP tools should handle this transparently. ```python async def fetch_all_pages(self, endpoint: str, max_items: int = 1000) -> list[dict]: results = [] cursor = None while len(results) < max_items: params = {"limit": 100} if cursor: params["cursor"] = cursor response = await self._client.get(endpoint, params=params) response.raise_for_status() data = response.json() items = data.get("items", data.get("results", [])) results.extend(items) cursor = data.get("next_cursor") if not cursor: break return results[:max_items] ``` **Error Translation:** ```python async def safe_request(self, method: str, path: str, **kwargs): try: response = await self._client.request(method, path, **kwargs) if response.status_code == 200: return response.json() elif response.status_code == 401: raise ToolError("API authentication failed") elif response.status_code == 403: raise ToolError("API access forbidden") elif response.status_code == 429: raise ToolError("API rate limit exceeded, retry later") elif response.status_code >= 500: raise ToolError(f"API server error: {response.status_code}") else: raise ToolError(f"API error: {response.status_code}") except httpx.TimeoutException: raise ToolError("API request timed out") except httpx.ConnectError: raise ToolError("API connection failed") ``` **Tool Implementation:** ```python @server.call_tool() async def call_tool(name: str, args: dict) -> str: match name: case "list_repos": repos = await self.fetch_all_pages("/api/repos") return json.dumps(repos, indent=2) case "get_issues": issues = await self.safe_request( "GET", f"/api/repos/{args['repo']}/issues", params={"state": args.get("state", "open")} ) return json.dumps(issues, indent=2) case "create_issue": result = await self.safe_request( "POST", f"/api/repos/{args['repo']}/issues", json={ "title": args["title"], "body": args.get("body", ""), "labels": args.get("labels", []) } ) return f"Created issue #{result['number']}" ``` **Rate Limiting:** ```python import asyncio from datetime import datetime, timedelta class RateLimiter: def __init__(self, max_requests: int, per_seconds: int): self.max_requests = max_requests self.per_seconds = per_seconds self._requests: list[datetime] = [] async def acquire(self): now = datetime.now() window_start = now - timedelta(seconds=self.per_seconds) self._requests = [r for r in self._requests if r > window_start] if len(self._requests) >= self.max_requests: wait_time = (self._requests[0] - window_start).total_seconds() await asyncio.sleep(wait_time) self._requests.append(datetime.now()) ``` **Response Caching:** ```python from functools import lru_cache import time @lru_cache(maxsize=100) async def cached_request(path: str, max_age: int = 60): cache_key = f"{path}:{int(time.time() / max_age)}" # Implementation with timestamp in cache key for TTL ```20 min
- 11SSE TransportServer-Sent Events transport enables MCP over HTTP by using standard request-response for client calls and SSE streams for server notifications. The HTTP+SSE transport combines two patterns: standard HTTP requests flow from client to server, while Server-Sent Events stream data from server to client. This hybrid approach supports all MCP capabilities. **Transport Architecture:** ``` Client → Server: HTTP POST (JSON-RPC requests) Server → Client: HTTP GET + SSE stream (notifications) ``` **Server Implementation:** ```python from starlette.applications import Starlette from starlette.routing import Route from sse_starlette import EventSourceResponse import asyncio app = Starlette( routes=[ Route("/mcp", handle_mcp_messages, methods=["POST"]), Route("/sse", handle_sse, methods=["GET"]), ] ) # Track connected clients for notifications connected_clients: dict[str, asyncio.Queue] = {} async def handle_mcp_messages(request): body = await request.json() response = await server.handle_json(body) return JSONResponse(response) async def handle_sse(request): client_id = request.query_params.get("client_id", str(id(request))) queue = asyncio.Queue() connected_clients[client_id] = queue async def event_generator(): while True: event = await queue.get() yield {"event": event.get("type", "message"), "data": json.dumps(event)} return EventSourceResponse(event_generator()) ``` **Session Management:** ```python class MCPSession: def __init__(self, session_id: str, request_handler, notification_sender): self.session_id = session_id self.request_handler = request_handler self.notification_sender = notification_sender self._capabilities = {} async def send_notification(self, notification_type: str, data: dict): await self.notification_sender({ "type": notification_type, "session_id": self.session_id, **data }) async def initialize(self, client_info: dict): result = await self.request_handler({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": client_info }) self._capabilities = result.get("capabilities", {}) return result ``` **Client Implementation:** ```python import httpx import sseclient class MCPHTTPClient: def __init__(self, base_url: str): self.base_url = base_url self._client = httpx.AsyncClient(base_url=base_url) self._notification_handlers = {} async def connect(self, client_info: dict): # SSE connection for notifications self._event_source = self._client.stream( "GET", "/sse", params={"client_id": str(uuid.uuid4())} ) # Initialize via standard request response = await self._client.post("/mcp", json={ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": client_info }) return response.json() async def call_tool(self, name: str, args: dict): response = await self._client.post("/mcp", json={ "jsonrpc": "2.0", "id": generate_id(), "method": "tools/call", "params": {"name": name, "arguments": args} }) return response.json() async def listen_notifications(self): # Parse SSE stream chunks = self._event_source.content.iter_any() events = sseclient.EventSource(chunks) for event in events: await self._handle_notification(json.loads(event.data)) ``` **Health Checks:** ```python @app.route("/health") async def health(request): return JSONResponse({ "status": "healthy", "connected_clients": len(connected_clients), "transport": "http+sse" }) ``` **CORS Configuration:** ```python from starlette.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_methods=["*"], allow_headers=["*"], ) ``` **Connection Cleanup:** ```python @app.on_event("shutdown") async def shutdown(): for client_id in list(connected_clients.keys()): del connected_clients[client_id] await self._client.aclose() ```20 min
- 12Streamable HTTP TransportThe Streamable HTTP transport mechanism transforms MCP from a synchronous request-response protocol into a capable, scalable architecture that handles real-time updates and long-running operations. MCP servers communicate over HTTP using Server-Sent Events (SSE) for streaming responses. This design accommodates operations that produce output over timeΓÇöa critical feature for AI agents that may need to process files, query databases, or perform operations with variable completion times. The transport layer establishes two key endpoints: an POST endpoint for sending JSON-RPC requests and a GET endpoint for receiving streamed responses via SSE. When a client initiates a request, the server can begin responding immediately while continuing to process the operation. ```python # server.py from mcp.server.fastmcp import FastMCP from starlette.routing import Match from starlette.responses import Response mcp = FastMCP("Streamable Server") @mcp.tool() async def long_running_analysis(data: str) -> str: """Simulate analysis that streams progress updates.""" results = [] for i in range(5): # Simulate work await asyncio.sleep(1) results.append(f"Batch {i}: processed") return "\n".join(results) ``` The client initiates streaming by sending a POST request with the standard JSON-RPC payload. The server responds with a `text/event-stream` content type, sending chunks as they become available. Each chunk follows the SSE format with an `event:` field and `data:` payload. For clients, the streaming pattern requires handling partial responses: ```python # client_streaming.py import httpx async def stream_request(tool_name: str, arguments: dict): async with httpx.AsyncClient() as client: response = await client.post( f"{BASE_URL}/mcp", json={ "jsonrpc": "2.0", "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}, "id": 1 }, headers={"Accept": "text/event-stream"}, timeout=None ) async for line in response.iter_lines(): if line.startswith("data: "): data = json.loads(line[6:]) yield data ``` The protocol also supports session management, allowing clients to maintain state across multiple requests. Sessions enable caching, connection pooling, and authenticated contexts. Failure handling in streaming differs from request-response. If a connection drops mid-stream, the client must detect the incomplete response and either retry or signal an error. The JSON-RPC protocol provides `id` fields that help correlate partial results.15 min
- 13Security Best PracticesMCP servers execute arbitrary code on behalf of AI agents, making security not optional but foundationalΓÇöevery exposed endpoint is a potential attack vector. The primary attack surface for MCP servers includes injection through tool parameters, path traversal in file operations, command injection in shell tools, and resource exhaustion via unbounded operations. Each vector requires specific defenses. Input validation forms the first line of defense. Never trust client-supplied data, regardless of apparent format: ```python import re from pathlib import Path def sanitize_path(user_input: str) -> Path: """Safely resolve paths, preventing traversal.""" base = Path("/data").resolve() # Reject absolute paths and traversal attempts if ".." in user_input or user_input.startswith("/"): raise ValueError("Invalid path format") resolved = (base / user_input).resolve() # Ensure result stays within base directory if not resolved.is_relative_to(base): raise ValueError("Path escapes base directory") return resolved @mcp.tool() def read_file(path: str) -> str: safe_path = sanitize_path(path) return safe_path.read_text() ``` Shell command execution requires extreme caution. If shell tools are necessary, use parameterized commands rather than string interpolation: ```python import shlex @mcp.tool() def safe_grep(pattern: str, directory: str) -> list[str]: """Grep with parameterized execution.""" # Validate pattern is safe regex try: re.compile(pattern) except re.error as e: raise ValueError(f"Invalid regex: {e}") # Escape directory for shell safety safe_dir = shlex.quote(directory) safe_pattern = shlex.quote(pattern) result = subprocess.run( ["grep", "-r", safe_pattern, safe_dir], capture_output=True, text=True, timeout=30 ) if result.returncode not in (0, 1): raise RuntimeError(f"grep failed: {result.stderr}") return result.stdout.strip().split("\n") ``` Rate limiting prevents resource exhaustion. Implement per-client throttling: ```python from collections import defaultdict import time class RateLimiter: def __init__(self, max_calls: int, window: float): self.max_calls = max_calls self.window = window self.calls = defaultdict(list) def check(self, client_id: str) -> bool: now = time.time() self.calls[client_id] = [ t for t in self.calls[client_id] if now - t < self.window ] if len(self.calls[client_id]) >= self.max_calls: return False self.calls[client_id].append(now) return True rate_limiter = RateLimiter(max_calls=100, window=60) async def rate_limited_call(client_id: str, func, *args, **kwargs): if not rate_limiter.check(client_id): raise PermissionError("Rate limit exceeded") return await func(*args, **kwargs) ``` Network exposure requires TLS. Local servers should bind to localhost; public deployments need proper certificates. Never transmit unencrypted credentials.20 min
- 14AuthenticationAuthentication verifies identity before granting accessΓÇöwithout it, every tool becomes a public interface vulnerable to unauthorized use. MCP supports multiple authentication mechanisms. The simplest, API keys, work for straightforward scenarios: ```python from fastapi import Request, HTTPException from functools import wraps API_KEYS = { "agent-key-001": {"name": "Development Agent", "scope": "full"}, "agent-key-002": {"name": "Production Agent", "scope": "limited"}, } async def authenticate_request(request: Request) -> dict: auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing or invalid authorization") key = auth_header[7:] # Strip "Bearer " if key not in API_KEYS: raise HTTPException(status_code=401, detail="Invalid API key") return API_KEYS[key] # Apply to endpoints app.add_middleware(AuthMiddleware) ``` For production deployments, OAuth 2.0 provides delegated access with fine-grained permissions. The flow involves redirecting users to an authorization server, handling the callback with a code, and exchanging that code for access tokens: ```python from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( name="mcp", client_id=OAUTH_CLIENT_ID, client_secret=OAUTH_CLIENT_SECRET, server_metadata_url=CONFIG["well_known_url"], client_kwargs={"scope": "mcp:read mcp:write mcp:admin"}, ) @app.get("/auth/login") async def login(request: Request): return await oauth.mcp.authorize_redirect( request, request.url_for("auth_callback") ) @app.get("/auth/callback") async def callback(request: Request): token = await oauth.mcp.authorize_access_token(request) # Store token for authenticated requests return {"access_token": token["access_token"]} ``` JWT tokens provide stateless authentication. After validating credentials, issue a signed token containing claims: ```python import jwt from datetime import datetime, timedelta def create_token(subject: str, scopes: list[str]) -> str: payload = { "sub": subject, "scopes": scopes, "iat": datetime.utcnow(), "exp": datetime.utcnow() + timedelta(hours=1), } return jwt.encode(payload, SECRET_KEY, algorithm="HS256") def verify_token(token: str) -> dict: try: return jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise HTTPException(401, "Token expired") except jwt.InvalidTokenError: raise HTTPException(401, "Invalid token") ``` Token refresh handles long-lived sessions. When a client presents an expired token, issue a new one if the refresh token remains valid.20 min
- 15Authorization ScopesAuthentication identifies who you are; authorization determines what you can do. Scope-based access control limits tool exposure based on verified identity. Define scopes that map to tool capabilities. Fine-grained scopes enable the principle of least privilege: ```python SCOPES = { "files:read": ["read_file", "list_directory", "search_files"], "files:write": ["write_file", "delete_file", "create_directory"], "files:admin": ["files:read", "files:write", "format_disk"], "database:query": ["execute_select", "list_tables"], "database:mutate": ["database:query", "execute_insert", "execute_update"], "admin": ["*"], # All scopes } def check_scope(token_scopes: list[str], required_tool: str) -> bool: for scope in token_scopes: allowed = SCOPES.get(scope, []) if required_tool in allowed or scope == "admin": return True return False ``` Enforce scopes at the tool level: ```python def require_scopes(*required): """Decorator to enforce authorization scopes.""" def decorator(func): @wraps(func) async def wrapper(request: Request, *args, **kwargs): token = request.state.token # Set by auth middleware if not token: raise HTTPException(401, "Authentication required") tool_name = func.__name__ if not check_scope(token.get("scopes", []), tool_name): raise HTTPException(403, f"Scope required for {tool_name}") return await func(request, *args, **kwargs) return wrapper return decorator @mcp.tool() @require_scopes("files:write") async def write_file(path: str, content: str) -> dict: # Implementation pass ``` Hierarchical scopes simplify management. A scope like `files:admin` implies `files:read` and `files:write`: ```python def expand_scopes(requested_scopes: list[str]) -> set[str]: """Expand scopes to include all implied scopes.""" expanded = set() to_process = list(requested_scopes) while to_process: scope = to_process.pop() if scope in expanded: continue expanded.add(scope) implied = SCOPES_IMPLICATIONS.get(scope, []) to_process.extend(implied) return expanded ``` Resource-level authorization adds another dimension. Beyond checking scope, verify the specific resource: ```python def check_resource_permission( user: dict, action: str, resource: str ) -> bool: # Check explicit grants if resource in user.get("allowed_resources", []): return True # Check pattern-based grants for pattern in user.get("resource_patterns", []): if re.match(pattern, resource): return True return False ```20 min
- 16Claude Code IntegrationMCP servers extend Claude Code's capabilities by exposing local tools as native featuresΓÇöenabling AI to interact with your systems using a standardized protocol. Claude Code communicates with MCP servers throughstdio transport by default. Installation requires registering the server in Claude Code's configuration: ```json { "mcpServers": { "filesystem": { "command": "uv", "args": ["run", "mcp-server-filesystem", "--path", "/projects"] }, "database": { "command": "docker", "args": ["run", "--rm", "-e", "DB_URL=postgres://localhost/mydb", "mcp-database-server"] } } } ``` After configuration, Claude Code discovers available tools automatically. Use `/mcp` to see which servers are running and what tools they provide. Create Claude Code-optimized tools by understanding how the model interprets them. Tool descriptions become part of the promptΓÇöwrite them as instructions, not API documentation: ```python @mcp.tool(description="""Read the contents of a file. Args: - path: Absolute path to the file (required) Returns the complete file contents as text. Fails if the file exceeds 1MB or is a binary format. Example use: - Reading source code before editing - Examining configuration files - Reviewing logs to debug issues""") def read_file(path: str) -> str: ... ``` Schema definitions directly influence how Claude Code requests tool execution. Use descriptive field names and provide constraints: ```python @mcp.tool(description="Search for files matching criteria") async def search_files( pattern: Annotated[str, Field(description="Glob pattern, e.g. '*.py'")], directory: Annotated[str, Field( description="Root directory for search", default="/projects" )], case_sensitive: Annotated[bool, Field( description="Match case exactly", default=False )], ) -> list[dict]: ... ``` For complex operations, consider tools that return structured data rather than raw text. Claude Code processes structured responses more reliably: ```python @mcp.tool(description="Query the database with SQL") async def query_database( sql: Annotated[str, Field(description="SELECT statement (read-only)")], ) -> dict: """Execute query and return structured results.""" if not sql.strip().upper().startswith("SELECT"): raise ValueError("Only SELECT statements allowed") results = await execute_query(sql) return { "columns": list(results[0].keys()) if results else [], "rows": [dict(row) for row in results], "count": len(results), } ``` Resources provide read-only data access that Claude Code can query without tool invocation. Register resource providers for static or frequently-accessed data: ```python @mcp.resource("config://app/settings") def get_app_settings() -> str: return json.dumps(AppSettings.current().to_dict()) @mcp.resource("file://{path}") def get_file_resource(path: str) -> str: return Path(path).read_text() ```20 min
- 17Debugging MCP ServersDebugging MCP servers requires understanding both the protocol's message flow and the server's runtime behavior—inspecting at multiple levels reveals different failure modes. Start with protocol-level debugging. MCP communicates via JSON-RPC, so log all incoming and outgoing messages: ```python import logging import json class ProtocolLogger: def __init__(self, logger: logging.Logger): self.logger = logger def log_request(self, method: str, params: dict): self.logger.debug(f"→ {method}: {json.dumps(params)}") def log_response(self, id: int, result: any): self.logger.debug(f"← Response {id}: {json.dumps(result)}") def log_error(self, id: int, error: dict): self.logger.error(f"✗ Error {id}: {error}") logger = ProtocolLogger(logging.getLogger("mcp.protocol")) ``` For connection-level issues, verify the transport is properly established. Check that the server responds to the initial handshake: ```bash # Test server health curl -X POST http://localhost:8000/health \ -H "Content-Type: application/json" \ -d '{"jsonrpc": "2.0", "method": "initialize", "params": {}, "id": 0}' # Monitor active connections ss -tlnp | grep 8000 lsof -i :8000 ``` Python debugging with `pdb` or `breakpoint()` works within async contexts, but consider using `aiopdb` or structured logging for production debugging: ```python @mcp.tool() async def complex_operation(data: str) -> str: # Add conditional breakpoints for specific inputs if "DEBUG_TRIGGER" in data: breakpoint() # Only pauses on specific data # Log intermediate states logger.info(f"Processing {data[:50]}...") result = await process_data(data) logger.info(f"Completed, result length: {len(result)}") return result ``` Memory and performance profiling identify bottlenecks: ```python import tracemalloc import time @mcp.tool() def profiled_operation(data: list): tracemalloc.start() start = time.perf_counter() result = heavy_computation(data) current, peak = tracemalloc.get_traced_memory() elapsed = time.perf_counter() - start tracemalloc.stop() return { "result": result, "memory_mb": peak / 1024 / 1024, "elapsed_seconds": elapsed, } ``` For integration issues, mock the AI client to replay scenarios: ```python class MockMCPClient: """Replay recorded MCP sessions for debugging.""" def __init__(self, trace_file: str): with open(trace_file) as f: self.session = json.load(f) async def replay(self, server): for request in self.session["requests"]: response = await server.handle(request) expected = request.get("expected_response") if expected and response != expected: print(f"MISMATCH at {request['id']}:") print(f" Expected: {expected}") print(f" Got: {response}") ```20 min
- 18Testing MCPThorough MCP testing covers protocol compliance, tool behavior, and integration scenariosΓÇöeach layer reveals different defects that tests must catch. Unit tests verify individual tool behavior in isolation: ```python import pytest @pytest.fixture def mcp_server(): return create_test_server() @pytest.mark.asyncio async def test_file_tool_read(mcp_server): # Arrange create_test_file("/tmp/test.txt", "hello world") # Act result = await mcp_server.call_tool("read_file", {"path": "/tmp/test.txt"}) # Assert assert result == "hello world" @pytest.mark.asyncio async def test_file_tool_not_found(mcp_server): with pytest.raises(ToolError) as exc_info: await mcp_server.call_tool("read_file", {"path": "/nonexistent"}) assert "not found" in str(exc_info.value) ``` Protocol tests verify JSON-RPC message handling: ```python @pytest.mark.asyncio async def test_protocol_invalid_json(): response = await test_client.post( "/mcp", data="not valid json", headers={"Content-Type": "application/json"} ) assert response.status_code == 400 @pytest.mark.asyncio async def test_protocol_missing_method(): response = await test_client.post( "/mcp", json={"jsonrpc": "2.0", "params": {}, "id": 1} ) assert response.status_code == 400 assert "method" in response.json()["error"]["message"] ``` Integration tests verify the full request-response cycle: ```python @pytest.mark.asyncio async def test_full_tool_invocation(): # Start server in test mode async with TestServer() as server: client = MCPClient(transport=stdio()) # Initialize await client.initialize() # List tools tools = await client.list_tools() assert any(t.name == "read_file" for t in tools) # Call tool result = await client.call_tool("read_file", {"path": "/etc/hostname"}) assert isinstance(result, str) # Cleanup await client.close() ``` Property-based testing validates edge cases: ```python from hypothesis import given, strategies as st @given(path=st.text(min_size=1, max_size=1000)) @pytest.mark.asyncio async def test_path_handling(path): server = create_server() # Should either succeed or raise proper error try: result = await server.call_tool("read_file", {"path": path}) assert isinstance(result, str) except ToolError as e: # Errors should be informative assert len(str(e)) > 0 ``` Contract tests ensure compatibility with MCP protocol versions: ```python @pytest.mark.asyncio async def test_protocol_version_handshake(): client = TestClient() init_response = await client.send_request({ "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "test", "version": "1.0"} }, "id": 1 }) assert "protocolVersion" in init_response assert "capabilities" in init_response ```20 min
- 19Error HandlingMCP errors follow JSON-RPC conventions with structured codes and messagesΓÇöconsistent error handling enables reliable clients and simplifies debugging. JSON-RPC defines standard error codes. Map your exceptions to appropriate codes: ```python from mcp.types import ErrorData ERROR_CODES = { "parse_error": (-32700, "Invalid JSON"), "invalid_request": (-32600, "Malformed request"), "method_not_found": (-32601, "Unknown method"), "invalid_params": (-32602, "Invalid parameters"), "internal_error": (-32603, "Server error"), # Application-specific codes (>= -32000) "tool_not_found": (-32001, "Tool unavailable"), "validation_failed": (-32002, "Input validation failed"), "permission_denied": (-32003, "Access denied"), "rate_limited": (-32004, "Too many requests"), } def error_response(code: str, message: str, data: any = None) -> dict: code_num, _ = ERROR_CODES.get(code, ERROR_CODES["internal_error"]) return { "jsonrpc": "2.0", "error": { "code": code_num, "message": message, "data": data, }, "id": None, # Keep id from request if available } ``` Structured error responses help clients handle failures: ```python class ToolError(Exception): def __init__( self, message: str, code: str = "internal_error", details: dict = None, ): super().__init__(message) self.code = code self.details = details or {} def to_response(self, request_id: int = None) -> dict: return { "jsonrpc": "2.0", "id": request_id, "error": { "code": ERROR_CODES[self.code][0], "message": self.message, "data": self.details, } } # Usage in tools @mcp.tool() async def read_file(path: str) -> str: if not Path(path).exists(): raise ToolError( f"File not found: {path}", code="tool_not_found", details={"path": path, "attempted": "read"} ) return Path(path).read_text() ``` Global exception handling prevents protocol violations: ```python async def handle_request(request: dict) -> dict: try: # Process request result = await dispatch(request) return {"jsonrpc": "2.0", "id": request.get("id"), "result": result} except ValidationError as e: return error_response("invalid_params", str(e), e.errors) except PermissionError as e: return error_response("permission_denied", str(e)) except Exception as e: logger.exception(f"Unhandled error in request {request.get('id')}") return error_response( "internal_error", "An unexpected error occurred", {"type": type(e).__name__} ) ``` Retry logic handles transient failures: ```python import asyncio async def retry_with_backoff( func, max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, ): last_exception = None for attempt in range(max_attempts): try: return await func() except TransientError as e: last_exception = e if attempt < max_attempts - 1: delay = min(base_delay * (2 ** attempt), max_delay) await asyncio.sleep(delay) raise last_exception ``` Error recovery strategies handle partial failures: ```python async def batch_operation(items: list[dict]) -> dict: results = [] errors = [] for item in items: try: result = await process_item(item) results.append({"item": item["id"], "status": "success", "result": result}) except Exception as e: errors.append({"item": item["id"], "status": "failed", "error": str(e)}) return { "completed": len(results), "failed": len(errors), "results": results, "errors": errors, } ```20 min
- 20Logging and MonitoringProduction MCP servers require structured logging and metrics collectionΓÇöobservability enables debugging, performance tuning, and reliability assurance. Structured logging captures queryable fields: ```python import structlog structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), ] ) logger = structlog.get_logger() @mcp.tool() async def database_query(sql: str) -> dict: log = logger.bind(tool="database_query") log.info("tool_invocation", sql_type=sql.strip().upper()[:20]) start = time.perf_counter() try: result = await execute_query(sql) elapsed = time.perf_counter() - start log.info( "tool_completed", rows=len(result), elapsed_ms=round(elapsed * 1000, 2) ) return {"rows": result} except Exception as e: log.error("tool_failed", error=str(e)) raise ``` Prometheus metrics provide performance visibility: ```python from prometheus_client import Counter, Histogram, Gauge REQUEST_COUNT = Counter( "mcp_requests_total", "Total MCP requests", ["tool", "status"] ) REQUEST_DURATION = Histogram( "mcp_request_duration_seconds", "Request duration", ["tool"] ) ACTIVE_REQUESTS = Gauge( "mcp_active_requests", "Currently processing requests" ) @mcp.tool() async def monitored_tool(data: str) -> str: ACTIVE_REQUESTS.inc() REQUEST_COUNT.labels(tool="monitored_tool", status="started").inc() start = time.perf_counter() try: result = await heavy_operation(data) elapsed = time.perf_counter() - start REQUEST_DURATION.labels(tool="monitored_tool").observe(elapsed) REQUEST_COUNT.labels(tool="monitored_tool", status="success").inc() return result except Exception: REQUEST_COUNT.labels(tool="monitored_tool", status="error").inc() raise finally: ACTIVE_REQUESTS.dec() ``` Distributed tracing correlates requests: ```python from opentelemetry import trace tracer = trace.get_tracer(__name__) @mcp.tool() async def traced_operation(id: str) -> dict: with tracer.start_as_current_span( "mcp.tool", attributes={"tool.name": "traced_operation", "tool.param.id": id} ) as span: span.add_event("Processing started") result = await compute(id) span.set_attribute("result.rows", len(result)) span.add_event("Processing completed") return result ``` Health endpoints for monitoring systems: ```python from starlette.routing import Route async def health_check(request): checks = { "database": await check_database_health(), "disk_space": check_disk_space(), "memory": check_memory_usage(), } healthy = all(checks.values()) status_code = 200 if healthy else 503 return JSONResponse({ "status": "healthy" if healthy else "unhealthy", "checks": checks, "uptime_seconds": time.time() - START_TIME, }) ``` Log aggregation requires consistent formatting. Ship logs to a central system: ```python import logging.handlers # Configure structured logging with JSON logger = logging.getLogger("mcp") logger.setLevel(logging.INFO) # JSON file handler for local debugging json_handler = logging.handlers.RotatingFileHandler( "/var/log/mcp/server.json", maxBytes=10_000_000, backupCount=5 ) json_handler.setFormatter(JsonFormatter()) logger.addHandler(json_handler) # Syslog handler for centralized collection syslog_handler = logging.handlers.SysLogHandler(address="/dev/log") syslog_handler.setFormatter(SyslogFormatter()) logger.addHandler(syslog_handler) ```20 min
- 21Multi-Server OrchestrationComplex AI workflows often require coordinating multiple MCP serversΓÇöa hub pattern enables consistent routing, authentication, and error handling across server federations. Design a hub that routes requests to appropriate servers: ```python class MCPHub: def __init__(self): self.servers: dict[str, MCPServer] = {} self.router = Router() def register(self, name: str, server: MCPServer): self.servers[name] = server self.router.register_server(name, server.get_capabilities()) async def handle(self, request: dict) -> dict: tool_name = request.get("params", {}).get("name", "") server_name = self.router.route(tool_name) if not server_name: return error_response("tool_not_found", f"Unknown tool: {tool_name}") try: server = self.servers[server_name] return await server.handle(request) except Exception as e: return error_response("internal_error", str(e)) class Router: def __init__(self): self.routes: dict[str, str] = {} def register_server(self, name: str, capabilities: list[str]): for capability in capabilities: self.routes[capability] = name def route(self, tool_name: str) -> str | None: return self.routes.get(tool_name) ``` Federated discovery lets servers announce capabilities: ```python from mcp.types import ServerCapabilities, Tool class FederatedHub: def __init__(self): self.local_servers: dict[str, MCPClient] = {} self.remote_hubs: list[str] = [] self.cache = CapabilityCache(ttl=300) async def discover_tools(self) -> list[Tool]: all_tools = [] # Local servers for name, server in self.local_servers.items(): tools = await server.list_tools() for tool in tools: tool.server = name all_tools.append(tool) # Remote hubs for hub_url in self.remote_hubs: remote_tools = await self.query_remote_hub(hub_url) all_tools.extend(remote_tools) return all_tools async def query_remote_hub(self, url: str) -> list[Tool]: # Check cache first cached = self.cache.get(url) if cached: return cached # Query remote hub response = await self.http_client.get(f"{url}/capabilities") tools = [Tool(**t) for t in response.json()["tools"]] self.cache.set(url, tools) return tools ``` Unified authentication across servers: ```python class UnifiedAuth: def __init__(self, providers: dict[str, AuthProvider]): self.providers = providers async def authenticate(self, request: Request) -> Session: token = extract_token(request) for provider_name, provider in self.providers.items(): if provider.can_handle(token): session = await provider.validate(token) session.provider = provider_name return session raise PermissionError("No valid authentication") async def authorize(self, session: Session, tool: str) -> bool: # Check provider-specific permissions provider = self.providers[session.provider] return await provider.check_permission(session, tool) class Session: def __init__(self, user_id: str, scopes: list[str]): self.user_id = user_id self.scopes = scopes self.provider = None ``` Error aggregation and retry across federated servers: ```python async def federated_operation( hub: FederatedHub, tool_names: list[str], arguments: dict, ) -> list[dict]: results = [] errors = [] for tool_name in tool_names: try: result = await hub.forward(tool_name, arguments) results.append({"tool": tool_name, "status": "success", "result": result}) except Exception as e: # Check for fallback fallback = await hub.get_fallback(tool_name) if fallback: result = await hub.forward(fallback, arguments) results.append({ "tool": tool_name, "status": "fallback_success", "fallback_to": fallback, "result": result }) else: errors.append({"tool": tool_name, "status": "failed", "error": str(e)}) return {"results": results, "errors": errors} ```20 min
- 22Custom MCP Tool Suite ProjectBuilding a complete MCP tool suite integrates everything learnedΓÇöprotocol handling, security, testing, and observabilityΓÇöinto a cohesive, production-ready system. This capstone project creates an MCP-powered development assistant with file management, code search, and task execution capabilities. **Project Structure:** ``` dev-assistant-mcp/ Γö£ΓöÇΓöÇ pyproject.toml Γö£ΓöÇΓöÇ src/ Γöé Γö£ΓöÇΓöÇ __init__.py Γöé Γö£ΓöÇΓöÇ server.py # Main MCP server Γöé Γö£ΓöÇΓöÇ tools/ Γöé Γöé Γö£ΓöÇΓöÇ __init__.py Γöé Γöé Γö£ΓöÇΓöÇ files.py # File operations Γöé Γöé Γö£ΓöÇΓöÇ search.py # Code search Γöé Γöé ΓööΓöÇΓöÇ tasks.py # Task execution Γöé Γö£ΓöÇΓöÇ auth/ Γöé Γöé Γö£ΓöÇΓöÇ __init__.py Γöé Γöé ΓööΓöÇΓöÇ token.py # JWT authentication Γöé Γö£ΓöÇΓöÇ middleware/ Γöé Γöé Γö£ΓöÇΓöÇ __init__.py Γöé Γöé ΓööΓöÇΓöÇ logging.py # Request logging Γöé ΓööΓöÇΓöÇ tests/ Γöé Γö£ΓöÇΓöÇ __init__.py Γöé Γö£ΓöÇΓöÇ test_tools.py Γöé ΓööΓöÇΓöÇ test_auth.py ΓööΓöÇΓöÇ config/ ΓööΓöÇΓöÇ settings.yaml ``` **Core Implementation:** ```python # server.py from fastapi import FastAPI from mcp.server.fastmcp import FastMCP from contextlib import asynccontextmanager app = FastAPI() mcp = FastMCP("Dev Assistant") # Register tools from .tools.files import register_file_tools from .tools.search import register_search_tools from .tools.tasks import register_task_tools register_file_tools(mcp) register_search_tools(mcp) register_task_tools(mcp) # Add auth middleware from .auth.token import AuthMiddleware app.add_middleware(AuthMiddleware) @asynccontextmanager async def lifespan(app: FastAPI): # Startup logger.info("Starting Dev Assistant MCP server") yield # Shutdown logger.info("Shutting down Dev Assistant MCP server") app.router.lifespan_context = lifespan # Mount MCP at /mcp app.mount("/mcp", mcp.streamable_http_app()) ``` **File Tools:** ```python # tools/files.py from pathlib import Path from typing import Annotated from fastapi import Path as PathParam def register_file_tools(mcp: FastMCP): @mcp.tool(description="Read file contents safely") def read_file( path: Annotated[str, PathParam(description="Absolute file path")] ) -> str: p = Path(path).resolve() if not p.exists(): raise ToolError(f"File not found: {path}", code="not_found") if p.stat().st_size > 1_000_000: raise ToolError("File too large (max 1MB)", code="size_exceeded") return p.read_text() @mcp.tool(description="List directory contents") def list_directory( path: Annotated[str, PathParam(description="Directory path")], pattern: str = "*" ) -> list[dict]: p = Path(path).resolve() return [ {"name": f.name, "type": "dir" if f.is_dir() else "file"} for f in p.glob(pattern) ] @mcp.tool(description="Write content to file") def write_file( path: Annotated[str, PathParam(description="File path")], content: str ) -> dict: p = Path(path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) return {"written": len(content), "path": str(p)} ``` **Search Tools:** ```python # tools/search.py import subprocess import shlex def register_search_tools(mcp: FastMCP): @mcp.tool(description="Search files using grep") def grep( pattern: str, path: str = ".", file_pattern: str = "*" ) -> list[str]: safe_pattern = shlex.quote(pattern) safe_path = shlex.quote(path) result = subprocess.run( ["grep", "-r", "--include=" + file_pattern, safe_pattern, safe_path], capture_output=True, text=True, timeout=30 ) if result.returncode not in (0, 1): raise ToolError(result.stderr, code="search_failed") return result.stdout.strip().split("\n") if result.stdout else [] @mcp.tool(description="Search code structure") def find_symbol( name: str, kind: str = "function" # function, class, variable ) -> list[dict]: # Implementation using tree-sitter or similar pass ``` **Task Tools:** ```python # tools/tasks.py import subprocess def register_task_tools(mcp: FastMCP): @mcp.tool(description="Execute shell command with timeout") def execute( command: str, cwd: str = None, timeout: int = 60 ) -> dict: try: result = subprocess.run( command, shell=True, cwd=cwd, capture_output=True, text=True, timeout=timeout ) return { "exit_code": result.returncode, "stdout": result.stdout, "stderr": result.stderr, } except subprocess.TimeoutExpired: raise ToolError(f"Command timed out after {timeout}s", code="timeout") ``` **Authentication:** ```python # auth/token.py import jwt from starlette.middleware.base import BaseHTTPMiddleware class AuthMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): # Skip auth for health endpoint if request.url.path == "/health": return await call_next(request) # Extract token auth = request.headers.get("Authorization", "") if not auth.startswith("Bearer "): return JSONResponse({"error": "Unauthorized"}, 401) token = auth[7:] try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) request.state.user = payload except jwt.ExpiredSignatureError: return JSONResponse({"error": "Token expired"}, 401) except jwt.InvalidTokenError: return JSONResponse({"error": "Invalid token"}, 401) return await call_next(request) ``` **Testing:** ```python # tests/test_tools.py import pytest @pytest.mark.asyncio async def test_read_file_success(): create_temp_file("/tmp/test.txt", "hello") result = await call_tool("read_file", {"path": "/tmp/test.txt"}) assert result == "hello" @pytest.mark.asyncio async def test_read_file_not_found(): with pytest.raises(ToolError) as exc: await call_tool("read_file", {"path": "/nonexistent"}) assert exc.value.code == "not_found" ``` **Running the Server:** ```bash # Development uv run dev # Production uv run serve --port 8000 --host 0.0.0.0 ```25 min