10. Embedding Pipeline

Chapter 10 of 22 · 30 min

Embedding converts text into numerical vectors that capture semantic meaning. The quality of your embeddings determines retrieval quality. Bad embeddings produce irrelevant results regardless of how sophisticated your retrieval algorithm is.

Understanding embedding models

An embedding model maps text to a vector space where similar texts cluster together. "Return policy" and "refund procedure" should have similar vectors. "Return policy" and "product specifications" should be distant.

Modern embedding models are transformer-based neural networks trained on massive text corpora using contrastive learning: similar text pairs are pulled together, dissimilar pairs are pushed apart.

Sentence transformers for local embedding

The sentence-transformers library provides high-quality open-source embedding models.

from sentence_transformers import SentenceTransformer

# Load a model
model = SentenceTransformer("all-MiniLM-L6-v2")

# Encode a single text
embedding = model.encode("What is the return policy?")

# Encode multiple texts (batch)
embeddings = model.encode([
    "Return policy information",
    "Product specifications",
    "Customer support contacts"
])

print(f"Single embedding shape: {embedding.shape}")
print(f"Batch embedding shape: {embeddings.shape}")

all-MiniLM-L6-v2 produces 384-dimensional vectors. Smaller, faster models like all-MiniLM-L6-v2 are good for prototyping. Larger models like BAAI/bge-base-en-v1.5 (768 dimensions) perform better on benchmarks.

Embedding dimensions and quality

More dimensions capture more nuanced relationships but require more storage and compute. For most use cases, 384 to 768 dimensions is the sweet spot.

# Compare model sizes
models = {
    "all-MiniLM-L6-v2": 384,
    "all-mpnet-base-v2": 768,
    "bge-base-en-v1.5": 768,
    "text-embedding-3-large": 3072
}

for name, dims in models.items():
    print(f"{name}: {dims} dimensions")

Building the embedding pipeline

from sentence_transformers import SentenceTransformer
import tiktoken
from typing import Iterator

class EmbeddingPipeline:
    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        batch_size: int = 32,
        max_tokens_per_batch: int = 8192
    ):
        self.model = SentenceTransformer(model_name)
        self.batch_size = batch_size
        self.max_tokens_per_batch = max_tokens_per_batch
        self.encoder = tiktoken.get_encoding("cl100k_base")

    def count_tokens(self, text: str) -> int:
        """Count tokens in text."""
        return len(self.encoder.encode(text))

    def batch_by_token_limit(
        self,
        chunks: list[str]
    ) -> Iterator[list[str]]:
        """Batch chunks respecting token limit."""
        current_batch = []
        current_tokens = 0

        for chunk in chunks:
            chunk_tokens = self.count_tokens(chunk)

            if current_tokens + chunk_tokens > self.max_tokens_per_batch:
                if current_batch:
                    yield current_batch
                    current_batch = [chunk]
                    current_tokens = chunk_tokens
                else:
                    # Single chunk exceeds limit - yield anyway
                    yield [chunk]
                    current_batch = []
                    current_tokens = 0
            else:
                current_batch.append(chunk)
                current_tokens += chunk_tokens

        if current_batch:
            yield current_batch

    def embed_chunks(
        self,
        chunks: list[str],
        show_progress: bool = True
    ) -> list[list[float]]:
        """Embed a list of chunks with batching."""
        all_embeddings = []

        for batch in self.batch_by_token_limit(chunks):
            batch_embeddings = self.model.encode(
                batch,
                show_progress_bar=show_progress,
                batch_size=self.batch_size
            )
            all_embeddings.extend(batch_embeddings.tolist())

        return all_embeddings

Handling long texts

Most embedding models have a context limit (typically 512 or 768 tokens). Text exceeding this limit must be truncated or chunked before embedding.

def embed_with_truncation(
    text: str,
    model: SentenceTransformer,
    max_tokens: int = 512
) -> list[float]:
    """Embed text, truncating to max tokens."""
    encoder = tiktoken.get_encoding("cl100k_base")
    tokens = encoder.encode(text)

    if len(tokens) > max_tokens:
        tokens = tokens[:max_tokens]
        text = encoder.decode(tokens)

    return model.encode(text).tolist()

def embed_long_text_by_sections(
    text: str,
    model: SentenceTransformer,
    max_tokens_per_section: int = 512
) -> list[list[float]]:
    """Embed long text by sectioning, return multiple embeddings."""
    encoder = tiktoken.get_encoding("cl100k_base")
    tokens = encoder.encode(text)

    section_embeddings = []
    for i in range(0, len(tokens), max_tokens_per_section):
        section_tokens = tokens[i:i + max_tokens_per_section]
        section_text = encoder.decode(section_tokens)
        embedding = model.encode(section_text)
        section_embeddings.append(embedding.tolist())

    return section_embeddings

