RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Advanced RAG — Chunking, Retrieval, Re-ranking
  6. /Ch. 24
Advanced RAG — Chunking, Retrieval, Re-ranking

24. Production RAG Pipeline Project

Chapter 24 of 24 · 25 min
KEY INSIGHT

A production RAG pipeline integrates indexing, retrieval, optimization, and evaluation as a cohesive system; each stage communicates metrics to inform improvements in other stages. ### Project Overview This chapter assembles a production-grade RAG pipeline combining techniques from Chapters 13-23. The pipeline evaluates itself on defined metrics, surfaces failures, and can be iteratively improved. ### Full Pipeline Architecture ``` User Query │ ├─ Query Rewriting (Chapter 13) ├─ Query Decomposition (Chapter 14) │ └─ Parallel Sub-Query Retrieval │ └─ Context Optimization (Chapter 15) │ ├─ Dedup + Relevance Filter │ ├─ Dynamic Context Window (Chapter 16) │ └─ Context Compression (Chapter 17) │ └─ Answer Generation │ │ │ ├─ RAGAS Faithfulness (Ch. 21) │ ├─ RAGAS Answer Relevance (Ch. 22) │ └─ RAGAS Context Precision (Ch. 23) │ Caching Layer (Chapter 20) ←→ Pipeline Optimization (Chapter 19) Multi-Modal Handler (Ch. 18) for image/table sources ``` ### Complete Implementation ```python import os from dataclasses import dataclass from typing import Optional from datetime import datetime @dataclass class ProductionRAGConfig: similarityThreshold: float = 0.90 maxContextTokens: int = 4096 tokenReserve: int = 512 compressionEnabled: bool = True cacheEnabled: bool = True evaluationEnabled: bool = True class ProductionRAGPipeline: def __init__(self, config: ProductionRAGConfig): self.config = config self.vectorStore = VectorStore() self.semanticCache = SemanticCache(config.similarityThreshold) self.evaluator = RAGASEvaluator() if config.evaluationEnabled else None def query( self, question: str, returnMetrics: bool = True ) -> dict: # Step 1: Cache check cacheHit = None if self.config.cacheEnabled: queryEmbedding = embed_texts([question])[0] cacheHit, found = self.semanticCache.get(question, queryEmbedding) if found: return { "answer": cacheHit, "cache_hit": True, "metrics": {} } # Step 2: Rewrite rewrites = rewrite_query(question) # Step 3: Decompose subQueries = decompose_query(question) allSubQueries = rewrites + subQueries # Step 4: Parallel retrieval across sub-queries retrieved = parallelRetrieve(allSubQueries, top_k=10) # Step 5: Context optimization optimized = optimizeContext(retrieved, top_k=10) # Step 6: Dynamic window allocation budget = self.config.maxContextTokens - self.config.tokenReserve windowedChunks = trimToTokenBudget(optimized, budget) # Step 7: Compression if self.config.compressionEnabled: context = "\n\n".join( compressChunk(c["text"], question)["content"] if isinstance(compressChunk(c["text"], question), dict) else c["text"] for c in windowedChunks ) else: context = "\n\n".join(c["text"] for c in windowedChunks) # Step 8: Generate answer = generateAnswer(context, question) # Step 9: Cache result if self.config.cacheEnabled: self.semanticCache.set(question, queryEmbedding, answer) # Step 10: Evaluate (offline-only in production) metrics = {} if self.config.evaluationEnabled and self.evaluator: faithfulness = computeFaithfulness(answer, context) relevance = computeAnswerRelevance(question, answer, embedModel) precision = computeContextPrecision(optimized, {}, k=10) metrics = { "faithfulness": faithfulness, "answer_relevance": relevance, "context_precision": precision, "timestamp": datetime.utcnow().isoformat() } return { "answer": answer, "cache_hit": False, "metrics": metrics } def healthCheck(self) -> dict: # Verify vector store connectivity storeStatus = "healthy" if self.vectorStore.isConnected() else "degraded" # Verify cache size cacheSize = len(self.semanticCache.cache) cacheMaxAge = max( (datetime.utcnow() - v["timestamp"]).seconds for v in self.semanticCache.cache.values() ) if self.semanticCache.cache else 0 return { "status": storeStatus, "cache_size": cacheSize, "oldest_cache_entry_age_seconds": cacheMaxAge, "config": { "compression": self.config.compressionEnabled, "caching": self.config.cacheEnabled, "evaluation": self.config.evaluationEnabled } } ``` ### Operational Monitoring ```python import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("production_rag") class MonitoredPipeline(ProductionRAGPipeline): def query(self, question: str, returnMetrics: bool = True) -> dict: start = time.perf_counter() result = super().query(question, returnMetrics) elapsed = time.perf_counter() - start logger.info(f"query={question[:50]}... elapsed={elapsed*1000:.0f}ms " f"cache_hit={result['cache_hit']}") if result["metrics"]: logger.info( f"metrics=faithfulness:{result['metrics']['faithfulness']['faithfulness_score']:.2f} " f"relevance:{result['metrics']['answer_relevance']['relevance_score']:.2f} " f"precision:{result['metrics']['context_precision']['precision']:.2f}" ) return result ``` ### Regression Testing ```python def regressionTest(pipeline: ProductionRAGPipeline, testSet: list[dict]) -> dict: """ Run the pipeline against a known test set and check for regressions. """ thresholds = { "faithfulness": 0.70, "relevance": 0.75, "precision": 0.50 } results = [] regressions = [] for item in testSet: result = pipeline.query(item["question"]) # Check for regressions against stored baseline baseline = getBaseline(item["question_id"]) if baseline: for metric in ["faithfulness", "relevance", "precision"]: if result["metrics"].get(metric): current = result["metrics"][metric].get( f"{metric}_score" if metric != "precision" else "precision", result["metrics"][metric].get("score", 0) ) if current < thresholds[metric]: regressions.append({ "question_id": item["question_id"], "metric": metric, "current": current, "threshold": thresholds[metric], "baseline": baseline[metric] }) results.append(result) return { "total_queries": len(results), "regression_count": len(regressions), "regressions": regressions, "results": results } ``` ### Deployment Checklist 1. **Vector store capacity**: Estimate storage at ~768–1536 bytes per embedding at 1M chunks. 2. **Rate limiting**: Add per-user and per-endpoint rate limits to avoid 429 errors from LLM API calls. 3. **Graceful degradation**: If the embedding service is down, fall back to BM25 keyword search. 4. **Cache eviction on corpus update**: Version corpus and invalidate cache entries that predate the version. 5. **Evaluation cadence**: Run RAGAS evaluation every N queries (e.g., every 100) rather than real-time; batch evaluation reduces API cost. ### Failure Modes for Production Production pipelines fail silently when the vector store returns zero results—the system generates an answer from no context, which often looks plausible. Instrument an explicit zero-results guard. ```python if not retrieved: return {"answer": "No relevant information found.", "metrics": {}, "cache_hit": False} ``` Monitoring that measures latency but not answer quality will miss regressions where the right answer is generated efficiently from wrong context. Always couple latency SLOs with weeklyFaithfulness + AnswerRelevance sampling audits.

EXERCISE

Assemble the production pipeline with all modules. Run against 30 test queries, collect all three RAGAS metrics, and identify the one metric with the lowest mean score. Investigate root cause and apply one targeted fix. (30 min)

← Chapter 23
RAGAS Context Precision
Course complete →
Browse all courses