RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /MCP Server Implementation
  6. /Ch. 11
MCP Server Implementation

11. SSE Transport

Chapter 11 of 22 · 20 min
KEY INSIGHT

Server-Sent Events transport enables MCP over HTTP by using standard request-response for client calls and SSE streams for server notifications. The HTTP+SSE transport combines two patterns: standard HTTP requests flow from client to server, while Server-Sent Events stream data from server to client. This hybrid approach supports all MCP capabilities. **Transport Architecture:** ``` Client → Server: HTTP POST (JSON-RPC requests) Server → Client: HTTP GET + SSE stream (notifications) ``` **Server Implementation:** ```python from starlette.applications import Starlette from starlette.routing import Route from sse_starlette import EventSourceResponse import asyncio app = Starlette( routes=[ Route("/mcp", handle_mcp_messages, methods=["POST"]), Route("/sse", handle_sse, methods=["GET"]), ] ) # Track connected clients for notifications connected_clients: dict[str, asyncio.Queue] = {} async def handle_mcp_messages(request): body = await request.json() response = await server.handle_json(body) return JSONResponse(response) async def handle_sse(request): client_id = request.query_params.get("client_id", str(id(request))) queue = asyncio.Queue() connected_clients[client_id] = queue async def event_generator(): while True: event = await queue.get() yield {"event": event.get("type", "message"), "data": json.dumps(event)} return EventSourceResponse(event_generator()) ``` **Session Management:** ```python class MCPSession: def __init__(self, session_id: str, request_handler, notification_sender): self.session_id = session_id self.request_handler = request_handler self.notification_sender = notification_sender self._capabilities = {} async def send_notification(self, notification_type: str, data: dict): await self.notification_sender({ "type": notification_type, "session_id": self.session_id, **data }) async def initialize(self, client_info: dict): result = await self.request_handler({ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": client_info }) self._capabilities = result.get("capabilities", {}) return result ``` **Client Implementation:** ```python import httpx import sseclient class MCPHTTPClient: def __init__(self, base_url: str): self.base_url = base_url self._client = httpx.AsyncClient(base_url=base_url) self._notification_handlers = {} async def connect(self, client_info: dict): # SSE connection for notifications self._event_source = self._client.stream( "GET", "/sse", params={"client_id": str(uuid.uuid4())} ) # Initialize via standard request response = await self._client.post("/mcp", json={ "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": client_info }) return response.json() async def call_tool(self, name: str, args: dict): response = await self._client.post("/mcp", json={ "jsonrpc": "2.0", "id": generate_id(), "method": "tools/call", "params": {"name": name, "arguments": args} }) return response.json() async def listen_notifications(self): # Parse SSE stream chunks = self._event_source.content.iter_any() events = sseclient.EventSource(chunks) for event in events: await self._handle_notification(json.loads(event.data)) ``` **Health Checks:** ```python @app.route("/health") async def health(request): return JSONResponse({ "status": "healthy", "connected_clients": len(connected_clients), "transport": "http+sse" }) ``` **CORS Configuration:** ```python from starlette.middleware.cors import CORSMiddleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_methods=["*"], allow_headers=["*"], ) ``` **Connection Cleanup:** ```python @app.on_event("shutdown") async def shutdown(): for client_id in list(connected_clients.keys()): del connected_clients[client_id] await self._client.aclose() ```

EXERCISE

Build an MCP server using SSE transport that implements a real-time notification system. The server should send periodic updates via SSE when resources change, and clients should receive these notifications through the event stream.

← Chapter 10
API Integration Server
Chapter 12 →
Streamable HTTP Transport