14. Agent Communication
Communication patterns determine how agents share state, coordinate actions, and handle partial failure. Getting this wrong creates systems that work in testing and fail catastrophically in production.
Direct Messaging vs. Message Broker
Direct messaging (agent-to-agent) offers low latency but creates tight coupling. Message brokers decouple agents but introduce eventual consistency headaches.
For most production systems, start with a message broker:
import asyncio
from dataclasses import dataclass
from typing import Any
import json
@dataclass
class AgentMessage:
sender: str
recipient: str | None # None means broadcast
content: dict[str, Any]
message_type: str
reply_to: str | None = None
correlation_id: str | None = None
class InMemoryMessageBus:
def __init__(self):
self.subscribers: dict[str, asyncio.Queue] = {}
self.default_queue: asyncio.Queue = asyncio.Queue()
async def subscribe(self, agent_id: str) -> asyncio.Queue:
queue: asyncio.Queue[AgentMessage] = asyncio.Queue()
self.subscribers[agent_id] = queue
return queue
async def publish(self, message: AgentMessage):
if message.recipient and message.recipient in self.subscribers:
await self.subscribers[message.recipient].put(message)
else:
# Broadcast to all subscribers
for queue in self.subscribers.values():
await queue.put(message)
The Shared Nothing Problem
Agents that share memory through global state cause intermittent failures that are nearly impossible to reproduce. Each agent should own its state and communicate through messages only.
Serialization Pitfalls
Messages must survive process restarts. This means JSON serialization, not pickle:
# Bad: pickle fails across Python versions and is a security risk
import pickle
data = pickle.dumps(message)
# Good: JSON survives process boundaries
data = json.dumps(asdict(message), default=str)
Backpressure Handling
Unbounded queues cause memory exhaustion when producers outpace consumers. Implement backpressure explicitly:
async def publish_with_backpressure(self, message: AgentMessage, max_queue_size: int = 1000):
queue_size = self.subscribers[message.recipient].qsize() if message.recipient in self.subscribers else 0
if queue_size > max_queue_size:
raise BackpressureError(f"Queue for {message.recipient} has {queue_size} pending messages")
await self.publish(message)
Modify the message bus to track delivery receipts. Implement a retry mechanism for agents that don't acknowledge within a timeout window.