17. Basic Generation Pipeline
This chapter assembles a complete end-to-end RAG pipeline combining retrieval and generation. Every component interacts with others - changes in chunking affect retrieval, which affects context assembly, which affects generation.
Pipeline Architecture
User Query → Query Rewriting → Retrieval → Context Assembly → Generation → Response
↓ ↓ ↓
Improved Query Top K Chunks Formatted Prompt
End-to-End Implementation
from your_rag_library import (
QueryRewriter,
HybridRetriever,
ContextAssembler,
LLMGenerator,
Config
)
class RAGPipeline:
def __init__(self, config: Config):
self.rewriter = QueryRewriter(config.rewrite_strategy)
self.retriever = HybridRetriever(
vector_store=config.vector_store,
bm25_index=config.bm25_index,
dense_weight=config.dense_weight,
sparse_weight=config.sparse_weight
)
self.assembler = ContextAssembler(
max_tokens=config.max_context_tokens
)
self.generator = LLMGenerator(
model=config.llm_model,
temperature=config.temperature
)
def query(self, user_question: str) -> dict:
# Stage 1: Rewrite query
rewritten = self.rewriter.rewrite(user_question)
# Stage 2: Retrieve
retrieved_chunks = self.retriever.search(
query=rewritten,
top_k=config.top_k,
filters=config.metadata_filters
)
# Stage 3: Assemble context
context = self.assembler.assemble(
query=user_question,
chunks=retrieved_chunks
)
# Stage 4: Generate
response = self.generator.generate(
prompt=config.prompt_template,
context=context,
question=user_question
)
return {
"answer": response.text,
"sources": response.citations,
"retrieved_chunks": retrieved_chunks,
"query_rewrite": rewritten
}
Configuration Management
Production pipelines require explicit configuration:
config = Config(
# Retrieval
dense_model="BAAI/bge-large-en-v1.5",
vector_dimension=1024,
dense_weight=0.6,
sparse_weight=0.4,
top_k=50,
bm25_k1=1.5,
bm25_b=0.75,
# Context
max_context_tokens=4000,
chunk_overlap_tokens=100,
# Generation
llm_model="gpt-4o-mini",
temperature=0.2,
max_tokens=500,
# Pipeline behavior
enable_reranking=True,
reranker_model="cross-encoder/ms-marco-MiniLM-L-6-v2",
enable_query_rewrite=False
)
pipeline = RAGPipeline(config)
Store configuration in environment variables or config files, not hardcoded in source. This enables A/B testing different configurations without code changes.
Pipeline Testing
Before production deployment, test each pipeline stage independently:
def test_pipeline(pipeline: RAGPipeline, test_queries: list[dict]):
"""Test pipeline with expected outputs."""
results = []
for query_data in test_queries:
result = pipeline.query(query_data["question"])
# Check retrieval quality
retrieved_ids = [c["id"] for c in result["retrieved_chunks"]]
expected_ids = query_data.get("expected_chunks", [])
hit_rate = len(set(retrieved_ids) & set(expected_ids)) / len(expected_ids)
results.append({
"query": query_data["question"],
"answer": result["answer"],
"retrieval_hit_rate": hit_rate,
"answer_correct": result["answer"].lower() ==
query_data["expected_answer"].lower()
})
return results
test_queries = [
{
"question": "How do I reset the admin password?",
"expected_chunks": ["admin_guide.md"],
"expected_answer": "Use the --reset-password flag with the admin CLI command"
},
# Add 20-50 test queries covering diverse query types
]
A passing test only validates that the pipeline runs without errors. It does not validate answer quality. Build evaluation datasets with ground truth answers for quality assessment.
Build a complete RAG pipeline from scratch using the components in this chapter. Query it with 10 test questions and manually evaluate answer quality. Identify which failure mode explains each poor answer.