18. Error Handling
Chapter 18 of 22 · 25 min
Voice AI systems must handle hardware failures, network issues, model errors, and unexpected audio conditions gracefully.
Error Classification
from enum import Enum
from typing import Union
class VoiceErrorType(Enum):
AUDIO_CAPTURE = "audio_capture"
NETWORK = "network"
MODEL_INFERENCE = "model_inference"
RESOURCE_EXHAUSTION = "resource_exhaustion"
INVALID_INPUT = "invalid_input"
TIMEOUT = "timeout"
class VoicePipelineError(Exception):
def __init__(self, error_type: VoiceErrorType, message: str, recoverable: bool = True):
self.error_type = error_type
self.message = message
self.recoverable = recoverable
super().__init__(message)
def to_dict(self) -> dict:
return {
"type": self.error_type.value,
"message": self.message,
"recoverable": self.recoverable
}
# Specific error types
class AudioCaptureError(VoicePipelineError):
def __init__(self, message: str, device: str = None):
super().__init__(VoiceErrorType.AUDIO_CAPTURE, message)
self.device = device
class NetworkError(VoicePipelineError):
def __init__(self, message: str, retry_after: float = None):
super().__init__(VoiceErrorType.NETWORK, message, recoverable=True)
self.retry_after = retry_after or 1.0
class InferenceError(VoicePipelineError):
def __init__(self, message: str, model_name: str = None):
super().__init__(VoiceErrorType.MODEL_INFERENCE, message, recoverable=False)
self.model_name = model_name
class ResourceError(VoicePipelineError):
def __init__(self, message: str, resource_type: str):
super().__init__(VoiceErrorType.RESOURCE_EXHAUSTION, message, recoverable=True)
self.resource_type = resource_type
Error Handling Decorators
import functools
import asyncio
import logging
logger = logging.getLogger(__name__)
def retry_on_error(max_retries: int = 3, backoff_factor: float = 2.0):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except VoicePipelineError as e:
last_exception = e
if not e.recoverable or attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s: {e.message}")
await asyncio.sleep(wait_time)
raise last_exception
return wrapper
return decorator
class RetryHandler:
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
async def execute(self, func, *args, **kwargs):
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise
wait = self.backoff_factor ** attempt
logger.error(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait}s...")
await asyncio.sleep(wait)
Circuit Breaker Pattern
import time
from collections import deque
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: float = 30.0):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise NetworkError("Circuit breaker is open", retry_after=5.0)
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
logger.error("Circuit breaker opened due to repeated failures")
raise
def reset(self):
self.state = "closed"
self.failures = 0
self.last_failure_time = None
Graceful Degradation
class FallbackManager:
def __init__(self):
self.fallbacks = {
"primary_tts": ["secondary_tts", "silence"],
"primary_asr": ["fallback_asr"],
"primary_llm": ["smaller_llm"]
}
async def execute_with_fallback(self, task_name: str, primary_func, *args, **kwargs):
options = self.fallbacks.get(task_name, [primary_func])
last_error = None
for func in options:
try:
if callable(func):
return await func(*args, **kwargs)
elif func == "silence":
return self._generate_silence()
elif func == "secondary_tts":
return await self._secondary_tts(*args, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"{func.__name__ if hasattr(func, '__name__') else func} failed: {e}")
continue
raise InferenceError(f"All fallbacks exhausted for {task_name}: {last_error}")
def _generate_silence(self) -> bytes:
# Return ~1 second of silence
return bytes(16000 * 2) # 16kHz, 16-bit mono
Health Monitoring
import psutil
class HealthMonitor:
def __init__(self):
self.error_counts = {e: 0 for e in VoiceErrorType}
self.start_time = time.time()
def record_error(self, error_type: VoiceErrorType):
self.error_counts[error_type] += 1
def get_health_report(self) -> dict:
uptime = time.time() - self.start_time
return {
"uptime_seconds": uptime,
"errors": self.error_counts,
"memory_usage_mb": psutil.virtual_memory().percent,
"gpu_memory_mb": torch.cuda.memory_allocated() / 1024 / 1024 if torch.cuda.is_available() else 0,
"cpu_percent": psutil.cpu_percent(interval=0.1)
}
def is_healthy(self) -> bool:
memory_ok = psutil.virtual_memory().percent < 90
recent_errors = sum(self.error_counts.values()) < 100
return memory_ok and recent_errors
Error handling requires logging at appropriate levels, alerting on non-recoverable errors, and maintaining system state for debugging.
EXERCISE
Implement a retry decorator with exponential backoff for network calls. Add a circuit breaker that trips after 5 consecutive failures and test the behavior with simulated failures. Time: 15 minutes.