RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Multi-Agent Systems
  6. /Ch. 7
Multi-Agent Systems

07. Supervisor Agents

Chapter 7 of 24 · 20 min
KEY INSIGHT

Supervisor hierarchies provide control and observability, but deep hierarchies create latency and fragility; keep supervisor trees shallow and worker pools large. Supervisor agents sit atop agent hierarchies, coordinating subordinate workers. The supervisor receives tasks, decomposes them, delegates to workers, monitors progress, handles failures, and assembles results. This pattern mirrors organizational structures—managers oversee specialists who perform execution work. Supervisors excel when task decomposition is complex or variable. A customer support supervisor might receive a complaint and decide whether to route to billing, technical support, or retention specialists. That routing logic lives in the supervisor rather than being pre-configured. Supervisors can implement priority schemes. High-priority tasks interrupt lower-priority work. The supervisor tracks running tasks, preempts if necessary, and reschedules interrupted work. This pattern suits systems with variable urgency. Monitoring and observability concentrate in supervisor logic. The supervisor sees all worker activity, enabling centralized logging, metrics, and alerting. Debugging benefits from one place tracking all state transitions. The failure model matters for supervisor design. What happens when a worker fails? Options include: retry on same worker, fail over to backup worker, escalate to human, or fail the parent task. The supervisor encodes these policies. Deep supervisor hierarchies introduce latency—each level adds processing time. A task passing through five supervisor levels before reaching a worker consumes time at each hop. Flat hierarchies with large worker pools reduce depth but concentrate load. Supervisor availability affects system reliability. If the supervisor crashes, its entire subordinate tree becomes unavailable. Redundant supervisors with leader election prevent single points of failure. ```python import asyncio from dataclasses import dataclass, field from typing import Optional, Callable, Any from enum import Enum from datetime import datetime, timedelta class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" RETRYING = "retrying" @dataclass class Task: id: str priority: int payload: Any deadline: Optional[datetime] = None max_retries: int = 3 current_retries: int = 0 status: TaskStatus = TaskStatus.PENDING assigned_to: Optional[str] = None result: Optional[Any] = None error: Optional[str] = None class Supervisor: def __init__( self, workers: list[Callable], max_parallel: int = 10, retry_backoff_base: int = 2 ): self.workers = workers self.max_parallel = max_parallel self.backoff_base = retry_backoff_base self.pending_queue: asyncio.PriorityQueue = asyncio.PriorityQueue() self.running_tasks: dict[str, Task] = {} self.completed_tasks: dict[str, Task] = {} async def submit(self, task: Task) -> None: """Submit a task for execution""" await self.pending_queue.put((task.priority, task.id, task)) async def run(self) -> None: """Main supervisor loop""" while True: # Check for available capacity while len(self.running_tasks) < self.max_parallel: try: _, task_id, task = await asyncio.wait_for( self.pending_queue.get(), timeout=1.0 ) # Assign to worker worker = self._select_worker() if not worker: # Re-queue with priority adjustment task.priority += 1 await self.pending_queue.put( (task.priority, task_id, task) ) await asyncio.sleep(0.1) continue task.status = TaskStatus.RUNNING task.assigned_to = worker.name self.running_tasks[task.id] = task asyncio.create_task(self._execute_with_monitoring(task, worker)) except asyncio.TimeoutError: break def _select_worker(self) -> Optional[Any]: """Select available worker with fewest active tasks""" available = [w for w in self.workers if w.available] if not available: return None return min(available, key=lambda w: w.active_count) async def _execute_with_monitoring(self, task: Task, worker: Any) -> None: try: result = await worker.execute(task.payload) task.result = result task.status = TaskStatus.SUCCESS except Exception as e: await self._handle_failure(task, worker, e) finally: del self.running_tasks[task.id] self.completed_tasks[task.id] = task async def _handle_failure( self, task: Task, worker: Any, error: Exception ) -> None: task.error = str(error) task.current_retries += 1 if task.current_retries < task.max_retries: task.status = TaskStatus.RETRYING # Exponential backoff delay = self.backoff_base ** task.current_retries await asyncio.sleep(delay) # Re-queue with same priority await self.pending_queue.put( (task.priority, task.id, task) ) else: task.status = TaskStatus.FAILED # Could escalate to human review here await self._escalate(task) async def _escalate(self, task: Task) -> None: """Escalate failed task to human review""" # Implementation would integrate with alert systems pass async def get_task_status(self, task_id: str) -> Optional[Task]: if task_id in self.running_tasks: return self.running_tasks[task_id] return self.completed_tasks.get(task_id) # Worker implementation class Worker: def __init__(self, name: str, processor: Callable): self.name = name self.processor = processor self.active_count = 0 self.available = True async def execute(self, payload: Any) -> Any: self.active_count += 1 self.available = False try: return await self.processor(payload) finally: self.active_count -= 1 self.available = True ```

EXERCISE

Design a supervisor hierarchy for an e-commerce checkout system. The top-level supervisor handles the checkout orchestration. Beneath it, supervisors manage cart validation, payment processing, inventory reservation, and order confirmation. Specify task decomposition, failure handling policies, and how the system handles partial failures (e.g., payment succeeded but inventory reservation failed). (12 minutes)

← Chapter 6
Shared State
Chapter 8 →
Specialized Agent Roles