KEY INSIGHT
Consensus enables agents to agree on facts and actions, but perfect consensus is expensive; choose mechanisms matching the cost of disagreement versus the cost of delay.
Multi-agent systems often require agents to agree on shared state, coordinated actions, or authoritative decisions. Consensus mechanisms provide systematic ways for agents to reach agreement even when individual agents have incomplete information or conflicting views.
Byzantine fault tolerance handles scenarios where agents may behave arbitrarily—maliciously, incorrectly, or inconsistently. Traditional Byzantine consensus requires large message exchanges and significant time. Practical Byzantine Fault Tolerance (PBFT) reduces overhead while maintaining safety guarantees.
Raft consensus provides a simpler model for crash fault tolerance—agents may fail by stopping but not by behaving incorrectly. Raft elects a leader that coordinates operations. Followers replicate leader decisions. Leader failures trigger new elections.
Multi-agent coordination can use voting mechanisms. Each agent casts votes based on its view; outcomes depend on voting rules (majority, supermajority, weighted). Voting works for discrete decisions but struggles with continuous value coordination.
Gossip protocols spread information across agent populations without central coordination. Agents randomly share state with peers; over time, the population converges to shared understanding. Gossip handles eventual consistency but provides no hard guarantees.
Credential propagation enables agents to collectively validate information. Agents share evidence and attestations; collective validation provides higher confidence than individual checks. However, collective validation increases latency.
Consensus overhead must match the cost of errors. Low-stakes decisions warrant fast consensus with small quorums. High-stakes decisions warrant slower consensus with Byzantine guarantees.
```python
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from enum import Enum
from datetime import datetime
import asyncio
import hashlib
import random
class ConsensusState(Enum):
PENDING = "pending"
PROPOSED = "proposed"
PREPARE = "prepare"
COMMIT = "commit"
DECIDED = "decided"
REJECTED = "rejected"
@dataclass
class Proposal:
id: str
value: Any
proposer: str
round: int
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Vote:
proposal_id: str
voter: str
vote_type: str # "prepare" or "commit"
round: int
timestamp: datetime = field(default_factory=datetime.utcnow)
class RaftConsensus:
"""Simplified Raft consensus for agent leader election and state replication"""
def __init__(self, agent_id: str, peers: list[str]):
self.agent_id = agent_id
self.peers = peers
self.state = "follower"
self.current_term = 0
self.voted_for: Optional[str] = None
self.leader_id: Optional[str] = None
self.commit_index = 0
self.last_applied = 0
self.vote_counts: dict[int, dict[str, bool]] = {}
self.election_timeout = random.randint(150, 300) / 1000 # seconds
async def start_election(self) -> bool:
"""Start leader election"""
self.current_term += 1
self.state = "candidate"
self.voted_for = self.agent_id
votes_received = {self.agent_id}
# Request votes from peers
for peer in self.peers:
granted = await self._request_vote(peer)
if granted:
votes_received.add(peer)
# Check if majority achieved
majority = (len(self.peers) + 1) // 2 + 1
if len(votes_received) >= majority:
self.state = "leader"
self.leader_id = self.agent_id
return True
return False
async def _request_vote(self, peer: str) -> bool:
"""Request vote from peer"""
# Simplified - real implementation would send RPC
last_log_index = 0
last_log_term = 0
# Peer decides based on its state
# It grants vote if:
# 1. term is >= peer's term
# 2. candidate's log is at least as complete
return True
async def append_entries(self, entries: list[dict]) -> bool:
"""Leader appends entries to followers"""
if self.state != "leader":
return False
success_count = 1 # Include self
for peer in self.peers:
success = await self._send_entries(peer, entries)
if success:
success_count += 1
majority = (len(self.peers) + 1) // 2 + 1
if success_count >= majority:
for entry in entries:
self.commit_index += 1
return True
return False
async def _send_entries(self, peer: str, entries: list[dict]) -> bool:
"""Send entries to follower"""
return True
class MultiPaxosVariant:
"""Multi-Paxos for distributed decision making"""
def __init__(self, agent_id: str, peers: list[str]):
self.agent_id = agent_id
self.peers = peers
self.proposals: dict[str, Proposal] = {}
self.votes: dict[str, list[Vote]] = {}
self.decisions: dict[str, Any] = {}
async def propose(self, value: Any) -> Optional[Any]:
"""Propose a value for consensus"""
proposal_id = hashlib.sha256(
f"{value}{datetime.utcnow().timestamp()}".encode()
).hexdigest()[:16]
proposal = Proposal(
id=proposal_id,
value=value,
proposer=self.agent_id,
round=0
)
self.proposals[proposal_id] = proposal
# Phase 1: Prepare
prepare_ok = await self._send_prepare(proposal)
if not prepare_ok:
return None
# Phase 2: Accept
accept_ok = await self._send_accept(proposal)
if not accept_ok:
return None
# Phase 3: Learn decision
self.decisions[proposal_id] = value
return value
async def _send_prepare(self, proposal: Proposal) -> bool:
"""Send prepare messages, collect promises"""
promises = 1 # Self
for peer in self.peers:
# In real implementation, wait for promise from peer
promise_received = True
if promise_received:
promises += 1
majority = (len(self.peers) + 1) // 2 + 1
return promises >= majority
async def _send_accept(self, proposal: Proposal) -> bool:
"""Send accept messages, collect accepts"""
accepts = 1 # Self
for peer in self.peers:
accept_received = True
if accept_received:
accepts += 1
majority = (len(self.peers) + 1) // 2 + 1
return accepts >= majority
async def learn_decision(self, proposal_id: str) -> Optional[Any]:
"""Learn a decided value"""
return self.decisions.get(proposal_id)
class GossipConsensus:
"""Gossip-based eventual consensus for large agent populations"""
def __init__(self, agent_id: str, peers: list[str]):
self.agent_id = agent_id
self.peers = peers
self.state: dict[str, Any] = {}
self.version: dict[str, int] = {}
self.gossip_interval = 1.0 # seconds
async def start_gossip(self) -> None:
"""Start background gossip protocol"""
while True:
await asyncio.sleep(self.gossip_interval)
peer = random.choice(self.peers)
await self._gossip_with(peer)
async def _gossip_with(self, peer: str) -> None:
"""Exchange state with peer"""
# Select subset of state to share
state_subset = dict(random.sample(
list(self.state.items()),
min(3, len(self.state))
))
# In real implementation, send to peer and receive their updates
# Merge incoming state
# Update versions
pass
async def set_value(self, key: str, value: Any) -> None:
"""Set a value (will eventually propagate to all agents)"""
self.state[key] = value
self.version[key] = self.version.get(key, 0) + 1
async def get_value(self, key: str) -> Optional[Any]:
"""Get current known value (may not be globally consistent)"""
return self.state.get(key)
# Factory function for consensus mechanism selection
def create_consensus(
mechanism: str,
agent_id: str,
peers: list[str]
):
if mechanism == "raft":
return RaftConsensus(agent_id, peers)
elif mechanism == "paxos":
return MultiPaxosVariant(agent_id, peers)
elif mechanism == "gossip":
return GossipConsensus(agent_id, peers)
else:
raise ValueError(f"Unknown consensus mechanism: {mechanism}")
# Usage
consensus = create_consensus("raft", "agent_1", ["agent_2", "agent_3", "agent_4"])
# Leader election
is_leader = await consensus.start_election()
if is_leader:
# Propose state changes
await consensus.append_entries([{"type": "update", "key": "config", "value": "new_config"}])
```