RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /RAG Systems: Part 1
  6. /Ch. 22
RAG Systems: Part 1

22. Part 1 Final Project

Chapter 22 of 22 · 30 min
KEY INSIGHT

This capstone integrates all course concepts. A well-organized pipeline with proper configuration management is more maintainable than clever one-liners.

This project builds a complete production-ready RAG system integrating all concepts from the course. Expect to spend 2-4 hours on this capstone.

Project Requirements

Build a RAG pipeline that:

  1. Ingests a corpus of at least 100 documents (20-50KB of text total)
  2. Chunks documents with configurable chunk size and overlap
  3. Implements hybrid retrieval combining dense and sparse methods
  4. Includes reranking with a cross-encoder
  5. Assembles context respecting token limits
  6. Generates responses using a prompt with citation requirements
  7. Achieves Hit Rate@10 > 0.85 and MRR@10 > 0.5 on test queries

Starter Code Structure

# project structure
rag_project/
├── config.py          # All configuration parameters
├── ingestion.py       # Document loading and chunking
├── retrieval.py       # Dense, sparse, hybrid retrieval
├── reranker.py        # Cross-encoder reranking
├── context.py         # Context assembly
├── generation.py      # LLM integration
├── pipeline.py        # End-to-end pipeline
├── evaluation.py      # Hit rate and MRR calculation
└── main.py            # Entry point and CLI

# config.py
from dataclasses import dataclass

@dataclass
class RAGConfig:
    # Ingestion
    chunk_size: int = 500
    chunk_overlap: int = 50
    separator: str = "\n\n"
    
    # Embedding
    embedding_model: str = "BAAI/bge-large-en-v1.5"
    embedding_dimension: int = 1024
    
    # Retrieval
    dense_weight: float = 0.6
    sparse_weight: float = 0.4
    top_k_initial: int = 50
    
    # Reranking
    reranker_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
    top_k_final: int = 10
    
    # Context
    max_context_tokens: int = 4000
    
    # Generation
    llm_model: str = "gpt-4o-mini"
    temperature: float = 0.2
    max_response_tokens: int = 500

Step-by-Step Implementation

Step 1: Document Ingestion (Chapter 4-6)

Load documents from a directory and chunk them:

# ingestion.py
from pathlib import Path
import tiktoken

def ingest_documents(directory: str, config: RAGConfig) -> list[dict]:
    documents = []
    
    for filepath in Path(directory).rglob("*.txt"):
        with open(filepath, "r") as f:
            text = f.read()
        
        # Chunk text
        chunks = chunk_text(
            text=text,
            chunk_size=config.chunk_size,
            chunk_overlap=config.chunk_overlap,
            separator=config.separator
        )
        
        for i, chunk in enumerate(chunks):
            documents.append({
                "id": f"{filepath.stem}_{i}",
                "text": chunk,
                "metadata": {
                    "source": str(filepath),
                    "chunk_index": i
                }
            })
    
    return documents

def chunk_text(text: str, chunk_size: int, chunk_overlap: int, separator: str) -> list[str]:
    encoder = tiktoken.get_encoding("cl100k_base")
    tokens = encoder.encode(text)
    
    chunks = []
    start = 0
    
    while start < len(tokens):
        end = start + chunk_size
        chunk_tokens = tokens[start:end]
        chunk_text = encoder.decode(chunk_tokens)
        chunks.append(chunk_text)
        start += chunk_size - chunk_overlap
    
    return chunks

Step 2: Index Building (Chapter 8-10)

Index chunks with both dense vectors and BM25:

# retrieval.py
from rank_bm25 import BM25Okapi
from sentence_transformers import SentenceTransformer

class HybridSearchIndex:
    def __init__(self, config: RAGConfig):
        self.config = config
        self.encoder = SentenceTransformer(config.embedding_model)
        self.documents = []
        
    def build(self, chunks: list[dict]):
        self.documents = chunks
        
        # Build dense index
        texts = [d["text"] for d in chunks]
        self.vectors = self.encoder.encode(texts, normalize=True)
        
        # Build BM25 index
        tokenized = [d["text"].split() for d in chunks]
        self.bm25 = BM25Okapi(tokenized)
        
    def search(self, query: str, top_k: int) -> list[dict]:
        # Dense search
        query_vector = self.encoder.encode(query, normalize=True)
        dense_scores = cosine_similarity([query_vector], self.vectors)[0]
        
        # Sparse search
        tokenized_query = query.split()
        sparse_scores = self.bm25.get_scores(tokenized_query)
        normalized_sparse = sparse_scores / (sparse_scores.max() + 1e-8)
        
        # Hybrid fusion
        combined_scores = (
            self.config.dense_weight * dense_scores +
            self.config.sparse_weight * normalized_sparse
        )
        
        top_indices = combined_scores.argsort()[-top_k:][::-1]
        
        return [
            {"id": self.documents[i]["id"], 
             "text": self.documents[i]["text"],
             "score": combined_scores[i]}
            for i in top_indices
        ]

Step 3: Reranking (Chapter 12)

# reranker.py
from sentence_transformers import CrossEncoder

