17. Error Handling
Chapter 17 of 24 · 20 min
Agent systems amplify errors. A single bad message can cascade through multiple agents, corrupting state and generating infinite retry loops. Solid error handling isn't optional.
Error Classification
Not all errors are equal. Classify them to respond appropriately:
from enum import Enum
from dataclasses import dataclass
class ErrorSeverity(Enum):
RETRYABLE = "retryable" # Network blips, temporary unavailability
TRANSIENT = "transient" # Resource exhaustion, load spikes
FATAL = "fatal" # Logic bugs, data corruption
UNKNOWN = "unknown" # Unexpected errors
@dataclass
class AgentError(Exception):
message: str
severity: ErrorSeverity
retry_after_seconds: float | None = None
original_exception: Exception | None = None
def classify_error(error: Exception) -> ErrorSeverity:
if isinstance(error, (ConnectionError, TimeoutError)):
return ErrorSeverity.RETRYABLE
elif isinstance(error, (MemoryError, OSError)):
return ErrorSeverity.TRANSIENT
elif isinstance(error, AgentError):
return error.severity
else:
return ErrorSeverity.UNKNOWN
Retry Logic with Exponential Backoff
Retrying immediately is almost always wrong:
import random
import asyncio
async def retry_with_backoff(
operation,
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
last_exception = None
for attempt in range(max_attempts):
try:
return await operation()
except Exception as e:
last_exception = e
severity = classify_error(e)
if severity == ErrorSeverity.FATAL:
raise # Don't retry fatal errors
if attempt < max_attempts - 1:
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
delay *= (0.5 + random.random() * 0.5) # Add jitter
await asyncio.sleep(delay)
raise last_exception
Dead Letter Queues
Messages that fail repeatedly shouldn't block the queue:
class DeadLetterQueue:
def __init__(self, max_retries: int = 3):
self.queue: asyncio.Queue[dict] = asyncio.Queue()
self.max_retries = max_retries
self.retry_counts: dict[str, int] = {}
async def add_failed_message(self, message: dict, error: AgentError):
msg_id = message.get('id', 'unknown')
self.retry_counts[msg_id] = self.retry_counts.get(msg_id, 0) + 1
if self.retry_counts[msg_id] >= self.max_retries:
await self.queue.put({
"message": message,
"error": str(error),
"retry_count": self.retry_counts[msg_id],
"failed_at": datetime.now().isoformat()
})
# Remove from tracking to save memory
self.retry_counts.pop(msg_id, None)
else:
error.retry_after_seconds = min(2 ** self.retry_counts[msg_id], 30)
raise error
async def process_dead_letters(self, handler):
while True:
item = await self.queue.get()
await handler(item)
self.queue.task_done()
EXERCISE
Review an existing agent's error handling. Identify scenarios where errors could be swallowed silently. Add dead letter queue processing and alerting for messages that fail repeatedly.