HOW-TO · RAG
How to Implement Agent-to-Agent Communication
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES
Multi-agent system, message protocol defined, Python 3.10+
What this does
Agent-to-agent communication allows specialized agents to exchange messages, request information, delegate sub-tasks, and share intermediate results through a structured protocol.
Steps
- Define a message bus. All agents publish and subscribe through a central bus.
import asyncio
from collections import defaultdict
class MessageBus:
def __init__(self):
self.subscribers = defaultdict(list)
self.queue = asyncio.Queue()
def subscribe(self, agent_name: str, callback):
self.subscribers[agent_name].append(callback)
async def publish(self, message: dict):
await self.queue.put(message)
# Also deliver directly to named recipients
if message.get("recipient") in self.subscribers:
for cb in self.subscribers[message["recipient"]]:
await cb(message)
async def start(self):
while True:
msg = await self.queue.get()
# Broadcast to all listeners
for listeners in self.subscribers.values():
for cb in listeners:
await cb(msg)
- Create a communication protocol. Standardize message format with required headers.
from pydantic import BaseModel
from datetime import datetime
class AgentMessage(BaseModel):
protocol_version: str = "1.0"
sender: str
recipient: str
message_type: str # request, response, broadcast, error
intent: str # query, delegate, inform, confirm
payload: dict
timestamp: datetime = None
ttl_seconds: int = 30
def is_expired(self) -> bool:
if self.timestamp is None:
return False
elapsed = (datetime.now() - self.timestamp).total_seconds()
return elapsed > self.ttl_seconds
- Implement request-response pattern. Agent A sends a request, Agent B responds.
class ResearcherAgent:
async def handle_query(self, msg: AgentMessage, bus: MessageBus):
# Perform research
result = await self.research(msg.payload["question"])
response = AgentMessage(
sender=self.name,
recipient=msg.sender,
message_type="response",
intent="inform",
payload={"answer": result}
)
await bus.publish(response.dict())
- Add broadcast and discovery. Agents announce their capabilities on startup.
class DiscoveryService:
def __init__(self, bus: MessageBus):
self.bus = bus
self.registry = {}
async def register_agent(self, name: str, capabilities: list[str]):
self.registry[name] = capabilities
await self.bus.publish({
"type": "announce",
"sender": name,
"capabilities": capabilities
})
def find_agent(self, capability: str) -> str | None:
for name, caps in self.registry.items():
if capability in caps:
return name
return None
- Handle message timeouts. If no response arrives within TTL, the sender retries or escalates.
async def request_with_timeout(bus: MessageBus, msg: AgentMessage, timeout: int = 30) -> dict:
future = asyncio.get_event_loop().create_future()
async def handler(response):
if response.get("recipient") == msg.sender:
future.set_result(response)
bus.subscribe(msg.sender, handler)
await bus.publish(msg.dict())
try:
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
return {"error": "timeout", "message": f"No response from {msg.recipient}"}
Verification
python -c "
from pydantic import BaseModel
class M(BaseModel):
sender: str
payload: str
m = M(sender='agent_a', payload='hello')
print(m.sender)
# Expected: agent_a
"
Common failures
- Dead letter queue not implemented. Messages addressed to non-existent agents are silently dropped. Route all undeliverable messages to a dead letter queue for debugging.
- Deserialization errors. Different agents may have different message schema versions. Include a
protocol_versionfield and validate before processing. - Infinite message loops. Agent A sends to B, B responds to A, A responds to B, repeating forever. Add message depth tracking and reject messages exceeding max depth.
- Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
- Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.
Related guides
- How to Build Multi-Agent Orchestration System
- How to Design Specialized Agent Roles