class Reranker:
    def __init__(self, model_name: str):
        self.model = CrossEncoder(model_name)
    
    def rerank(self, query: str, chunks: list[dict], top_k: int = 10) -> list[dict]:
        pairs = [(query, chunk["text"]) for chunk in chunks]
        scores = self.model.predict(pairs)
        
        # Sort by reranker scores
        indexed_scores = list(enumerate(scores))
        reranked_indices = sorted(indexed_scores, key=lambda x: x[1], reverse=True)
        
        return [
            {**chunks[idx], "rerank_score": float(score)}
            for idx, score in reranked_indices[:top_k]
        ]

Step 4: Context Assembly (Chapter 15)

# context.py
import tiktoken

def assemble_context(chunks: list[dict], config: RAGConfig) -> str:
    encoder = tiktoken.get_encoding("cl100k_base")
    context_parts = []
    total_tokens = 0
    
    # Order by rerank score descending
    sorted_chunks = sorted(chunks, key=lambda x: x.get("rerank_score", 0), reverse=True)
    
    for chunk in sorted_chunks:
        chunk_tokens = len(encoder.encode(chunk["text"]))
        
        if total_tokens + chunk_tokens > config.max_context_tokens:
            break
        
        context_parts.append(f"[Source: {chunk['id']}]\n{chunk['text']}")
        total_tokens += chunk_tokens
    
    return "\n\n".join(context_parts)

Step 5: Generation (Chapter 16-17)

# generation.py
from openai import OpenAI

PROMPT_TEMPLATE = """You are a helpful assistant answering questions based ONLY on the provided context.

CONTEXT:
{context}

INSTRUCTIONS:
1. Answer the question using ONLY information from the context above.
2. Cite sources using [Source: ID] notation.
3. If the context does not contain the answer, say so.

QUESTION: {question}

ANSWER:"""

class RAGGenerator:
    def __init__(self, config: RAGConfig):
        self.client = OpenAI()
        self.config = config
    
    def generate(self, question: str, context: str) -> str:
        prompt = PROMPT_TEMPLATE.format(context=context, question=question)
        
        response = self.client.chat.completions.create(
            model=self.config.llm_model,
            messages=[{"role": "user", "content": prompt}],
            temperature=self.config.temperature,
            max_tokens=self.config.max_response_tokens
        )
        
        return response.choices[0].message.content

Step 6: End-to-End Pipeline

# pipeline.py
class RAGPipeline:
    def __init__(self, config: RAGConfig):
        self.config = config
        self.index = HybridSearchIndex(config)
        self.reranker = Reranker(config.reranker_model)
        self.generator = RAGGenerator(config)
    
    def build_index(self, documents: list[dict]):
        chunks = ingest_documents(documents)  # Process with chunking
        self.index.build(chunks)
    
    def query(self, question: str) -> dict:
        # Retrieval
        initial_results = self.index.search(question, self.config.top_k_initial)
        
        # Reranking
        reranked_results = self.reranker.rerank(
            question, 
            initial_results, 
            self.config.top_k_final
        )
        
        # Context assembly
        context = assemble_context(reranked_results, self.config)
        
        # Generation
        answer = self.generator.generate(question, context)
        
        return {
            "answer": answer,
            "sources": [r["id"] for r in reranked_results],
            "num_chunks_retrieved": len(reranked_results)
        }

Step 7: Evaluation

# evaluation.py
def evaluate_pipeline(pipeline: RAGPipeline, test_queries: list[dict]) -> dict:
    """Evaluate hit rate and MRR."""
    all_retrieved = []
    all_labels = []
    
    for query_data in test_queries:
        results = pipeline.index.search(query_data["question"], 50)
        retrieved_ids = [r["id"] for r in results]
        
        # Create relevance labels
        labels = [
            2 if qid in query_data.get("relevant_chunks", []) 
            else 0 
            for qid in [d["id"] for d in pipeline.index.documents]
        ]
        
        all_retrieved.append(retrieved_ids)
        all_labels.append(labels)
    
    hit_rate = calculate_hit_rate(all_labels, all_retrieved, k=10)
    mrr = calculate_mrr(all_labels, all_retrieved)
    
    return {
        "hit_rate@10": hit_rate,
        "mrr@10": mrr,
    }

Submission Criteria

Your submission should include:

  1. Working pipeline code that ingests documents, answers queries, and generates citations
  2. At least 50 test queries with ground truth relevance judgments
  3. Evaluation results showing Hit Rate@10 > 0.85, MRR@10 > 0.5
  4. A README explaining your architecture decisions and any optimization applied
  5. Analysis of 5-10 failed queries explaining why retrieval missed relevant chunks

Hints and Common Pitfalls

  • If Hit Rate is low, check embedding model relevance to your document corpus
  • If MRR is low but Hit Rate is OK, add reranking
  • If generation fabricates, lower temperature to 0.1-0.2
  • If context exceeds limits, reduce max_context_tokens and top_k_initial
  • If BM25 returns counterintuitive results, tune k1 and b parameters

Extension Challenges

After meeting base requirements, attempt these optional challenges:

  1. Implement query decomposition for compound questions
  2. Add metadata filtering to support section-scoped queries
  3. Support multi-modal documents (tables, diagrams as images)
  4. Implement response streaming with progressive context reveal
  5. Build a simple web UI for human evaluation of answers
EXERCISE

Complete the final project. Submit evaluation results showing Hit Rate@10 and MRR@10 meeting targets. Including analysis of failure modes demonstrates understanding beyond the happy path.

← Chapter 21
RAG Pipeline Optimization
Course complete →
Browse all courses