KEY INSIGHT
Server-Sent Events transport enables MCP over HTTP by using standard request-response for client calls and SSE streams for server notifications.
The HTTP+SSE transport combines two patterns: standard HTTP requests flow from client to server, while Server-Sent Events stream data from server to client. This hybrid approach supports all MCP capabilities.
**Transport Architecture:**
```
Client → Server: HTTP POST (JSON-RPC requests)
Server → Client: HTTP GET + SSE stream (notifications)
```
**Server Implementation:**
```python
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
import asyncio
app = Starlette(
routes=[
Route("/mcp", handle_mcp_messages, methods=["POST"]),
Route("/sse", handle_sse, methods=["GET"]),
]
)
# Track connected clients for notifications
connected_clients: dict[str, asyncio.Queue] = {}
async def handle_mcp_messages(request):
body = await request.json()
response = await server.handle_json(body)
return JSONResponse(response)
async def handle_sse(request):
client_id = request.query_params.get("client_id", str(id(request)))
queue = asyncio.Queue()
connected_clients[client_id] = queue
async def event_generator():
while True:
event = await queue.get()
yield {"event": event.get("type", "message"), "data": json.dumps(event)}
return EventSourceResponse(event_generator())
```
**Session Management:**
```python
class MCPSession:
def __init__(self, session_id: str, request_handler, notification_sender):
self.session_id = session_id
self.request_handler = request_handler
self.notification_sender = notification_sender
self._capabilities = {}
async def send_notification(self, notification_type: str, data: dict):
await self.notification_sender({
"type": notification_type,
"session_id": self.session_id,
**data
})
async def initialize(self, client_info: dict):
result = await self.request_handler({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": client_info
})
self._capabilities = result.get("capabilities", {})
return result
```
**Client Implementation:**
```python
import httpx
import sseclient
class MCPHTTPClient:
def __init__(self, base_url: str):
self.base_url = base_url
self._client = httpx.AsyncClient(base_url=base_url)
self._notification_handlers = {}
async def connect(self, client_info: dict):
# SSE connection for notifications
self._event_source = self._client.stream(
"GET",
"/sse",
params={"client_id": str(uuid.uuid4())}
)
# Initialize via standard request
response = await self._client.post("/mcp", json={
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": client_info
})
return response.json()
async def call_tool(self, name: str, args: dict):
response = await self._client.post("/mcp", json={
"jsonrpc": "2.0",
"id": generate_id(),
"method": "tools/call",
"params": {"name": name, "arguments": args}
})
return response.json()
async def listen_notifications(self):
# Parse SSE stream
chunks = self._event_source.content.iter_any()
events = sseclient.EventSource(chunks)
for event in events:
await self._handle_notification(json.loads(event.data))
```
**Health Checks:**
```python
@app.route("/health")
async def health(request):
return JSONResponse({
"status": "healthy",
"connected_clients": len(connected_clients),
"transport": "http+sse"
})
```
**CORS Configuration:**
```python
from starlette.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_methods=["*"],
allow_headers=["*"],
)
```
**Connection Cleanup:**
```python
@app.on_event("shutdown")
async def shutdown():
for client_id in list(connected_clients.keys()):
del connected_clients[client_id]
await self._client.aclose()
```