HOW-TO · RAG
How to Build Multi-Agent Orchestration System
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES
Single agent working, messaging infrastructure, Python 3.10+
What this does
Multi-agent orchestration coordinates multiple specialized agents that work together on complex tasks. An orchestrator agent delegates sub-tasks, monitors progress, and aggregates results.
Steps
- Define agent roles and message protocol. Each agent has a name, role, and capability list.
from dataclasses import dataclass
from enum import Enum
class MessageType(Enum):
TASK = "task"
RESULT = "result"
ERROR = "error"
STATUS = "status"
@dataclass
class AgentMessage:
sender: str
recipient: str
msg_type: MessageType
content: dict
task_id: str
- Create an orchestrator agent. It receives user requests, decomposes them, and assigns sub-tasks.
class Orchestrator:
def __init__(self):
self.agents = {}
self.task_queue = asyncio.Queue()
self.result_store = {}
def register_agent(self, name: str, capabilities: list[str]):
self.agents[name] = capabilities
def route_task(self, task: str) -> str:
best_agent = max(self.agents.items(),
key=lambda a: sum(1 for c in a[1] if c in task))
return best_agent[0]
async def run(self, user_request: str):
steps = decompose_task(user_request)
for step in steps:
agent = self.route_task(step)
msg = AgentMessage(
sender="orchestrator", recipient=agent,
msg_type=MessageType.TASK, content={"task": step}, task_id=uuid.uuid4()
)
await self.task_queue.put(msg)
- Build a worker agent. Workers listen for tasks and return results.
class WorkerAgent:
def __init__(self, name: str, llm_client, tools: dict):
self.name = name
self.llm = llm_client
self.tools = tools
async def process_task(self, task: AgentMessage) -> AgentMessage:
try:
result = await self.run_agent_loop(task.content["task"])
return AgentMessage(
sender=self.name, recipient="orchestrator",
msg_type=MessageType.RESULT, content={"output": result}, task_id=task.task_id
)
except Exception as e:
return AgentMessage(
sender=self.name, recipient="orchestrator",
msg_type=MessageType.ERROR, content={"error": str(e)}, task_id=task.task_id
)
- Handle result aggregation. The orchestrator combines partial results.
def aggregate_results(self, results: list[dict]) -> str:
context = "\n\n".join(
f"### Step by {r['agent']}:\n{r['output']}" for r in results
)
final_prompt = f"Combine these results into a final answer:\n\n{context}"
return self.llm.invoke(final_prompt)
Verification
python -c "
# Test basic routing
agents = {'search': ['search', 'find'], 'calc': ['math', 'calculate']}
def route(task):
return max(agents, key=lambda a: sum(1 for c in agents[a] if c in task))
print(route('find population of France'))
# Expected: search
"
Common failures
- Agent starvation. Some agents never receive tasks because the router always picks the same one. Implement round-robin fallback for ties.
- Deadlock from circular delegation. Agent A delegates to B, B to C, C to A. Add max delegation depth and detect cycles.
- Result ordering mismatch. The orchestrator expects results in submission order, but agents complete at different speeds. Tag each task with a sequence number.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.
Related guides
- How to Implement Agent-to-Agent Communication
- How to Design Specialized Agent Roles