RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /How-to
  5. /How to Implement Agent-to-Agent Communication
HOW-TO · RAG

How to Implement Agent-to-Agent Communication

advanced·30 min·By Fredoline Eruo
Target environment
Ubuntu 24.04 · Ollama 0.4.x
PREREQUISITES

Multi-agent system, message protocol defined, Python 3.10+

What this does

Agent-to-agent communication allows specialized agents to exchange messages, request information, delegate sub-tasks, and share intermediate results through a structured protocol.

Steps

  • Define a message bus. All agents publish and subscribe through a central bus.
import asyncio
from collections import defaultdict

class MessageBus:
    def __init__(self):
        self.subscribers = defaultdict(list)
        self.queue = asyncio.Queue()

    def subscribe(self, agent_name: str, callback):
        self.subscribers[agent_name].append(callback)

    async def publish(self, message: dict):
        await self.queue.put(message)
        # Also deliver directly to named recipients
        if message.get("recipient") in self.subscribers:
            for cb in self.subscribers[message["recipient"]]:
                await cb(message)

    async def start(self):
        while True:
            msg = await self.queue.get()
            # Broadcast to all listeners
            for listeners in self.subscribers.values():
                for cb in listeners:
                    await cb(msg)
  • Create a communication protocol. Standardize message format with required headers.
from pydantic import BaseModel
from datetime import datetime

class AgentMessage(BaseModel):
    protocol_version: str = "1.0"
    sender: str
    recipient: str
    message_type: str  # request, response, broadcast, error
    intent: str  # query, delegate, inform, confirm
    payload: dict
    timestamp: datetime = None
    ttl_seconds: int = 30

    def is_expired(self) -> bool:
        if self.timestamp is None:
            return False
        elapsed = (datetime.now() - self.timestamp).total_seconds()
        return elapsed > self.ttl_seconds
  • Implement request-response pattern. Agent A sends a request, Agent B responds.
class ResearcherAgent:
    async def handle_query(self, msg: AgentMessage, bus: MessageBus):
        # Perform research
        result = await self.research(msg.payload["question"])
        response = AgentMessage(
            sender=self.name,
            recipient=msg.sender,
            message_type="response",
            intent="inform",
            payload={"answer": result}
        )
        await bus.publish(response.dict())
  • Add broadcast and discovery. Agents announce their capabilities on startup.
class DiscoveryService:
    def __init__(self, bus: MessageBus):
        self.bus = bus
        self.registry = {}

    async def register_agent(self, name: str, capabilities: list[str]):
        self.registry[name] = capabilities
        await self.bus.publish({
            "type": "announce",
            "sender": name,
            "capabilities": capabilities
        })

    def find_agent(self, capability: str) -> str | None:
        for name, caps in self.registry.items():
            if capability in caps:
                return name
        return None
  • Handle message timeouts. If no response arrives within TTL, the sender retries or escalates.
async def request_with_timeout(bus: MessageBus, msg: AgentMessage, timeout: int = 30) -> dict:
    future = asyncio.get_event_loop().create_future()
    async def handler(response):
        if response.get("recipient") == msg.sender:
            future.set_result(response)

    bus.subscribe(msg.sender, handler)
    await bus.publish(msg.dict())

    try:
        return await asyncio.wait_for(future, timeout=timeout)
    except asyncio.TimeoutError:
        return {"error": "timeout", "message": f"No response from {msg.recipient}"}

Verification

python -c "
from pydantic import BaseModel
class M(BaseModel):
    sender: str
    payload: str
m = M(sender='agent_a', payload='hello')
print(m.sender)
# Expected: agent_a
"

Common failures

  • Dead letter queue not implemented. Messages addressed to non-existent agents are silently dropped. Route all undeliverable messages to a dead letter queue for debugging.
  • Deserialization errors. Different agents may have different message schema versions. Include a protocol_version field and validate before processing.
  • Infinite message loops. Agent A sends to B, B responds to A, A responds to B, repeating forever. Add message depth tracking and reject messages exceeding max depth.
  • Version mismatch - The installed package or runtime differs from the command shown; check the version first and rerun the smallest verification command.
  • Local environment drift - Another service, virtual environment, model, or path is being used; print the active binary path and configuration before changing the guide steps.

Related guides

  • How to Build Multi-Agent Orchestration System
  • How to Design Specialized Agent Roles
← All how-to guidesCourses →