HOW-TO · SUP

How to implement parallel agent execution

advanced30 minBy Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

Multi-agent system, Python asyncio or threading

What this does

Parallel agent execution allows multiple AI agents to run concurrently, dramatically reducing total pipeline latency. This guide covers task fan-out, result aggregation, and partial failure handling using Python asyncio.

Steps

Step 1: Install dependencies

pip install asyncio aiohttp

asyncio ships with Python 3.7+ standard library. No additional packages required.

Step 2: Define the parallel executor

import asyncio
from typing import Any, Callable, Coroutine, Dict, List

class ParallelAgentExecutor:
    """Execute multiple agent tasks concurrently with result aggregation."""

    def __init__(self, max_concurrency: int = 10):
        self.max_concurrency = max_concurrency
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def execute(
        self,
        tasks: List[Dict[str, Any]],
        agent_fn: Callable[..., Coroutine],
        aggregator: Callable[[List[Dict]], Dict] = None
    ) -> Dict[str, Any]:
        """
        Fan out tasks to concurrent agents.
        Args:
            tasks: List of task dictionaries passed to agent_fn.
            agent_fn: Async function defining one agent's logic.
            aggregator: Optional function to combine agent results.
        Returns:
            Dict containing results list and summary statistics.
        """
        async def run_task(task: Dict, task_id: int) -> Dict:
            async with self.semaphore:
                try:
                    result = await agent_fn(task)
                    return {"task_id": task_id, "status": "success", "result": result}
                except Exception as e:
                    return {"task_id": task_id, "status": "failed", "error": str(e)}

        coros = [run_task(t, i) for i, t in enumerate(tasks)]
        results = await asyncio.gather(*coros, return_exceptions=False)

        aggregated = {}
        if aggregator:
            aggregated = aggregator(results)

        success_count = sum(1 for r in results if r["status"] == "success")
        failed_count = len(results) - success_count

        return {
            "individual_results": results,
            "aggregated": aggregated,
            "summary": {
                "total": len(results),
                "succeeded": success_count,
                "failed": failed_count
            }
        }

Step 3: Define agent task functions

async def classification_agent(task: Dict) -> Dict:
    """Example agent that classifies a text snippet."""
    await asyncio.sleep(0.1)  # Simulate LLM call
    text = task["text"]
    return {"label": "positive" if len(text) % 2 else "negative", "text": text}

async def summarization_agent(task: Dict) -> Dict:
    """Example agent that summarizes a text snippet."""
    await asyncio.sleep(0.1)
    text = task["text"]
    return {"summary": text[:50], "text": text}

Step 4: Execute fan-out with result aggregation

async def main():
    executor = ParallelAgentExecutor(max_concurrency=5)

    tasks = [{"text": f"Sample document number {i}"} for i in range(20)]

    # Fan out to multiple agents of the same type
    results = await executor.execute(
        tasks=tasks,
        agent_fn=classification_agent,
        aggregator=lambda r: {
            "total_classified": len(r),
            "labels": [x["result"]["label"] for x in r if x["status"] == "success"]
        }
    )

    print(f"Total: {results['summary']['total']}")
    print(f"Success: {results['summary']['succeeded']}")
    print(f"Failed: {results['summary']['failed']}")
    print(f"Labels: {results['aggregated']['labels'][:5]}")

asyncio.run(main())

Step 5: Handle partial failures

The executor continues even if individual agents fail. Failed tasks return with status: "failed" and an error message. Use aggregated to compute partial results from successful agents:

successful_results = [r["result"] for r in results["individual_results"] if r["status"] == "success"]
partial_aggregate = sum(item["label"] == "positive" for item in successful_results)

Verification

Run the script and verify:

Total: 20
Success: 20
Failed: 0
Labels: ['positive', 'negative', 'positive', ...]

For partial failure verification, inject a failure by raising inside one agent call. The output should still show a non-zero succeeded count and a non-empty results list.

Common failures

  1. Blocking calls inside async functions. Using time.sleep() or synchronous HTTP clients inside agent_fn blocks the event loop. Always use await asyncio.sleep() and async HTTP clients like aiohttp.

  2. Exceeding max concurrency silently. The semaphore prevents overwhelming the LLM API, but a burst of tasks above the limit queues requests and increases latency. Monitor queue depth and scale max_concurrency based on API rate limits.

  3. Unhandled exceptions in gather. If one agent raises an unhandled exception and return_exceptions=False, the entire execute call fails. Always pass return_exceptions=False and let the executor track per-task failures gracefully.

  • Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
  • Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.

Related guides

  • Setup Agent Error Recovery and Retry Logic - Pair this with retry logic so failed agent tasks are automatically rescheduled without losing the parallel execution benefit.
  • Implement Streaming Responses in AI APIs - Combine streaming output with parallel execution for real-time multi-agent dashboards.