11. Retry Logic
Retry logic transforms fragile function calls into reliable operations. Without it, a single transient failure cascades into a complete system failure. With it, the system survives temporary issues while remaining responsive.
Exponential Backoff
The standard retry pattern uses exponential backoff—waiting progressively longer between attempts. This prevents hammering a failing service while still giving transient failures a chance to resolve.
import time
import random
def exponential_backoff(
attempt: int,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> float:
"""
Calculate delay for retry attempt with exponential backoff.
Args:
attempt: Zero-indexed attempt number
base_delay: Base delay in seconds
max_delay: Maximum delay cap
jitter: Add randomness to prevent thundering herd
Returns:
Delay in seconds before next retry
"""
delay = base_delay * (2 ** attempt)
delay = min(delay, max_delay)
if jitter:
# Add ±25% randomness
delay = delay * (0.75 + random.random() * 0.5)
return delay
Retry Configuration
Not all errors warrant retries. A file not found error will not resolve by waiting. A network timeout might. Configure retry behavior per error type:
from dataclasses import dataclass
from typing import Callable, Type
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
retryable_exceptions: tuple[Type[Exception], ...] = (TimeoutError, ConnectionError)
exponential_base: float = 2.0
class RetryExecutor:
def __init__(self, config: RetryConfig):
self.config = config
def execute_with_retry(
self,
func: Callable,
*args,
**kwargs
):
last_exception = None
for attempt in range(self.config.max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
# Check if error is retryable
is_retryable = any(
isinstance(e, exc_type)
for exc_type in self.config.retryable_exceptions
)
if not is_retryable or attempt == self.config.max_attempts - 1:
raise
delay = exponential_backoff(
attempt,
self.config.base_delay,
self.config.max_delay
)
time.sleep(delay)
raise last_exception
Circuit Breaker Pattern
For persistent failures, a circuit breaker prevents further attempts while the service recovers:
from enum import Enum
from time import time
import threading
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
half_open_attempts: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_attempts = half_open_attempts
self.failure_count = 0
self.last_failure_time: float | None = None
self.state = CircuitState.CLOSED
self._lock = threading.Lock()
def call(self, func: Callable, *args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return True
return time() - self.last_failure_time >= self.recovery_timeout
def _on_success(self):
with self._lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Integration with Tool Execution
class ResilientToolExecutor:
def __init__(self):
self.circuit_breakers: dict[str, CircuitBreaker] = {}
self.retry_executor = RetryExecutor(RetryConfig())
def execute(self, tool_name: str, func: Callable, *args, **kwargs):
if tool_name not in self.circuit_breakers:
self.circuit_breakers[tool_name] = CircuitBreaker()
breaker = self.circuit_breakers[tool_name]
return breaker.call(
lambda: self.retry_executor.execute_with_retry(func, *args, **kwargs)
)
Add a circuit breaker to your tool executor that opens after 3 consecutive failures on any single tool. Verify that subsequent calls to the failing tool are rejected immediately while calls to other tools proceed normally.