RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /RAG Systems: Part 2
  6. /Ch. 18
RAG Systems: Part 2

18. Production Pipeline

Chapter 18 of 22 · 20 min
KEY INSIGHT

Production pipelines combine retrieval stages with error handling, fallbacks, and health checks to ensure reliable operation under failure conditions.

Building a production RAG system means connecting retrieval, processing, and generation into a reliable pipeline with monitoring and error handling.

Pipeline Architecture

from dataclasses import dataclass
from typing import Optional
import logging

@dataclass
class PipelineConfig:
    chunk_size: int = 500
    overlap: int = 50
    retrieval_top_k: int = 20
    rerank_top_k: int = 10
    compression_threshold: float = 0.7
    max_context_tokens: int = 4000

class RAGPipeline:
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
        
        # Initialize components
        self.embedder = EmbeddingModel()
        self.vector_store = VectorStore()
        self.reranker = Reranker()
        self.compressor = ContextCompressor()
        self.generator = LLMClient()
        
        # Metrics
        self.metrics = {
            "retrieval_latency": [],
            "total_latency": [],
            "retrieval_errors": 0,
            "generation_errors": 0
        }
    
    async def query(self, question: str, user_id: Optional[str] = None) -> dict:
        """Process a single query through the pipeline."""
        
        start_time = time.time()
        
        try:
            # Stage 1: Embed query
            query_embedding = self.embedder.encode(question)
            
            # Stage 2: Retrieve candidates
            retrieval_start = time.time()
            candidates = self.vector_store.search(
                query_embedding,
                top_k=self.config.retrieval_top_k
            )
            
            retrieval_time = time.time() - retrieval_start
            self.metrics["retrieval_latency"].append(retrieval_time)
            
            if not candidates:
                return {"answer": "I couldn't find relevant information.", 
                        "sources": [],
                        "retrieval_time": retrieval_time}
            
            # Stage 3: Re-rank
            reranked = self.reranker.rerank(question, candidates)
            
            # Stage 4: Compress context
            compressed = self.compressor.compress(
                question, 
                reranked,
                max_tokens=self.config.max_context_tokens
            )
            
            # Stage 5: Generate answer
            generation_start = time.time()
            answer = self.generator.generate(
                question=question,
                context=compressed["context"],
                sources=compressed["sources"]
            )
            
            total_time = time.time() - start_time
            self.metrics["total_latency"].append(total_time)
            
            return {
                "answer": answer,
                "sources": compressed["sources"],
                "retrieval_time": retrieval_time,
                "total_time": total_time
            }
            
        except Exception as e:
            self.logger.error(f"Pipeline error: {e}")
            if "embedding" in str(e).lower():
                self.metrics["retrieval_errors"] += 1
            else:
                self.metrics["generation_errors"] += 1
            raise

Error Handling and Fallbacks

class ResilientPipeline(RAGPipeline):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.fallback_retriever = BM25Retriever()
    
    async def query(self, question: str, user_id: Optional[str] = None) -> dict:
        """Query with fallback on vector retrieval failure."""
        
        try:
            return await super().query(question, user_id)
        except VectorStoreError as e:
            self.logger.warning(f"Vector store failed, using fallback: {e}")
            
            # Fall back to keyword retrieval
            candidates = self.fallback_retriever.search(question)
            
            if not candidates:
                return {"answer": "I couldn't find relevant information.", 
                        "sources": [],
                        "fallback_used": True}
            
            return {
                "answer": self.generator.generate(question, candidates),
                "sources": [c["source"] for c in candidates],
                "fallback_used": True
            }

Health Checks

def health_check(pipeline: RAGPipeline) -> dict:
    """Check pipeline health."""
    
    checks = {}
    
    # Check vector store
    try:
        pipeline.vector_store.ping()
        checks["vector_store"] = "healthy"
    except Exception as e:
        checks["vector_store"] = f"unhealthy: {e}"
    
    # Check embedder
    try:
        test_emb = pipeline.embedder.encode("health check")
        checks["embedder"] = "healthy"
    except Exception as e:
        checks["embedder"] = f"unhealthy: {e}"
    
    # Check LLM
    try:
        test_response = pipeline.generator.generate(
            "Respond with OK", "test context")
        checks["llm"] = "healthy"
    except Exception as e:
        checks["llm"] = f"unhealthy: {e}"
    
    # Metrics summary
    checks["avg_retrieval_latency"] = np.mean(pipeline.metrics["retrieval_latency"])
    checks["avg_total_latency"] = np.mean(pipeline.metrics["total_latency"])
    checks["retrieval_errors"] = pipeline.metrics["retrieval_errors"]
    
    return checks
EXERCISE

Build a complete RAG pipeline with retrieval, reranking, compression, and generation. Add health checks and a fallback to keyword search when vector retrieval fails.

← Chapter 17
Batch Processing
Chapter 19 →
Monitoring RAG Quality