RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Function Calling for Local Models
  6. /Ch. 12
Function Calling for Local Models

12. Streaming with Tools

Chapter 12 of 18 · 25 min
KEY INSIGHT

Streaming tool calls require buffering partial model outputs and asynchronously executing tools without blocking the token stream to the client.

Streaming responses from LLMs creates a responsive user experience, but integrating function calls into streaming pipelines introduces complexity. The model generates text incrementally, may decide to call a tool mid-stream, and the UI must handle partial tool calls gracefully.

Streaming Architecture

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   LLM       │───▶│  Parser     │───▶│  Dispatcher │
│  (streaming)│    │ (incremental)│    │  (async)    │
└─────────────┘    └─────────────┘    └─────────────┘
                                              │
                   ┌──────────────────────────┤
                   ▼                          ▼
            ┌─────────────┐           ┌─────────────┐
            │   Tool      │           │   Token     │
            │  Executor   │           │   Buffer    │
            └─────────────┘           └─────────────┘

SSE-Based Streaming

Ollama supports Server-Sent Events for streaming:

import sseclient
import requests
from typing import Iterator, AsyncGenerator
import json

class StreamingToolDispatcher:
    def __init__(self, base_url: str = "http://localhost:11434"):
        self.base_url = base_url
    
    def stream_with_tools(
        self,
        model: str,
        messages: list[dict],
        tools: list[dict],
        system_prompt: str | None = None
    ) -> AsyncGenerator[dict, None]:
        """
        Stream responses while detecting tool calls.
        
        Yields:
            - {"type": "content", "text": "..."} for text tokens
            - {"type": "tool_call", "tool": "...", "params": {...}} when tool detected
            - {"type": "tool_result", "tool": "...", "result": ...} after execution
        """
        payload = {
            "model": model,
            "messages": messages,
            "tools": tools,
            "stream": True
        }
        
        if system_prompt:
            payload["system"] = system_prompt
        
        response = requests.post(
            f"{self.base_url}/api/chat",
            json=payload,
            stream=True
        )
        
        client = sseclient.SSEClient(response)
        
        tool_call_buffer = ""
        current_tool = None
        
        for event in client.events():
            if event.data == "[DONE]":
                break
            
            data = json.loads(event.data)
            
            if "content" in data:
                content = data["content"]
                
                if isinstance(content, str):
                    # Check if this looks like tool call syntax
                    tool_indicator = self._detect_tool_call_start(content)
                    
                    if tool_indicator:
                        # Partial tool call detected
                        tool_call_buffer = content
                        current_tool = tool_indicator
                    elif current_tool:
                        # Continue accumulating tool call
                        tool_call_buffer += content
                    else:
                        # Regular text
                        yield {"type": "content", "text": content}
                        
                elif isinstance(content, dict) and content.get("type") == "tool":
                    # Direct tool call format from model
                    yield {
                        "type": "tool_call",
                        "tool": content["name"],
                        "params": content.get("parameters", {})
                    }
                    current_tool = None
    
    def _detect_tool_call_start(self, text: str) -> str | None:
        """Detect if text appears to be start of tool call."""
        for tool_name in ["get_weather", "calculate", "search", "read_file"]:
            if tool_name in text and "(" in text:
                return tool_name
        return None

Async Tool Execution

Tools should execute asynchronously to avoid blocking the stream:

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncToolExecutor:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.tools: dict[str, callable] = {}
    
    async def execute_async(
        self, 
        tool_name: str, 
        parameters: dict
    ) -> dict:
        """Execute tool in thread pool to avoid blocking event loop."""
        
        if tool_name not in self.tools:
            return {"error": f"Unknown tool: {tool_name}"}
        
        loop = asyncio.get_event_loop()
        
        def run_tool():
            return self.tools[tool_name](**parameters)
        
        try:
            result = await loop.run_in_executor(self.executor, run_tool)
            return {"success": True, "result": result}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def register(self, name: str, func: callable):
        self.tools[name] = func

Streaming Loop

Putting it together with proper streaming:

async def chat_stream(
    model: str,
    messages: list[dict],
    tools: list[dict],
    tool_executor: AsyncToolExecutor
):
    dispatcher = StreamingToolDispatcher()
    
    tool_call_pending = None
    tool_params = {}
    
    async for event in dispatcher.stream_with_tools(model, messages, tools):
        if event["type"] == "content":
            yield f"data: {json.dumps(event)}\n\n"
        
        elif event["type"] == "tool_call":
            # Execute tool and inject result
            result = await tool_executor.execute_async(
                event["tool"],
                event["params"]
            )
            
            # Add tool result to conversation
            messages.append({
                "role": "tool",
                "name": event["tool"],
                "content": json.dumps(result)
            })
            
            yield f"data: {json.dumps({'type': 'tool_result', **event, 'result': result})}\n\n"
EXERCISE

Implement a streaming pipeline that displays text tokens as they arrive and executes a long-running tool (with asyncio.sleep to simulate) without blocking the stream. Verify that text tokens continue flowing during tool execution.

← Chapter 11
Retry Logic
Chapter 13 →
LangChain Tools Integration