Advanced RAG — Chunking, Retrieval, Re-ranking
Learn advanced rag — chunking, retrieval, re-ranking through RunLocalAI's practical lens: rag, chunking, hybrid search and reranking, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.
- B013
- B014
Why this course matters
Advanced RAG — Chunking, Retrieval, Re-ranking is for builders turning local models into working tools, agents and retrieval systems. It connects rag, chunking, hybrid search, reranking and advanced to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?
What you will be able to do
By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.
How to use this course
Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as RAG Pipeline Anatomy, Semantic Chunking at Scale, Fixed-Size vs Semantic Tradeoffs and Multi-Strategy Retrieval and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.
- 01RAG Pipeline AnatomyPipeline quality is multiplicative across stages; optimizing only retrieval ignores compounding failures upstream.15 min
- 02Semantic Chunking at ScaleSemantic chunking preserves topic coherence at boundaries, allowing retrieval to return whole arguments rather than fragments.15 min
- 03Fixed-Size vs Semantic TradeoffsThe optimal strategy depends on document structure complexity; simple documents benefit little from semantic chunking overhead.15 min
- 04Multi-Strategy RetrievalRouting queries to appropriate retrieval strategies reduces noise and improves signal in the initial result set.15 min
- 05Dense Retrieval Deep DiveEmbedding model selection has larger impact on retrieval quality than index implementation details.15 min
- 06Sparse Retrieval BM25BM25 excels at exact term matching where dense retrieval struggles; it provides complementary recall that hybrid systems leverage.15 min
- 07Hybrid Search with RRFRRF provides a reliable fusion method that is indifferent to score magnitudes, focusing instead on rank agreement across strategies.15 min
- 08Weighted Hybrid StrategiesWeights should be tuned on task-specific benchmarks; generic weights rarely optimize for specific retrieval domains.15 min
- 09Cross-Encoder SetupCross-encoders provide more accurate relevance signals than bi-encoder similarity, but the computational cost limits their use to reranking post-retrieval candidates.15 min
- 10Local Cross-Encoder ModelsLocal cross-encoders enable offline reranking but require hardware-aware model selection to balance latency and quality.15 min
- 11Two-Stage RetrievalTwo-stage retrieval achieves quality close to full scoring while maintaining retrieval speed proportional to index size.15 min
- 12Query ClassificationQuery classification enables retrieval system adaptation but classifiers require validation and updating as query patterns evolve.15 min
- 13Query RewritingLLM-generated queries often fail to match how content was indexed; rewriting aligns the query language with the retrieval corpus. ### Why Queries Drift A user asking "how does consensus work in Raft" might produce hits for distributed systems documentation, but a retrieval query of "consensus algorithm Raft implementation details" bridges the vocabulary gap between the user's intent and the indexed content. Query rewriting uses an LLM to translate user queries into retrieval-friendly language before the search step. ### Implementation Rewrite the raw query by asking an LLM to reformulate it into 2-3 cleaner retrieval queries. The rewrite should strip conversational filler, expand abbreviations, and optionally include domain-specific terminology. ```python from openai import OpenAI client = OpenAI() def rewrite_query(raw_query: str, model: str = "gpt-4o-mini") -> list[str]: system_prompt = ( "You are a query rewriting assistant. Given a user query, " "produce 2-3 alternative retrieval queries that are optimized " "for semantic search against a technical document corpus. " "Return only the queries, one per line. Be specific and include " "domain terminology." ) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": raw_query} ], temperature=0.0, # Deterministic output for retrieval consistency max_tokens=256 ) rewrites = response.choices[0].message.content.strip().split("\n") return [q.strip() for q in rewrites if q.strip()] # Example usage raw = "what happens when postgres runs out of connections" queries = rewrite_query(raw) # ['PostgreSQL connection pool exhaustion handling', # 'PostgreSQL max_connections configuration error', # 'database connection limit exceeded postgres solutions'] ``` ### HyDE: Hypothetical Document Embeddings A more advanced approach is HyDE (Hypothetical Document Embeddings), where an LLM generates a hypothetical answer document, and both the user query and the generated document are embedded for similarity search. This works because the generated document often uses closer vocabulary to actual indexed content. ```python from openai import OpenAI from embeddings import embed_texts # your embedding function client = OpenAI() def hyde_retrieval(query: str, top_k: int = 5) -> list[dict]: # Step 1: Generate a hypothetical answer (not final output) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "Generate a concise hypothetical answer to the question. " "Base it on general knowledge. Be factual and specific."}, {"role": "user", "content": query} ], temperature=0.7, max_tokens=512 ) hypothetical_doc = response.choices[0].message.content # Step 2: Embed both query and hypothetical doc query_emb = embed_texts([query])[0] doc_emb = embed_texts([hypothetical_doc])[0] # Step 3: Use the doc embedding for retrieval # (embarrassingly simple but effective trick: average with query) combined_emb = (query_emb + doc_emb) / 2 results = vector_store.similarity_search( embedding=combined_emb, top_k=top_k ) return results ``` ### Failure Modes Rewrite outputs can drift from user intent if the system prompt is ambiguous. Always validate rewrites stay semantically aligned—run cosine similarity between original and rewritten query embeddings as a sanity check. HyDE can hallucinate confident-sounding fictitious content, which distorts retrieval when the embedding model amplifies confident-but-wrong language.20 min
- 14Query DecompositionComplex multi-constraint queries retrieve poorly unless broken into sub-queries whose results are merged. ### The Multi-Constraint Problem A query like "compare the latency, throughput, and fault tolerance of Kafka versus RabbitMQ for stream processing" asks for a multi-dimensional comparison. A single embedding query against a document corpus rarely returns all relevant passages across these three concerns. Query decomposition breaks this into targeted sub-queries that separately retrieve each dimension, then merges results with weighted scoring. ### Sub-Query Generation Decomposition uses an LLM to identify semantic "facets" of a complex query and produce targeted sub-queries. Each sub-query retrieves independently. ```python from openai import OpenAI client = OpenAI() def decompose_query(query: str) -> list[str]: system_prompt = ( "Given a complex query, decompose it into 3-6 independent " "sub-queries that each retrieve a distinct piece of information. " "Each sub-query should be self-contained and retrieval-friendly. " "Return one sub-query per line." ) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": query} ], temperature=0.0, max_tokens=256 ) sub_queries = response.choices[0].message.content.strip().split("\n") return [q.strip() for q in sub_queries if q.strip()] def retrieveDecomposed(query: str, top_k_per: int = 5) -> list[dict]: sub_queries = decompose_query(query) all_chunks = {} for sq in sub_queries: results = vector_store.similarity_search(sq, top_k=top_k_per) for i, chunk in enumerate(results): # Score by position within each sub-query result set chunk_id = chunk["id"] base_score = (top_k_per - i) / top_k_per weight = 1.0 / len(sub_queries) # Equal weight across facets score = base_score * weight if chunk_id in all_chunks: all_chunks[chunk_id]["score"] += score all_chunks[chunk_id]["facets"].add(sq) else: all_chunks[chunk_id] = { "chunk": chunk, "score": score, "facets": {sq} } # Sort by aggregated score ranked = sorted(all_chunks.values(), key=lambda x: x["score"], reverse=True) return ranked ``` ### Facet Coverage Verification A key advantage of decomposition is accountability: you can verify which facets retrieved results and which did not. ```python def reportFacetCoverage(query: str, top_k_per: int = 5) -> dict: sub_queries = decompose_query(query) coverage = {} for sq in sub_queries: results = vector_store.similarity_search(sq, top_k=top_k_per) coverage[sq] = { "retrieved": len(results), "top_score": results[0]["score"] if results else 0.0 } missing_facets = [f for f, v in coverage.items() if v["retrieved"] == 0] if missing_facets: print(f"WARNING: No facets retrieved for: {missing_facets}") return coverage ``` ### Failure Modes Decomposition can over-segment when the LLM generates sub-queries that are too granular, retrieving nothing for empty facets. It can under-segment when different facets overlap heavily in a corpus, causing redundant retrieval. Tune the number of sub-queries based on query complexity analysis—measure average retrieved chunk overlap as a diagnostic.20 min
- 15Context OptimizationRaw retrieved chunks are often noisy; optimization filters and reorders context before generation. ### The Retrieval Noise Problem Vector similarity search returns chunks that are topically relevant but may include filler sentences, malformed code blocks, or off-topic tangents. Context optimization runs post-processing on retrieval results to improve their quality before feeding them to the answer-generating LLM. ### Techniques **Dedup**: Remove near-duplicate chunks that appear across multiple result slots. ```python from difflib import SequenceMatcher def removeNearDuplicates(chunks: list[dict], threshold: float = 0.85) -> list[dict]: deduped = [] for chunk in chunks: is_duplicate = False for kept in deduped: ratio = SequenceMatcher( None, chunk["text"], kept["text"] ).ratio() if ratio >= threshold: is_duplicate = True # Keep the one with higher relevance score if chunk.get("score", 0) > kept.get("score", 0): deduped.remove(kept) deduped.append(chunk) break if not is_duplicate: deduped.append(chunk) return deduped ``` **Noise Reduction**: Remove chunks below a relevance score threshold. ```python RELENCE_THRESHOLD = 0.65 def filterByRelevance(chunks: list[dict], threshold: float = RELENCE_THRESHOLD) -> list[dict]: return [c for c in chunks if c.get("score", 0) >= threshold] ``` **Diversity Boost**: Re-rank by MMR (Maximum Marginal Relevance) to penalize retrieving too many chunks from the same section. ```python import numpy as np def mmrRerank( chunks: list[dict], lambda_param: float = 0.5, top_k: int = 10 ) -> list[dict]: """ lambda_param: 0 = pure diversity, 1 = pure relevance. """ selected = [] remaining = chunks.copy() while len(selected) < top_k and remaining: if len(selected) == 0: selected.append(remaining.pop(0)) continue best_score = -float("inf") best_idx = None for i, chunk in enumerate(remaining): relevance = chunk.get("score", 0) # Max similarity to already selected chunks (diversity penalty) max_sim = max( cosineSimilarity(chunk["embedding"], s["embedding"]) for s in selected ) mmr_score = lambda_param * relevance - (1 - lambda_param) * max_sim if mmr_score > best_score: best_score = mmr_score best_idx = i if best_idx is not None: selected.append(remaining.pop(best_idx)) else: break return selected def cosineSimilarity(a: np.ndarray, b: np.ndarray) -> float: return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) ``` ### Chaining Optimizations ```python def optimizeContext(chunks: list[dict], top_k: int = 10) -> list[dict]: # 1. Filter by relevance first filtered = filterByRelevance(chunks) # 2. Remove near-duplicates deduped = removeNearDuplicates(filtered) # 3. Re-rank with MMR for diversity reranked = mmrRerank(deduped, lambda_param=0.7, top_k=top_k) return reranked ``` ### Failure Modes Over-aggressive filtering can remove relevant low-scoring chunks that happen to be the only source for a specific fact. Always check for missing-entity scenarios where a fact appears only in a low-relevance chunk. Lambda tuning (MMR) often requires per-domain adjustment; a dataset collected from real queries is the most reliable tuning signal.20 min
- 16Dynamic Context WindowsContext window allocation should adapt to query complexity rather than using fixed limits. ### Fixed vs. Dynamic Allocation A naive RAG system uses a fixed token budget (e.g., 4096 tokens) or fixed chunk count (e.g., top 5 chunks) for the context window. This fails when simple queries need 2 chunks and complex queries need 15. Dynamic context windows allocate retrieval tokens proportionally to query complexity. ### Complexity Scoring Query complexity can be estimated via multiple signals: | Signal | Metric | Complexity Indicator | |---|---|---| | Token count | `len(query.split())` | Higher → more variables to answer | | Sub-query count | Number of facets detected | More facets → more context needed | | Entity count | NER-extracted named entities | More entities → more cross-reference needed | | Question type | "compare", "analyze", "list" | Compare/list → multi-document synthesis | | Embedding variance | Std dev of retrieval scores | High variance → confident single answer | ```python import tiktoken def estimateQueryComplexity(query: str, retrieval_results: list[dict]) -> dict: encoding = tiktoken.get_encoding("cl100k_base") tokens = len(encoding.encode(query)) score = 0.0 # Signal 1: Token count score += min(tokens / 100, 2.0) # Cap at 2 points # Signal 2: Score variance of top results if retrieval_results: scores = [r.get("score", 0) for r in retrieval_results] score += max(1.0 - np.std(scores) * 3, 0) # High variance = harder else: score += 1.0 # No results = uncertain, add complexity # Normalize to 0-1 range normalized = min(score / 5.0, 1.0) return { "raw_score": score, "normalized": normalized, "tier": "simple" if normalized < 0.3 else "medium" if normalized < 0.6 else "complex" } ``` ### Window Allocation Strategy ```python MAX_TOKENS = 4096 TOKEN_RESERVE = 512 # Reserve for answer generation SIMPLE_CHUNKS = 2 MEDIUM_CHUNKS = 5 COMPLEX_CHUNKS = 10 def allocateContext( query: str, retrieval_results: list[dict], model: str = "gpt-4o-mini" ) -> list[dict]: complexity = estimateQueryComplexity(query, retrieval_results) tier = modelComplexityTier(complexity["normalized"]) if tier == "simple": top_k = SIMPLE_CHUNKS elif tier == "medium": top_k = MEDIUM_CHUNKS else: top_k = COMPLEX_CHUNKS allocated = retrieval_results[:top_k] total_tokens = sum(countTokens(c["text"]) for c in allocated) # If even top-k exceeds budget, pull back if total_tokens > MAX_TOKENS - TOKEN_RESERVE: allocated = trimToTokenBudget(allocated, MAX_TOKENS - TOKEN_RESERVE) return allocated def trimToTokenBudget(chunks: list[dict], budget: int) -> list[dict]: encoding = tiktoken.get_encoding("cl100k_base") trimmed = [] for chunk in chunks: tokens = len(encoding.encode(chunk["text"])) if budget - tokens >= 0: trimmed.append(chunk) budget -= tokens else: break return trimmed ``` ### Failure Modes Complexity scoring can misclassify queries that are short but semantically dense (e.g., "explain the concept" where "concept" has heavy domain ambiguity). Validate complexity tiers against actual answer quality metrics, not just internal routing labels. Over-allocation wastes tokens on irrelevant content; under-allocation drops key facts.20 min
- 17Context CompressionIrrelevant tokens in retrieved context dilute generation quality; compressing to relevant sentences improves signal-to-noise ratio. ### The Noise-in-Context Problem Retrieved chunks often contain the answer but also surrounding context that misdirects or confuses the LLM. Context compression uses an LLM to extract only the relevant sentences from each chunk, reducing token cost and improving relevance. ### LLM-Based Compression ```python from openai import OpenAI client = OpenAI() def compressChunk(chunk_text: str, query: str, model: str = "gpt-4o-mini") -> str: system_prompt = ( "You are a precise technical assistant. Given a document passage " "and a user question, extract ONLY the sentences directly relevant " "to answering the question. Discard background, commentary, and " "tangential content. Preserve code blocks if relevant. " "Return the compressed passage with no additional text." ) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Question: {query}\n\nPassage: {chunk_text}"} ], temperature=0.0, max_tokens=512 ) return response.choices[0].message.content.strip() def compressContext( chunks: list[dict], query: str, model: str = "gpt-4o-mini" ) -> list[dict]: compressed = [] for chunk in chunks: compressed_text = compressChunk(chunk["text"], query, model) if compressed_text: # Only include non-empty results compressed.append({ **chunk, "text": compressed_text, "original_length": len(chunk["text"]), "compressed_length": len(compressed_text) }) return compressed ``` ### Token Savings Measurement ```python def measureCompressionSavings( original_chunks: list[dict], compressed_chunks: list[dict] ) -> dict: original_tokens = sum(c["original_length"] for c in compressed_chunks) compressed_tokens = sum(c["compressed_length"] for c in compressed_chunks) # Use rough estimate: 4 chars per token orig_est = original_tokens // 4 comp_est = compressed_tokens // 4 savings = (orig_est - comp_est) / orig_est if orig_est > 0 else 0 return { "original_tokens_estimate": orig_est, "compressed_tokens_estimate": comp_est, "savings_percent": round(savings * 100, 1) } ``` ### Condensing via Extract-and-Synthesize A two-pass approach: extract relevant facts first, then synthesize into a dense paragraph. ```python def condenseContext(chunks: list[dict], query: str) -> str: # Pass 1: Extract all relevant snippets extracted = [compressChunk(c["text"], query) for c in chunks] combined = "\n\n".join(extracted) # Pass 2: Synthesize into coherent context synthesis_prompt = ( "Synthesize the following extracted passages into a single " "coherent technical summary that answers the question. " "Preserve all factual claims. Output only the summary." ) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": synthesis_prompt}, {"role": "user", "content": f"Question: {query}\n\n{combined}"} ], temperature=0.0, max_tokens=1024 ) return response.choices[0].message.content.strip() ``` ### Failure Modes Compression can over-compress, removing context needed for cross-sentence references or pronouns. Always verify the compressed context includes entity names that the original generation step needed. Latency increases significantly with per-chunk LLM calls; batch compression or API streaming mitigates this.20 min
- 18Multi-Modal RAGRetrieval pipelines must handle text, images, tables, and code as distinct modalities with modality-appropriate embeddings and generation. ### Beyond Text-Only Retrieval Production RAG systems face documents that contain figures, diagrams, screenshots, code blocks, and tables. A text-only embedding pipeline discards the visual structure. Multi-modal RAG retrieves across modalities and generates answers that synthesize text with visual evidence. ### Modality-Specific Processing ```python import pypdf from PIL import Image import pytesseract def extractModalContent(pdf_path: str) -> list[dict]: """ Extract text, images, and tables from a PDF with modality labels. """ content = [] reader = pypdf.PdfReader(pdf_path) for page_num, page in enumerate(reader.pages): # Extract text text = page.extract_text() if text.strip(): content.append({ "modality": "text", "content": text, "page": page_num + 1, "source": f"{pdf_path}#page={page_num + 1}" }) # Extract images for img_idx, image in enumerate(page.images): img_bytes = image.data img_desc = describeImage(img_bytes) # LLM or vision model content.append({ "modality": "image", "content": img_desc, "raw_bytes": img_bytes, "page": page_num + 1, "source": f"{pdf_path}#page={page_num + 1}&image={img_idx}" }) # Extract tables tables = page.extract_tables() for tbl_idx, table in enumerate(tables): table_text = formatTable(table) content.append({ "modality": "table", "content": table_text, "raw_table": table, "page": page_num + 1, "source": f"{pdf_path}#page={page_num + 1}&table={tbl_idx}" }) return content ``` ### Describing Images for Retrieval ```python from openai import OpenAI client = OpenAI() def describeImage(image_bytes: bytes) -> str: """ Use vision model to generate a text description for image retrieval. """ response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_bytes}"}}, {"type": "text", "text": "Describe this image with enough technical detail " "that a semantic search system can retrieve it for a RAG query. " "Include labels, values, and relationships visible in the diagram."} ]} ], temperature=0.0, max_tokens=256 ) return response.choices[0].message.content.strip() ``` ### Multi-Modal Indexing ```python def indexByModality(content: list[dict], vector_store) -> None: """ Index each modality into its appropriate vector store or database. """ for item in content: embedding = embed_texts([item["content"]])[0] vector_store.insert( embedding=embedding, text=item["content"], modality=item["modality"], source=item["source"], metadata={"page": item["page"]} ) ``` ### Multi-Modal Generation ```python def multiModalAnswer(query: str, retrieved: list[dict]) -> dict: """ Generate answer using vision for images, text synthesis for text. """ text_chunks = [c for c in retrieved if c["modality"] == "text"] image_items = [c for c in retrieved if c["modality"] == "image"] table_items = [c for c in retrieved if c["modality"] == "table"] synthesis_parts = [c["content"] for c in text_chunks] # Include image descriptions inline for img in image_items: synthesis_parts.append(f"[Image: {img['content']}]") # Include table representations inline for tbl in table_items: synthesis_parts.append(f"[Table: {tbl['content']}]") synthesis_context = "\n\n".join(synthesis_parts) response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "Answer using both the text and image/table descriptions provided."}, {"role": "user", "content": f"Question: {query}\n\nContext: {synthesis_context}"} ], temperature=0.0 ) return {"answer": response.choices[0].message.content, "sources": retrieved} ``` ### Failure Modes Image descriptions are lossy—the vision model may miss small axis labels, faint grid lines, or color-coded legend items. Always include the image source link so users can verify. Table extraction boundary detection (rows vs. columns) varies by PDF structure; validate on a sample before production. Multi-modal pipelines triple the processing time of text-only pipelines; parallelize embedding calls across modalities.25 min
- 19Pipeline OptimizationSlow pipeline stages create bottlenecks; profiling and parallelizing independent stages is the fastest path to throughput gains. ### Where Pipelines Slow Down RAG pipelines have these typical stages: query parsing, rewriting/decomposition, embedding generation, vector search, reranking, context processing, and answer generation. The time cost is not distributed evenly—"embedding + search" dominates at high scale, while "answer generation" dominates for long contexts. ### Profiling the Pipeline ```python import time from functools import wraps def profileStage(func): @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start print(f"[PROFILE] {func.__name__}: {elapsed*1000:.1f}ms") return result, elapsed return wrapper # Instrumented pipeline def ragPipeline(query: str) -> tuple[str, dict]: timings = {} # Stage 1: Rewrite rewritten_queries, t1 = profileStage(rewriteQueryBatch)([query]) timings["rewrite"] = t1 # Stage 2: Embed embeddings, t2 = profileStage(generateEmbeddings)(rewritten_queries) timings["embedding"] = t2 # Stage 3: Search results, t3 = profileStage(vectorSearch)(embeddings, top_k=10) timings["search"] = t3 # Stage 4: Re-rank reranked, t4 = profileStage(rerankResults)(results, query) timings["rerank"] = t4 # Stage 5: Generate answer, t5 = profileStage(generateAnswer)(reranked, query) timings["generate"] = t5 total = sum(timings.values()) print(f"[PROFILE] Total: {total*1000:.1f}ms") print(f"[PROFILE] Breakdown: { {k: f'{v*1000:.1f}ms' for k,v in timings.items()} }") return answer, {"timings": timings, "total": total} ``` ### Parallelizing Independent Stages Rewrite and embedding are independent per-query but are often called serially. If decomposing into multiple sub-queries, embed them in parallel: ```python from concurrent.futures import ThreadPoolExecutor, as_completed def parallelEmbed(queryBatch: list[str], max_workers: int = 10) -> list[list[float]]: embeddings = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(embed_texts, [q]): i for i, q in enumerate(queryBatch)} results = [None] * len(queryBatch) for future in as_completed(futures): idx = futures[future] results[idx] = future.result()[0] return results # Before: serial embedding # for q in queries: # emb = embed_texts([q]) # blocking per query # After: parallel embedding # embeddings = parallelEmbed(queries, max_workers=10) # ~10x throughput for 10 queries ``` ### Async Wait Times For production HTTP-based calls (embedding API, generation API), use `asyncio` with `aiohttp` to overlap network I/O: ```python import asyncio import aiohttp async def asyncEmbedBatch( queries: list[str], api_url: str, api_key: str, batch_size: int = 100 ) -> list[list[float]]: semaphore = asyncio.Semaphore(batch_size) async def embedSingle(query: str) -> list[float]: async with semaphore: async with aiohttp.ClientSession() as session: payload = {"input": query} headers = {"Authorization": f"Bearer {api_key}"} async with session.post(api_url, json=payload, headers=headers) as resp: data = await resp.json() return data["embedding"] tasks = [embedSingle(q) for q in queries] return await asyncio.gather(*tasks) ``` ### Async Answer Generation ```python async def asyncPipeline(query: str) -> str: # Rewrite stage rewrite_task = asyncio.to_thread(rewriteQueryBatch, [query]) # Start rewrite while we await rewritten = await rewrite_task # Parallel: embed + generate query synopsis embed_task = asyncio.to_thread(generateEmbeddings, rewritten) search_task = asyncio.to_thread(vectorSearch, await embed_task, top_k=10) # Await search, then rerank results = await search_task reranked, rerank_time = await asyncio.to_thread(rerankWithCrossEncoder, results, query) # Generate answer, gen_time = await asyncio.to_thread(generateAnswer, reranked, query) return answer ``` ### Failure Modes Thread pool sizes that exceed the API rate limit cause 429 errors. Cache embeddings server-side when possible to avoid redundant computation. Async code introduces subtle race conditions—ensure thread safety on shared vector store connections. For local models, GIL contention limits parallelization of CPU-bound embedding runs; use batch inference instead.25 min
- 20Caching StrategiesThe same queries repeat; caching at the semantic-similarity level instead of exact-match catches more cache hits. ### Exact Caching Is Too Restrictive A query like "how does consensus work" and "explain Raft consensus" are semantically equivalent but token-different. Exact-match caching misses these hits. Semantic caching stores results for queries with cosine-similarity above a threshold. ```python from datetime import datetime, timedelta import hashlib class SemanticCache: def __init__(self, similarityThreshold: float = 0.95): self.cache = {} # query_hash -> {"embedding", "result", "timestamp"} self.similarityThreshold = similarityThreshold def _hashQuery(self, query: str) -> str: return hashlib.sha256(query.encode()).hexdigest() def get(self, query: str, queryEmbedding: list[float]) -> tuple[any, bool]: token_hash = self._hashQuery(query) if token_hash in self.cache: return self.cache[token_hash]["result"], True # Try semantic lookup for cached_query, cached_entry in self.cache.items(): sim = cosineSimilarity( np.array(queryEmbedding), np.array(cached_entry["embedding"]) ) if sim >= self.similarityThreshold: print(f"[CACHE HIT] Semantic match: '{cached_query}' -> this query") return cached_entry["result"], True return None, False def set(self, query: str, queryEmbedding: list[float], result: any) -> None: self.cache[self._hashQuery(query)] = { "embedding": queryEmbedding, "result": result, "timestamp": datetime.utcnow() } def evictStale(self, max_age_seconds: int = 3600) -> int: now = datetime.utcnow() stale_keys = [ k for k, v in self.cache.items() if (now - v["timestamp"]) > timedelta(seconds=max_age_seconds) ] for k in stale_keys: del self.cache[k] return len(stale_keys) ``` ### Cache Warming On service startup, pre-load the cache with high-traffic queries identified from logging: ```python def warmCache(cache: SemanticCache, hotQueries: list[tuple[str, any]]) -> None: loaded = 0 for query, expected_result in hotQueries: embedding = embed_texts([query])[0] cache.set(query, embedding, expected_result) loaded += 1 print(f"[CACHE] Warmed with {loaded} entries") ``` ### TTL and Eviction Policies ```python class LRUCache: def __init__(self, maxEntries: int = 1000, ttlSeconds: int = 3600): self.maxEntries = maxEntries self.ttlSeconds = ttlSeconds self.cache = {} self.accessOrder = [] def get(self, key: str) -> any: if key in self.cache: entry = self.cache[key] if self._isFresh(entry["timestamp"]): self._markAccessed(key) return entry["result"] else: del self.cache[key] self.accessOrder.remove(key) return None def set(self, key: str, result: any) -> None: if len(self.cache) >= self.maxEntries: oldest = self.accessOrder.pop(0) del self.cache[oldest] self.cache[key] = {"result": result, "timestamp": datetime.utcnow()} self.accessOrder.append(key) def _isFresh(self, timestamp: datetime) -> bool: return (datetime.utcnow() - timestamp).total_seconds() < self.ttlSeconds def _markAccessed(self, key: str) -> None: self.accessOrder.remove(key) self.accessOrder.append(key) ``` ### Failure Modes Semantic cache with a low similarity threshold (<0.90) can return semantically different but lexically similar cached results, causing generation quality degradation. Use a high threshold (0.95+) and validate on logged queries. Cache invalidation on corpus updates is tricky—clearing the entire cache on any index update is safe but wasteful; incremental invalidation by versioned corpus segments is more efficient but complex to implement.20 min
- 21RAGAS FaithfulnessFaithfulness measures whether the generated answer stays within the bounds of what the retrieved context actually supports. ### Definition An answer is faithful if it can be entirely attributed to the retrieved context without introducing facts not present in—or contradicted by—the context. A faithfulness score of 1.0 means every claim in the answer maps to a supporting citation in the context. A score of 0.5 means half the claims are unsupported. ### Why It Matters High retrieval relevance scores do not guarantee answer faithfulness. A retrieval system can return relevant chunks, but the LLM can still confabulate related-but-absent facts. Faithfulness validation catches this failure mode. ### RAGAS Faithfulness Implementation ```python from openai import OpenAI client = OpenAI() def computeFaithfulness( answer: str, context: str, model: str = "gpt-4o-mini" ) -> dict: """ RAGAS faithfulness: break answer into claims, check each against context. """ # Step 1: Decompose answer into atomic claims claims_prompt = ( "Break the following answer into independent factual claims. " "Each claim should be a single verifiable statement. " "List one per line.\n\nAnswer:\n{answer}" ) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You extract factual claims accurately."}, {"role": "user", "content": claims_prompt.format(answer=answer)} ], temperature=0.0, max_tokens=512 ) claims = [c.strip() for c in response.choices[0].message.content.split("\n") if c.strip()] print(f"[FAITHFULNESS] Found {len(claims)} claims") # Step 2: Verify each claim against context supported = 0 claim_results = [] for claim in claims: verification_prompt = ( "Given the context below, determine if the claim is supported " "by the context. Answer YES if the claim is entailed by the context. " "Answer NO if the claim contradicts the context or is not supported.\n\n" "Context:\n{context}\n\nClaim:\n{claim}" ) verification = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "Answer only YES or NO."}, {"role": "user", "content": verification_prompt.format(context=context, claim=claim)} ], temperature=0.0, max_tokens=32 ) verdict = verification.choices[0].message.content.strip().upper() is_supported = "YES" in verdict supported += 1 if is_supported else 0 claim_results.append({ "claim": claim, "supported": is_supported, "verdict": verdict }) faithfulnessScore = supported / len(claims) if claims else 0.0 return { "faithfulness_score": faithfulnessScore, "total_claims": len(claims), "supported_claims": supported, "claim_details": claim_results } ``` ### Aggregated Evaluation ```python from statistics import mean def evaluateFaithfulnessOnDataset( dataset: list[dict], verbose: bool = True ) -> dict: """ dataset: list of {"question", "answer", "context"} """ scores = [] for i, item in enumerate(dataset): result = computeFaithfulness(item["answer"], item["context"]) scores.append(result["faithfulness_score"]) if verbose: status = "✅" if result["faithfulness_score"] == 1.0 else "⚠️" print(f"{status} Q{i+1}: faithfulness={result['faithfulness_score']:.2f} " f"({result['supported_claims']}/{result['total_claims']} claims)") return { "mean_faithfulness": mean(scores), "min_faithfulness": min(scores), "max_faithfulness": max(scores), "scores": scores } ``` ### Failure Modes The claim decomposition step can merge multiple facts into one claim, making the verdict ambiguous (partially true, partially false). Fine-tune the decomposition prompt to generate shorter atomic claims. Fragments-of-claims that are partially implied but not explicitly stated confuse the YES/NO classifier; add a "PARTIAL" category in a production evaluation pipeline.20 min
- 22RAGAS Answer RelevanceAnswer relevance measures how directly the generated answer addresses the specific question asked, penalizing tangents and tangential verbosity. ### Definition An answer has relevance when its content directly addresses all aspects of the question. A question asking "compare latency of Kafka and RabbitMQ" that produces an answer about throughput only scores low on relevance even if factually correct. RAGAS computes relevance by generating question-like probes from the answer and measuring cosine similarity between probe questions and the original question. ### Implementation ```python import numpy as np def generateQuestionProbes( answer: str, numProbes: int = 3, model: str = "gpt-4o-mini" ) -> list[str]: """ Generate alternative questions that the answer would answer. """ prompt = ( f"Given the answer below, generate {numProbes} different questions " "that this answer would respond to. The questions should vary in phrasing " "but all be answerable by the provided answer content. " "Return one question per line.\n\nAnswer:\n{answer}" ) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "Be concise and varied in phrasing."}, {"role": "user", "content": prompt} ], temperature=0.8, # Some diversity in probe generation max_tokens=256 ) probes = [p.strip() for p in response.choices[0].message.content.split("\n") if p.strip()] return probes[:numProbes] def computeAnswerRelevance( originalQuestion: str, answer: str, embeddingModel ) -> dict: probes = generateQuestionProbes(answer, numProbes=3) # Compute similarity between original question and each probe originalEmb = embeddingModel.encode(originalQuestion) similarities = [] for probe in probes: probeEmb = embeddingModel.encode(probe) sim = cosineSimilarity(originalEmb, probeEmb) similarities.append(sim) relevanceScore = np.mean(similarities) return { "relevance_score": round(relevanceScore, 3), "probes": probes, "probe_similarities": [round(s, 3) for s in similarities] } ``` ### Handling Long Answers Verbose answers that include correct information plus tangents score lower, because probe generation from verbose answers produces probes that don't match the original question. This penalizes the right behavior: ```python def computeRelevanceOnDataset( dataset: list[dict], embeddingModel ) -> dict: scores = [] for item in dataset: result = computeAnswerRelevance( item["question"], item["answer"], embeddingModel ) scores.append(result["relevance_score"]) return { "mean_relevance": mean(scores), "scores": scores } ``` ### Tuning the Probe Count More probes increase measurement stability but increase API calls and cost. At numProbes=3, standard deviation across repeated runs on the same answer is typically 0.02–0.05, acceptable for production evaluation. At numProbes=1, variance is too high; at numProbes=10, marginal accuracy gain does not justify cost. ### Failure Modes Probe generation can produce questions that the original answer actually does answer correctly (high precision) but that are phrased similarly to each other, reducing diversity and underestimating relevance variance. Use a diversity-promoting instruction in the probe generation prompt. If the answer is completely off-topic, probes describe the off-topic content and similarity to the original question is near zero—no false negatives, but verify with a manual check for face validity.20 min
- 23RAGAS Context PrecisionRetrieval precision measures whether the top-ranked chunks in the context window contain what is needed to answer, penalizing relevant content buried in lower positions. ### Definition Context precision evaluates the "density" of relevant information in the retrieval ordering. Of the chunks needed to fully answer the question, how many appear in the top-N positions of the retrieved result set? Perfect precision means all needed chunks appear at position 1, 2, ..., k. Poor precision means needed chunks are pushed to positions 7–10 of a top-10 window. ### Implementation ```python from statistics import mean def computeContextPrecision( retrievedChunks: list[dict], relevantChunkIds: set[str], k: int = 10 ) -> dict: """ Compute precision at k using position-weighted scoring. retrievedChunks: list of {"id", "text", "score"} ordered by retrieval rank relevantChunkIds: set of chunk IDs known to answer the question k: evaluation window size """ retrievedK = retrievedChunks[:k] precisions = [] for i, chunk in enumerate(retrievedK): position = i + 1 # 1-indexed is_relevant = chunk["id"] in relevantChunkIds # Precision at this position = relevant items seen / position # Equivalent to binary relevance precision@k formula precisions.append(1 / position if is_relevant else 0) # NDCG-like weighting: sum of relevant positions weighted by inverse rank # Simplified RAGAS precision formula relevantItems = sum( 1 for c in retrievedK if c["id"] in relevantChunkIds ) totalRelevant = len(relevantChunkIds) if totalRelevant == 0: return {"precision": 0.0, "retrieved_relevant": relevantItems, "total_relevant": 0} # Proportion of relevant items retrieved, weighted by position precisionScore = sum(precisions) / k return { "precision": round(precisionScore, 3), "retrieved_relevant": relevantItems, "total_relevant": totalRelevant, "position_breakdown": [ {"position": i+1, "id": c["id"], "relevant": c["id"] in relevantChunkIds} for i, c in enumerate(retrievedK) ] } ``` ### Computing Relevant Chunk IDs For evaluation, relevant chunks are identified by an oracle or LLM annotation. In production, use the ground-truth answer to identify which chunks contain the facts needed: ```python def identifyRelevantChunks( chunks: list[dict], groundTruthAnswer: str, question: str, model: str = "gpt-4o-mini" ) -> set[str]: """ Use an LLM to identify which retrieved chunks contain facts needed to answer the question. """ relevant = set() for chunk in chunks: check_prompt = ( "Given the question and the ground truth answer, determine if " "the following passage contains at least one fact needed to answer the question.\n\n" "Question: {question}\n\nAnswer: {answer}\n\nPassage:\n{passage}\n\n" "Answer YES or NO." ) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "Answer only YES or NO."}, {"role": "user", "content": check_prompt.format( question=question, answer=groundTruthAnswer, passage=chunk["text"] )} ], temperature=0.0, max_tokens=16 ) verdict = response.choices[0].message.content.strip().upper() if "YES" in verdict: relevant.add(chunk["id"]) return relevant ``` ### Aggregated Context Precision ```python def evaluateContextPrecisionOnDataset( dataset: list[dict], k: int = 10 ) -> dict: results = [] for item in dataset: retrieved = item["retrieved_chunks"] # already ordered relevant = identifyRelevantChunks( retrieved, item["ground_truth"], item["question"] ) precResult = computeContextPrecision(retrieved, relevant, k=k) results.append(precResult) return { "mean_precision": mean(r["precision"] for r in results), "min_precision": min(r["precision"] for r in results), "max_precision": max(r["precision"] for r in results), "results": results } ``` ### Failure Modes Precision at k does not differentiate between a needed chunk at position 3 versus position 10 if both are in the window. For higher-fidelity evaluation, use NDCG or DCG with relevance grades instead of binary relevance. Oracle-identified relevant chunks are approximations; human annotation remains the gold standard. Chunk boundaries affect precision—chunks that split a single fact across two retrieved chunks may double-count in evaluation.20 min
- 24Production RAG Pipeline ProjectA production RAG pipeline integrates indexing, retrieval, optimization, and evaluation as a cohesive system; each stage communicates metrics to inform improvements in other stages. ### Project Overview This chapter assembles a production-grade RAG pipeline combining techniques from Chapters 13-23. The pipeline evaluates itself on defined metrics, surfaces failures, and can be iteratively improved. ### Full Pipeline Architecture ``` User Query │ ├─ Query Rewriting (Chapter 13) ├─ Query Decomposition (Chapter 14) │ └─ Parallel Sub-Query Retrieval │ └─ Context Optimization (Chapter 15) │ ├─ Dedup + Relevance Filter │ ├─ Dynamic Context Window (Chapter 16) │ └─ Context Compression (Chapter 17) │ └─ Answer Generation │ │ │ ├─ RAGAS Faithfulness (Ch. 21) │ ├─ RAGAS Answer Relevance (Ch. 22) │ └─ RAGAS Context Precision (Ch. 23) │ Caching Layer (Chapter 20) ←→ Pipeline Optimization (Chapter 19) Multi-Modal Handler (Ch. 18) for image/table sources ``` ### Complete Implementation ```python import os from dataclasses import dataclass from typing import Optional from datetime import datetime @dataclass class ProductionRAGConfig: similarityThreshold: float = 0.90 maxContextTokens: int = 4096 tokenReserve: int = 512 compressionEnabled: bool = True cacheEnabled: bool = True evaluationEnabled: bool = True class ProductionRAGPipeline: def __init__(self, config: ProductionRAGConfig): self.config = config self.vectorStore = VectorStore() self.semanticCache = SemanticCache(config.similarityThreshold) self.evaluator = RAGASEvaluator() if config.evaluationEnabled else None def query( self, question: str, returnMetrics: bool = True ) -> dict: # Step 1: Cache check cacheHit = None if self.config.cacheEnabled: queryEmbedding = embed_texts([question])[0] cacheHit, found = self.semanticCache.get(question, queryEmbedding) if found: return { "answer": cacheHit, "cache_hit": True, "metrics": {} } # Step 2: Rewrite rewrites = rewrite_query(question) # Step 3: Decompose subQueries = decompose_query(question) allSubQueries = rewrites + subQueries # Step 4: Parallel retrieval across sub-queries retrieved = parallelRetrieve(allSubQueries, top_k=10) # Step 5: Context optimization optimized = optimizeContext(retrieved, top_k=10) # Step 6: Dynamic window allocation budget = self.config.maxContextTokens - self.config.tokenReserve windowedChunks = trimToTokenBudget(optimized, budget) # Step 7: Compression if self.config.compressionEnabled: context = "\n\n".join( compressChunk(c["text"], question)["content"] if isinstance(compressChunk(c["text"], question), dict) else c["text"] for c in windowedChunks ) else: context = "\n\n".join(c["text"] for c in windowedChunks) # Step 8: Generate answer = generateAnswer(context, question) # Step 9: Cache result if self.config.cacheEnabled: self.semanticCache.set(question, queryEmbedding, answer) # Step 10: Evaluate (offline-only in production) metrics = {} if self.config.evaluationEnabled and self.evaluator: faithfulness = computeFaithfulness(answer, context) relevance = computeAnswerRelevance(question, answer, embedModel) precision = computeContextPrecision(optimized, {}, k=10) metrics = { "faithfulness": faithfulness, "answer_relevance": relevance, "context_precision": precision, "timestamp": datetime.utcnow().isoformat() } return { "answer": answer, "cache_hit": False, "metrics": metrics } def healthCheck(self) -> dict: # Verify vector store connectivity storeStatus = "healthy" if self.vectorStore.isConnected() else "degraded" # Verify cache size cacheSize = len(self.semanticCache.cache) cacheMaxAge = max( (datetime.utcnow() - v["timestamp"]).seconds for v in self.semanticCache.cache.values() ) if self.semanticCache.cache else 0 return { "status": storeStatus, "cache_size": cacheSize, "oldest_cache_entry_age_seconds": cacheMaxAge, "config": { "compression": self.config.compressionEnabled, "caching": self.config.cacheEnabled, "evaluation": self.config.evaluationEnabled } } ``` ### Operational Monitoring ```python import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("production_rag") class MonitoredPipeline(ProductionRAGPipeline): def query(self, question: str, returnMetrics: bool = True) -> dict: start = time.perf_counter() result = super().query(question, returnMetrics) elapsed = time.perf_counter() - start logger.info(f"query={question[:50]}... elapsed={elapsed*1000:.0f}ms " f"cache_hit={result['cache_hit']}") if result["metrics"]: logger.info( f"metrics=faithfulness:{result['metrics']['faithfulness']['faithfulness_score']:.2f} " f"relevance:{result['metrics']['answer_relevance']['relevance_score']:.2f} " f"precision:{result['metrics']['context_precision']['precision']:.2f}" ) return result ``` ### Regression Testing ```python def regressionTest(pipeline: ProductionRAGPipeline, testSet: list[dict]) -> dict: """ Run the pipeline against a known test set and check for regressions. """ thresholds = { "faithfulness": 0.70, "relevance": 0.75, "precision": 0.50 } results = [] regressions = [] for item in testSet: result = pipeline.query(item["question"]) # Check for regressions against stored baseline baseline = getBaseline(item["question_id"]) if baseline: for metric in ["faithfulness", "relevance", "precision"]: if result["metrics"].get(metric): current = result["metrics"][metric].get( f"{metric}_score" if metric != "precision" else "precision", result["metrics"][metric].get("score", 0) ) if current < thresholds[metric]: regressions.append({ "question_id": item["question_id"], "metric": metric, "current": current, "threshold": thresholds[metric], "baseline": baseline[metric] }) results.append(result) return { "total_queries": len(results), "regression_count": len(regressions), "regressions": regressions, "results": results } ``` ### Deployment Checklist 1. **Vector store capacity**: Estimate storage at ~768–1536 bytes per embedding at 1M chunks. 2. **Rate limiting**: Add per-user and per-endpoint rate limits to avoid 429 errors from LLM API calls. 3. **Graceful degradation**: If the embedding service is down, fall back to BM25 keyword search. 4. **Cache eviction on corpus update**: Version corpus and invalidate cache entries that predate the version. 5. **Evaluation cadence**: Run RAGAS evaluation every N queries (e.g., every 100) rather than real-time; batch evaluation reduces API cost. ### Failure Modes for Production Production pipelines fail silently when the vector store returns zero results—the system generates an answer from no context, which often looks plausible. Instrument an explicit zero-results guard. ```python if not retrieved: return {"answer": "No relevant information found.", "metrics": {}, "cache_hit": False} ``` Monitoring that measures latency but not answer quality will miss regressions where the right answer is generated efficiently from wrong context. Always couple latency SLOs with weeklyFaithfulness + AnswerRelevance sampling audits.25 min