22. Part 2 Final Project

Chapter 22 of 22 · 25 min

Build a complete production RAG system that handles multi-hop queries, uses re-ranking, compresses context, and includes monitoring.

Project Requirements

Build a RAG system with the following components:

  1. Multi-hop retrieval - Handle questions requiring 2+ hops
  2. Re-ranking - Use cross-encoder to reorder top-20 results
  3. Context compression - Reduce context to relevant sentences
  4. Caching - Cache embeddings with semantic fallback
  5. Monitoring - Track retrieval quality and latency
  6. A/B testing - Compare two retrieval strategies

Dataset

Use the TechQA dataset or create your own with 100 questions requiring multi-hop reasoning. Each question should need information from 2+ documents to answer.

Implementation

# main.py - Complete RAG pipeline

import asyncio
from dataclasses import dataclass
from typing import Optional
import hashlib

# Configuration
@dataclass
class Config:
    chunk_size: int = 500
    overlap: int = 50
    retrieval_top_k: int = 20
    rerank_top_k: int = 10
    max_hops: int = 2
    compression_threshold: float = 0.5
    cache_ttl_seconds: int = 3600

class ProductionRAG:
    """Complete production RAG system."""
    
    def __init__(self, config: Config = None):
        self.config = config or Config()
        
        # Initialize components
        self.vector_store = MilvusVectorStore()  # or your choice
        self.embedder = SentenceTransformerEmbedder()
        self.reranker = CrossEncoderReranker()
        self.cache = RedisSemanticCache()
        self.monitor = QualityMonitor()
        
        # A/B test
        self.ab_tester = ABTester(Experiment(
            name="retrieval_strategy",
            variants=["baseline", "reranked"],
            weights=[0.5, 0.5]
        ))
    
    async def query(self, question: str, user_id: str) -> dict:
        """Query with full pipeline."""
        
        # Get variant for A/B test
        variant = self.ab_tester.get_variant(user_id)
        
        # Multi-hop retrieval
        if variant == "reranked":
            context_chunks = await self.multi_hop_reranked(question)
        else:
            context_chunks = await self.multi_hop_baseline(question)
        
        # Compress context
        compressed = self.compress_context(question, context_chunks)
        
        # Generate answer
        answer = self.generator.generate(question, compressed)
        
        # Record metrics
        self.monitor.record(RetrievalMetrics(
            query=question,
            expected_docs=[],  # Fill in for eval set
            retrieved_docs=[c["id"] for c in context_chunks],
            latencies={}
        ))
        
        return {
            "answer": answer,
            "sources": [c["source"] for c in context_chunks],
            "variant": variant,
            "hops_used": len(context_chunks) // 3  # Approximate
        }
    
    async def multi_hop_reranked(self, question: str) -> list:
        """Multi-hop with re-ranking at each step."""
        
        all_chunks = []
        current_question = question
        
        for hop in range(self.config.max_hops):
            # Check cache first
            cached = self.cache.get_similar(current_question)
            if cached:
                all_chunks.extend(cached)
                current_question = self.refine_question(question, all_chunks)
                continue
            
            # Retrieve candidates
            query_emb = self.embedder.encode(current_question)
            candidates = self.vector_store.search(query_emb, k=20)
            
            # Re-rank
            reranked = self.reranker.rerank(current_question, candidates, k=10)
            
            # Cache results
            self.cache.store(current_question, reranked)
            
            all_chunks.extend(reranked)
            
            # Refine question for next hop
            current_question = self.generate_subquery(question, reranked)
        
        return self.deduplicate_chunks(all_chunks)
    
    async def multi_hop_baseline(self, question: str) -> list:
        """Multi-hop baseline without re-ranking."""
        
        all_chunks = []
        current_question = question
        
        for hop in range(self.config.max_hops):
            query_emb = self.embedder.encode(current_question)
            chunks = self.vector_store.search(query_emb, k=10)
            all_chunks.extend(chunks)
            
            current_question = self.generate_subquery(question, chunks)
        
        return self.deduplicate_chunks(all_chunks)
    
    def compress_context(self, question: str, chunks: list) -> str:
        """Compress chunks to most relevant content."""
        
        # Sentence-level extraction
        all_sentences = []
        sentence_sources = []
        
        for chunk in chunks:
            sentences = chunk["text"].split(". ")
            all_sentences.extend(sentences)
            sentence_sources.extend([chunk["id"]] * len(sentences))
        
        # Score sentences
        question_emb = self.embedder.encode(question)
        sentence_embs = self.embedder.encode(all_sentences)
        
        similarities = cosine_similarity([question_emb], sentence_embs)[0]
        
        # Select top sentences up to threshold
        threshold = self.config.compression_threshold
        top_indices = np.argsort(similarities)[-20:]  # Top 20 sentences
        
        selected = [all_sentences[i] for i in sorted(top_indices)]
        
        return ". ".join(selected)
    
    def deduplicate_chunks(self, chunks: list) -> list:
        """Remove duplicate chunks."""
        seen_ids = set()
        unique = []
        
        for chunk in chunks:
            if chunk["id"] not in seen_ids:
                seen_ids.add(chunk["id"])
                unique.append(chunk)
        
        return unique
    
    def refine_question(self, original: str, context: list) -> str:
        """Generate refined query for next hop."""
        
        context_text = " ".join([c["text"] for c in context[:5]])
        
        response = openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "Given the original question and retrieved context, generate a new search query to find missing information. Be specific."},
                {"role": "user", "content": f"Original: {original}\nContext: {context_text}\nNew query:"}
            ]
        )
        
        return response.choices[0].message.content
    
    def generate_subquery(self, original: str, context: list) -> str:
        """Generate subquery for next hop."""
        # Simplified version
        return f"{original} more details"

Evaluation Script

# evaluate.py

async def evaluate_system(system: ProductionRAG, test_set: list) -> dict:
    """Evaluate the complete system."""
    
    results = []
    
    for item in test_set:
        result = await system.query(item["question"], f"user_{len(results)}")
        
        # Check answer quality
        is_correct = check_answer(item["expected_answer"], result["answer"])
        
        # Check retrieval quality
        retrieved_ids = result.get("retrieved_doc_ids", [])
        
        results.append({
            "question": item["question"],
            "answer_correct": is_correct,
            "variant": result["variant"],
            "hops": result["hops_used"]
        })
    
    # A/B comparison
    baseline_results = [r for r in results if r["variant"] == "baseline"]
    reranked_results = [r for r in results if r["variant"] == "reranked"]
    
    return {
        "overall_accuracy": sum(r["answer_correct"] for r in results) / len(results),
        "baseline_accuracy": sum(r["answer_correct"] for r in baseline_results) / len(baseline_results),
        "reranked_accuracy": sum(r["answer_correct"] for r in reranked_results) / len(reranked_results),
        "avg_hops_used": np.mean([r["hops"] for r in results]),
        "per_question": results
    }

Success Criteria

Your implementation is complete when:

  • Multi-hop queries return context from 2+ documents
  • Re-ranked results score higher on precision than baseline
  • Context compression reduces token count by 50%+ while preserving answer accuracy
  • Cache hit rate exceeds 20% for repeated queries
  • A/B test shows statistically significant difference (p < 0.05) between variants
  • Latency P99 stays under 2 seconds for end-to-end query
EXERCISE

Run your complete pipeline on 100 test queries. Report accuracy, cache hit rate, compression ratio, and A/B test results. Identify the top 5 queries where the system fails and diagnose why.