18. Testing Framework
Chapter 18 of 24 · 20 min
Testing agent systems requires different strategies than testing single components. The complexity comes from state, timing, and emergent behavior across multiple agents.
Unit Testing Individual Agents
Isolate agents from the message bus for unit tests:
import pytest
class MockMessageBus:
def __init__(self):
self.messages_sent: list[dict] = []
self.subscriptions: dict[str, asyncio.Queue] = {}
async def send(self, recipient: str, message: dict):
self.messages_sent.append({"recipient": recipient, **message})
async def subscribe(self, agent_id: str) -> asyncio.Queue:
queue = asyncio.Queue()
self.subscriptions[agent_id] = queue
return queue
def get_messages_to(self, recipient: str) -> list[dict]:
return [m for m in self.messages_sent if m.get('recipient') == recipient]
@pytest.fixture
def message_bus():
return MockMessageBus()
@pytest.fixture
def test_agent(message_bus):
agent = ProcessingAgent(message_bus=message_bus)
agent.start()
yield agent
agent.stop()
def test_agent_processes_valid_input(test_agent, message_bus):
test_agent.receive({"type": "process", "data": "valid_input"})
messages = message_bus.get_messages_to("output")
assert len(messages) == 1
assert messages[0]["status"] == "success"
Integration Testing with Test Harnesses
For integration tests, create deterministic test environments:
class TestHarness:
def __init__(self):
self.message_bus = TestMessageBus()
self.agents: dict[str, Agent] = {}
self.time: float = 0.0
def register_agent(self, agent_id: str, agent: Agent):
self.agents[agent_id] = agent
agent.attach_bus(self.message_bus)
async def run_until_complete(self, timeout: float = 5.0):
"""Run until all queues are empty or timeout."""
start = time.time()
while time.time() - start < timeout:
await asyncio.sleep(0.01)
all_empty = all(
q.empty() for q in self.message_bus.subscriptions.values()
if isinstance(q, asyncio.Queue)
)
if all_empty:
return True
return False
def get_all_messages(self) -> list[dict]:
return self.message_bus.messages_sent.copy()
Testing Race Conditions
Race conditions are hard to reproduce. Force deterministic scheduling:
async def test_concurrent_state_updates():
"""Test that concurrent updates don't lose data."""
state = SharedState()
num_concurrent_updates = 100
async def increment():
await state.update(lambda x: x + 1)
# Run concurrently
await asyncio.gather(*[increment() for _ in range(num_concurrent_updates)])
# With proper locking, final value should be exact
assert state.value == num_concurrent_updates
Chaos Testing
Introduce failures deliberately:
class ChaosInjector:
def __init__(self, failure_rate: float = 0.1):
self.failure_rate = failure_rate
self.failures_injected = 0
def should_fail(self) -> bool:
if random.random() < self.failure_rate:
self.failures_injected += 1
return True
return False
def inject_network_error(self, operation):
if self.should_fail():
raise ConnectionError("Chaos: injected network error")
return operation
EXERCISE
Build a test harness that can replay production message sequences. Use it to reproduce and verify fixes for past production bugs.