21. RAG Pipeline Optimization

Chapter 21 of 22 · 25 min

Production RAG systems must balance quality, latency, and cost. This chapter covers systematic optimization strategies.

Latency Profiling

Before optimizing, measure where time is spent:

import time
from contextlib import contextmanager

@contextmanager
def timer(name: str):
    start = time.time()
    yield
    elapsed = time.time() - start
    print(f"{name}: {elapsed:.3f}s")

# Profile pipeline stages
with timer("Query rewriting"):
    rewritten_query = rewriter.rewrite(user_query)

with timer("Dense retrieval"):
    dense_results = vector_store.search(query_vector, top_k=50)

with timer("Sparse retrieval"):
    sparse_results = bm25.search(tokenized_query, top_k=50)

with timer("Hybrid merge"):
    merged_results = reciprocal_rank_fusion(dense_results, sparse_results)

with timer("Reranking"):
    reranked = reranker.rank(rewritten_query, merged_results[:20])

with timer("Context assembly"):
    context = assembler.assemble(reranked)

with timer("LLM generation"):
    response = llm.generate(prompt, context)

# Output:
# Query rewriting: 0.045s
# Dense retrieval: 0.128s
# Sparse retrieval: 0.089s
# Hybrid merge: 0.012s
# Reranking: 0.287s
# Context assembly: 0.034s
# LLM generation: 1.423s

Reranking (0.287s) and LLM generation (1.423s) dominate total latency. Optimize these first. Query rewriting and merge operations are negligible.

Caching Strategies

Identical queries return identical results. Cache retrieval results:

from functools import lru_cache

@lru_cache(maxsize=10000)
def cached_vector_search(query_hash: str, k: int):
    """Cache vector search results by query hash."""
    return vector_store.search(decoded_query, top_k=k)

# Generate stable query hash
import hashlib

def get_cached_retrieval(query: str, top_k: int):
    query_hash = hashlib.sha256(query.encode()).hexdigest()
    return cached_vector_search(query_hash, top_k)

Note: Only cache for read-heavy workloads where documents change infrequently. For frequently updated content, use TTL-based caching with cache invalidation on document updates.

Query Parallelization

Parallel retrieval stages execute simultaneously:

import asyncio

async def parallel_retrieval(query: str, top_k: int):
    # Run dense and sparse retrieval concurrently
    dense_future = asyncio.to_thread(dense_search, query, top_k)
    sparse_future = asyncio.to_thread(sparse_search, query, top_k)
    
    dense_results, sparse_results = await asyncio.gather(
        dense_future, sparse_future
    )
    
    return merge_results(dense_results, sparse_results)

# Execution time: max(dense_time, sparse_time) instead of sum

Parallelization reduces retrieval latency by 30-40% versus sequential execution.

Model Selection for Latency

LLM model choice affects both latency and cost:

model_comparison = {
    "gpt-4-turbo": {"latency_p50": 2.1, "cost_per_1k": 0.03},
    "gpt-4o-mini": {"latency_p50": 0.8, "cost_per_1k": 0.0006},
    # Smaller models have faster responses but lower quality
}

# Use cascading approach: fast model first, escalate if confidence low
async def cascading_generation(prompt: str, context: str) -> dict:
    # Fast model handles 80% of queries
    fast_response = await llm.generate(
        model="gpt-4o-mini",
        prompt=prompt,
        context=context
    )
    
    # Escalate to slower model if fast model expresses uncertainty
    if fast_response.confidence < 0.7:
        slow_response = await llm.generate(
            model="gpt-4-turbo",
            prompt=prompt,
            context=context
        )
        return slow_response
    
    return fast_response

Cascading reduces average latency by 40-50% with minimal quality degradation.

Batch Processing for Index Updates

Indexing new documents should not block serving. Use batch updates during off-peak hours:

from your_rag_library import BatchIndexing

batch_indexer = BatchIndexing(
    batch_size=500,
    schedule="off-peak",  # Index during low-traffic windows
    vector_store=vector_store
)

# Queue documents for background indexing
batch_indexer.queue(documents=new_documents)

# Documents become searchable within 5-30 minutes depending on size
# System continues serving with existing index during updating
EXERCISE

Profile your pipeline latency for 100 consecutive queries. Calculate where time is spent at each stage. Implement query parallelization and measure latency improvement. Target reducing total latency by 25%.