KEY INSIGHT
Shared state synchronizes agents at the cost of contention; choose consistency models that match task requirements—eager consistency for transactions, lazy for throughput.
Multi-agent systems frequently require shared state: order status visible to fulfillment and customer service agents, product inventory shared across sales and warehouse agents, user session data accessed by multiple service agents. Managing this shared state determines system correctness and performance.
Optimistic concurrency control accepts that conflicts are rare and handles them through retry logic. Agents read state, make local modifications, then attempt to write. If another agent modified the state between read and write, the write fails and the agent retries with fresh data. This model maximizes throughput but requires idempotent operations.
Pessimistic locking blocks access to prevent conflicts. Agents acquire locks before reading or writing shared state. Other agents wait or receive "busy" responses. This model simplifies correctness reasoning but limits parallelism and creates deadlock risk.
Event sourcing treats state changes as append-only event logs. Agents append events rather than mutating state directly. Current state derives from replaying events. This pattern provides audit trails and supports temporal queries (what was state at time T?), but complicates current-state access.
Saga patterns coordinate multi-step operations across agents. Each step has a compensating action that undoes its effects if later steps fail. The saga coordinator ensures either all steps complete or all are compensated. This pattern handles distributed transactions without distributed locks.
Eventual consistency accepts that different agents may temporarily observe different state values. Updates propagate asynchronously; clients eventually see consistent data. This model scales well but complicates scenarios requiring strong consistency.
```python
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Any
from datetime import datetime
import uuid
@dataclass
class VersionedValue:
value: Any
version: int
last_modified: datetime
modified_by: str
class OptimisticStore:
"""Optimistic concurrency control shared store"""
def __init__(self):
self.data: dict[str, VersionedValue] = {}
self.locks: dict[str, asyncio.Lock] = {}
def _get_lock(self, key: str) -> asyncio.Lock:
if key not in self.locks:
self.locks[key] = asyncio.Lock()
return self.locks[key]
async def read(self, key: str) -> Optional[VersionedValue]:
return self.data.get(key)
async def write(self, key: str, value: Any, agent_id: str, expected_version: int) -> bool:
"""Write with optimistic concurrency. Returns True if successful."""
lock = self._get_lock(key)
async with lock:
current = self.data.get(key)
if current and current.version != expected_version:
return False # Version conflict
self.data[key] = VersionedValue(
value=value,
version=(current.version + 1) if current else 1,
last_modified=datetime.utcnow(),
modified_by=agent_id
)
return True
async def compare_and_swap(
self,
key: str,
expected: Any,
new_value: Any,
agent_id: str
) -> bool:
"""Atomic CAS operation"""
lock = self._get_lock(key)
async with lock:
current = self.data.get(key)
if current is None or current.value != expected:
return False
self.data[key] = VersionedValue(
value=new_value,
version=current.version + 1,
last_modified=datetime.utcnow(),
modified_by=agent_id
)
return True
class SagaState:
"""Manages saga coordination state"""
def __init__(self, store: OptimisticStore):
self.store = store
self.prefix = "saga:"
async def create_saga(self, saga_id: str, initial_state: dict) -> None:
key = f"{self.prefix}{saga_id}"
await self.store.write(key, {
"state": "pending",
"steps": initial_state,
"completed_steps": [],
"compensated_steps": []
}, "saga_manager", 0)
async def update_step(
self,
saga_id: str,
step_name: str,
result: str,
compensation_action: dict
) -> bool:
key = f"{self.prefix}{saga_id}"
current = await self.store.read(key)
if not current:
return False
saga_state = current.value.copy()
if result == "success":
saga_state["completed_steps"].append({
"name": step_name,
"timestamp": datetime.utcnow().isoformat()
})
else:
# Compensate completed steps
for completed in reversed(saga_state["completed_steps"]):
comp_key = f"{self.prefix}{saga_id}:comp:{completed['name']}"
await self.store.write(
comp_key,
compensation_action,
"saga_manager",
0
)
saga_state["compensated_steps"].append(completed["name"])
saga_state["state"] = "compensated"
return await self.store.write(
key,
saga_state,
"saga_manager",
current.version
)
# Usage example
store = OptimisticStore()
saga = SagaState(store)
async def process_order(order_id: str):
saga_id = f"order_{order_id}"
await saga.create_saga(saga_id, [
{"name": "reserve_inventory", "compensation": "release_inventory"},
{"name": "charge_payment", "compensation": "refund_payment"},
{"name": "schedule_shipping", "compensation": "cancel_shipping"}
])
# Execute steps with compensation on failure
steps = [
("reserve_inventory", {"item_id": order_id}),
("charge_payment", {"order_id": order_id}),
("schedule_shipping", {"order_id": order_id})
]
for step_name, params in steps:
success = await execute_step(step_name, params)
if not await saga.update_step(saga_id, step_name,
"success" if success else "failed",
{"action": "compensate", "step": step_name}):
raise Exception(f"Saga update failed for step {step_name}")
if not success:
return False
return True
```