01. RAG Pipeline Anatomy
A RAG pipeline consists of four primary stages: ingestion, chunking, indexing, and retrieval. Each stage introduces latency and potential quality degradation that compounds across the system.
Ingestion involves loading raw documents from storage—file systems, databases, or object stores. Large documents require streaming parsers to avoid memory exhaustion. PDFs present parsing challenges due to layout variability; table extraction often requires specialized libraries.
Chunking splits documents into segments that balance context preservation against retrieval precision. Chunk size directly affects embedding quality and query-document similarity scores. Oversized chunks dilute relevance; undersized chunks lose necessary context.
Indexing maps chunks to vector representations in a high-dimensional space. The embedding model choice determines what semantic relationships the index captures. Dense indices require approximate nearest neighbor algorithms (HNSW, IVF) for sub-second queries at scale.
Retrieval matches user queries against indexed chunks using similarity metrics—typically cosine similarity or inner product for normalized embeddings. The retrieved chunks feed a language model that synthesizes an answer.
The critical insight: quality at each stage propagates forward, but failures compound. A poorly chunked document produces an index that cannot represent its semantic content, and no retrieval algorithm can compensate.
# Minimal RAG pipeline structure
from dataclasses import dataclass
from typing import List
@dataclass
class Chunk:
text: str
metadata: dict
chunk_id: str
@dataclass
class RetrievedChunk:
chunk: Chunk
score: float
class RAGPipeline:
def __init__(self, embedder, vector_store, ranker=None):
self.embedder = embedder
self.vector_store = vector_store
self.ranker = ranker # Optional cross-encoder
def index_documents(self, documents: List[str], metadata: List[dict]):
chunks = [Chunk(text=doc, metadata=meta, chunk_id=generate_id())
for doc, meta in zip(documents, metadata)]
embeddings = self.embedder.encode([c.text for c in chunks])
self.vector_store.add(embeddings, chunks)
def retrieve(self, query: str, top_k: int = 10) -> List[RetrievedChunk]:
query_embedding = self.embedder.encode(query)
initial_results = self.vector_store.search(query_embedding, top_k * 2)
if self.ranker:
reranked = self.ranker.rerank(query, initial_results)
return reranked[:top_k]
return initial_results[:top_k]
Profile a simple pipeline with timing instrumentation on each stage using time.time() or time.perf_counter() to identify your slowest stage with a 10-document sample.