15. Error Handling

Chapter 15 of 18 · 30 min

Production AI systems encounter failures constantly. reliable error handling separates reliable automation from fragile prototypes. Design for failure at every integration point.

Exception Hierarchy

# error_handling.py
from typing import Optional, Callable
from datetime import datetime
from pathlib import Path
import traceback
import json

class AutomationError(Exception):
    """Base exception for automation failures."""
    def __init__(self, message: str, context: dict = None):
        super().__init__(message)
        self.context = context or {}
        self.timestamp = datetime.now()

class ModelError(AutomationError):
    """Raised when AI model operations fail."""
    pass

class DataError(AutomationError):
    """Raised when data operations fail."""
    pass

class IntegrationError(AutomationError):
    """Raised when external service integrations fail."""
    pass

class ErrorHandler:
    def __init__(self, config_path: str = "error_config.json"):
        self.config = self._load_config(config_path)
        self.error_log = Path(self.config.get("error_log", "errors.jsonl"))
        self.error_counts: dict[str, int] = {}
    
    def _load_config(self, path: str) -> dict:
        with open(path) as f:
            return json.load(f)
    
    def handle(self, error: Exception, operation: str, 
               context: dict = None) -> dict:
        """Handle error with appropriate recovery strategy."""
        error_type = type(error).__name__
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
        
        # Log error
        log_entry = self._create_log_entry(error, operation, context)
        self._persist_error(log_entry)
        
        # Determine recovery action
        recovery = self._determine_recovery(error, operation, context)
        
        return {
            "handled": True,
            "error_type": error_type,
            "recovery": recovery,
            "logged": True
        }
    
    def _create_log_entry(self, error: Exception, operation: str,
                          context: dict = None) -> dict:
        """Create structured error log entry."""
        return {
            "timestamp": datetime.now().isoformat(),
            "error_type": type(error).__name__,
            "message": str(error),
            "operation": operation,
            "context": context or {},
            "traceback": traceback.format_exc(),
            "error_count": self.error_counts.get(type(error).__name__, 0)
        }
    
    def _persist_error(self, log_entry: dict):
        """Write error to persistent log."""
        with open(self.error_log, "a") as f:
            f.write(json.dumps(log_entry) + "\n")
    
    def _determine_recovery(self, error: Exception, operation: str,
                           context: dict = None) -> dict:
        """Determine appropriate recovery action."""
        error_type = type(error).__name__
        
        # Model errors - retry with fallback
        if isinstance(error, ModelError):
            return {
                "action": "retry_with_fallback",
                "max_attempts": 3,
                "fallback_model": self.config.get("fallback_model", "llama3"),
                "backoff_seconds": 5
            }
        
        # Data errors - skip and continue
        if isinstance(error, DataError):
            return {
                "action": "skip_and_log",
                "continue_processing": True
            }
        
        # Integration errors - circuit break
        if isinstance(error, IntegrationError):
            service = context.get("service", "unknown") if context else "unknown"
            return {
                "action": "circuit_break",
                "service": service,
                "open_duration_seconds": self.config.get("circuit_break_duration", 60)
            }
        
        # Unknown errors - alert and halt
        return {
            "action": "alert_and_halt",
            "alert_channels": self.config.get("alert_channels", [])
        }

Retry Logic with Exponential Backoff

# retry_handler.py
import time
from functools import wraps
from typing import Callable, Any

class RetryHandler:
    def __init__(self, max_attempts: int = 3, base_delay: float = 1.0,
                 max_delay: float = 60.0, exponential_base: float = 2.0):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
    
    def retry(self, func: Callable = None, exceptions: tuple = (Exception,)):
        """Decorator for retry logic."""
        def decorator(f: Callable) -> Callable:
            @wraps(f)
            def wrapper(*args, **kwargs) -> Any:
                last_exception = None
                
                for attempt in range(1, self.max_attempts + 1):
                    try:
                        return f(*args, **kwargs)
                    except exceptions as e:
                        last_exception = e
                        
                        if attempt == self.max_attempts:
                            raise
                        
                        delay = self._calculate_delay(attempt)
                        print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
                        time.sleep(delay)
                
                raise last_exception
            
            return wrapper
        
        if func is None:
            return decorator
        return decorator(func)
    
    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter."""
        import random
        delay = min(self.base_delay * (self.exponential_base ** (attempt - 1)),
                   self.max_delay)
        # Add jitter (±25%)
        jitter = delay * 0.25 * (2 * random.random() - 1)
        return delay + jitter

# Usage
retry_handler = RetryHandler(max_attempts=3, base_delay=2.0, max_delay=60.0)

@retry_handler.retry(exceptions=(ConnectionError, TimeoutError))
def call_external_api(data: dict) -> dict:
    """Example API call with retry."""
    import requests
    response = requests.post("https://api.example.com/process",
                            json=data, timeout=30)
    response.raise_for_status()
    return response.json()

Circuit Breaker Pattern

# circuit_breaker.py
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject calls
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 expected_exception: type = Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time: datetime | None = None
        self.state = CircuitState.CLOSED
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function through circuit breaker."""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset."""
        if self.last_failure_time is None:
            return True
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.recovery_timeout
    
    def _on_success(self):
        """Handle successful call."""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call."""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker opened after {self.failure_count} failures")

class CircuitOpenError(Exception):
    """Raised when circuit breaker is open."""
    pass

# Integration with error handler
def circuit_protected(func: Callable, service_name: str,
                     error_handler: ErrorHandler) -> Callable:
    """Decorator combining circuit breaker with error handling."""
    breaker = CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=60
    )
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return breaker.call(func, *args, **kwargs)
        except CircuitOpenError:
            error_handler.handle(
                AutomationError(f"Circuit open for {service_name}"),
                f"call_{service_name}",
                {"service": service_name, "circuit_state": "open"}
            )
            raise
        except Exception as e:
            error_handler.handle(e, f"call_{service_name}",
                               {"service": service_name})
            raise
    
    return wrapper

Graceful Degradation

# degradation.py
class DegradedMode:
    """Manage graceful degradation when AI services are unavailable."""
    
    def __init__(self, config_path: str = "degradation_config.json"):
        self.config = self._load_config(config_path)
        self.is_degraded = False
        self.fallback_rules = self._load_fallback_rules()
    
    def get_fallback(self, operation: str, context: dict) -> Any:
        """Return appropriate fallback for failed operation."""
        if operation == "analyze":
            return self._fallback_analysis(context)
        elif operation == "summarize":
            return self._fallback_summary(context)
        elif operation == "classify":
            return self._fallback_classification(context)
        else:
            return {"error": "No fallback available", "original_operation": operation}
    
    def _fallback_analysis(self, context: dict) -> dict:
        """Simple rule-based fallback for analysis."""
        # When AI unavailable, apply basic thresholds
        value = context.get("value", 0)
        threshold = context.get("threshold", 100)
        
        return {
            "analysis": "automated_threshold",
            "result": "anomaly" if value > threshold else "normal",
            "confidence": "low",
            "degraded_mode": True
        }
    
    def _fallback_summary(self, context: dict) -> dict:
        """Return minimal summary without AI."""
        return {
            "summary": f"Data contains {context.get('record_count', 0)} records",
            "confidence": "low",
            "degraded_mode": True
        }
    
    def _fallback_classification(self, context: dict) -> dict:
        """Return unknown classification without AI."""
        return {
            "category": "unknown",
            "confidence": 0.0,
            "degraded_mode": True
        }
EXERCISE

Implement error handling for an Ollama API call that retries on connection errors, falls back to a smaller model after 3 failures, and logs a critical alert if all models fail.