Embedding normalization

Some similarity calculations (cosine similarity) work better with normalized vectors. Normalize embeddings to unit length.

import numpy as np

def normalize_embeddings(embeddings: list[list[float]]) -> list[list[float]]:
    """L2 normalize embeddings."""
    embeddings_array = np.array(embeddings)
    norms = np.linalg.norm(embeddings_array, axis=1, keepdims=True)
    normalized = embeddings_array / norms
    return normalized.tolist()

# After embedding
embeddings = pipeline.embed_chunks(chunks)
embeddings = normalize_embeddings(embeddings)

Processing documents end-to-end

Combine ingestion, chunking, and embedding into one pipeline.

from dataclasses import dataclass
from typing import Optional

@dataclass
class EmbeddedChunk:
    chunk_id: str
    text: str
    embedding: list[float]
    metadata: dict

class DocumentEmbeddingPipeline:
    def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
        self.embedding_pipeline = EmbeddingPipeline(model_name=embedding_model)
        self.splitter = TokenAwareRecursiveSplitter(chunk_size=512, chunk_overlap=50)

    def process_document(
        self,
        text: str,
        document_id: str,
        metadata: Optional[dict] = None
    ) -> list[EmbeddedChunk]:
        """Process a document into embedded chunks."""
        chunks = self.splitter.split_text(text)

        embeddings = self.embedding_pipeline.embed_chunks(chunks)

        embedded_chunks = []
        for i, (chunk_text, embedding) in enumerate(zip(chunks, embeddings)):
            chunk_metadata = {
                **(metadata or {}),
                "document_id": document_id,
                "chunk_index": i
            }

            embedded_chunks.append(EmbeddedChunk(
                chunk_id=f"{document_id}_{i}",
                text=chunk_text,
                embedding=embedding,
                metadata=chunk_metadata
            ))

        return embedded_chunks

    def process_documents(
        self,
        documents: list[dict]
    ) -> list[EmbeddedChunk]:
        """Process multiple documents."""
        all_chunks = []

        for doc in documents:
            chunks = self.process_document(
                text=doc["text"],
                document_id=doc.get("id", f"doc_{len(all_chunks)}"),
                metadata=doc.get("metadata", {})
            )
            all_chunks.extend(chunks)

        return all_chunks

Batch processing for large document sets

When processing thousands of documents, batch operations and progress tracking matter.

from tqdm import tqdm

def process_documents_with_progress(
    pipeline: DocumentEmbeddingPipeline,
    documents: list[dict],
    checkpoint_path: Optional[str] = None
) -> list[EmbeddedChunk]:
    """Process documents with progress tracking and checkpointing."""
    all_chunks = []
    processed = 0

    # Resume from checkpoint if exists
    if checkpoint_path and Path(checkpoint_path).exists():
        checkpoint = json.loads(Path(checkpoint_path).read_text())
        all_chunks = checkpoint["chunks"]
        processed = checkpoint["processed"]
        documents = documents[processed:]

    for doc in tqdm(documents, desc="Processing documents"):
        try:
            chunks = pipeline.process_document(
                text=doc["text"],
                document_id=doc.get("id", f"doc_{processed}"),
                metadata=doc.get("metadata", {})
            )
            all_chunks.extend(chunks)
            processed += 1

            # Checkpoint every 100 documents
            if processed % 100 == 0 and checkpoint_path:
                Path(checkpoint_path).write_text(json.dumps({
                    "chunks": all_chunks,
                    "processed": processed
                }))

        except Exception as e:
            print(f"Error processing document {processed}: {e}")

    return all_chunks

Verifying embedding quality

Always test your embeddings before building a production system.

def test_embedding_quality(model: SentenceTransformer):
    """Test embedding model on sample queries."""

    queries = [
        "return policy for electronics",
        "refund procedure",
        "warranty information"
    ]

    documents = [
        "Electronics can be returned within 30 days of purchase.",
        "To get a refund, submit a request through the portal.",
        "All products come with a one-year warranty."
    ]

    query_embeddings = model.encode(queries)
    doc_embeddings = model.encode(documents)

    # Compute similarities
    for i, query in enumerate(queries):
        similarities = cosine_similarity(
            [query_embeddings[i]],
            doc_embeddings
        )[0]

        print(f"\nQuery: '{query}'")
        for j, (doc, sim) in enumerate(zip(documents, similarities)):
            print(f"  [{sim:.3f}] {doc}")
EXERCISE

Take 10 sentences from different domains (3 technology, 3 finance, 4 general). Create embeddings for all 10. Compute the cosine similarity matrix. Verify that sentences from the same domain have higher similarity to each other than to sentences from different domains. Report the average within-domain vs between-domain similarity.