21. Performance Benchmarks

Chapter 21 of 22 · 25 min

Systematic benchmarking quantifies voice pipeline capabilities and identifies optimization opportunities.

Benchmark Framework

import time
import statistics
import psutil
import torch
from dataclasses import dataclass
from typing import Callable

@dataclass
class BenchmarkResult:
    name: str
    mean_ms: float
    std_ms: float
    p50_ms: float
    p95_ms: float
    p99_ms: float
    throughput: float  # items per second

class VoicePipelineBenchmark:
    def __init__(self, pipeline):
        self.pipeline = pipeline
        self.results = {}
    
    def benchmark_latency(
        self,
        func: Callable,
        inputs: list,
        iterations: int = 100,
        warmup: int = 10
    ) -> BenchmarkResult:
        # Warmup
        for _ in range(warmup):
            func(inputs[0])
        
        latencies = []
        for input_data in inputs * (iterations // len(inputs)):
            start = time.perf_counter()
            result = func(input_data)
            if torch.cuda.is_available():
                torch.cuda.synchronize()
            latencies.append((time.perf_counter() - start) * 1000)
        
        return self._compute_stats(func.__name__, latencies)
    
    def _compute_stats(self, name: str, latencies: list[float]) -> BenchmarkResult:
        sorted_latencies = sorted(latencies)
        return BenchmarkResult(
            name=name,
            mean_ms=statistics.mean(latencies),
            std_ms=statistics.stdev(latencies),
            p50_ms=sorted_latencies[len(sorted_latencies) // 2],
            p95_ms=sorted_latencies[int(len(sorted_latencies) * 0.95)],
            p99_ms=sorted_latencies[int(len(sorted_latencies) * 0.99)],
            throughput=1000 / statistics.mean(latencies)
        )
    
    def benchmark_memory(self, func: Callable, input_data) -> dict:
        if torch.cuda.is_available():
            torch.cuda.reset_peak_memory_stats()
            torch.cuda.empty_cache()
        
        initial_memory = psutil.virtual_memory().used
        
        func(input_data)
        
        if torch.cuda.is_available():
            peak_gpu_memory = torch.cuda.max_memory_allocated() / 1024**2
        else:
            peak_gpu_memory = 0
        
        final_memory = psutil.virtual_memory().used
        
        return {
            "system_memory_delta_mb": (final_memory - initial_memory) / 1024**2,
            "peak_gpu_memory_mb": peak_gpu_memory
        }

ASR Benchmark

class ASRBenchmark:
    def __init__(self, model):
        self.model = model
        self.test_audio = self._load_test_set()
    
    def _load_test_set(self) -> list[dict]:
        return [
            {"path": "test/audio/clean_1.wav", "expected": "hello world"},
            {"path": "test/audio/clean_2.wav", "expected": "how are you today"},
            # ... more test files
        ]
    
    def benchmark(self, iterations: int = 50) -> dict:
        results = []
        
        for audio_path in self.test_audio:
            audio = load_audio(audio_path["path"])
            
            # Measure transcription time
            start = time.perf_counter()
            transcription = self.model.transcribe(audio)
            if torch.cuda.is_available():
                torch.cuda.synchronize()
            elapsed = (time.perf_counter() - start) * 1000
            
            results.append({
                "file": audio_path["path"],
                "time_ms": elapsed,
                "transcription": transcription,
                "expected": audio_path["expected"],
                "wer": self._compute_wer(transcription, audio_path["expected"])
            })
        
        avg_latency = statistics.mean(r["time_ms"] for r in results)
        avg_wer = statistics.mean(r["wer"] for r in results)
        
        return {
            "avg_latency_ms": avg_latency,
            "avg_wer": avg_wer,  # Word Error Rate
            "max_latency_ms": max(r["time_ms"] for r in results),
            "detail": results
        }
    
    def _compute_wer(self, hypothesis: str, reference: str) -> float:
        hyp_tokens = hypothesis.lower().split()
        ref_tokens = reference.lower().split()
        
        # Simple Levenshtein distance
        d = [[0] * (len(ref_tokens) + 1) for _ in range(len(hyp_tokens) + 1)]
        
        for i in range(len(hyp_tokens) + 1):
            d[i][0] = i
        for j in range(len(ref_tokens) + 1):
            d[0][j] = j
        
        for i in range(1, len(hyp_tokens) + 1):
            for j in range(1, len(ref_tokens) + 1):
                if hyp_tokens[i-1] == ref_tokens[j-1]:
                    d[i][j] = d[i-1][j-1]
                else:
                    d[i][j] = 1 + min(d[i-1][j], d[i][j-1], d[i-1][j-1])
        
        return d[len(hyp_tokens)][len(ref_tokens)] / len(ref_tokens)

TTS Benchmark

class TTSBenchmark:
    def __init__(self, model):
        self.model = model
        self.test_texts = [
            "Short sentence.",
            "This is a medium length sentence with multiple words.",
            "A much longer sentence that contains many more words and should take longer to synthesize, especially when using more complex models with attention mechanisms.",
            " ".join(["word"] * 100)  # Very long text
        ]
    
    def benchmark(self, iterations: int = 20) -> dict:
        results = []
        
        for text in self.test_texts:
            latencies = []
            
            for _ in range(iterations):
                start = time.perf_counter()
                audio = self.model.synthesize(text)
                if torch.cuda.is_available():
                    torch.cuda.synchronize()
                latencies.append((time.perf_counter() - start) * 1000)
            
            results.append({
                "text_length": len(text),
                "word_count": len(text.split()),
                "audio_duration_s": len(audio) / 16000,
                "rtf": len(audio) / 16000 / (statistics.mean(latencies) / 1000),
                "latency_ms": statistics.mean(latencies)
            })
        
        return {
            "results": results,
            "avg_rtf": statistics.mean(r["rtf"] for r in results),
            "avg_latency_ms": statistics.mean(r["latency_ms"] for r in results)
        }

Full Pipeline Benchmark

class FullPipelineBenchmark:
    def __init__(self, pipeline):
        self.pipeline = pipeline
        self.test_cases = self._load_test_cases()
    
    def _load_test_cases(self) -> list[dict]:
        return [
            {"audio": "test/audio/question.wav", "type": "question"},
            {"audio": "test/audio/statement.wav", "type": "statement"},
            {"audio": "test/audio/complex.wav", "type": "complex"}
        ]
    
    def benchmark(self, iterations: int = 30) -> dict:
        results = []
        
        for test_case in self.test_cases:
            audio = load_audio(test_case["audio"])
            latencies = {"total": [], "asr": [], "llm": [], "tts": []}
            
            for _ in range(iterations):
                start = time.perf_counter()
                result = await self.pipeline.process(audio)
                if torch.cuda.is_available():
                    torch.cuda.synchronize()
                total_time = (time.perf_counter() - start) * 1000
                
                latencies["total"].append(total_time)
                latencies["asr"].append(result["asr_time_ms"])
                latencies["llm"].append(result["llm_time_ms"])
                latencies["tts"].append(result["tts_time_ms"])
            
            results.append({
                "type": test_case["type"],
                "total": statistics.mean(latencies["total"]),
                "asr": statistics.mean(latencies["asr"]),
                "llm": statistics.mean(latencies["llm"]),
                "tts": statistics.mean(latencies["tts"])
            })
        
        return {
            "average_total_ms": statistics.mean(r["total"] for r in results),
            "by_type": results,
            "p95_total_ms": statistics.mean([
                sorted([r["total"] for r in results])[int(len(results) * 0.95)]
            ])
        }

Benchmark Report

def generate_benchmark_report(benchmarks: dict) -> str:
    report = ["# Voice Pipeline Benchmark Report", ""]
    report.append(f"Date: {datetime.now().isoformat()}")
    report.append(f"System: {platform.platform()}")
    report.append(f"Python: {platform.python_version()}")
    report.append(f"PyTorch: {torch.__version__}")
    report.append("")
    
    if torch.cuda.is_available():
        report.append(f"GPU: {torch.cuda.get_device_name(0)}")
        report.append(f"CUDA: {torch.version.cuda}")
        report.append("")
    
    for name, result in benchmarks.items():
        report.append(f"## {name}")
        report.append(f"- Mean: {result.mean_ms:.2f}ms")
        report.append(f"- Std: {result.std_ms:.2f}ms")
        report.append(f"- P95: {result.p95_ms:.2f}ms")
        report.append(f"- Throughput: {result.throughput:.2f} req/s")
        report.append("")
    
    return "\n".join(report)
EXERCISE

Implement a benchmark suite for a Whisper ASR model that measures: (1) latency distribution over 100 iterations, (2) peak GPU memory, and (3) WER on a small test set. Generate a formatted report. Time: 15 minutes.