How to implement parallel agent execution
Multi-agent system, Python asyncio or threading
What this does
Parallel agent execution allows multiple AI agents to run concurrently, dramatically reducing total pipeline latency. This guide covers task fan-out, result aggregation, and partial failure handling using Python asyncio.
Steps
Step 1: Install dependencies
pip install asyncio aiohttp
asyncio ships with Python 3.7+ standard library. No additional packages required.
Step 2: Define the parallel executor
import asyncio
from typing import Any, Callable, Coroutine, Dict, List
class ParallelAgentExecutor:
"""Execute multiple agent tasks concurrently with result aggregation."""
def __init__(self, max_concurrency: int = 10):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
async def execute(
self,
tasks: List[Dict[str, Any]],
agent_fn: Callable[..., Coroutine],
aggregator: Callable[[List[Dict]], Dict] = None
) -> Dict[str, Any]:
"""
Fan out tasks to concurrent agents.
Args:
tasks: List of task dictionaries passed to agent_fn.
agent_fn: Async function defining one agent's logic.
aggregator: Optional function to combine agent results.
Returns:
Dict containing results list and summary statistics.
"""
async def run_task(task: Dict, task_id: int) -> Dict:
async with self.semaphore:
try:
result = await agent_fn(task)
return {"task_id": task_id, "status": "success", "result": result}
except Exception as e:
return {"task_id": task_id, "status": "failed", "error": str(e)}
coros = [run_task(t, i) for i, t in enumerate(tasks)]
results = await asyncio.gather(*coros, return_exceptions=False)
aggregated = {}
if aggregator:
aggregated = aggregator(results)
success_count = sum(1 for r in results if r["status"] == "success")
failed_count = len(results) - success_count
return {
"individual_results": results,
"aggregated": aggregated,
"summary": {
"total": len(results),
"succeeded": success_count,
"failed": failed_count
}
}
Step 3: Define agent task functions
async def classification_agent(task: Dict) -> Dict:
"""Example agent that classifies a text snippet."""
await asyncio.sleep(0.1) # Simulate LLM call
text = task["text"]
return {"label": "positive" if len(text) % 2 else "negative", "text": text}
async def summarization_agent(task: Dict) -> Dict:
"""Example agent that summarizes a text snippet."""
await asyncio.sleep(0.1)
text = task["text"]
return {"summary": text[:50], "text": text}
Step 4: Execute fan-out with result aggregation
async def main():
executor = ParallelAgentExecutor(max_concurrency=5)
tasks = [{"text": f"Sample document number {i}"} for i in range(20)]
# Fan out to multiple agents of the same type
results = await executor.execute(
tasks=tasks,
agent_fn=classification_agent,
aggregator=lambda r: {
"total_classified": len(r),
"labels": [x["result"]["label"] for x in r if x["status"] == "success"]
}
)
print(f"Total: {results['summary']['total']}")
print(f"Success: {results['summary']['succeeded']}")
print(f"Failed: {results['summary']['failed']}")
print(f"Labels: {results['aggregated']['labels'][:5]}")
asyncio.run(main())
Step 5: Handle partial failures
The executor continues even if individual agents fail. Failed tasks return with status: "failed" and an error message. Use aggregated to compute partial results from successful agents:
successful_results = [r["result"] for r in results["individual_results"] if r["status"] == "success"]
partial_aggregate = sum(item["label"] == "positive" for item in successful_results)
Verification
Run the script and verify:
Total: 20
Success: 20
Failed: 0
Labels: ['positive', 'negative', 'positive', ...]
For partial failure verification, inject a failure by raising inside one agent call. The output should still show a non-zero succeeded count and a non-empty results list.
Common failures
Blocking calls inside async functions. Using
time.sleep()or synchronous HTTP clients insideagent_fnblocks the event loop. Always useawait asyncio.sleep()and async HTTP clients likeaiohttp.Exceeding max concurrency silently. The semaphore prevents overwhelming the LLM API, but a burst of tasks above the limit queues requests and increases latency. Monitor queue depth and scale
max_concurrencybased on API rate limits.Unhandled exceptions in gather. If one agent raises an unhandled exception and
return_exceptions=False, the entireexecutecall fails. Always passreturn_exceptions=Falseand let the executor track per-task failures gracefully.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.
Related guides
- Setup Agent Error Recovery and Retry Logic - Pair this with retry logic so failed agent tasks are automatically rescheduled without losing the parallel execution benefit.
- Implement Streaming Responses in AI APIs - Combine streaming output with parallel execution for real-time multi-agent dashboards.