11. Retry Logic

Chapter 11 of 18 · 25 min

Retry logic transforms fragile function calls into reliable operations. Without it, a single transient failure cascades into a complete system failure. With it, the system survives temporary issues while remaining responsive.

Exponential Backoff

The standard retry pattern uses exponential backoff—waiting progressively longer between attempts. This prevents hammering a failing service while still giving transient failures a chance to resolve.

import time
import random

def exponential_backoff(
    attempt: int,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
) -> float:
    """
    Calculate delay for retry attempt with exponential backoff.
    
    Args:
        attempt: Zero-indexed attempt number
        base_delay: Base delay in seconds
        max_delay: Maximum delay cap
        jitter: Add randomness to prevent thundering herd
    
    Returns:
        Delay in seconds before next retry
    """
    delay = base_delay * (2 ** attempt)
    delay = min(delay, max_delay)
    
    if jitter:
        # Add ±25% randomness
        delay = delay * (0.75 + random.random() * 0.5)
    
    return delay

Retry Configuration

Not all errors warrant retries. A file not found error will not resolve by waiting. A network timeout might. Configure retry behavior per error type:

from dataclasses import dataclass
from typing import Callable, Type

@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 30.0
    retryable_exceptions: tuple[Type[Exception], ...] = (TimeoutError, ConnectionError)
    exponential_base: float = 2.0

class RetryExecutor:
    def __init__(self, config: RetryConfig):
        self.config = config
    
    def execute_with_retry(
        self, 
        func: Callable, 
        *args, 
        **kwargs
    ):
        last_exception = None
        
        for attempt in range(self.config.max_attempts):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e
                
                # Check if error is retryable
                is_retryable = any(
                    isinstance(e, exc_type) 
                    for exc_type in self.config.retryable_exceptions
                )
                
                if not is_retryable or attempt == self.config.max_attempts - 1:
                    raise
                
                delay = exponential_backoff(
                    attempt,
                    self.config.base_delay,
                    self.config.max_delay
                )
                time.sleep(delay)
        
        raise last_exception

Circuit Breaker Pattern

For persistent failures, a circuit breaker prevents further attempts while the service recovers:

from enum import Enum
from time import time
import threading

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        half_open_attempts: int = 1
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_attempts = half_open_attempts
        
        self.failure_count = 0
        self.last_failure_time: float | None = None
        self.state = CircuitState.CLOSED
        self._lock = threading.Lock()
    
    def call(self, func: Callable, *args, **kwargs):
        with self._lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise CircuitOpenError("Circuit breaker is open")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self) -> bool:
        if self.last_failure_time is None:
            return True
        return time() - self.last_failure_time >= self.recovery_timeout
    
    def _on_success(self):
        with self._lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        with self._lock:
            self.failure_count += 1
            self.last_failure_time = time()
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

Integration with Tool Execution

class ResilientToolExecutor:
    def __init__(self):
        self.circuit_breakers: dict[str, CircuitBreaker] = {}
        self.retry_executor = RetryExecutor(RetryConfig())
    
    def execute(self, tool_name: str, func: Callable, *args, **kwargs):
        if tool_name not in self.circuit_breakers:
            self.circuit_breakers[tool_name] = CircuitBreaker()
        
        breaker = self.circuit_breakers[tool_name]
        return breaker.call(
            lambda: self.retry_executor.execute_with_retry(func, *args, **kwargs)
        )
EXERCISE

Add a circuit breaker to your tool executor that opens after 3 consecutive failures on any single tool. Verify that subsequent calls to the failing tool are rejected immediately while calls to other tools proceed normally.