KEY INSIGHT
Supervisor hierarchies provide control and observability, but deep hierarchies create latency and fragility; keep supervisor trees shallow and worker pools large.
Supervisor agents sit atop agent hierarchies, coordinating subordinate workers. The supervisor receives tasks, decomposes them, delegates to workers, monitors progress, handles failures, and assembles results. This pattern mirrors organizational structures—managers oversee specialists who perform execution work.
Supervisors excel when task decomposition is complex or variable. A customer support supervisor might receive a complaint and decide whether to route to billing, technical support, or retention specialists. That routing logic lives in the supervisor rather than being pre-configured.
Supervisors can implement priority schemes. High-priority tasks interrupt lower-priority work. The supervisor tracks running tasks, preempts if necessary, and reschedules interrupted work. This pattern suits systems with variable urgency.
Monitoring and observability concentrate in supervisor logic. The supervisor sees all worker activity, enabling centralized logging, metrics, and alerting. Debugging benefits from one place tracking all state transitions.
The failure model matters for supervisor design. What happens when a worker fails? Options include: retry on same worker, fail over to backup worker, escalate to human, or fail the parent task. The supervisor encodes these policies.
Deep supervisor hierarchies introduce latency—each level adds processing time. A task passing through five supervisor levels before reaching a worker consumes time at each hop. Flat hierarchies with large worker pools reduce depth but concentrate load.
Supervisor availability affects system reliability. If the supervisor crashes, its entire subordinate tree becomes unavailable. Redundant supervisors with leader election prevent single points of failure.
```python
import asyncio
from dataclasses import dataclass, field
from typing import Optional, Callable, Any
from enum import Enum
from datetime import datetime, timedelta
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class Task:
id: str
priority: int
payload: Any
deadline: Optional[datetime] = None
max_retries: int = 3
current_retries: int = 0
status: TaskStatus = TaskStatus.PENDING
assigned_to: Optional[str] = None
result: Optional[Any] = None
error: Optional[str] = None
class Supervisor:
def __init__(
self,
workers: list[Callable],
max_parallel: int = 10,
retry_backoff_base: int = 2
):
self.workers = workers
self.max_parallel = max_parallel
self.backoff_base = retry_backoff_base
self.pending_queue: asyncio.PriorityQueue = asyncio.PriorityQueue()
self.running_tasks: dict[str, Task] = {}
self.completed_tasks: dict[str, Task] = {}
async def submit(self, task: Task) -> None:
"""Submit a task for execution"""
await self.pending_queue.put((task.priority, task.id, task))
async def run(self) -> None:
"""Main supervisor loop"""
while True:
# Check for available capacity
while len(self.running_tasks) < self.max_parallel:
try:
_, task_id, task = await asyncio.wait_for(
self.pending_queue.get(),
timeout=1.0
)
# Assign to worker
worker = self._select_worker()
if not worker:
# Re-queue with priority adjustment
task.priority += 1
await self.pending_queue.put(
(task.priority, task_id, task)
)
await asyncio.sleep(0.1)
continue
task.status = TaskStatus.RUNNING
task.assigned_to = worker.name
self.running_tasks[task.id] = task
asyncio.create_task(self._execute_with_monitoring(task, worker))
except asyncio.TimeoutError:
break
def _select_worker(self) -> Optional[Any]:
"""Select available worker with fewest active tasks"""
available = [w for w in self.workers if w.available]
if not available:
return None
return min(available, key=lambda w: w.active_count)
async def _execute_with_monitoring(self, task: Task, worker: Any) -> None:
try:
result = await worker.execute(task.payload)
task.result = result
task.status = TaskStatus.SUCCESS
except Exception as e:
await self._handle_failure(task, worker, e)
finally:
del self.running_tasks[task.id]
self.completed_tasks[task.id] = task
async def _handle_failure(
self,
task: Task,
worker: Any,
error: Exception
) -> None:
task.error = str(error)
task.current_retries += 1
if task.current_retries < task.max_retries:
task.status = TaskStatus.RETRYING
# Exponential backoff
delay = self.backoff_base ** task.current_retries
await asyncio.sleep(delay)
# Re-queue with same priority
await self.pending_queue.put(
(task.priority, task.id, task)
)
else:
task.status = TaskStatus.FAILED
# Could escalate to human review here
await self._escalate(task)
async def _escalate(self, task: Task) -> None:
"""Escalate failed task to human review"""
# Implementation would integrate with alert systems
pass
async def get_task_status(self, task_id: str) -> Optional[Task]:
if task_id in self.running_tasks:
return self.running_tasks[task_id]
return self.completed_tasks.get(task_id)
# Worker implementation
class Worker:
def __init__(self, name: str, processor: Callable):
self.name = name
self.processor = processor
self.active_count = 0
self.available = True
async def execute(self, payload: Any) -> Any:
self.active_count += 1
self.available = False
try:
return await self.processor(payload)
finally:
self.active_count -= 1
self.available = True
```