KEY INSIGHT
A production RAG pipeline integrates indexing, retrieval, optimization, and evaluation as a cohesive system; each stage communicates metrics to inform improvements in other stages.
### Project Overview
This chapter assembles a production-grade RAG pipeline combining techniques from Chapters 13-23. The pipeline evaluates itself on defined metrics, surfaces failures, and can be iteratively improved.
### Full Pipeline Architecture
```
User Query
│
├─ Query Rewriting (Chapter 13)
├─ Query Decomposition (Chapter 14)
│ └─ Parallel Sub-Query Retrieval
│ └─ Context Optimization (Chapter 15)
│ ├─ Dedup + Relevance Filter
│ ├─ Dynamic Context Window (Chapter 16)
│ └─ Context Compression (Chapter 17)
│ └─ Answer Generation
│ │
│ ├─ RAGAS Faithfulness (Ch. 21)
│ ├─ RAGAS Answer Relevance (Ch. 22)
│ └─ RAGAS Context Precision (Ch. 23)
│
Caching Layer (Chapter 20) ←→ Pipeline Optimization (Chapter 19)
Multi-Modal Handler (Ch. 18) for image/table sources
```
### Complete Implementation
```python
import os
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
@dataclass
class ProductionRAGConfig:
similarityThreshold: float = 0.90
maxContextTokens: int = 4096
tokenReserve: int = 512
compressionEnabled: bool = True
cacheEnabled: bool = True
evaluationEnabled: bool = True
class ProductionRAGPipeline:
def __init__(self, config: ProductionRAGConfig):
self.config = config
self.vectorStore = VectorStore()
self.semanticCache = SemanticCache(config.similarityThreshold)
self.evaluator = RAGASEvaluator() if config.evaluationEnabled else None
def query(
self,
question: str,
returnMetrics: bool = True
) -> dict:
# Step 1: Cache check
cacheHit = None
if self.config.cacheEnabled:
queryEmbedding = embed_texts([question])[0]
cacheHit, found = self.semanticCache.get(question, queryEmbedding)
if found:
return {
"answer": cacheHit,
"cache_hit": True,
"metrics": {}
}
# Step 2: Rewrite
rewrites = rewrite_query(question)
# Step 3: Decompose
subQueries = decompose_query(question)
allSubQueries = rewrites + subQueries
# Step 4: Parallel retrieval across sub-queries
retrieved = parallelRetrieve(allSubQueries, top_k=10)
# Step 5: Context optimization
optimized = optimizeContext(retrieved, top_k=10)
# Step 6: Dynamic window allocation
budget = self.config.maxContextTokens - self.config.tokenReserve
windowedChunks = trimToTokenBudget(optimized, budget)
# Step 7: Compression
if self.config.compressionEnabled:
context = "\n\n".join(
compressChunk(c["text"], question)["content"]
if isinstance(compressChunk(c["text"], question), dict)
else c["text"]
for c in windowedChunks
)
else:
context = "\n\n".join(c["text"] for c in windowedChunks)
# Step 8: Generate
answer = generateAnswer(context, question)
# Step 9: Cache result
if self.config.cacheEnabled:
self.semanticCache.set(question, queryEmbedding, answer)
# Step 10: Evaluate (offline-only in production)
metrics = {}
if self.config.evaluationEnabled and self.evaluator:
faithfulness = computeFaithfulness(answer, context)
relevance = computeAnswerRelevance(question, answer, embedModel)
precision = computeContextPrecision(optimized, {}, k=10)
metrics = {
"faithfulness": faithfulness,
"answer_relevance": relevance,
"context_precision": precision,
"timestamp": datetime.utcnow().isoformat()
}
return {
"answer": answer,
"cache_hit": False,
"metrics": metrics
}
def healthCheck(self) -> dict:
# Verify vector store connectivity
storeStatus = "healthy" if self.vectorStore.isConnected() else "degraded"
# Verify cache size
cacheSize = len(self.semanticCache.cache)
cacheMaxAge = max(
(datetime.utcnow() - v["timestamp"]).seconds
for v in self.semanticCache.cache.values()
) if self.semanticCache.cache else 0
return {
"status": storeStatus,
"cache_size": cacheSize,
"oldest_cache_entry_age_seconds": cacheMaxAge,
"config": {
"compression": self.config.compressionEnabled,
"caching": self.config.cacheEnabled,
"evaluation": self.config.evaluationEnabled
}
}
```
### Operational Monitoring
```python
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("production_rag")
class MonitoredPipeline(ProductionRAGPipeline):
def query(self, question: str, returnMetrics: bool = True) -> dict:
start = time.perf_counter()
result = super().query(question, returnMetrics)
elapsed = time.perf_counter() - start
logger.info(f"query={question[:50]}... elapsed={elapsed*1000:.0f}ms "
f"cache_hit={result['cache_hit']}")
if result["metrics"]:
logger.info(
f"metrics=faithfulness:{result['metrics']['faithfulness']['faithfulness_score']:.2f} "
f"relevance:{result['metrics']['answer_relevance']['relevance_score']:.2f} "
f"precision:{result['metrics']['context_precision']['precision']:.2f}"
)
return result
```
### Regression Testing
```python
def regressionTest(pipeline: ProductionRAGPipeline, testSet: list[dict]) -> dict:
"""
Run the pipeline against a known test set and check for regressions.
"""
thresholds = {
"faithfulness": 0.70,
"relevance": 0.75,
"precision": 0.50
}
results = []
regressions = []
for item in testSet:
result = pipeline.query(item["question"])
# Check for regressions against stored baseline
baseline = getBaseline(item["question_id"])
if baseline:
for metric in ["faithfulness", "relevance", "precision"]:
if result["metrics"].get(metric):
current = result["metrics"][metric].get(
f"{metric}_score" if metric != "precision" else "precision",
result["metrics"][metric].get("score", 0)
)
if current < thresholds[metric]:
regressions.append({
"question_id": item["question_id"],
"metric": metric,
"current": current,
"threshold": thresholds[metric],
"baseline": baseline[metric]
})
results.append(result)
return {
"total_queries": len(results),
"regression_count": len(regressions),
"regressions": regressions,
"results": results
}
```
### Deployment Checklist
1. **Vector store capacity**: Estimate storage at ~768–1536 bytes per embedding at 1M chunks.
2. **Rate limiting**: Add per-user and per-endpoint rate limits to avoid 429 errors from LLM API calls.
3. **Graceful degradation**: If the embedding service is down, fall back to BM25 keyword search.
4. **Cache eviction on corpus update**: Version corpus and invalidate cache entries that predate the version.
5. **Evaluation cadence**: Run RAGAS evaluation every N queries (e.g., every 100) rather than real-time; batch evaluation reduces API cost.
### Failure Modes for Production
Production pipelines fail silently when the vector store returns zero results—the system generates an answer from no context, which often looks plausible. Instrument an explicit zero-results guard.
```python
if not retrieved:
return {"answer": "No relevant information found.", "metrics": {}, "cache_hit": False}
```
Monitoring that measures latency but not answer quality will miss regressions where the right answer is generated efficiently from wrong context. Always couple latency SLOs with weeklyFaithfulness + AnswerRelevance sampling audits.