14. Agent Communication

Chapter 14 of 24 · 20 min

Communication patterns determine how agents share state, coordinate actions, and handle partial failure. Getting this wrong creates systems that work in testing and fail catastrophically in production.

Direct Messaging vs. Message Broker

Direct messaging (agent-to-agent) offers low latency but creates tight coupling. Message brokers decouple agents but introduce eventual consistency headaches.

For most production systems, start with a message broker:

import asyncio
from dataclasses import dataclass
from typing import Any
import json

@dataclass
class AgentMessage:
    sender: str
    recipient: str | None  # None means broadcast
    content: dict[str, Any]
    message_type: str
    reply_to: str | None = None
    correlation_id: str | None = None

class InMemoryMessageBus:
    def __init__(self):
        self.subscribers: dict[str, asyncio.Queue] = {}
        self.default_queue: asyncio.Queue = asyncio.Queue()
    
    async def subscribe(self, agent_id: str) -> asyncio.Queue:
        queue: asyncio.Queue[AgentMessage] = asyncio.Queue()
        self.subscribers[agent_id] = queue
        return queue
    
    async def publish(self, message: AgentMessage):
        if message.recipient and message.recipient in self.subscribers:
            await self.subscribers[message.recipient].put(message)
        else:
            # Broadcast to all subscribers
            for queue in self.subscribers.values():
                await queue.put(message)

The Shared Nothing Problem

Agents that share memory through global state cause intermittent failures that are nearly impossible to reproduce. Each agent should own its state and communicate through messages only.

Serialization Pitfalls

Messages must survive process restarts. This means JSON serialization, not pickle:

# Bad: pickle fails across Python versions and is a security risk
import pickle
data = pickle.dumps(message)

# Good: JSON survives process boundaries
data = json.dumps(asdict(message), default=str)

Backpressure Handling

Unbounded queues cause memory exhaustion when producers outpace consumers. Implement backpressure explicitly:

async def publish_with_backpressure(self, message: AgentMessage, max_queue_size: int = 1000):
    queue_size = self.subscribers[message.recipient].qsize() if message.recipient in self.subscribers else 0
    if queue_size > max_queue_size:
        raise BackpressureError(f"Queue for {message.recipient} has {queue_size} pending messages")
    await self.publish(message)
EXERCISE

Modify the message bus to track delivery receipts. Implement a retry mechanism for agents that don't acknowledge within a timeout window.