12. Streaming with Tools
Chapter 12 of 18 · 25 min
Streaming responses from LLMs creates a responsive user experience, but integrating function calls into streaming pipelines introduces complexity. The model generates text incrementally, may decide to call a tool mid-stream, and the UI must handle partial tool calls gracefully.
Streaming Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ LLM │───▶│ Parser │───▶│ Dispatcher │
│ (streaming)│ │ (incremental)│ │ (async) │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌──────────────────────────┤
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Tool │ │ Token │
│ Executor │ │ Buffer │
└─────────────┘ └─────────────┘
SSE-Based Streaming
Ollama supports Server-Sent Events for streaming:
import sseclient
import requests
from typing import Iterator, AsyncGenerator
import json
class StreamingToolDispatcher:
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
def stream_with_tools(
self,
model: str,
messages: list[dict],
tools: list[dict],
system_prompt: str | None = None
) -> AsyncGenerator[dict, None]:
"""
Stream responses while detecting tool calls.
Yields:
- {"type": "content", "text": "..."} for text tokens
- {"type": "tool_call", "tool": "...", "params": {...}} when tool detected
- {"type": "tool_result", "tool": "...", "result": ...} after execution
"""
payload = {
"model": model,
"messages": messages,
"tools": tools,
"stream": True
}
if system_prompt:
payload["system"] = system_prompt
response = requests.post(
f"{self.base_url}/api/chat",
json=payload,
stream=True
)
client = sseclient.SSEClient(response)
tool_call_buffer = ""
current_tool = None
for event in client.events():
if event.data == "[DONE]":
break
data = json.loads(event.data)
if "content" in data:
content = data["content"]
if isinstance(content, str):
# Check if this looks like tool call syntax
tool_indicator = self._detect_tool_call_start(content)
if tool_indicator:
# Partial tool call detected
tool_call_buffer = content
current_tool = tool_indicator
elif current_tool:
# Continue accumulating tool call
tool_call_buffer += content
else:
# Regular text
yield {"type": "content", "text": content}
elif isinstance(content, dict) and content.get("type") == "tool":
# Direct tool call format from model
yield {
"type": "tool_call",
"tool": content["name"],
"params": content.get("parameters", {})
}
current_tool = None
def _detect_tool_call_start(self, text: str) -> str | None:
"""Detect if text appears to be start of tool call."""
for tool_name in ["get_weather", "calculate", "search", "read_file"]:
if tool_name in text and "(" in text:
return tool_name
return None
Async Tool Execution
Tools should execute asynchronously to avoid blocking the stream:
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncToolExecutor:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.tools: dict[str, callable] = {}
async def execute_async(
self,
tool_name: str,
parameters: dict
) -> dict:
"""Execute tool in thread pool to avoid blocking event loop."""
if tool_name not in self.tools:
return {"error": f"Unknown tool: {tool_name}"}
loop = asyncio.get_event_loop()
def run_tool():
return self.tools[tool_name](**parameters)
try:
result = await loop.run_in_executor(self.executor, run_tool)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
def register(self, name: str, func: callable):
self.tools[name] = func
Streaming Loop
Putting it together with proper streaming:
async def chat_stream(
model: str,
messages: list[dict],
tools: list[dict],
tool_executor: AsyncToolExecutor
):
dispatcher = StreamingToolDispatcher()
tool_call_pending = None
tool_params = {}
async for event in dispatcher.stream_with_tools(model, messages, tools):
if event["type"] == "content":
yield f"data: {json.dumps(event)}\n\n"
elif event["type"] == "tool_call":
# Execute tool and inject result
result = await tool_executor.execute_async(
event["tool"],
event["params"]
)
# Add tool result to conversation
messages.append({
"role": "tool",
"name": event["tool"],
"content": json.dumps(result)
})
yield f"data: {json.dumps({'type': 'tool_result', **event, 'result': result})}\n\n"
EXERCISE
Implement a streaming pipeline that displays text tokens as they arrive and executes a long-running tool (with asyncio.sleep to simulate) without blocking the stream. Verify that text tokens continue flowing during tool execution.