RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /How-to
  5. /How to implement streaming responses in AI APIs
HOW-TO · SUP

How to implement streaming responses in AI APIs

intermediate·20 min·By Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

FastAPI with SSE support, LLM endpoint

What this does

Streaming responses deliver LLM output token-by-token via Server-Sent Events (SSE), dramatically improving perceived latency in chat interfaces. This guide implements a FastAPI endpoint that streams LLM completions in real time while handling connection drops gracefully.

Steps

Step 1: Install dependencies

pip install fastapi uvicorn sse-starlette

fastapi provides the web framework. uvicorn is the ASGI server. sse-starlette offers SSE helpers on top of Starlette.

Step 2: Define the streaming endpoint

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
import json

app = FastAPI(title="Streaming LLM API")

# Conversation history storage (use Redis or a database in production)
conversation_history: dict[str, list[dict]] = {}

@app.get("/health")
def health():
    return {"status": "ok"}

@app.post("/v1/chat/stream")
async def chat_stream(request: Request):
    """
    Stream LLM responses token-by-token via SSE.
    Returns events with 'delta' field containing new tokens.
    """
    body = await request.json()
    message = body.get("message", "")
    session_id = body.get("session_id", "default")

    if session_id not in conversation_history:
        conversation_history[session_id] = []

    conversation_history[session_id].append({"role": "user", "content": message})

    async def event_generator():
        full_response = ""
        chunk_count = 0

        try:
            # Replace this with actual LLM API call
            async for token in simulate_llm_stream(message):
                full_response += token
                chunk_count += 1

                # Yield SSE event with token delta
                yield {
                    "event": "message",
                    "data": json.dumps({
                        "delta": token,
                        "done": False,
                        "chunk_index": chunk_count
                    })
                }

        except asyncio.CancelledError:
            # Connection dropped mid-stream
            print(f"[Stream] Client disconnected. Partial response: {full_response[:50]}")
            yield {
                "event": "error",
                "data": json.dumps({"message": "Client disconnected", "partial": full_response})
            }
            return

        # Stream complete
        conversation_history[session_id].append({"role": "assistant", "content": full_response})

        yield {
            "event": "message",
            "data": json.dumps({"delta": "", "done": True, "total_chunks": chunk_count})
        }

    return EventSourceResponse(event_generator())

Step 3: Simulate LLM streaming for testing

async def simulate_llm_stream(prompt: str, delay: float = 0.05):
    """Simulate token-by-token LLM response for development/testing."""
    response_template = (
        f"Based on your query about '{prompt}', here is the analysis: "
        f"This topic involves multiple key components that interact in complex ways. "
        f"The primary considerations include resource allocation, performance metrics, "
        f"and systematic evaluation of the underlying mechanisms at play."
    )
    for word in response_template.split():
        await asyncio.sleep(delay)
        yield word + " "

Step 4: Verify the streaming endpoint

Start the server and test with curl:

uvicorn main:app --reload --port 8000

In a separate terminal:

curl -N -X POST http://localhost:8000/v1/chat/stream \
  -H "Content-Type: application/json" \
  -d '{"message": "explain caching", "session_id": "test_001"}'

Step 5: Verify expected output

Expected SSE format:

event: message
data: {"delta": "Based ", "done": false, "chunk_index": 1}
event: message
data: {"delta": "on ", "done": false, "chunk_index": 2}
event: message
data: {"delta": "your ", "done": false, "chunk_index": 3}
...
event: message
data: {"delta": "", "done": true, "total_chunks": 32}

Each response line begins with event: message followed by data: containing a JSON object. The client parses delta to accumulate the full response and reads done: true to finalize.

Verification

Common failures

  1. Missing Content-Type: text/event-stream header. Clients cannot detect SSE responses without this header. The sse-starlette library sets it automatically, but a custom StreamingResponse requires explicit media_type="text/event-stream" in the response object.

  2. Unhandled exceptions in the generator breaking the stream. Any unhandled exception inside event_generator() collapses the entire SSE connection. Wrap the entire body in a try/except block and yield an error event before returning.

  3. Buffer accumulation causing latency. When await calls inside the generator have variable latency, tokens queue in the buffer. Use yield immediately after each token and avoid awaiting heavy operations mid-stream. Reserve heavy processing for a separate stage before the stream.

  • Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
  • Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.

Related guides

  • Implement Parallel Agent Execution - Stream output from multiple agents simultaneously by assigning each agent its own SSE stream and merging results client-side.
  • Setup GPU Memory Optimization for Inference - Optimize the underlying LLM inference to reduce per-token generation time, directly improving streaming responsiveness.
← All how-to guidesCourses →