15. Agent Discovery

Chapter 15 of 24 · 15 min

Static agent configurations break in dynamic environments. When agents scale, restart, or migrate, hardcoded references become liabilities.

Service Registry Pattern

A service registry tracks live agents and their capabilities:

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import asyncio

@dataclass
class AgentRegistration:
    agent_id: str
    capabilities: list[str]
    endpoint: str
    registered_at: datetime = field(default_factory=datetime.now)
    last_heartbeat: datetime = field(default_factory=datetime.now)
    ttl_seconds: int = 60

class AgentRegistry:
    def __init__(self, ttl_seconds: int = 60):
        self.agents: dict[str, AgentRegistration] = {}
        self.ttl_seconds = ttl_seconds
        self._cleanup_task: Optional[asyncio.Task] = None
    
    async def register(self, agent_id: str, capabilities: list[str], endpoint: str):
        self.agents[agent_id] = AgentRegistration(
            agent_id=agent_id,
            capabilities=capabilities,
            endpoint=endpoint
        )
    
    async def unregister(self, agent_id: str):
        self.agents.pop(agent_id, None)
    
    async def heartbeat(self, agent_id: str):
        if agent_id in self.agents:
            self.agents[agent_id].last_heartbeat = datetime.now()
    
    def find_by_capability(self, capability: str) -> list[AgentRegistration]:
        """Find all agents with a specific capability."""
        return [
            agent for agent in self.agents.values()
            if capability in agent.capabilities and self._is_alive(agent)
        ]
    
    def _is_alive(self, agent: AgentRegistration) -> bool:
        elapsed = (datetime.now() - agent.last_heartbeat).total_seconds()
        return elapsed < self.ttl_seconds
    
    async def start_cleanup(self):
        """Periodically remove stale registrations."""
        async def cleanup_loop():
            while True:
                await asyncio.sleep(self.ttl_seconds // 2)
                stale = [
                    agent_id for agent_id, agent in self.agents.items()
                    if not self._is_alive(agent)
                ]
                for agent_id in stale:
                    await self.unregister(agent_id)
        
        self._cleanup_task = asyncio.create_task(cleanup_loop())

Failure Mode: Split Brain

If you run multiple registry instances without coordination, agents might see different views of the system. For most use cases, a single registry with a hot standby is sufficient. Distributed registries require consensus protocols and add significant complexity.

Discovery Protocol

Agents should discover each other through the registry, not hardcoded addresses:

async def find_agent_for_task(registry: AgentRegistry, required_capability: str) -> Optional[str]:
    candidates = registry.find_by_capability(required_capability)
    if not candidates:
        return None
    # Simple load balancing: pick the least recently registered
    return min(candidates, key=lambda a: a.registered_at).endpoint
EXERCISE

Implement a capability negotiation protocol where an agent can ask the registry to find agents matching multiple capabilities and return the best fit based on availability.