22. Part 2 Final Project
Chapter 22 of 22 · 25 min
Build a complete production RAG system that handles multi-hop queries, uses re-ranking, compresses context, and includes monitoring.
Project Requirements
Build a RAG system with the following components:
- Multi-hop retrieval - Handle questions requiring 2+ hops
- Re-ranking - Use cross-encoder to reorder top-20 results
- Context compression - Reduce context to relevant sentences
- Caching - Cache embeddings with semantic fallback
- Monitoring - Track retrieval quality and latency
- A/B testing - Compare two retrieval strategies
Dataset
Use the TechQA dataset or create your own with 100 questions requiring multi-hop reasoning. Each question should need information from 2+ documents to answer.
Implementation
# main.py - Complete RAG pipeline
import asyncio
from dataclasses import dataclass
from typing import Optional
import hashlib
# Configuration
@dataclass
class Config:
chunk_size: int = 500
overlap: int = 50
retrieval_top_k: int = 20
rerank_top_k: int = 10
max_hops: int = 2
compression_threshold: float = 0.5
cache_ttl_seconds: int = 3600
class ProductionRAG:
"""Complete production RAG system."""
def __init__(self, config: Config = None):
self.config = config or Config()
# Initialize components
self.vector_store = MilvusVectorStore() # or your choice
self.embedder = SentenceTransformerEmbedder()
self.reranker = CrossEncoderReranker()
self.cache = RedisSemanticCache()
self.monitor = QualityMonitor()
# A/B test
self.ab_tester = ABTester(Experiment(
name="retrieval_strategy",
variants=["baseline", "reranked"],
weights=[0.5, 0.5]
))
async def query(self, question: str, user_id: str) -> dict:
"""Query with full pipeline."""
# Get variant for A/B test
variant = self.ab_tester.get_variant(user_id)
# Multi-hop retrieval
if variant == "reranked":
context_chunks = await self.multi_hop_reranked(question)
else:
context_chunks = await self.multi_hop_baseline(question)
# Compress context
compressed = self.compress_context(question, context_chunks)
# Generate answer
answer = self.generator.generate(question, compressed)
# Record metrics
self.monitor.record(RetrievalMetrics(
query=question,
expected_docs=[], # Fill in for eval set
retrieved_docs=[c["id"] for c in context_chunks],
latencies={}
))
return {
"answer": answer,
"sources": [c["source"] for c in context_chunks],
"variant": variant,
"hops_used": len(context_chunks) // 3 # Approximate
}
async def multi_hop_reranked(self, question: str) -> list:
"""Multi-hop with re-ranking at each step."""
all_chunks = []
current_question = question
for hop in range(self.config.max_hops):
# Check cache first
cached = self.cache.get_similar(current_question)
if cached:
all_chunks.extend(cached)
current_question = self.refine_question(question, all_chunks)
continue
# Retrieve candidates
query_emb = self.embedder.encode(current_question)
candidates = self.vector_store.search(query_emb, k=20)
# Re-rank
reranked = self.reranker.rerank(current_question, candidates, k=10)
# Cache results
self.cache.store(current_question, reranked)
all_chunks.extend(reranked)
# Refine question for next hop
current_question = self.generate_subquery(question, reranked)
return self.deduplicate_chunks(all_chunks)
async def multi_hop_baseline(self, question: str) -> list:
"""Multi-hop baseline without re-ranking."""
all_chunks = []
current_question = question
for hop in range(self.config.max_hops):
query_emb = self.embedder.encode(current_question)
chunks = self.vector_store.search(query_emb, k=10)
all_chunks.extend(chunks)
current_question = self.generate_subquery(question, chunks)
return self.deduplicate_chunks(all_chunks)
def compress_context(self, question: str, chunks: list) -> str:
"""Compress chunks to most relevant content."""
# Sentence-level extraction
all_sentences = []
sentence_sources = []
for chunk in chunks:
sentences = chunk["text"].split(". ")
all_sentences.extend(sentences)
sentence_sources.extend([chunk["id"]] * len(sentences))
# Score sentences
question_emb = self.embedder.encode(question)
sentence_embs = self.embedder.encode(all_sentences)
similarities = cosine_similarity([question_emb], sentence_embs)[0]
# Select top sentences up to threshold
threshold = self.config.compression_threshold
top_indices = np.argsort(similarities)[-20:] # Top 20 sentences
selected = [all_sentences[i] for i in sorted(top_indices)]
return ". ".join(selected)
def deduplicate_chunks(self, chunks: list) -> list:
"""Remove duplicate chunks."""
seen_ids = set()
unique = []
for chunk in chunks:
if chunk["id"] not in seen_ids:
seen_ids.add(chunk["id"])
unique.append(chunk)
return unique
def refine_question(self, original: str, context: list) -> str:
"""Generate refined query for next hop."""
context_text = " ".join([c["text"] for c in context[:5]])
response = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Given the original question and retrieved context, generate a new search query to find missing information. Be specific."},
{"role": "user", "content": f"Original: {original}\nContext: {context_text}\nNew query:"}
]
)
return response.choices[0].message.content
def generate_subquery(self, original: str, context: list) -> str:
"""Generate subquery for next hop."""
# Simplified version
return f"{original} more details"
Evaluation Script
# evaluate.py
async def evaluate_system(system: ProductionRAG, test_set: list) -> dict:
"""Evaluate the complete system."""
results = []
for item in test_set:
result = await system.query(item["question"], f"user_{len(results)}")
# Check answer quality
is_correct = check_answer(item["expected_answer"], result["answer"])
# Check retrieval quality
retrieved_ids = result.get("retrieved_doc_ids", [])
results.append({
"question": item["question"],
"answer_correct": is_correct,
"variant": result["variant"],
"hops": result["hops_used"]
})
# A/B comparison
baseline_results = [r for r in results if r["variant"] == "baseline"]
reranked_results = [r for r in results if r["variant"] == "reranked"]
return {
"overall_accuracy": sum(r["answer_correct"] for r in results) / len(results),
"baseline_accuracy": sum(r["answer_correct"] for r in baseline_results) / len(baseline_results),
"reranked_accuracy": sum(r["answer_correct"] for r in reranked_results) / len(reranked_results),
"avg_hops_used": np.mean([r["hops"] for r in results]),
"per_question": results
}
Success Criteria
Your implementation is complete when:
- Multi-hop queries return context from 2+ documents
- Re-ranked results score higher on precision than baseline
- Context compression reduces token count by 50%+ while preserving answer accuracy
- Cache hit rate exceeds 20% for repeated queries
- A/B test shows statistically significant difference (p < 0.05) between variants
- Latency P99 stays under 2 seconds for end-to-end query
EXERCISE
Run your complete pipeline on 100 test queries. Report accuracy, cache hit rate, compression ratio, and A/B test results. Identify the top 5 queries where the system fails and diagnose why.