RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Multi-Agent Systems
  6. /Ch. 5
Multi-Agent Systems

05. Message Passing

Chapter 5 of 24 · 20 min
KEY INSIGHT

Direct point-to-point messaging creates tight coupling; intermediary-based passing enables routing, filtering, and temporal decoupling that distributed systems require. Message passing patterns determine how agents exchange information. Direct point-to-point communication offers simplicity—messages go from sender to receiver with minimal overhead. However, this pattern creates tight coupling: senders must know recipient identities, recipients must handle all incoming messages, and adding new agents requires updating all existing participants. Message brokers or queues introduce intermediaries. Senders publish messages to topics or queues without knowing recipients. Receivers subscribe to relevant channels. This decoupling enables dynamic participation—agents can join and leave without disrupting the system. Temporal decoupling handles cases where agents operate at different rates; queues buffer messages between fast producers and slow consumers. Publish-subscribe patterns excel for notifications and state changes. When an agent completes a task or updates shared state, it publishes an event. Interested agents subscribe and react accordingly. This pattern supports event-driven architectures where system behavior emerges from reactions to events rather than explicit orchestration. Request-reply patterns remain essential for queries requiring responses. A query agent sends a request to a knowledge base agent and expects results. The request-reply pattern requires correlation—matching responses to their originating requests. Message headers typically carry correlation identifiers. Queue-based systems introduce new failure modes: message loss, duplication, and reordering. Dead-letter queues capture messages that fail processing after multiple retries. Idempotency keys enable safe message deduplication. Sequence numbers or timestamps enable ordering guarantees where required. Push vs pull patterns affect resource utilization. Push systems notify consumers immediately (efficient when consumers can handle load); pull systems poll for new work (efficient when load varies or consumers need backpressure control). ```python import asyncio from collections import defaultdict from dataclasses import dataclass, field from typing import Callable, Optional import uuid from enum import Enum class DeliveryMode(Enum): AT_MOST_ONCE = "at_most_once" # Fire and forget AT_LEAST_ONCE = "at_least_once" # Ack-based retry EXACTLY_ONCE = "exactly_once" # Idempotent operations @dataclass class Envelope: id: str = field(default_factory=lambda: str(uuid.uuid4())) sender: str = "" topic: str = "" action: str = "" payload: dict = field(default_factory=dict) correlation_id: Optional[str] = None reply_to: Optional[str] = None delivery_mode: DeliveryMode = DeliveryMode.AT_LEAST_ONCE class MessageBus: def __init__(self): self.subscribers: dict[str, list[Callable]] = defaultdict(list) self.pending: dict[str, Envelope] = {} self.dead_letter_queue: list[Envelope] = [] self.max_retries = 3 def subscribe(self, topic: str, handler: Callable) -> None: self.subscribers[topic].append(handler) async def publish(self, envelope: Envelope) -> None: self.pending[envelope.id] = envelope if envelope.reply_to: # Request-reply pattern return await self._request_reply(envelope) return await self._broadcast(envelope) async def _broadcast(self, envelope: Envelope) -> None: handlers = self.subscribers.get(envelope.topic, []) if not handlers: self.dead_letter_queue.append(envelope) return for handler in handlers: try: await handler(envelope) # Remove from pending only on success if envelope.id in self.pending: del self.pending[envelope.id] except Exception as e: await self._handle_failure(envelope, e) async def _request_reply(self, envelope: Envelope) -> Optional[Envelope]: # Create reply channel reply_topic = f"reply_{envelope.id}" reply_received = asyncio.Event() reply_result = None async def reply_handler(reply: Envelope): nonlocal reply_result reply_result = reply reply_received.set() self.subscribe(reply_topic, reply_handler) # Send original message handlers = self.subscribers.get(envelope.topic, []) if handlers: await handlers[0](envelope) # Wait for reply with timeout try: await asyncio.wait_for(reply_received.wait(), timeout=30) return reply_result except asyncio.TimeoutError: return None finally: self.subscribers.pop(reply_topic, None) async def _handle_failure(self, envelope: Envelope, error: Exception) -> None: retry_count = envelope.payload.get("_retry_count", 0) if retry_count < self.max_retries: envelope.payload["_retry_count"] = retry_count + 1 # Exponential backoff await asyncio.sleep(2 ** retry_count) await self.publish(envelope) else: self.dead_letter_queue.append(envelope) # Usage example bus = MessageBus() async def handle_code_review(envelope: Envelope): # Process code review request result = {"approved": True, "issues": []} reply = Envelope( sender="reviewer", topic=envelope.reply_to, action="review_complete", payload=result, correlation_id=envelope.id ) await bus.publish(reply) bus.subscribe("code_review_requests", handle_code_review) # Send request request = Envelope( sender="pipeline", topic="code_review_requests", action="review", payload={"code": "...", "language": "python"}, reply_to=f"reply_channel_{uuid.uuid4()}" ) reply = await bus.publish(request) ```

EXERCISE

Design a message-passing system for a distributed tracing scenario where one agent generates spans, another aggregates them, and a third visualizes results. Implement topic-based routing so aggregation agents receive all span events, and visualization agents receive only completed traces. Handle the case where the visualization agent goes offline for an extended period. (12 minutes)

← Chapter 4
Agent Communication Protocols
Chapter 6 →
Shared State