15. Agent Discovery
Static agent configurations break in dynamic environments. When agents scale, restart, or migrate, hardcoded references become liabilities.
Service Registry Pattern
A service registry tracks live agents and their capabilities:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import asyncio
@dataclass
class AgentRegistration:
agent_id: str
capabilities: list[str]
endpoint: str
registered_at: datetime = field(default_factory=datetime.now)
last_heartbeat: datetime = field(default_factory=datetime.now)
ttl_seconds: int = 60
class AgentRegistry:
def __init__(self, ttl_seconds: int = 60):
self.agents: dict[str, AgentRegistration] = {}
self.ttl_seconds = ttl_seconds
self._cleanup_task: Optional[asyncio.Task] = None
async def register(self, agent_id: str, capabilities: list[str], endpoint: str):
self.agents[agent_id] = AgentRegistration(
agent_id=agent_id,
capabilities=capabilities,
endpoint=endpoint
)
async def unregister(self, agent_id: str):
self.agents.pop(agent_id, None)
async def heartbeat(self, agent_id: str):
if agent_id in self.agents:
self.agents[agent_id].last_heartbeat = datetime.now()
def find_by_capability(self, capability: str) -> list[AgentRegistration]:
"""Find all agents with a specific capability."""
return [
agent for agent in self.agents.values()
if capability in agent.capabilities and self._is_alive(agent)
]
def _is_alive(self, agent: AgentRegistration) -> bool:
elapsed = (datetime.now() - agent.last_heartbeat).total_seconds()
return elapsed < self.ttl_seconds
async def start_cleanup(self):
"""Periodically remove stale registrations."""
async def cleanup_loop():
while True:
await asyncio.sleep(self.ttl_seconds // 2)
stale = [
agent_id for agent_id, agent in self.agents.items()
if not self._is_alive(agent)
]
for agent_id in stale:
await self.unregister(agent_id)
self._cleanup_task = asyncio.create_task(cleanup_loop())
Failure Mode: Split Brain
If you run multiple registry instances without coordination, agents might see different views of the system. For most use cases, a single registry with a hot standby is sufficient. Distributed registries require consensus protocols and add significant complexity.
Discovery Protocol
Agents should discover each other through the registry, not hardcoded addresses:
async def find_agent_for_task(registry: AgentRegistry, required_capability: str) -> Optional[str]:
candidates = registry.find_by_capability(required_capability)
if not candidates:
return None
# Simple load balancing: pick the least recently registered
return min(candidates, key=lambda a: a.registered_at).endpoint
Implement a capability negotiation protocol where an agent can ask the registry to find agents matching multiple capabilities and return the best fit based on availability.