18. Error Handling

Chapter 18 of 22 · 25 min

Voice AI systems must handle hardware failures, network issues, model errors, and unexpected audio conditions gracefully.

Error Classification

from enum import Enum
from typing import Union

class VoiceErrorType(Enum):
    AUDIO_CAPTURE = "audio_capture"
    NETWORK = "network"
    MODEL_INFERENCE = "model_inference"
    RESOURCE_EXHAUSTION = "resource_exhaustion"
    INVALID_INPUT = "invalid_input"
    TIMEOUT = "timeout"

class VoicePipelineError(Exception):
    def __init__(self, error_type: VoiceErrorType, message: str, recoverable: bool = True):
        self.error_type = error_type
        self.message = message
        self.recoverable = recoverable
        super().__init__(message)
    
    def to_dict(self) -> dict:
        return {
            "type": self.error_type.value,
            "message": self.message,
            "recoverable": self.recoverable
        }

# Specific error types
class AudioCaptureError(VoicePipelineError):
    def __init__(self, message: str, device: str = None):
        super().__init__(VoiceErrorType.AUDIO_CAPTURE, message)
        self.device = device

class NetworkError(VoicePipelineError):
    def __init__(self, message: str, retry_after: float = None):
        super().__init__(VoiceErrorType.NETWORK, message, recoverable=True)
        self.retry_after = retry_after or 1.0

class InferenceError(VoicePipelineError):
    def __init__(self, message: str, model_name: str = None):
        super().__init__(VoiceErrorType.MODEL_INFERENCE, message, recoverable=False)
        self.model_name = model_name

class ResourceError(VoicePipelineError):
    def __init__(self, message: str, resource_type: str):
        super().__init__(VoiceErrorType.RESOURCE_EXHAUSTION, message, recoverable=True)
        self.resource_type = resource_type

Error Handling Decorators

import functools
import asyncio
import logging

logger = logging.getLogger(__name__)

def retry_on_error(max_retries: int = 3, backoff_factor: float = 2.0):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except VoicePipelineError as e:
                    last_exception = e
                    if not e.recoverable or attempt == max_retries - 1:
                        raise
                    
                    wait_time = backoff_factor ** attempt
                    logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s: {e.message}")
                    await asyncio.sleep(wait_time)
            
            raise last_exception
        return wrapper
    return decorator

class RetryHandler:
    def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
    
    async def execute(self, func, *args, **kwargs):
        for attempt in range(self.max_retries):
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                wait = self.backoff_factor ** attempt
                logger.error(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait}s...")
                await asyncio.sleep(wait)

Circuit Breaker Pattern

import time
from collections import deque

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: float = 30.0):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half-open"
            else:
                raise NetworkError("Circuit breaker is open", retry_after=5.0)
        
        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            
            if self.failures >= self.failure_threshold:
                self.state = "open"
                logger.error("Circuit breaker opened due to repeated failures")
            
            raise
    
    def reset(self):
        self.state = "closed"
        self.failures = 0
        self.last_failure_time = None

Graceful Degradation

class FallbackManager:
    def __init__(self):
        self.fallbacks = {
            "primary_tts": ["secondary_tts", "silence"],
            "primary_asr": ["fallback_asr"],
            "primary_llm": ["smaller_llm"]
        }
    
    async def execute_with_fallback(self, task_name: str, primary_func, *args, **kwargs):
        options = self.fallbacks.get(task_name, [primary_func])
        
        last_error = None
        for func in options:
            try:
                if callable(func):
                    return await func(*args, **kwargs)
                elif func == "silence":
                    return self._generate_silence()
                elif func == "secondary_tts":
                    return await self._secondary_tts(*args, **kwargs)
            except Exception as e:
                last_error = e
                logger.warning(f"{func.__name__ if hasattr(func, '__name__') else func} failed: {e}")
                continue
        
        raise InferenceError(f"All fallbacks exhausted for {task_name}: {last_error}")
    
    def _generate_silence(self) -> bytes:
        # Return ~1 second of silence
        return bytes(16000 * 2)  # 16kHz, 16-bit mono

Health Monitoring

import psutil

class HealthMonitor:
    def __init__(self):
        self.error_counts = {e: 0 for e in VoiceErrorType}
        self.start_time = time.time()
    
    def record_error(self, error_type: VoiceErrorType):
        self.error_counts[error_type] += 1
    
    def get_health_report(self) -> dict:
        uptime = time.time() - self.start_time
        
        return {
            "uptime_seconds": uptime,
            "errors": self.error_counts,
            "memory_usage_mb": psutil.virtual_memory().percent,
            "gpu_memory_mb": torch.cuda.memory_allocated() / 1024 / 1024 if torch.cuda.is_available() else 0,
            "cpu_percent": psutil.cpu_percent(interval=0.1)
        }
    
    def is_healthy(self) -> bool:
        memory_ok = psutil.virtual_memory().percent < 90
        recent_errors = sum(self.error_counts.values()) < 100
        return memory_ok and recent_errors

Error handling requires logging at appropriate levels, alerting on non-recoverable errors, and maintaining system state for debugging.

EXERCISE

Implement a retry decorator with exponential backoff for network calls. Add a circuit breaker that trips after 5 consecutive failures and test the behavior with simulated failures. Time: 15 minutes.