15. Error Handling
Chapter 15 of 18 · 30 min
Production AI systems encounter failures constantly. reliable error handling separates reliable automation from fragile prototypes. Design for failure at every integration point.
Exception Hierarchy
# error_handling.py
from typing import Optional, Callable
from datetime import datetime
from pathlib import Path
import traceback
import json
class AutomationError(Exception):
"""Base exception for automation failures."""
def __init__(self, message: str, context: dict = None):
super().__init__(message)
self.context = context or {}
self.timestamp = datetime.now()
class ModelError(AutomationError):
"""Raised when AI model operations fail."""
pass
class DataError(AutomationError):
"""Raised when data operations fail."""
pass
class IntegrationError(AutomationError):
"""Raised when external service integrations fail."""
pass
class ErrorHandler:
def __init__(self, config_path: str = "error_config.json"):
self.config = self._load_config(config_path)
self.error_log = Path(self.config.get("error_log", "errors.jsonl"))
self.error_counts: dict[str, int] = {}
def _load_config(self, path: str) -> dict:
with open(path) as f:
return json.load(f)
def handle(self, error: Exception, operation: str,
context: dict = None) -> dict:
"""Handle error with appropriate recovery strategy."""
error_type = type(error).__name__
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# Log error
log_entry = self._create_log_entry(error, operation, context)
self._persist_error(log_entry)
# Determine recovery action
recovery = self._determine_recovery(error, operation, context)
return {
"handled": True,
"error_type": error_type,
"recovery": recovery,
"logged": True
}
def _create_log_entry(self, error: Exception, operation: str,
context: dict = None) -> dict:
"""Create structured error log entry."""
return {
"timestamp": datetime.now().isoformat(),
"error_type": type(error).__name__,
"message": str(error),
"operation": operation,
"context": context or {},
"traceback": traceback.format_exc(),
"error_count": self.error_counts.get(type(error).__name__, 0)
}
def _persist_error(self, log_entry: dict):
"""Write error to persistent log."""
with open(self.error_log, "a") as f:
f.write(json.dumps(log_entry) + "\n")
def _determine_recovery(self, error: Exception, operation: str,
context: dict = None) -> dict:
"""Determine appropriate recovery action."""
error_type = type(error).__name__
# Model errors - retry with fallback
if isinstance(error, ModelError):
return {
"action": "retry_with_fallback",
"max_attempts": 3,
"fallback_model": self.config.get("fallback_model", "llama3"),
"backoff_seconds": 5
}
# Data errors - skip and continue
if isinstance(error, DataError):
return {
"action": "skip_and_log",
"continue_processing": True
}
# Integration errors - circuit break
if isinstance(error, IntegrationError):
service = context.get("service", "unknown") if context else "unknown"
return {
"action": "circuit_break",
"service": service,
"open_duration_seconds": self.config.get("circuit_break_duration", 60)
}
# Unknown errors - alert and halt
return {
"action": "alert_and_halt",
"alert_channels": self.config.get("alert_channels", [])
}
Retry Logic with Exponential Backoff
# retry_handler.py
import time
from functools import wraps
from typing import Callable, Any
class RetryHandler:
def __init__(self, max_attempts: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, exponential_base: float = 2.0):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
def retry(self, func: Callable = None, exceptions: tuple = (Exception,)):
"""Decorator for retry logic."""
def decorator(f: Callable) -> Callable:
@wraps(f)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(1, self.max_attempts + 1):
try:
return f(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == self.max_attempts:
raise
delay = self._calculate_delay(attempt)
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
raise last_exception
return wrapper
if func is None:
return decorator
return decorator(func)
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter."""
import random
delay = min(self.base_delay * (self.exponential_base ** (attempt - 1)),
self.max_delay)
# Add jitter (±25%)
jitter = delay * 0.25 * (2 * random.random() - 1)
return delay + jitter
# Usage
retry_handler = RetryHandler(max_attempts=3, base_delay=2.0, max_delay=60.0)
@retry_handler.retry(exceptions=(ConnectionError, TimeoutError))
def call_external_api(data: dict) -> dict:
"""Example API call with retry."""
import requests
response = requests.post("https://api.example.com/process",
json=data, timeout=30)
response.raise_for_status()
return response.json()
Circuit Breaker Pattern
# circuit_breaker.py
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time: datetime | None = None
self.state = CircuitState.CLOSED
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function through circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset."""
if self.last_failure_time is None:
return True
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_timeout
def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} failures")
class CircuitOpenError(Exception):
"""Raised when circuit breaker is open."""
pass
# Integration with error handler
def circuit_protected(func: Callable, service_name: str,
error_handler: ErrorHandler) -> Callable:
"""Decorator combining circuit breaker with error handling."""
breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
@wraps(func)
def wrapper(*args, **kwargs):
try:
return breaker.call(func, *args, **kwargs)
except CircuitOpenError:
error_handler.handle(
AutomationError(f"Circuit open for {service_name}"),
f"call_{service_name}",
{"service": service_name, "circuit_state": "open"}
)
raise
except Exception as e:
error_handler.handle(e, f"call_{service_name}",
{"service": service_name})
raise
return wrapper
Graceful Degradation
# degradation.py
class DegradedMode:
"""Manage graceful degradation when AI services are unavailable."""
def __init__(self, config_path: str = "degradation_config.json"):
self.config = self._load_config(config_path)
self.is_degraded = False
self.fallback_rules = self._load_fallback_rules()
def get_fallback(self, operation: str, context: dict) -> Any:
"""Return appropriate fallback for failed operation."""
if operation == "analyze":
return self._fallback_analysis(context)
elif operation == "summarize":
return self._fallback_summary(context)
elif operation == "classify":
return self._fallback_classification(context)
else:
return {"error": "No fallback available", "original_operation": operation}
def _fallback_analysis(self, context: dict) -> dict:
"""Simple rule-based fallback for analysis."""
# When AI unavailable, apply basic thresholds
value = context.get("value", 0)
threshold = context.get("threshold", 100)
return {
"analysis": "automated_threshold",
"result": "anomaly" if value > threshold else "normal",
"confidence": "low",
"degraded_mode": True
}
def _fallback_summary(self, context: dict) -> dict:
"""Return minimal summary without AI."""
return {
"summary": f"Data contains {context.get('record_count', 0)} records",
"confidence": "low",
"degraded_mode": True
}
def _fallback_classification(self, context: dict) -> dict:
"""Return unknown classification without AI."""
return {
"category": "unknown",
"confidence": 0.0,
"degraded_mode": True
}
EXERCISE
Implement error handling for an Ollama API call that retries on connection errors, falls back to a smaller model after 3 failures, and logs a critical alert if all models fail.