Multi-Agent Systems
Learn multi-agent systems through RunLocalAI's practical lens: agents, multi agent, orchestration and supervisor, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.
- B016
Why this course matters
Multi-Agent Systems is for builders turning local models into working tools, agents and retrieval systems. It connects agents, multi agent, orchestration and supervisor to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?
What you will be able to do
By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.
How to use this course
Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as Why Multi-Agent?, Orchestrator-Worker Pattern, Debate Pattern and Agent Communication Protocols and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.
- 01Why Multi-Agent?Single-agent systems hit ceilings; multi-agent architectures unlock parallelism, specialization, and fault tolerance that no monolithic design can achieve. Modern AI applications increasingly demand capabilities beyond what any single agent delivers. A customer service system might need simultaneous product expertise, order tracking, and sentiment analysis. A code generation pipeline benefits from separate planning, implementation, and verification stages. These requirements drive adoption of multi-agent architectures. Multi-agent systems distribute cognitive load across specialized components. Rather than one model handling every sub-task, each agent focuses on its domain—translating natural language to structured queries, executing database operations, formatting responses, or monitoring quality gates. This separation enables parallel execution: while one agent retrieves data, another analyzes yesterday's trends, and a third prepares presentation templates. Specialization creates efficiency gains. Smaller models fine-tuned for narrow tasks often outperform general-purpose models on those specific tasks—and at lower cost. A routing agent need not generate creative prose; a code-writer need not maintain conversational personality. Each agent carries only the capabilities its role demands. Fault tolerance emerges naturally from distribution. If one agent fails or produces poor output, the system continues operating. Redundant verification agents can catch errors before results reach users. Rollback mechanisms replay failed subtasks without restarting entire pipelines. The design space spans simple hierarchies (supervisor delegates to workers) through complex peer networks (agents negotiate, debate, vote). Choice depends on task structure, latency requirements, and consistency demands. Early architectures favored central control; modern approaches favor adaptive meshes where agents self-coordinate. The orchestration pattern chosen shapes performance, debuggability, and extensibility. Poor pattern selection creates bottlenecks. Understanding these tradeoffs requires examining specific architectures and their appropriate use cases. ```python # Simple multi-agent dispatch example class Agent: def __init__(self, name, model): self.name = name self.model = model def invoke(self, prompt): return self.model.generate(prompt) # Supervisor delegates to specialized agents supervisor = Agent("supervisor", supervisor_model) router_agent = Agent("router", router_model) coder_agent = Agent("coder", coder_model) task = "Fix the authentication bug in the login flow" context = supervisor.invoke(f"Decompose: {task}") # Routes to appropriate specialist based on task type ```15 min
- 02Orchestrator-Worker PatternCentral orchestrators provide simplicity and controllability at the cost of scalability; use them when task graphs are known upfront and debuggability matters. The orchestrator-worker pattern places a central coordinator in control. The orchestrator receives incoming requests, decomposes them into subtasks, assigns each to appropriate workers, collects results, and assembles final responses. Workers remain stateless—they execute assigned tasks and return outputs without awareness of the broader pipeline. This pattern excels for well-structured workflows with known dependency graphs. Request processing pipelines, document analysis systems, and multi-step form handling map cleanly to orchestrated topologies. The central coordinator maintains visibility into pipeline state, enabling logging, error recovery, and retry logic in one place. Debugging simplifies when a single component drives execution. Tracing a request means following the orchestrator's logic, not reconstructing distributed causality. New developers understand system behavior by examining one class rather than mapping agent interactions. Failure handling proves straightforward. When a worker fails, the orchestrator detects timeout or error responses and can retry, route to a backup worker, or escalate to human review. The orchestrator owns retry budgets, backoff strategies, and circuit-breaker state. The pattern's weakness lies in centralization. The orchestrator becomes a bottleneck—every task passes through it, creating latency and limiting throughput. If the orchestrator crashes, the entire system halts. Scaling requires scaling one component that may not parallelize well. Task decomposition requires the orchestrator to understand enough about each subtask to assign correctly. In complex domains, building this knowledge proves difficult. The orchestrator may become a large, unwieldy prompt or a maze of conditional logic. ```python import asyncio from dataclasses import dataclass from typing import Any @dataclass class Task: id: str type: str payload: Any dependencies: list[str] class Orchestrator: def __init__(self, workers: dict[str, Any]): self.workers = workers self.results = {} async def execute(self, task: Task) -> Any: # Wait for dependencies for dep_id in task.dependencies: if dep_id not in self.results: raise ValueError(f"Missing dependency: {dep_id}") # Route to appropriate worker worker = self.workers.get(task.type) if not worker: raise ValueError(f"No worker for task type: {task.type}") result = await worker.process(task.payload) self.results[task.id] = result return result # Usage orchestrator = Orchestrator({ "analysis": AnalysisWorker(), "transformation": TransformWorker(), "formatting": FormatWorker() }) result = await orchestrator.execute( Task("t1", "analysis", {"text": raw_data}, []) ) ```15 min
- 03Debate PatternAdversarial evaluation beats unilateral confidence—structured debate catches errors that solo reasoning misses by exposing hidden assumptions and contradictory conclusions. The debate pattern pits agents against each other, forcing explicit argumentation for and against positions. Rather than one agent producing a answer, opposing agents argue from different perspectives, expose weaknesses, and highlight contradictions. A judge or aggregator evaluates the debate and selects the strongest position. This pattern proves effective for high-stakes decisions where errors carry significant cost. Legal reasoning, strategic planning, and safety evaluation benefit from multiple viewpoints challenging assumptions. The adversarial structure prevents premature consensus and surfaces edge cases. Agents in debate mode need distinct roles: one argues the proposition, another argues negation, a third synthesizes or judges. These roles require different instruction sets—arguers maximize their position's strength, judges evaluate evidence quality and argument validity. Debate introduces latency—multiple agents must complete reasoning before resolution. Round-based debates multiply this cost. Design decisions about round count, whether arguments are simultaneous or sequential, and how judges weight evidence shape system behavior. Hidden assumptions surface through opposition. An agent defending a position encounters questions its creator never considered. The defending agent either constructs valid responses (exposing its reasoning) or fails (exposing a flaw). Either outcome improves output quality. Implementation requires balancing thoroughness against response time. Some systems use preliminary confidence scores to skip debate for obvious cases—only low-confidence answers undergo full adversarial evaluation. ```python from enum import Enum from dataclasses import dataclass from typing import Optional class Verdict(Enum): AFFIRMATIVE = "affirmative" NEGATIVE = "negative" INCONCLUSIVE = "inconclusive" @dataclass class DebateResult: verdict: Verdict affirmative_arguments: list[str] negative_arguments: list[str] confidence: float class Debater: """Base class for debate participants""" def __init__(self, name: str, stance: str, model): self.name = name self.stance = stance self.model = model async def argue(self, topic: str, opposing_arguments: list[str]) -> str: prompt = f"""Topic: {topic} Your stance: {self.stance} Opposing arguments to rebut: {opposing_arguments} Present your strongest arguments.""" return self.model.generate(prompt) class Judge: def __init__(self, model): self.model = model async def evaluate(self, topic: str, affirmative: str, negative: str) -> DebateResult: prompt = f"""Debate Topic: {topic} Affirmative position: {affirmative} Negative position: {negative} Evaluate both positions. Consider: - Logical consistency - Evidence quality - Completeness of reasoning Output a structured verdict with confidence score (0-1).""" response = self.model.generate(prompt) # Parse structured response return self._parse_verdict(response) async def run_debate(topic: str, model) -> DebateResult: pro = Debater("Advocate", "support", model) con = Debater("Critic", "oppose", model) judge = Judge(model) # Round 1: Initial arguments pro_arg = await pro.argue(topic, []) con_arg = await con.argue(topic, []) # Round 2: Rebuttals pro_rebuttal = await pro.argue(topic, [con_arg]) con_rebuttal = await con.argue(topic, [pro_arg]) # Final judgment return await judge.evaluate(topic, pro_rebuttal, con_rebuttal) ```15 min
- 04Agent Communication ProtocolsProtocol design determines whether agents interoperate or become isolated silos; clean abstractions survive model changes while brittle interfaces crumble. Agent communication requires explicit protocols governing message formats, turn-taking, error handling, and state management. Without standardized protocols, each agent pair requires custom integration code. Adding a new agent means updating every existing peer. Protocols prevent this N×M integration problem. Communication protocols define three layers: syntactic (message structure and serialization), semantic (meaning of message contents and valid transitions), and pragmatic (conversational patterns and context management). Syntactic choices (JSON vs protobuf vs custom formats) matter less than semantic clarity and pragmatic reliability. Messages carry intents: requests, responses, notifications, errors. Each intent type carries different expectations about acknowledgment, retry behavior, and timeout handling. A request expects a response; a notification does not. Protocols must distinguish these patterns to enable appropriate handling. State management during conversations requires careful design. Does each agent maintain conversation state? Is there a shared conversation tracker? Does each message carry full context? Stateful protocols support richer interactions but complicate implementation. Stateless protocols scale better but require clients to track context. Error handling deserves explicit protocol attention. Network failures, agent crashes, malformed messages, and timeout scenarios must all produce deterministic behavior. Protocols should specify retry budgets, backoff strategies, and dead-letter handling. Protocol versioning enables evolution. Agents supporting multiple protocol versions interoperate with both old and new peers. The protocol should define negotiation mechanisms so agents discover mutual capabilities. ```python from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Any, Optional from enum import Enum import json class MessageType(Enum): REQUEST = "request" RESPONSE = "response" NOTIFICATION = "notification" ERROR = "error" @dataclass class Message: id: str type: MessageType sender: str receiver: Optional[str] # None for broadcast action: str payload: dict[str, Any] correlation_id: Optional[str] = None # Links request/response ttl_seconds: int = 30 metadata: dict[str, Any] = field(default_factory=dict) class AgentProtocol(ABC): @abstractmethod async def send(self, message: Message) -> None: pass @abstractmethod async def receive(self) -> Optional[Message]: pass @abstractmethod async def acknowledge(self, message_id: str) -> None: pass class JSONAgentProtocol(AgentProtocol): def __init__(self, transport): self.transport = transport self.pending = {} async def send(self, message: Message) -> None: serialized = json.dumps({ "id": message.id, "type": message.type.value, "sender": message.sender, "receiver": message.receiver, "action": message.action, "payload": message.payload, "correlation_id": message.correlation_id, "ttl": message.ttl_seconds, "metadata": message.metadata }) if message.type == MessageType.REQUEST: self.pending[message.id] = message await self.transport.send(serialized) async def receive(self) -> Optional[Message]: raw = await self.transport.receive() if not raw: return None data = json.loads(raw) return Message( id=data["id"], type=MessageType(data["type"]), sender=data["sender"], receiver=data["receiver"], action=data["action"], payload=data["payload"], correlation_id=data.get("correlation_id"), ttl_seconds=data.get("ttl", 30), metadata=data.get("metadata", {}) ) async def acknowledge(self, message_id: str) -> None: ack = Message( id=f"ack_{message_id}", type=MessageType.NOTIFICATION, sender="local", receiver=None, action="ack", payload={"original_id": message_id} ) await self.send(ack) ```15 min
- 05Message PassingDirect point-to-point messaging creates tight coupling; intermediary-based passing enables routing, filtering, and temporal decoupling that distributed systems require. Message passing patterns determine how agents exchange information. Direct point-to-point communication offers simplicity—messages go from sender to receiver with minimal overhead. However, this pattern creates tight coupling: senders must know recipient identities, recipients must handle all incoming messages, and adding new agents requires updating all existing participants. Message brokers or queues introduce intermediaries. Senders publish messages to topics or queues without knowing recipients. Receivers subscribe to relevant channels. This decoupling enables dynamic participation—agents can join and leave without disrupting the system. Temporal decoupling handles cases where agents operate at different rates; queues buffer messages between fast producers and slow consumers. Publish-subscribe patterns excel for notifications and state changes. When an agent completes a task or updates shared state, it publishes an event. Interested agents subscribe and react accordingly. This pattern supports event-driven architectures where system behavior emerges from reactions to events rather than explicit orchestration. Request-reply patterns remain essential for queries requiring responses. A query agent sends a request to a knowledge base agent and expects results. The request-reply pattern requires correlation—matching responses to their originating requests. Message headers typically carry correlation identifiers. Queue-based systems introduce new failure modes: message loss, duplication, and reordering. Dead-letter queues capture messages that fail processing after multiple retries. Idempotency keys enable safe message deduplication. Sequence numbers or timestamps enable ordering guarantees where required. Push vs pull patterns affect resource utilization. Push systems notify consumers immediately (efficient when consumers can handle load); pull systems poll for new work (efficient when load varies or consumers need backpressure control). ```python import asyncio from collections import defaultdict from dataclasses import dataclass, field from typing import Callable, Optional import uuid from enum import Enum class DeliveryMode(Enum): AT_MOST_ONCE = "at_most_once" # Fire and forget AT_LEAST_ONCE = "at_least_once" # Ack-based retry EXACTLY_ONCE = "exactly_once" # Idempotent operations @dataclass class Envelope: id: str = field(default_factory=lambda: str(uuid.uuid4())) sender: str = "" topic: str = "" action: str = "" payload: dict = field(default_factory=dict) correlation_id: Optional[str] = None reply_to: Optional[str] = None delivery_mode: DeliveryMode = DeliveryMode.AT_LEAST_ONCE class MessageBus: def __init__(self): self.subscribers: dict[str, list[Callable]] = defaultdict(list) self.pending: dict[str, Envelope] = {} self.dead_letter_queue: list[Envelope] = [] self.max_retries = 3 def subscribe(self, topic: str, handler: Callable) -> None: self.subscribers[topic].append(handler) async def publish(self, envelope: Envelope) -> None: self.pending[envelope.id] = envelope if envelope.reply_to: # Request-reply pattern return await self._request_reply(envelope) return await self._broadcast(envelope) async def _broadcast(self, envelope: Envelope) -> None: handlers = self.subscribers.get(envelope.topic, []) if not handlers: self.dead_letter_queue.append(envelope) return for handler in handlers: try: await handler(envelope) # Remove from pending only on success if envelope.id in self.pending: del self.pending[envelope.id] except Exception as e: await self._handle_failure(envelope, e) async def _request_reply(self, envelope: Envelope) -> Optional[Envelope]: # Create reply channel reply_topic = f"reply_{envelope.id}" reply_received = asyncio.Event() reply_result = None async def reply_handler(reply: Envelope): nonlocal reply_result reply_result = reply reply_received.set() self.subscribe(reply_topic, reply_handler) # Send original message handlers = self.subscribers.get(envelope.topic, []) if handlers: await handlers[0](envelope) # Wait for reply with timeout try: await asyncio.wait_for(reply_received.wait(), timeout=30) return reply_result except asyncio.TimeoutError: return None finally: self.subscribers.pop(reply_topic, None) async def _handle_failure(self, envelope: Envelope, error: Exception) -> None: retry_count = envelope.payload.get("_retry_count", 0) if retry_count < self.max_retries: envelope.payload["_retry_count"] = retry_count + 1 # Exponential backoff await asyncio.sleep(2 ** retry_count) await self.publish(envelope) else: self.dead_letter_queue.append(envelope) # Usage example bus = MessageBus() async def handle_code_review(envelope: Envelope): # Process code review request result = {"approved": True, "issues": []} reply = Envelope( sender="reviewer", topic=envelope.reply_to, action="review_complete", payload=result, correlation_id=envelope.id ) await bus.publish(reply) bus.subscribe("code_review_requests", handle_code_review) # Send request request = Envelope( sender="pipeline", topic="code_review_requests", action="review", payload={"code": "...", "language": "python"}, reply_to=f"reply_channel_{uuid.uuid4()}" ) reply = await bus.publish(request) ```20 min
- 06Shared StateShared state synchronizes agents at the cost of contention; choose consistency models that match task requirements—eager consistency for transactions, lazy for throughput. Multi-agent systems frequently require shared state: order status visible to fulfillment and customer service agents, product inventory shared across sales and warehouse agents, user session data accessed by multiple service agents. Managing this shared state determines system correctness and performance. Optimistic concurrency control accepts that conflicts are rare and handles them through retry logic. Agents read state, make local modifications, then attempt to write. If another agent modified the state between read and write, the write fails and the agent retries with fresh data. This model maximizes throughput but requires idempotent operations. Pessimistic locking blocks access to prevent conflicts. Agents acquire locks before reading or writing shared state. Other agents wait or receive "busy" responses. This model simplifies correctness reasoning but limits parallelism and creates deadlock risk. Event sourcing treats state changes as append-only event logs. Agents append events rather than mutating state directly. Current state derives from replaying events. This pattern provides audit trails and supports temporal queries (what was state at time T?), but complicates current-state access. Saga patterns coordinate multi-step operations across agents. Each step has a compensating action that undoes its effects if later steps fail. The saga coordinator ensures either all steps complete or all are compensated. This pattern handles distributed transactions without distributed locks. Eventual consistency accepts that different agents may temporarily observe different state values. Updates propagate asynchronously; clients eventually see consistent data. This model scales well but complicates scenarios requiring strong consistency. ```python import asyncio from dataclasses import dataclass, field from typing import Optional, Any from datetime import datetime import uuid @dataclass class VersionedValue: value: Any version: int last_modified: datetime modified_by: str class OptimisticStore: """Optimistic concurrency control shared store""" def __init__(self): self.data: dict[str, VersionedValue] = {} self.locks: dict[str, asyncio.Lock] = {} def _get_lock(self, key: str) -> asyncio.Lock: if key not in self.locks: self.locks[key] = asyncio.Lock() return self.locks[key] async def read(self, key: str) -> Optional[VersionedValue]: return self.data.get(key) async def write(self, key: str, value: Any, agent_id: str, expected_version: int) -> bool: """Write with optimistic concurrency. Returns True if successful.""" lock = self._get_lock(key) async with lock: current = self.data.get(key) if current and current.version != expected_version: return False # Version conflict self.data[key] = VersionedValue( value=value, version=(current.version + 1) if current else 1, last_modified=datetime.utcnow(), modified_by=agent_id ) return True async def compare_and_swap( self, key: str, expected: Any, new_value: Any, agent_id: str ) -> bool: """Atomic CAS operation""" lock = self._get_lock(key) async with lock: current = self.data.get(key) if current is None or current.value != expected: return False self.data[key] = VersionedValue( value=new_value, version=current.version + 1, last_modified=datetime.utcnow(), modified_by=agent_id ) return True class SagaState: """Manages saga coordination state""" def __init__(self, store: OptimisticStore): self.store = store self.prefix = "saga:" async def create_saga(self, saga_id: str, initial_state: dict) -> None: key = f"{self.prefix}{saga_id}" await self.store.write(key, { "state": "pending", "steps": initial_state, "completed_steps": [], "compensated_steps": [] }, "saga_manager", 0) async def update_step( self, saga_id: str, step_name: str, result: str, compensation_action: dict ) -> bool: key = f"{self.prefix}{saga_id}" current = await self.store.read(key) if not current: return False saga_state = current.value.copy() if result == "success": saga_state["completed_steps"].append({ "name": step_name, "timestamp": datetime.utcnow().isoformat() }) else: # Compensate completed steps for completed in reversed(saga_state["completed_steps"]): comp_key = f"{self.prefix}{saga_id}:comp:{completed['name']}" await self.store.write( comp_key, compensation_action, "saga_manager", 0 ) saga_state["compensated_steps"].append(completed["name"]) saga_state["state"] = "compensated" return await self.store.write( key, saga_state, "saga_manager", current.version ) # Usage example store = OptimisticStore() saga = SagaState(store) async def process_order(order_id: str): saga_id = f"order_{order_id}" await saga.create_saga(saga_id, [ {"name": "reserve_inventory", "compensation": "release_inventory"}, {"name": "charge_payment", "compensation": "refund_payment"}, {"name": "schedule_shipping", "compensation": "cancel_shipping"} ]) # Execute steps with compensation on failure steps = [ ("reserve_inventory", {"item_id": order_id}), ("charge_payment", {"order_id": order_id}), ("schedule_shipping", {"order_id": order_id}) ] for step_name, params in steps: success = await execute_step(step_name, params) if not await saga.update_step(saga_id, step_name, "success" if success else "failed", {"action": "compensate", "step": step_name}): raise Exception(f"Saga update failed for step {step_name}") if not success: return False return True ```20 min
- 07Supervisor AgentsSupervisor hierarchies provide control and observability, but deep hierarchies create latency and fragility; keep supervisor trees shallow and worker pools large. Supervisor agents sit atop agent hierarchies, coordinating subordinate workers. The supervisor receives tasks, decomposes them, delegates to workers, monitors progress, handles failures, and assembles results. This pattern mirrors organizational structures—managers oversee specialists who perform execution work. Supervisors excel when task decomposition is complex or variable. A customer support supervisor might receive a complaint and decide whether to route to billing, technical support, or retention specialists. That routing logic lives in the supervisor rather than being pre-configured. Supervisors can implement priority schemes. High-priority tasks interrupt lower-priority work. The supervisor tracks running tasks, preempts if necessary, and reschedules interrupted work. This pattern suits systems with variable urgency. Monitoring and observability concentrate in supervisor logic. The supervisor sees all worker activity, enabling centralized logging, metrics, and alerting. Debugging benefits from one place tracking all state transitions. The failure model matters for supervisor design. What happens when a worker fails? Options include: retry on same worker, fail over to backup worker, escalate to human, or fail the parent task. The supervisor encodes these policies. Deep supervisor hierarchies introduce latency—each level adds processing time. A task passing through five supervisor levels before reaching a worker consumes time at each hop. Flat hierarchies with large worker pools reduce depth but concentrate load. Supervisor availability affects system reliability. If the supervisor crashes, its entire subordinate tree becomes unavailable. Redundant supervisors with leader election prevent single points of failure. ```python import asyncio from dataclasses import dataclass, field from typing import Optional, Callable, Any from enum import Enum from datetime import datetime, timedelta class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" RETRYING = "retrying" @dataclass class Task: id: str priority: int payload: Any deadline: Optional[datetime] = None max_retries: int = 3 current_retries: int = 0 status: TaskStatus = TaskStatus.PENDING assigned_to: Optional[str] = None result: Optional[Any] = None error: Optional[str] = None class Supervisor: def __init__( self, workers: list[Callable], max_parallel: int = 10, retry_backoff_base: int = 2 ): self.workers = workers self.max_parallel = max_parallel self.backoff_base = retry_backoff_base self.pending_queue: asyncio.PriorityQueue = asyncio.PriorityQueue() self.running_tasks: dict[str, Task] = {} self.completed_tasks: dict[str, Task] = {} async def submit(self, task: Task) -> None: """Submit a task for execution""" await self.pending_queue.put((task.priority, task.id, task)) async def run(self) -> None: """Main supervisor loop""" while True: # Check for available capacity while len(self.running_tasks) < self.max_parallel: try: _, task_id, task = await asyncio.wait_for( self.pending_queue.get(), timeout=1.0 ) # Assign to worker worker = self._select_worker() if not worker: # Re-queue with priority adjustment task.priority += 1 await self.pending_queue.put( (task.priority, task_id, task) ) await asyncio.sleep(0.1) continue task.status = TaskStatus.RUNNING task.assigned_to = worker.name self.running_tasks[task.id] = task asyncio.create_task(self._execute_with_monitoring(task, worker)) except asyncio.TimeoutError: break def _select_worker(self) -> Optional[Any]: """Select available worker with fewest active tasks""" available = [w for w in self.workers if w.available] if not available: return None return min(available, key=lambda w: w.active_count) async def _execute_with_monitoring(self, task: Task, worker: Any) -> None: try: result = await worker.execute(task.payload) task.result = result task.status = TaskStatus.SUCCESS except Exception as e: await self._handle_failure(task, worker, e) finally: del self.running_tasks[task.id] self.completed_tasks[task.id] = task async def _handle_failure( self, task: Task, worker: Any, error: Exception ) -> None: task.error = str(error) task.current_retries += 1 if task.current_retries < task.max_retries: task.status = TaskStatus.RETRYING # Exponential backoff delay = self.backoff_base ** task.current_retries await asyncio.sleep(delay) # Re-queue with same priority await self.pending_queue.put( (task.priority, task.id, task) ) else: task.status = TaskStatus.FAILED # Could escalate to human review here await self._escalate(task) async def _escalate(self, task: Task) -> None: """Escalate failed task to human review""" # Implementation would integrate with alert systems pass async def get_task_status(self, task_id: str) -> Optional[Task]: if task_id in self.running_tasks: return self.running_tasks[task_id] return self.completed_tasks.get(task_id) # Worker implementation class Worker: def __init__(self, name: str, processor: Callable): self.name = name self.processor = processor self.active_count = 0 self.available = True async def execute(self, payload: Any) -> Any: self.active_count += 1 self.available = False try: return await self.processor(payload) finally: self.active_count -= 1 self.available = True ```20 min
- 08Specialized Agent RolesRole specialization enables focused capability building, but requires careful boundary design to avoid both under-specialization (agents do everything poorly) and over-specialization (agents lack context for decisions). Multi-agent systems derive power from specialization. Each agent focuses on a narrow domain, developing deep capabilities within that scope. A code analysis agent builds expertise in static analysis, AST traversal, and code style detection. A data visualization agent specializes in chart selection, color theory, and accessibility compliance. Specialization boundaries require careful design. Poor boundaries create either gaps (tasks fall between agent responsibilities) or redundancy (multiple agents handle overlapping concerns). Boundaries should align with natural domain divisions and match how humans conceptualize task categories. Each role requires defined inputs, outputs, and behavioral expectations. Input specification tells the agent what requests it receives and what context accompanies them. Output specification defines deliverables and their structure. Behavioral expectations encode constraints—safety requirements, quality thresholds, latency limits. Role documentation includes not just capabilities but also limitations. An agent that cannot handle certain input formats should communicate that clearly. Agents that provide probabilistic outputs should quantify uncertainty. Clear limitation communication prevents misuse. Role composition enables complex workflows. A pipeline might chain data extraction, transformation, and loading agents. A parallel execution might run multiple analysis agents on the same input and merge results. Role composition requires clear contracts between roles. Agents operating in specialized roles may still need general context. A product recommendation agent needs user history, inventory state, and business rules—not just product knowledge. Specialization does not mean isolation; agents must access cross-cutting concerns. ```python from dataclasses import dataclass from typing import Optional, Any, Protocol from abc import ABC, abstractmethod @dataclass class AgentCapability: """Describes what an agent can do""" input_types: list[str] # Supported input content types output_type: str # What the agent produces limitations: list[str] # Known constraints quality_bounds: dict[str, tuple[float, float]] # Min/max quality metrics @dataclass class AgentRole(ABC): """Base class for agent roles""" name: str description: str capabilities: list[AgentCapability] @abstractmethod async def process(self, request: dict) -> dict: """Handle incoming request""" pass @abstractmethod async def describe_limits(self) -> str: """Return human-readable limitation description""" pass class DataExtractionRole(AgentRole): """Specialized role for extracting structured data from unstructured sources""" def __init__(self, model): self.model = model def __init__(self, model): self.name = "data_extractor" self.description = "Extracts structured data from unstructured text" self.capabilities = [ AgentCapability( input_types=["text/plain", "text/html", "application/pdf"], output_type="application/json", limitations=[ "Cannot process images directly (use OCR agent first)", "Maximum input length: 50,000 tokens", "Confidence below 0.7 triggers human review flag" ], quality_bounds={ "precision": (0.85, 1.0), "recall": (0.75, 0.95) } ) ] self.model = model async def process(self, request: dict) -> dict: schema = request.get("target_schema") source = request.get("source_text") extraction_prompt = f"""Extract data according to this schema: {schema} Source text: {source} Return JSON matching the schema exactly. Include confidence scores per field.""" result = await self.model.generate(extraction_prompt) # Post-process to include metadata return { "extracted_data": result, "confidence": self._calculate_confidence(result), "schema_version": request.get("schema_version"), "agent": self.name } def _calculate_confidence(self, result: dict) -> float: # Simplified confidence calculation field_count = len(result) filled_fields = sum(1 for v in result.values() if v is not None) return filled_fields / field_count if field_count > 0 else 0.0 async def describe_limits(self) -> str: return ( "This agent extracts structured data from text sources. " "It cannot process images directly—use OCR preprocessing first. " "Inputs exceeding 50,000 tokens are truncated. " "Results with confidence below 0.7 are flagged for human review." ) class DataValidationRole(AgentRole): """Specialized role for validating extracted data against business rules""" def __init__(self, model): self.name = "validator" self.description = "Validates data against business rules and constraints" self.capabilities = [ AgentCapability( input_types=["application/json"], output_type="application/json", limitations=[ "Requires explicit validation rules in request", "Cannot infer business logic—rules must be provided" ], quality_bounds={ "rule_coverage": (0.9, 1.0), "false_positive_rate": (0.0, 0.05) } ) ] self.model = model async def process(self, request: dict) -> dict: data = request.get("data") rules = request.get("validation_rules", []) validation_results = [] for rule in rules: result = self._apply_rule(data, rule) validation_results.append(result) passed = sum(1 for r in validation_results if r["passed"]) return { "validation_results": validation_results, "passed_count": passed, "failed_count": len(rules) - passed, "overall_valid": passed == len(rules), "agent": self.name } def _apply_rule(self, data: dict, rule: dict) -> dict: field = rule["field"] condition = rule["condition"] value = data.get(field) # Rule evaluation logic if condition["type"] == "range": passed = condition["min"] <= value <= condition["max"] elif condition["type"] == "enum": passed = value in condition["allowed"] else: passed = True return { "field": field, "passed": passed, "actual_value": value, "expected": condition } async def describe_limits(self) -> str: return ( "This agent validates data against provided rules. " "It does not infer business rules—all validation criteria must be " "explicitly defined in the request." ) # Role composition example class PipelineOrchestrator: def __init__(self, extractor: DataExtractionRole, validator: DataValidationRole): self.extractor = extractor self.validator = validator async def process(self, source_text: str, schema: dict, rules: list) -> dict: # Stage 1: Extraction extraction_request = { "source_text": source_text, "target_schema": schema } extraction_result = await self.extractor.process(extraction_request) if extraction_result["confidence"] < 0.7: return { "status": "review_required", "reason": "low_confidence", "extracted_data": extraction_result["extracted_data"] } # Stage 2: Validation validation_request = { "data": extraction_result["extracted_data"], "validation_rules": rules } validation_result = await self.validator.process(validation_request) return { "status": "complete" if validation_result["overall_valid"] else "failed", "data": extraction_result["extracted_data"], "validation": validation_result } ```20 min
- 09Task DecompositionDecomposition quality determines system performance more than agent sophistication; well-structured tasks enable parallelism while poorly structured tasks create bottlenecks. Task decomposition breaks complex requests into manageable subtasks. A supervisor agent receives a complex goal and decides how to divide work among specialized agents. Effective decomposition enables parallel execution, isolates failures, and simplifies each agent's job. Decomposition strategies depend on task structure. Sequential tasks with dependencies require careful ordering; independent subtasks can parallelize. Tasks requiring different capabilities naturally decompose along capability boundaries. Tasks with nested scope require hierarchical decomposition. Goal-oriented decomposition starts from the final deliverable and works backward. What final output does the user need? What inputs are required to produce it? What intermediate results feed those inputs? This recursive decomposition continues until reaching tasks that existing agents can execute directly. Capability-based decomposition maps tasks to agent capabilities. Each agent declares what it can do; decomposition selects agents whose capabilities match subtasks. This approach scales as new agents join—decomposition logic automatically leverages new capabilities. Decomposition must handle uncertainty about task difficulty. Some requests decompose into obvious subtasks; others reveal complexity only during execution. Adaptive decomposition adjusts granularity based on early results—simple subtasks may not need further decomposition while complex ones break down further. Subtask specification requires clear interfaces. Each subtask should define inputs, expected outputs, quality criteria, and failure modes. Vague subtask definitions leads to mismatched expectations and integration failures. ```python from dataclasses import dataclass, field from typing import Optional, Any, Callable from enum import Enum import asyncio class TaskType(Enum): PRIMITIVE = "primitive" # Can be executed by existing agent COMPOSITE = "composite" # Requires further decomposition CONDITIONAL = "conditional" # Branch based on runtime conditions @dataclass class Subtask: id: str name: str type: TaskType input_spec: dict output_spec: dict dependencies: list[str] = field(default_factory=list) required_capabilities: list[str] = field(default_factory=list) quality_threshold: float = 0.8 children: list["Subtask"] = field(default_factory=list) @dataclass class DecompositionResult: root_task: Subtask execution_plan: list[list[Subtask]] # Batches of parallelizable tasks estimated_duration_seconds: float class TaskDecomposer: def __init__(self, capability_registry: dict[str, list[str]]): self.capabilities = capability_registry # agent_name -> supported_capabilities async def decompose( self, goal: str, constraints: dict ) -> DecompositionResult: """Main decomposition entry point""" # Phase 1: Initial decomposition based on goal analysis initial_tasks = await self._analyze_goal(goal, constraints) # Phase 2: Expand composite tasks expanded = await self._expand_composites(initial_tasks) # Phase 3: Analyze dependencies dependency_graph = self._build_dependency_graph(expanded) # Phase 4: Generate execution plan (batched for parallelization) execution_plan = self._generate_execution_plan(dependency_graph) # Phase 5: Estimate duration duration = self._estimate_duration(execution_plan) root = Subtask( id="root", name=goal, type=TaskType.COMPOSITE, input_spec={}, output_spec={}, children=expanded ) return DecompositionResult( root_task=root, execution_plan=execution_plan, estimated_duration_seconds=duration ) async def _analyze_goal(self, goal: str, constraints: dict) -> list[Subtask]: """Initial goal analysis and task identification""" # Prompt-based analysis (would use actual LLM in production) analysis_prompt = f"""Analyze this goal and identify subtasks: Goal: {goal} Constraints: {constraints} Identify: 1. Main phases or steps 2. Required capabilities for each step 3. Clear interfaces between steps 4. Potential parallelization opportunities""" # In production, this calls an analysis model tasks = await self._call_analysis_model(analysis_prompt) return tasks async def _expand_composites(self, tasks: list[Subtask]) -> list[Subtask]: """Recursively expand composite tasks""" expanded = [] for task in tasks: if task.type == TaskType.COMPOSITE: # Check if we have matching agents has_agent = any( cap in self.capabilities.get(agent, []) for agent in self.capabilities for cap in task.required_capabilities ) if not has_agent: # Need further decomposition subtasks = await self._decompose_further(task) expanded.extend(await self._expand_composites(subtasks)) else: expanded.append(task) else: expanded.append(task) return expanded def _build_dependency_graph(self, tasks: list[Subtask]) -> dict[str, list[str]]: """Analyze which tasks depend on outputs of other tasks""" # Simple implementation - real version would analyze data flow dependencies = {} for task in tasks: # Tasks without declared dependencies can run immediately if task.dependencies: dependencies[task.id] = task.dependencies else: dependencies[task.id] = [] return dependencies def _generate_execution_plan(self, dependency_graph: dict) -> list[list[Subtask]]: """Topologically sort and batch tasks by parallelism potential""" # Build adjacency lists in_degree = {task_id: 0 for task_id in dependency_graph} for deps in dependency_graph.values(): for dep in deps: in_degree[dep] = in_degree.get(dep, 0) + 1 plan = [] remaining = set(dependency_graph.keys()) while remaining: # Find all tasks with no dependencies ready = [ task_id for task_id in remaining if in_degree.get(task_id, 0) == 0 ] if not ready: # Circular dependency - break cycle break plan.append(ready) # Remove ready tasks and update degrees for task_id in ready: remaining.remove(task_id) for other_task, deps in dependency_graph.items(): if task_id in deps: in_degree[other_task] -= 1 return plan def _estimate_duration(self, plan: list[list[str]]) -> float: """Estimate total execution time based on task batches""" # Simplified - real implementation would use historical timing return len(plan) * 30.0 # Assume 30 seconds per batch async def _call_analysis_model(self, prompt: str) -> list[Subtask]: """Call analysis model to identify task structure""" # Placeholder - in production, calls actual model return [] async def _decompose_further(self, task: Subtask) -> list[Subtask]: """Further decompose a composite task""" # Placeholder - recursive decomposition logic return [] # Execution engine for decomposed tasks class TaskExecutor: def __init__(self, agent_registry: dict[str, Any]): self.agents = agent_registry async def execute_plan(self, plan: list[list[Subtask]]) -> dict[str, Any]: results = {} for batch in plan: # Execute all tasks in batch concurrently batch_results = await asyncio.gather( *[self._execute_task(task, results) for task in batch], return_exceptions=True ) for task, result in zip(batch, batch_results): if isinstance(result, Exception): results[task.id] = {"status": "failed", "error": str(result)} else: results[task.id] = {"status": "success", "result": result} return results async def _execute_task( self, task: Subtask, prior_results: dict[str, Any] ) -> Any: """Execute a single task using appropriate agent""" # Gather inputs from dependencies inputs = {} for dep_id in task.dependencies: inputs.update(prior_results.get(dep_id, {}).get("result", {})) # Find capable agent agent = self._find_agent(task.required_capabilities) if not agent: raise ValueError(f"No agent found for capabilities: {task.required_capabilities}") # Execute with combined inputs return await agent.execute( task.input_spec, context=inputs ) def _find_agent(self, required_capabilities: list[str]) -> Optional[Any]: """Find first agent supporting required capabilities""" for agent_name, capabilities in self.agents.items(): if all(cap in capabilities for cap in required_capabilities): return self.agents[agent_name] return None ```25 min
- 10Tool SharingShared tool access enables agents to collaborate effectively, but requires careful access control and result validation to prevent cascading failures. Multi-agent systems frequently require shared access to tools: databases, APIs, file systems, computation resources. Agents must not only have access to tools but must share them coherently—results from one agent's tool usage inform another agent's operations. Poor tool sharing creates redundant work, inconsistent state, and error propagation. Tool registration provides a catalog agents query before invoking capabilities. Each tool declares its interface, access requirements, rate limits, and usage cost. Agents select tools based on declared capabilities rather than hardcoded integrations. Access control prevents unauthorized tool usage. Tools may require authentication, enforce quotas, or limit access to specific agents. Registration systems include policy evaluation determining whether requesting agents receive access. Result caching prevents redundant tool invocations. When multiple agents query the same database, cached results serve subsequent requests. Cache invalidation strategies determine how stale cached data becomes before requiring fresh invocation. Tool composition chains tools together. One tool's output becomes another's input. Chain execution requires type compatibility checking, error propagation, and transaction boundaries defining what constitutes success. Result validation ensures tool outputs meet expectations. Tools can return malformed data, incomplete results, or unexpected types. Validators check outputs before passing them to dependent agents, catching failures early. ```python from dataclasses import dataclass, field from typing import Any, Callable, Optional from datetime import datetime, timedelta from enum import Enum import hashlib import json class ToolCategory(Enum): DATA_ACCESS = "data_access" COMPUTATION = "computation" EXTERNAL_API = "external_api" FILE_SYSTEM = "file_system" @dataclass class ToolSpec: name: str category: ToolCategory description: str input_schema: dict output_schema: dict required_permissions: list[str] = field(default_factory=list) rate_limit_per_minute: int = 60 cost_per_invocation: float = 0.0 @dataclass class ToolResult: tool_name: str success: bool output: Any cached: bool = False execution_time_ms: float = 0.0 timestamp: datetime = field(default_factory=datetime.utcnow) error: Optional[str] = None class ToolRegistry: def __init__(self): self.tools: dict[str, ToolSpec] = {} self.implementations: dict[str, Callable] = {} self.cache: dict[str, ToolResult] = {} self.cache_ttl = timedelta(minutes=5) def register(self, spec: ToolSpec, implementation: Callable) -> None: self.tools[spec.name] = spec self.implementations[spec.name] = implementation def get_tool_spec(self, name: str) -> Optional[ToolSpec]: return self.tools.get(name) def list_tools(self, category: Optional[ToolCategory] = None) -> list[ToolSpec]: if category: return [t for t in self.tools.values() if t.category == category] return list(self.tools.values()) def find_tools(self, required_capabilities: list[str]) -> list[ToolSpec]: """Find tools matching capability requirements""" matching = [] for tool in self.tools.values(): # Simple text matching - real version would use capability graphs if all(cap.lower() in tool.description.lower() for cap in required_capabilities): matching.append(tool) return matching class ToolAccessController: def __init__(self, registry: ToolRegistry): self.registry = registry self.permissions: dict[str, set[str]] = {} # agent_id -> allowed_tools self.usage_counts: dict[str, dict[str, int]] = {} # agent_id -> tool_name -> count def grant_permission(self, agent_id: str, tool_names: list[str]) -> None: if agent_id not in self.permissions: self.permissions[agent_id] = set() self.permissions[agent_id].update(tool_names) def can_access(self, agent_id: str, tool_name: str) -> bool: allowed = self.permissions.get(agent_id, set()) if "*" in allowed: return True return tool_name in allowed def check_rate_limit(self, agent_id: str, tool_name: str) -> bool: spec = self.registry.get_tool_spec(tool_name) if not spec: return False current_count = self.usage_counts.get(agent_id, {}).get(tool_name, 0) return current_count < spec.rate_limit_per_minute def record_usage(self, agent_id: str, tool_name: str) -> None: if agent_id not in self.usage_counts: self.usage_counts[agent_id] = {} self.usage_counts[agent_id][tool_name] = \ self.usage_counts[agent_id].get(tool_name, 0) + 1 class ToolExecutor: def __init__( self, registry: ToolRegistry, controller: ToolAccessController, validator: Optional[Any] = None ): self.registry = registry self.controller = controller self.validator = validator async def execute( self, agent_id: str, tool_name: str, parameters: dict ) -> ToolResult: start_time = datetime.utcnow() # Access check if not self.controller.can_access(agent_id, tool_name): return ToolResult( tool_name=tool_name, success=False, output=None, error=f"Agent {agent_id} lacks permission for {tool_name}" ) # Rate limit check if not self.controller.check_rate_limit(agent_id, tool_name): return ToolResult( tool_name=tool_name, success=False, output=None, error=f"Rate limit exceeded for {tool_name}" ) # Generate cache key cache_key = self._generate_cache_key(tool_name, parameters) # Check cache if cache_key in self.registry.cache: cached_result = self.registry.cache[cache_key] if datetime.utcnow() - cached_result.timestamp < self.registry.cache_ttl: cached_result.cached = True return cached_result # Execute tool spec = self.registry.get_tool_spec(tool_name) implementation = self.registry.implementations.get(tool_name) if not spec or not implementation: return ToolResult( tool_name=tool_name, success=False, output=None, error=f"Tool {tool_name} not found" ) try: output = await implementation(parameters) # Validate output if validator present if self.validator: validation_result = self.validator.validate( tool_name, output, spec.output_schema ) if not validation_result.valid: return ToolResult( tool_name=tool_name, success=False, output=output, error=f"Validation failed: {validation_result.errors}" ) result = ToolResult( tool_name=tool_name, success=True, output=output, execution_time_ms=(datetime.utcnow() - start_time).total_seconds() * 1000 ) except Exception as e: result = ToolResult( tool_name=tool_name, success=False, output=None, error=str(e), execution_time_ms=(datetime.utcnow() - start_time).total_seconds() * 1000 ) # Record usage self.controller.record_usage(agent_id, tool_name) # Cache result self.registry.cache[cache_key] = result return result def _generate_cache_key(self, tool_name: str, parameters: dict) -> str: """Generate deterministic cache key for request""" canonical = json.dumps(parameters, sort_keys=True) return hashlib.sha256(f"{tool_name}:{canonical}".encode()).hexdigest() # Example tool definitions and usage registry = ToolRegistry() controller = ToolAccessController(registry) executor = ToolExecutor(registry, controller) async def database_query_impl(params: dict) -> list[dict]: # Implementation would query actual database return [{"id": 1, "name": "example"}] registry.register( ToolSpec( name="query_database", category=ToolCategory.DATA_ACCESS, description="Execute SQL queries against the product database", input_schema={ "type": "object", "properties": { "query": {"type": "string"}, "parameters": {"type": "array"} }, "required": ["query"] }, output_schema={ "type": "array", "items": {"type": "object"} } ), database_query_impl ) controller.grant_permission("data_agent", ["query_database"]) controller.grant_permission("analytics_agent", ["query_database"]) result = await executor.execute( "data_agent", "query_database", {"query": "SELECT * FROM products WHERE active = ?", "parameters": [True]} ) ```20 min
- 11Conflict ResolutionMulti-agent conflicts reveal design gaps; effective resolution requires understanding whether conflicts stem from resource contention, goal misalignment, or factual disagreement. Multi-agent systems encounter conflicts when agents produce incompatible outputs, compete for shared resources, or hold contradictory beliefs. Conflict resolution strategies depend on conflict type—resource contention requires scheduling, goal misalignment requires priority or hierarchy, factual disagreement requires evidence evaluation. Resource conflicts arise when multiple agents need the same limited resource simultaneously. Database connections, API rate limits, and computational capacity create contention. Resolution options include queuing (first-come-first-served), priority ordering (critical tasks first), and fair sharing (rotating access). Goal conflicts emerge when agent objectives contradict. A cost-optimization agent might minimize expenses while a reliability agent maximizes redundancy. These conflicts require explicit priority definitions or higher-level arbitration. Belief conflicts occur when agents derive different conclusions from the same inputs. A classification agent might categorize content differently than expected. Resolution requires evidence comparison, uncertainty quantification, or human arbitration. Resolution mechanisms operate at different speeds. Synchronous resolution blocks until conflict resolves—appropriate for critical decisions. Asynchronous resolution allows agents to proceed and handles conflicts later—appropriate for non-critical work. Resolution strategies should be consistent to avoid confusion. Random resolution creates unpredictable behavior. Agent priority ordering provides stable outcomes. Time-based resolution handles transient conflicts (both agents wanted the same slot at different times). ```python from dataclasses import dataclass, field from typing import Any, Optional, Callable from enum import Enum from datetime import datetime import asyncio class ConflictType(Enum): RESOURCE = "resource" GOAL = "goal" BELIEF = "belief" DATA = "data" @dataclass class Conflict: id: str type: ConflictType participants: list[str] # Agent IDs involved description: str timestamp: datetime = field(default_factory=datetime.utcnow) severity: int = 1 # 1-5, higher is more severe resolution_status: str = "pending" @dataclass class Resolution: conflict_id: str strategy: str outcome: Any resolver: str # Agent or system component that resolved timestamp: datetime = field(default_factory=datetime.utcnow) class ConflictResolver: def __init__(self, priority_registry: dict[str, int]): self.priority_registry = priority_registry # agent_id -> priority (higher wins) self.conflicts: list[Conflict] = [] self.resolutions: list[Resolution] = [] self.resolution_strategies: dict[ConflictType, Callable] = {} self._register_default_strategies() def _register_default_strategies(self) -> None: self.resolution_strategies[ConflictType.RESOURCE] = self._resolve_resource self.resolution_strategies[ConflictType.GOAL] = self._resolve_goal self.resolution_strategies[ConflictType.BELIEF] = self._resolve_belief self.resolution_strategies[ConflictType.DATA] = self._resolve_data async def register_conflict(self, conflict: Conflict) -> str: """Register a new conflict for resolution""" self.conflicts.append(conflict) return conflict.id async def resolve(self, conflict_id: str) -> Optional[Resolution]: """Resolve a specific conflict""" conflict = self._find_conflict(conflict_id) if not conflict: return None strategy = self.resolution_strategies.get(conflict.type) if not strategy: return None resolution = await strategy(conflict) self.resolutions.append(resolution) conflict.resolution_status = "resolved" return resolution async def resolve_all_pending(self) -> list[Resolution]: """Resolve all pending conflicts""" results = [] for conflict in self.conflicts: if conflict.resolution_status == "pending": result = await self.resolve(conflict.id) if result: results.append(result) return results def _find_conflict(self, conflict_id: str) -> Optional[Conflict]: for conflict in self.conflicts: if conflict.id == conflict_id: return conflict return None async def _resolve_resource(self, conflict: Conflict) -> Resolution: """Resolve resource contention using priority and fairness""" # Sort participants by priority (higher wins) sorted_participants = sorted( conflict.participants, key=lambda agent_id: self.priority_registry.get(agent_id, 0), reverse=True ) # Winner gets resource winner = sorted_participants[0] losers = sorted_participants[1:] return Resolution( conflict_id=conflict.id, strategy="priority_queue", outcome={ "winner": winner, "losers": losers, "queue_positions": { agent: pos + 1 for pos, agent in enumerate(sorted_participants) } }, resolver="system" ) async def _resolve_goal(self, conflict: Conflict) -> Resolution: """Resolve goal conflicts using explicit priority rules""" # Check for explicit priority rules priorities = [ (agent_id, self.priority_registry.get(agent_id, 0)) for agent_id in conflict.participants ] # Highest priority wins winner = max(priorities, key=lambda x: x[1])[0] return Resolution( conflict_id=conflict.id, strategy="priority_override", outcome={ "primary_agent": winner, "secondary_agents": [ a for a in conflict.participants if a != winner ], "compromise_required": True }, resolver="system" ) async def _resolve_belief(self, conflict: Conflict) -> Resolution: """Resolve belief conflicts through evidence comparison""" # For belief conflicts, collect evidence from participants # This would integrate with the actual agent system # Simple strategy: pick belief with highest confidence # Real implementation would aggregate evidence return Resolution( conflict_id=conflict.id, strategy="confidence_weighted", outcome={ "resolution_type": "requires_human_review", "agents_in_dispute": conflict.participants }, resolver="system" ) async def _resolve_data(self, conflict: Conflict) -> Resolution: """Resolve data inconsistencies""" return Resolution( conflict_id=conflict.id, strategy="timestamp_latest", outcome={ "resolution": "use_most_recent_value", "requires_manual_review": conflict.severity >= 4 }, resolver="system" ) class HierarchicalConflictEscalation: """Handles escalation when standard resolution fails""" def __init__(self, resolver: ConflictResolver): self.resolver = resolver self.escalation_levels = { 1: "agent_level", # Agents negotiate directly 2: "supervisor_level", # Supervisor arbitrates 3: "system_level", # System-wide rules apply 4: "human_review" # Human intervention required } async def escalate(self, conflict_id: str) -> str: """Escalate conflict to higher resolution level""" conflict = self.resolver._find_conflict(conflict_id) if not conflict: return "unknown" new_level = min(conflict.severity + 1, 4) conflict.severity = new_level return self.escalation_levels[new_level] # Usage example resolver = ConflictResolver({ "critical_agent": 100, "data_agent": 80, "analytics_agent": 60, "cache_agent": 40 }) async def handle_agent_conflict(agent1: str, agent2: str, resource: str): conflict = Conflict( id=f"conflict_{resource}_{datetime.utcnow().timestamp()}", type=ConflictType.RESOURCE, participants=[agent1, agent2], description=f"Both {agent1} and {agent2} require {resource}", severity=2 ) await resolver.register_conflict(conflict) result = await resolver.resolve(conflict.id) return result ```20 min
- 12Consensus MechanismsConsensus enables agents to agree on facts and actions, but perfect consensus is expensive; choose mechanisms matching the cost of disagreement versus the cost of delay. Multi-agent systems often require agents to agree on shared state, coordinated actions, or authoritative decisions. Consensus mechanisms provide systematic ways for agents to reach agreement even when individual agents have incomplete information or conflicting views. Byzantine fault tolerance handles scenarios where agents may behave arbitrarily—maliciously, incorrectly, or inconsistently. Traditional Byzantine consensus requires large message exchanges and significant time. Practical Byzantine Fault Tolerance (PBFT) reduces overhead while maintaining safety guarantees. Raft consensus provides a simpler model for crash fault tolerance—agents may fail by stopping but not by behaving incorrectly. Raft elects a leader that coordinates operations. Followers replicate leader decisions. Leader failures trigger new elections. Multi-agent coordination can use voting mechanisms. Each agent casts votes based on its view; outcomes depend on voting rules (majority, supermajority, weighted). Voting works for discrete decisions but struggles with continuous value coordination. Gossip protocols spread information across agent populations without central coordination. Agents randomly share state with peers; over time, the population converges to shared understanding. Gossip handles eventual consistency but provides no hard guarantees. Credential propagation enables agents to collectively validate information. Agents share evidence and attestations; collective validation provides higher confidence than individual checks. However, collective validation increases latency. Consensus overhead must match the cost of errors. Low-stakes decisions warrant fast consensus with small quorums. High-stakes decisions warrant slower consensus with Byzantine guarantees. ```python from dataclasses import dataclass, field from typing import Any, Optional, Callable from enum import Enum from datetime import datetime import asyncio import hashlib import random class ConsensusState(Enum): PENDING = "pending" PROPOSED = "proposed" PREPARE = "prepare" COMMIT = "commit" DECIDED = "decided" REJECTED = "rejected" @dataclass class Proposal: id: str value: Any proposer: str round: int timestamp: datetime = field(default_factory=datetime.utcnow) @dataclass class Vote: proposal_id: str voter: str vote_type: str # "prepare" or "commit" round: int timestamp: datetime = field(default_factory=datetime.utcnow) class RaftConsensus: """Simplified Raft consensus for agent leader election and state replication""" def __init__(self, agent_id: str, peers: list[str]): self.agent_id = agent_id self.peers = peers self.state = "follower" self.current_term = 0 self.voted_for: Optional[str] = None self.leader_id: Optional[str] = None self.commit_index = 0 self.last_applied = 0 self.vote_counts: dict[int, dict[str, bool]] = {} self.election_timeout = random.randint(150, 300) / 1000 # seconds async def start_election(self) -> bool: """Start leader election""" self.current_term += 1 self.state = "candidate" self.voted_for = self.agent_id votes_received = {self.agent_id} # Request votes from peers for peer in self.peers: granted = await self._request_vote(peer) if granted: votes_received.add(peer) # Check if majority achieved majority = (len(self.peers) + 1) // 2 + 1 if len(votes_received) >= majority: self.state = "leader" self.leader_id = self.agent_id return True return False async def _request_vote(self, peer: str) -> bool: """Request vote from peer""" # Simplified - real implementation would send RPC last_log_index = 0 last_log_term = 0 # Peer decides based on its state # It grants vote if: # 1. term is >= peer's term # 2. candidate's log is at least as complete return True async def append_entries(self, entries: list[dict]) -> bool: """Leader appends entries to followers""" if self.state != "leader": return False success_count = 1 # Include self for peer in self.peers: success = await self._send_entries(peer, entries) if success: success_count += 1 majority = (len(self.peers) + 1) // 2 + 1 if success_count >= majority: for entry in entries: self.commit_index += 1 return True return False async def _send_entries(self, peer: str, entries: list[dict]) -> bool: """Send entries to follower""" return True class MultiPaxosVariant: """Multi-Paxos for distributed decision making""" def __init__(self, agent_id: str, peers: list[str]): self.agent_id = agent_id self.peers = peers self.proposals: dict[str, Proposal] = {} self.votes: dict[str, list[Vote]] = {} self.decisions: dict[str, Any] = {} async def propose(self, value: Any) -> Optional[Any]: """Propose a value for consensus""" proposal_id = hashlib.sha256( f"{value}{datetime.utcnow().timestamp()}".encode() ).hexdigest()[:16] proposal = Proposal( id=proposal_id, value=value, proposer=self.agent_id, round=0 ) self.proposals[proposal_id] = proposal # Phase 1: Prepare prepare_ok = await self._send_prepare(proposal) if not prepare_ok: return None # Phase 2: Accept accept_ok = await self._send_accept(proposal) if not accept_ok: return None # Phase 3: Learn decision self.decisions[proposal_id] = value return value async def _send_prepare(self, proposal: Proposal) -> bool: """Send prepare messages, collect promises""" promises = 1 # Self for peer in self.peers: # In real implementation, wait for promise from peer promise_received = True if promise_received: promises += 1 majority = (len(self.peers) + 1) // 2 + 1 return promises >= majority async def _send_accept(self, proposal: Proposal) -> bool: """Send accept messages, collect accepts""" accepts = 1 # Self for peer in self.peers: accept_received = True if accept_received: accepts += 1 majority = (len(self.peers) + 1) // 2 + 1 return accepts >= majority async def learn_decision(self, proposal_id: str) -> Optional[Any]: """Learn a decided value""" return self.decisions.get(proposal_id) class GossipConsensus: """Gossip-based eventual consensus for large agent populations""" def __init__(self, agent_id: str, peers: list[str]): self.agent_id = agent_id self.peers = peers self.state: dict[str, Any] = {} self.version: dict[str, int] = {} self.gossip_interval = 1.0 # seconds async def start_gossip(self) -> None: """Start background gossip protocol""" while True: await asyncio.sleep(self.gossip_interval) peer = random.choice(self.peers) await self._gossip_with(peer) async def _gossip_with(self, peer: str) -> None: """Exchange state with peer""" # Select subset of state to share state_subset = dict(random.sample( list(self.state.items()), min(3, len(self.state)) )) # In real implementation, send to peer and receive their updates # Merge incoming state # Update versions pass async def set_value(self, key: str, value: Any) -> None: """Set a value (will eventually propagate to all agents)""" self.state[key] = value self.version[key] = self.version.get(key, 0) + 1 async def get_value(self, key: str) -> Optional[Any]: """Get current known value (may not be globally consistent)""" return self.state.get(key) # Factory function for consensus mechanism selection def create_consensus( mechanism: str, agent_id: str, peers: list[str] ): if mechanism == "raft": return RaftConsensus(agent_id, peers) elif mechanism == "paxos": return MultiPaxosVariant(agent_id, peers) elif mechanism == "gossip": return GossipConsensus(agent_id, peers) else: raise ValueError(f"Unknown consensus mechanism: {mechanism}") # Usage consensus = create_consensus("raft", "agent_1", ["agent_2", "agent_3", "agent_4"]) # Leader election is_leader = await consensus.start_election() if is_leader: # Propose state changes await consensus.append_entries([{"type": "update", "key": "config", "value": "new_config"}]) ```25 min
- 13Agent ObservabilityObservable agents emit structured state events that enable retrospective analysis and real-time anomaly detection without degrading agent performance.15 min
- 14Agent TracingDistributed tracing captures causal chains across agent boundaries, enabling engineers to reconstruct exactly how complex multi-agent behaviors emerged from component interactions.15 min
- 15Performance MetricsMulti-agent metrics must capture interaction patterns between agents, not just individual throughput, to reveal coordination bottlenecks and emergent system behaviors.15 min
- 16Error RecoveryError recovery in multi-agent systems requires classification-driven response selection, with retry strategies for transient failures and escalation pathways for semantic or fatal errors.15 min
- 17Scaling AgentsAgent scaling requires pool-based worker management with utilization-driven autoscaling that accounts for context transfer costs and variable task durations.15 min
- 18Security ConsiderationsMulti-agent security requires defense in depth—context guards prevent injection attacks, authorization scoping limits tool access, and sandboxing contains adversarial behavior.15 min
- 19Multi-Agent TestingMulti-agent testing requires a layered approach that validates individual agents in isolation, integration patterns between agents, and emergent system behaviors under realistic conditions.15 min
- 20Production DeploymentProduction deployment requires infrastructure that supports independent agent scaling, versioned deployments with rollback capability, and runtime configuration injection without service disruption.15 min
- 21Multi-Agent Content TeamMulti-agent content production uses staged pipelines with distinct responsibilities per agent, human checkpoints for quality control, and iteration loops for revision cycles.15 min
- 22Research Agent TeamResearch teams benefit from agent specialization where each agent excels at distinct research phases, enabling parallel execution and confidence-weighted synthesis.15 min
- 23Customer Support AgentsCustomer support multi-agent systems use staged processing with triage classification, diagnostic investigation, automated resolution attempts, and escalation pathways for complex cases.15 min
- 24Multi-Agent Platform ProjectMulti-agent platforms provide reusable infrastructure that abstracts orchestration, observability, and security concerns, enabling teams to focus on domain-specific agent implementations.15 min