RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Custom Agent Frameworks
  6. /Ch. 17
Custom Agent Frameworks

17. Error Handling

Chapter 17 of 24 · 20 min
KEY INSIGHT

The worst failure mode is silent failure. Every error should either be resolved or moved to a dead letter queue with alerting. Errors that "just get retried forever" waste resources and hide problems.

Agent systems amplify errors. A single bad message can cascade through multiple agents, corrupting state and generating infinite retry loops. Solid error handling isn't optional.

Error Classification

Not all errors are equal. Classify them to respond appropriately:

from enum import Enum
from dataclasses import dataclass

class ErrorSeverity(Enum):
    RETRYABLE = "retryable"        # Network blips, temporary unavailability
    TRANSIENT = "transient"        # Resource exhaustion, load spikes
    FATAL = "fatal"                # Logic bugs, data corruption
    UNKNOWN = "unknown"            # Unexpected errors

@dataclass
class AgentError(Exception):
    message: str
    severity: ErrorSeverity
    retry_after_seconds: float | None = None
    original_exception: Exception | None = None

def classify_error(error: Exception) -> ErrorSeverity:
    if isinstance(error, (ConnectionError, TimeoutError)):
        return ErrorSeverity.RETRYABLE
    elif isinstance(error, (MemoryError, OSError)):
        return ErrorSeverity.TRANSIENT
    elif isinstance(error, AgentError):
        return error.severity
    else:
        return ErrorSeverity.UNKNOWN

Retry Logic with Exponential Backoff

Retrying immediately is almost always wrong:

import random
import asyncio

async def retry_with_backoff(
    operation,
    max_attempts: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0
):
    last_exception = None
    
    for attempt in range(max_attempts):
        try:
            return await operation()
        except Exception as e:
            last_exception = e
            severity = classify_error(e)
            
            if severity == ErrorSeverity.FATAL:
                raise  # Don't retry fatal errors
            
            if attempt < max_attempts - 1:
                # Exponential backoff with jitter
                delay = min(base_delay * (2 ** attempt), max_delay)
                delay *= (0.5 + random.random() * 0.5)  # Add jitter
                await asyncio.sleep(delay)
    
    raise last_exception

Dead Letter Queues

Messages that fail repeatedly shouldn't block the queue:

class DeadLetterQueue:
    def __init__(self, max_retries: int = 3):
        self.queue: asyncio.Queue[dict] = asyncio.Queue()
        self.max_retries = max_retries
        self.retry_counts: dict[str, int] = {}
    
    async def add_failed_message(self, message: dict, error: AgentError):
        msg_id = message.get('id', 'unknown')
        self.retry_counts[msg_id] = self.retry_counts.get(msg_id, 0) + 1
        
        if self.retry_counts[msg_id] >= self.max_retries:
            await self.queue.put({
                "message": message,
                "error": str(error),
                "retry_count": self.retry_counts[msg_id],
                "failed_at": datetime.now().isoformat()
            })
            # Remove from tracking to save memory
            self.retry_counts.pop(msg_id, None)
        else:
            error.retry_after_seconds = min(2 ** self.retry_counts[msg_id], 30)
            raise error
    
    async def process_dead_letters(self, handler):
        while True:
            item = await self.queue.get()
            await handler(item)
            self.queue.task_done()
EXERCISE

Review an existing agent's error handling. Identify scenarios where errors could be swallowed silently. Add dead letter queue processing and alerting for messages that fail repeatedly.

← Chapter 16
Observability
Chapter 18 →
Testing Framework