08. Parallel Tool Calls
Models may request multiple tool calls simultaneously when the operations are independent. Parallel execution reduces latency compared to sequential execution.
Detect and extract parallel calls:
def extract_parallel_calls(response: dict) -> list[dict]:
tool_calls = []
for message in response.get("message", []):
if "tool_calls" in message:
for call in message["tool_calls"]:
tool_calls.append({
"name": call["function"]["name"],
"arguments": json.loads(call["function"]["arguments"])
})
return tool_calls
Execute tools in parallel using concurrent.futures:
from concurrent.futures import ThreadPoolExecutor, as_completed
def execute_parallel(
calls: list[dict],
registry: ToolRegistry,
max_workers: int = 4
) -> list[dict]:
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_call = {
executor.submit(
execute_with_retry,
call["name"],
call["arguments"],
registry.get_schema(call["name"])
): call
for call in calls
}
for future in as_completed(future_to_call):
call = future_to_call[future]
try:
result = future.result()
results.append({
"tool": call["name"],
"success": result.get("success", False),
"result": result
})
except Exception as e:
results.append({
"tool": call["name"],
"success": False,
"error": str(e)
})
return results
Aggregate parallel results for the model:
def format_parallel_results(results: list[dict]) -> list[dict]:
formatted = []
for i, result in enumerate(results):
formatted.append({
"role": "tool",
"tool_call_id": f"call_parallel_{i}",
"content": json.dumps(result)
})
return formatted
# In orchestration loop
tool_calls = extract_parallel_calls(response)
if len(tool_calls) > 1:
# Parallel execution
parallel_results = execute_parallel(tool_calls, registry)
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": f"call_parallel_{i}",
"type": "function",
"function": {
"name": call["name"],
"arguments": json.dumps(call["arguments"])
}
} for i, call in enumerate(tool_calls)
]})
messages.extend(format_parallel_results(parallel_results))
elif tool_calls:
# Single tool execution (fall back to sequential logic)
pass
Parallel execution requires careful resource management. Limit max_workers based on available CPU cores and the nature of the tool operations—CPU-bound tools benefit from fewer workers while I/O-bound tools can use more.
Handle partial failures in parallel execution:
def check_parallel_completeness(
results: list[dict],
expected_count: int
) -> bool:
successful = sum(1 for r in results if r.get("success", False))
if successful == expected_count:
return True
elif successful == 0:
# All failed—abort and report
return False
else:
# Partial success—continue but warn
return True
Error aggregation formats multiple errors for the model:
def aggregate_errors(results: list[dict]) -> str:
error_parts = []
for result in results:
if not result.get("success", True):
error_parts.append(
f"{result.get('tool', 'unknown')}: {result.get('error', 'Unknown error')}"
)
return f"Multiple errors occurred:\n" + "\n".join(error_parts)
Create a system with three independent tools (e.g., get time, get date, get weather). Configure a query that triggers all three in parallel. Measure latency and compare against sequential execution.