19. Performance Benchmarking
Chapter 19 of 24 · 15 min
Benchmarking establishes baseline performance metrics and validates that systems meet latency and throughput requirements. Without benchmarks, regressions go undetected until production incidents occur.
The benchmark suite structure:
import time
import statistics
import asyncio
from concurrent.futures import ThreadPoolExecutor
class RAGBenchmarkSuite:
def __init__(self, vector_client, llm_endpoint: str):
self.vector = vector_client
self.llm_endpoint = llm_endpoint
def benchmark_vector_search(
self,
query: str,
k: int = 10,
iterations: int = 1000
) -> dict:
"""Measure vector search latency distribution"""
embedding = self._embed_query(query)
latencies = []
for _ in range(iterations):
start = time.perf_counter()
results = self.vector.search(
collection="chunks",
query_vector=embedding,
limit=k
)
elapsed = (time.perf_counter() - start) * 1000
latencies.append(elapsed)
return {
"iterations": iterations,
"avg_ms": statistics.mean(latencies),
"p50_ms": statistics.quantiles(latencies, n=100)[50],
"p95_ms": statistics.quantiles(latencies, n=100)[95],
"p99_ms": statistics.quantiles(latencies, n=100)[99],
"min_ms": min(latencies),
"max_ms": max(latencies)
}
def benchmark_e2e_rag(self, queries: list[str], concurrency: int = 10) -> dict:
"""End-to-end RAG pipeline benchmark with concurrency"""
async def single_query_latency(q: str) -> float:
start = time.perf_counter()
# Retrieval
embedding = self._embed_query(q)
retrieval_results = await self.vector.search_async(
collection="chunks",
query_vector=embedding,
limit=5
)
retrieval_ms = (time.perf_counter() - start) * 1000
# Augmentation
context = "\n".join([r.payload for r in retrieval_results])
prompt = f"Context: {context}\n\nQuestion: {q}"
# Generation (mock)
gen_start = time.perf_counter()
# response = openai.ChatCompletion.create(...) # Real call
generation_ms = (time.perf_counter() - gen_start) * 1000
return {
"retrieval_ms": retrieval_ms,
"generation_ms": generation_ms,
"total_ms": (time.perf_counter() - start) * 1000
}
async def run_concurrent_queries():
tasks = [single_query_latency(q) for q in queries]
return await asyncio.gather(*tasks)
results = asyncio.run(run_concurrent_queries())
return {
"concurrency": concurrency,
"total_queries": len(queries),
"throughput_qps": len(queries) / max(
max(r["total_ms"] for r in results) / 1000, 0.001
),
"avg_retrieval_ms": statistics.mean(r["retrieval_ms"] for r in results),
"avg_generation_ms": statistics.mean(r["generation_ms"] for r in results),
"avg_total_ms": statistics.mean(r["total_ms"] for r in results)
}
Failure Modes:
- Cold-start benchmarks: First query after deployment always shows anomalous latency. Always pre-warm with 100+ queries before recording metrics.
- Hardware variance: Cloud VM performance varies by host. Run benchmarks against production-type instances, not shared development hardware.
- Connection pooling: Creating new connections per query inflates latency 10-100x. Always use persistent connection pools.
- Measuring non-representative queries: Synthetic queries may not reflect production distribution. Collect anonymized real queries for benchmarking.
Store benchmark results in a time-series database for trend analysis across deployments.
EXERCISE
Write a benchmark that measures vector search p99 latency across different k values (1, 5, 10, 50). Graph the relationship between result set size and latency.