08. Parallel Tool Calls

Chapter 8 of 18 · 20 min

Models may request multiple tool calls simultaneously when the operations are independent. Parallel execution reduces latency compared to sequential execution.

Detect and extract parallel calls:

def extract_parallel_calls(response: dict) -> list[dict]:
    tool_calls = []
    
    for message in response.get("message", []):
        if "tool_calls" in message:
            for call in message["tool_calls"]:
                tool_calls.append({
                    "name": call["function"]["name"],
                    "arguments": json.loads(call["function"]["arguments"])
                })
    
    return tool_calls

Execute tools in parallel using concurrent.futures:

from concurrent.futures import ThreadPoolExecutor, as_completed

def execute_parallel(
    calls: list[dict],
    registry: ToolRegistry,
    max_workers: int = 4
) -> list[dict]:
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_call = {
            executor.submit(
                execute_with_retry,
                call["name"],
                call["arguments"],
                registry.get_schema(call["name"])
            ): call
            for call in calls
        }
        
        for future in as_completed(future_to_call):
            call = future_to_call[future]
            try:
                result = future.result()
                results.append({
                    "tool": call["name"],
                    "success": result.get("success", False),
                    "result": result
                })
            except Exception as e:
                results.append({
                    "tool": call["name"],
                    "success": False,
                    "error": str(e)
                })
    
    return results

Aggregate parallel results for the model:

def format_parallel_results(results: list[dict]) -> list[dict]:
    formatted = []
    
    for i, result in enumerate(results):
        formatted.append({
            "role": "tool",
            "tool_call_id": f"call_parallel_{i}",
            "content": json.dumps(result)
        })
    
    return formatted

# In orchestration loop
tool_calls = extract_parallel_calls(response)

if len(tool_calls) > 1:
    # Parallel execution
    parallel_results = execute_parallel(tool_calls, registry)
    
    messages.append({
        "role": "assistant",
        "content": None,
        "tool_calls": [{
            "id": f"call_parallel_{i}",
            "type": "function",
            "function": {
                "name": call["name"],
                "arguments": json.dumps(call["arguments"])
            }
        } for i, call in enumerate(tool_calls)
    ]})
    
    messages.extend(format_parallel_results(parallel_results))
elif tool_calls:
    # Single tool execution (fall back to sequential logic)
    pass

Parallel execution requires careful resource management. Limit max_workers based on available CPU cores and the nature of the tool operations—CPU-bound tools benefit from fewer workers while I/O-bound tools can use more.

Handle partial failures in parallel execution:

def check_parallel_completeness(
    results: list[dict],
    expected_count: int
) -> bool:
    successful = sum(1 for r in results if r.get("success", False))
    
    if successful == expected_count:
        return True
    elif successful == 0:
        # All failed—abort and report
        return False
    else:
        # Partial success—continue but warn
        return True

Error aggregation formats multiple errors for the model:

def aggregate_errors(results: list[dict]) -> str:
    error_parts = []
    
    for result in results:
        if not result.get("success", True):
            error_parts.append(
                f"{result.get('tool', 'unknown')}: {result.get('error', 'Unknown error')}"
            )
    
    return f"Multiple errors occurred:\n" + "\n".join(error_parts)
EXERCISE

Create a system with three independent tools (e.g., get time, get date, get weather). Configure a query that triggers all three in parallel. Measure latency and compare against sequential execution.