18. Testing Framework

Chapter 18 of 24 · 20 min

Testing agent systems requires different strategies than testing single components. The complexity comes from state, timing, and emergent behavior across multiple agents.

Unit Testing Individual Agents

Isolate agents from the message bus for unit tests:

import pytest

class MockMessageBus:
    def __init__(self):
        self.messages_sent: list[dict] = []
        self.subscriptions: dict[str, asyncio.Queue] = {}
    
    async def send(self, recipient: str, message: dict):
        self.messages_sent.append({"recipient": recipient, **message})
    
    async def subscribe(self, agent_id: str) -> asyncio.Queue:
        queue = asyncio.Queue()
        self.subscriptions[agent_id] = queue
        return queue
    
    def get_messages_to(self, recipient: str) -> list[dict]:
        return [m for m in self.messages_sent if m.get('recipient') == recipient]

@pytest.fixture
def message_bus():
    return MockMessageBus()

@pytest.fixture
def test_agent(message_bus):
    agent = ProcessingAgent(message_bus=message_bus)
    agent.start()
    yield agent
    agent.stop()

def test_agent_processes_valid_input(test_agent, message_bus):
    test_agent.receive({"type": "process", "data": "valid_input"})
    
    messages = message_bus.get_messages_to("output")
    assert len(messages) == 1
    assert messages[0]["status"] == "success"

Integration Testing with Test Harnesses

For integration tests, create deterministic test environments:

class TestHarness:
    def __init__(self):
        self.message_bus = TestMessageBus()
        self.agents: dict[str, Agent] = {}
        self.time: float = 0.0
    
    def register_agent(self, agent_id: str, agent: Agent):
        self.agents[agent_id] = agent
        agent.attach_bus(self.message_bus)
    
    async def run_until_complete(self, timeout: float = 5.0):
        """Run until all queues are empty or timeout."""
        start = time.time()
        while time.time() - start < timeout:
            await asyncio.sleep(0.01)
            all_empty = all(
                q.empty() for q in self.message_bus.subscriptions.values()
                if isinstance(q, asyncio.Queue)
            )
            if all_empty:
                return True
        return False
    
    def get_all_messages(self) -> list[dict]:
        return self.message_bus.messages_sent.copy()

Testing Race Conditions

Race conditions are hard to reproduce. Force deterministic scheduling:

async def test_concurrent_state_updates():
    """Test that concurrent updates don't lose data."""
    state = SharedState()
    num_concurrent_updates = 100
    
    async def increment():
        await state.update(lambda x: x + 1)
    
    # Run concurrently
    await asyncio.gather(*[increment() for _ in range(num_concurrent_updates)])
    
    # With proper locking, final value should be exact
    assert state.value == num_concurrent_updates

Chaos Testing

Introduce failures deliberately:

class ChaosInjector:
    def __init__(self, failure_rate: float = 0.1):
        self.failure_rate = failure_rate
        self.failures_injected = 0
    
    def should_fail(self) -> bool:
        if random.random() < self.failure_rate:
            self.failures_injected += 1
            return True
        return False
    
    def inject_network_error(self, operation):
        if self.should_fail():
            raise ConnectionError("Chaos: injected network error")
        return operation
EXERCISE

Build a test harness that can replay production message sequences. Use it to reproduce and verify fixes for past production bugs.