21. RAG Pipeline Optimization
Production RAG systems must balance quality, latency, and cost. This chapter covers systematic optimization strategies.
Latency Profiling
Before optimizing, measure where time is spent:
import time
from contextlib import contextmanager
@contextmanager
def timer(name: str):
start = time.time()
yield
elapsed = time.time() - start
print(f"{name}: {elapsed:.3f}s")
# Profile pipeline stages
with timer("Query rewriting"):
rewritten_query = rewriter.rewrite(user_query)
with timer("Dense retrieval"):
dense_results = vector_store.search(query_vector, top_k=50)
with timer("Sparse retrieval"):
sparse_results = bm25.search(tokenized_query, top_k=50)
with timer("Hybrid merge"):
merged_results = reciprocal_rank_fusion(dense_results, sparse_results)
with timer("Reranking"):
reranked = reranker.rank(rewritten_query, merged_results[:20])
with timer("Context assembly"):
context = assembler.assemble(reranked)
with timer("LLM generation"):
response = llm.generate(prompt, context)
# Output:
# Query rewriting: 0.045s
# Dense retrieval: 0.128s
# Sparse retrieval: 0.089s
# Hybrid merge: 0.012s
# Reranking: 0.287s
# Context assembly: 0.034s
# LLM generation: 1.423s
Reranking (0.287s) and LLM generation (1.423s) dominate total latency. Optimize these first. Query rewriting and merge operations are negligible.
Caching Strategies
Identical queries return identical results. Cache retrieval results:
from functools import lru_cache
@lru_cache(maxsize=10000)
def cached_vector_search(query_hash: str, k: int):
"""Cache vector search results by query hash."""
return vector_store.search(decoded_query, top_k=k)
# Generate stable query hash
import hashlib
def get_cached_retrieval(query: str, top_k: int):
query_hash = hashlib.sha256(query.encode()).hexdigest()
return cached_vector_search(query_hash, top_k)
Note: Only cache for read-heavy workloads where documents change infrequently. For frequently updated content, use TTL-based caching with cache invalidation on document updates.
Query Parallelization
Parallel retrieval stages execute simultaneously:
import asyncio
async def parallel_retrieval(query: str, top_k: int):
# Run dense and sparse retrieval concurrently
dense_future = asyncio.to_thread(dense_search, query, top_k)
sparse_future = asyncio.to_thread(sparse_search, query, top_k)
dense_results, sparse_results = await asyncio.gather(
dense_future, sparse_future
)
return merge_results(dense_results, sparse_results)
# Execution time: max(dense_time, sparse_time) instead of sum
Parallelization reduces retrieval latency by 30-40% versus sequential execution.
Model Selection for Latency
LLM model choice affects both latency and cost:
model_comparison = {
"gpt-4-turbo": {"latency_p50": 2.1, "cost_per_1k": 0.03},
"gpt-4o-mini": {"latency_p50": 0.8, "cost_per_1k": 0.0006},
# Smaller models have faster responses but lower quality
}
# Use cascading approach: fast model first, escalate if confidence low
async def cascading_generation(prompt: str, context: str) -> dict:
# Fast model handles 80% of queries
fast_response = await llm.generate(
model="gpt-4o-mini",
prompt=prompt,
context=context
)
# Escalate to slower model if fast model expresses uncertainty
if fast_response.confidence < 0.7:
slow_response = await llm.generate(
model="gpt-4-turbo",
prompt=prompt,
context=context
)
return slow_response
return fast_response
Cascading reduces average latency by 40-50% with minimal quality degradation.
Batch Processing for Index Updates
Indexing new documents should not block serving. Use batch updates during off-peak hours:
from your_rag_library import BatchIndexing
batch_indexer = BatchIndexing(
batch_size=500,
schedule="off-peak", # Index during low-traffic windows
vector_store=vector_store
)
# Queue documents for background indexing
batch_indexer.queue(documents=new_documents)
# Documents become searchable within 5-30 minutes depending on size
# System continues serving with existing index during updating
Profile your pipeline latency for 100 consecutive queries. Calculate where time is spent at each stage. Implement query parallelization and measure latency improvement. Target reducing total latency by 25%.