KEY INSIGHT
Direct point-to-point messaging creates tight coupling; intermediary-based passing enables routing, filtering, and temporal decoupling that distributed systems require.
Message passing patterns determine how agents exchange information. Direct point-to-point communication offers simplicity—messages go from sender to receiver with minimal overhead. However, this pattern creates tight coupling: senders must know recipient identities, recipients must handle all incoming messages, and adding new agents requires updating all existing participants.
Message brokers or queues introduce intermediaries. Senders publish messages to topics or queues without knowing recipients. Receivers subscribe to relevant channels. This decoupling enables dynamic participation—agents can join and leave without disrupting the system. Temporal decoupling handles cases where agents operate at different rates; queues buffer messages between fast producers and slow consumers.
Publish-subscribe patterns excel for notifications and state changes. When an agent completes a task or updates shared state, it publishes an event. Interested agents subscribe and react accordingly. This pattern supports event-driven architectures where system behavior emerges from reactions to events rather than explicit orchestration.
Request-reply patterns remain essential for queries requiring responses. A query agent sends a request to a knowledge base agent and expects results. The request-reply pattern requires correlation—matching responses to their originating requests. Message headers typically carry correlation identifiers.
Queue-based systems introduce new failure modes: message loss, duplication, and reordering. Dead-letter queues capture messages that fail processing after multiple retries. Idempotency keys enable safe message deduplication. Sequence numbers or timestamps enable ordering guarantees where required.
Push vs pull patterns affect resource utilization. Push systems notify consumers immediately (efficient when consumers can handle load); pull systems poll for new work (efficient when load varies or consumers need backpressure control).
```python
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Callable, Optional
import uuid
from enum import Enum
class DeliveryMode(Enum):
AT_MOST_ONCE = "at_most_once" # Fire and forget
AT_LEAST_ONCE = "at_least_once" # Ack-based retry
EXACTLY_ONCE = "exactly_once" # Idempotent operations
@dataclass
class Envelope:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
sender: str = ""
topic: str = ""
action: str = ""
payload: dict = field(default_factory=dict)
correlation_id: Optional[str] = None
reply_to: Optional[str] = None
delivery_mode: DeliveryMode = DeliveryMode.AT_LEAST_ONCE
class MessageBus:
def __init__(self):
self.subscribers: dict[str, list[Callable]] = defaultdict(list)
self.pending: dict[str, Envelope] = {}
self.dead_letter_queue: list[Envelope] = []
self.max_retries = 3
def subscribe(self, topic: str, handler: Callable) -> None:
self.subscribers[topic].append(handler)
async def publish(self, envelope: Envelope) -> None:
self.pending[envelope.id] = envelope
if envelope.reply_to:
# Request-reply pattern
return await self._request_reply(envelope)
return await self._broadcast(envelope)
async def _broadcast(self, envelope: Envelope) -> None:
handlers = self.subscribers.get(envelope.topic, [])
if not handlers:
self.dead_letter_queue.append(envelope)
return
for handler in handlers:
try:
await handler(envelope)
# Remove from pending only on success
if envelope.id in self.pending:
del self.pending[envelope.id]
except Exception as e:
await self._handle_failure(envelope, e)
async def _request_reply(self, envelope: Envelope) -> Optional[Envelope]:
# Create reply channel
reply_topic = f"reply_{envelope.id}"
reply_received = asyncio.Event()
reply_result = None
async def reply_handler(reply: Envelope):
nonlocal reply_result
reply_result = reply
reply_received.set()
self.subscribe(reply_topic, reply_handler)
# Send original message
handlers = self.subscribers.get(envelope.topic, [])
if handlers:
await handlers[0](envelope)
# Wait for reply with timeout
try:
await asyncio.wait_for(reply_received.wait(), timeout=30)
return reply_result
except asyncio.TimeoutError:
return None
finally:
self.subscribers.pop(reply_topic, None)
async def _handle_failure(self, envelope: Envelope, error: Exception) -> None:
retry_count = envelope.payload.get("_retry_count", 0)
if retry_count < self.max_retries:
envelope.payload["_retry_count"] = retry_count + 1
# Exponential backoff
await asyncio.sleep(2 ** retry_count)
await self.publish(envelope)
else:
self.dead_letter_queue.append(envelope)
# Usage example
bus = MessageBus()
async def handle_code_review(envelope: Envelope):
# Process code review request
result = {"approved": True, "issues": []}
reply = Envelope(
sender="reviewer",
topic=envelope.reply_to,
action="review_complete",
payload=result,
correlation_id=envelope.id
)
await bus.publish(reply)
bus.subscribe("code_review_requests", handle_code_review)
# Send request
request = Envelope(
sender="pipeline",
topic="code_review_requests",
action="review",
payload={"code": "...", "language": "python"},
reply_to=f"reply_channel_{uuid.uuid4()}"
)
reply = await bus.publish(request)
```