How to implement streaming responses in AI APIs
FastAPI with SSE support, LLM endpoint
What this does
Streaming responses deliver LLM output token-by-token via Server-Sent Events (SSE), dramatically improving perceived latency in chat interfaces. This guide implements a FastAPI endpoint that streams LLM completions in real time while handling connection drops gracefully.
Steps
Step 1: Install dependencies
pip install fastapi uvicorn sse-starlette
fastapi provides the web framework. uvicorn is the ASGI server. sse-starlette offers SSE helpers on top of Starlette.
Step 2: Define the streaming endpoint
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
app = FastAPI(title="Streaming LLM API")
# Conversation history storage (use Redis or a database in production)
conversation_history: dict[str, list[dict]] = {}
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/v1/chat/stream")
async def chat_stream(request: Request):
"""
Stream LLM responses token-by-token via SSE.
Returns events with 'delta' field containing new tokens.
"""
body = await request.json()
message = body.get("message", "")
session_id = body.get("session_id", "default")
if session_id not in conversation_history:
conversation_history[session_id] = []
conversation_history[session_id].append({"role": "user", "content": message})
async def event_generator():
full_response = ""
chunk_count = 0
try:
# Replace this with actual LLM API call
async for token in simulate_llm_stream(message):
full_response += token
chunk_count += 1
# Yield SSE event with token delta
yield {
"event": "message",
"data": json.dumps({
"delta": token,
"done": False,
"chunk_index": chunk_count
})
}
except asyncio.CancelledError:
# Connection dropped mid-stream
print(f"[Stream] Client disconnected. Partial response: {full_response[:50]}")
yield {
"event": "error",
"data": json.dumps({"message": "Client disconnected", "partial": full_response})
}
return
# Stream complete
conversation_history[session_id].append({"role": "assistant", "content": full_response})
yield {
"event": "message",
"data": json.dumps({"delta": "", "done": True, "total_chunks": chunk_count})
}
return EventSourceResponse(event_generator())
Step 3: Simulate LLM streaming for testing
async def simulate_llm_stream(prompt: str, delay: float = 0.05):
"""Simulate token-by-token LLM response for development/testing."""
response_template = (
f"Based on your query about '{prompt}', here is the analysis: "
f"This topic involves multiple key components that interact in complex ways. "
f"The primary considerations include resource allocation, performance metrics, "
f"and systematic evaluation of the underlying mechanisms at play."
)
for word in response_template.split():
await asyncio.sleep(delay)
yield word + " "
Step 4: Verify the streaming endpoint
Start the server and test with curl:
uvicorn main:app --reload --port 8000
In a separate terminal:
curl -N -X POST http://localhost:8000/v1/chat/stream \
-H "Content-Type: application/json" \
-d '{"message": "explain caching", "session_id": "test_001"}'
Step 5: Verify expected output
Expected SSE format:
event: message
data: {"delta": "Based ", "done": false, "chunk_index": 1}
event: message
data: {"delta": "on ", "done": false, "chunk_index": 2}
event: message
data: {"delta": "your ", "done": false, "chunk_index": 3}
...
event: message
data: {"delta": "", "done": true, "total_chunks": 32}
Each response line begins with event: message followed by data: containing a JSON object. The client parses delta to accumulate the full response and reads done: true to finalize.
Verification
Common failures
Missing
Content-Type: text/event-streamheader. Clients cannot detect SSE responses without this header. Thesse-starlettelibrary sets it automatically, but a custom StreamingResponse requires explicitmedia_type="text/event-stream"in the response object.Unhandled exceptions in the generator breaking the stream. Any unhandled exception inside
event_generator()collapses the entire SSE connection. Wrap the entire body in atry/exceptblock and yield an error event before returning.Buffer accumulation causing latency. When
awaitcalls inside the generator have variable latency, tokens queue in the buffer. Useyieldimmediately after each token and avoid awaiting heavy operations mid-stream. Reserve heavy processing for a separate stage before the stream.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.
Related guides
- Implement Parallel Agent Execution - Stream output from multiple agents simultaneously by assigning each agent its own SSE stream and merging results client-side.
- Setup GPU Memory Optimization for Inference - Optimize the underlying LLM inference to reduce per-token generation time, directly improving streaming responsiveness.