18. Production Pipeline
Chapter 18 of 22 · 20 min
Building a production RAG system means connecting retrieval, processing, and generation into a reliable pipeline with monitoring and error handling.
Pipeline Architecture
from dataclasses import dataclass
from typing import Optional
import logging
@dataclass
class PipelineConfig:
chunk_size: int = 500
overlap: int = 50
retrieval_top_k: int = 20
rerank_top_k: int = 10
compression_threshold: float = 0.7
max_context_tokens: int = 4000
class RAGPipeline:
def __init__(self, config: PipelineConfig):
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize components
self.embedder = EmbeddingModel()
self.vector_store = VectorStore()
self.reranker = Reranker()
self.compressor = ContextCompressor()
self.generator = LLMClient()
# Metrics
self.metrics = {
"retrieval_latency": [],
"total_latency": [],
"retrieval_errors": 0,
"generation_errors": 0
}
async def query(self, question: str, user_id: Optional[str] = None) -> dict:
"""Process a single query through the pipeline."""
start_time = time.time()
try:
# Stage 1: Embed query
query_embedding = self.embedder.encode(question)
# Stage 2: Retrieve candidates
retrieval_start = time.time()
candidates = self.vector_store.search(
query_embedding,
top_k=self.config.retrieval_top_k
)
retrieval_time = time.time() - retrieval_start
self.metrics["retrieval_latency"].append(retrieval_time)
if not candidates:
return {"answer": "I couldn't find relevant information.",
"sources": [],
"retrieval_time": retrieval_time}
# Stage 3: Re-rank
reranked = self.reranker.rerank(question, candidates)
# Stage 4: Compress context
compressed = self.compressor.compress(
question,
reranked,
max_tokens=self.config.max_context_tokens
)
# Stage 5: Generate answer
generation_start = time.time()
answer = self.generator.generate(
question=question,
context=compressed["context"],
sources=compressed["sources"]
)
total_time = time.time() - start_time
self.metrics["total_latency"].append(total_time)
return {
"answer": answer,
"sources": compressed["sources"],
"retrieval_time": retrieval_time,
"total_time": total_time
}
except Exception as e:
self.logger.error(f"Pipeline error: {e}")
if "embedding" in str(e).lower():
self.metrics["retrieval_errors"] += 1
else:
self.metrics["generation_errors"] += 1
raise
Error Handling and Fallbacks
class ResilientPipeline(RAGPipeline):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fallback_retriever = BM25Retriever()
async def query(self, question: str, user_id: Optional[str] = None) -> dict:
"""Query with fallback on vector retrieval failure."""
try:
return await super().query(question, user_id)
except VectorStoreError as e:
self.logger.warning(f"Vector store failed, using fallback: {e}")
# Fall back to keyword retrieval
candidates = self.fallback_retriever.search(question)
if not candidates:
return {"answer": "I couldn't find relevant information.",
"sources": [],
"fallback_used": True}
return {
"answer": self.generator.generate(question, candidates),
"sources": [c["source"] for c in candidates],
"fallback_used": True
}
Health Checks
def health_check(pipeline: RAGPipeline) -> dict:
"""Check pipeline health."""
checks = {}
# Check vector store
try:
pipeline.vector_store.ping()
checks["vector_store"] = "healthy"
except Exception as e:
checks["vector_store"] = f"unhealthy: {e}"
# Check embedder
try:
test_emb = pipeline.embedder.encode("health check")
checks["embedder"] = "healthy"
except Exception as e:
checks["embedder"] = f"unhealthy: {e}"
# Check LLM
try:
test_response = pipeline.generator.generate(
"Respond with OK", "test context")
checks["llm"] = "healthy"
except Exception as e:
checks["llm"] = f"unhealthy: {e}"
# Metrics summary
checks["avg_retrieval_latency"] = np.mean(pipeline.metrics["retrieval_latency"])
checks["avg_total_latency"] = np.mean(pipeline.metrics["total_latency"])
checks["retrieval_errors"] = pipeline.metrics["retrieval_errors"]
return checks
EXERCISE
Build a complete RAG pipeline with retrieval, reranking, compression, and generation. Add health checks and a fallback to keyword search when vector retrieval fails.