10. Embedding Pipeline
Embedding converts text into numerical vectors that capture semantic meaning. The quality of your embeddings determines retrieval quality. Bad embeddings produce irrelevant results regardless of how sophisticated your retrieval algorithm is.
Understanding embedding models
An embedding model maps text to a vector space where similar texts cluster together. "Return policy" and "refund procedure" should have similar vectors. "Return policy" and "product specifications" should be distant.
Modern embedding models are transformer-based neural networks trained on massive text corpora using contrastive learning: similar text pairs are pulled together, dissimilar pairs are pushed apart.
Sentence transformers for local embedding
The sentence-transformers library provides high-quality open-source embedding models.
from sentence_transformers import SentenceTransformer
# Load a model
model = SentenceTransformer("all-MiniLM-L6-v2")
# Encode a single text
embedding = model.encode("What is the return policy?")
# Encode multiple texts (batch)
embeddings = model.encode([
"Return policy information",
"Product specifications",
"Customer support contacts"
])
print(f"Single embedding shape: {embedding.shape}")
print(f"Batch embedding shape: {embeddings.shape}")
all-MiniLM-L6-v2 produces 384-dimensional vectors. Smaller, faster models like all-MiniLM-L6-v2 are good for prototyping. Larger models like BAAI/bge-base-en-v1.5 (768 dimensions) perform better on benchmarks.
Embedding dimensions and quality
More dimensions capture more nuanced relationships but require more storage and compute. For most use cases, 384 to 768 dimensions is the sweet spot.
# Compare model sizes
models = {
"all-MiniLM-L6-v2": 384,
"all-mpnet-base-v2": 768,
"bge-base-en-v1.5": 768,
"text-embedding-3-large": 3072
}
for name, dims in models.items():
print(f"{name}: {dims} dimensions")
Building the embedding pipeline
from sentence_transformers import SentenceTransformer
import tiktoken
from typing import Iterator
class EmbeddingPipeline:
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
batch_size: int = 32,
max_tokens_per_batch: int = 8192
):
self.model = SentenceTransformer(model_name)
self.batch_size = batch_size
self.max_tokens_per_batch = max_tokens_per_batch
self.encoder = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
return len(self.encoder.encode(text))
def batch_by_token_limit(
self,
chunks: list[str]
) -> Iterator[list[str]]:
"""Batch chunks respecting token limit."""
current_batch = []
current_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk)
if current_tokens + chunk_tokens > self.max_tokens_per_batch:
if current_batch:
yield current_batch
current_batch = [chunk]
current_tokens = chunk_tokens
else:
# Single chunk exceeds limit - yield anyway
yield [chunk]
current_batch = []
current_tokens = 0
else:
current_batch.append(chunk)
current_tokens += chunk_tokens
if current_batch:
yield current_batch
def embed_chunks(
self,
chunks: list[str],
show_progress: bool = True
) -> list[list[float]]:
"""Embed a list of chunks with batching."""
all_embeddings = []
for batch in self.batch_by_token_limit(chunks):
batch_embeddings = self.model.encode(
batch,
show_progress_bar=show_progress,
batch_size=self.batch_size
)
all_embeddings.extend(batch_embeddings.tolist())
return all_embeddings
Handling long texts
Most embedding models have a context limit (typically 512 or 768 tokens). Text exceeding this limit must be truncated or chunked before embedding.
def embed_with_truncation(
text: str,
model: SentenceTransformer,
max_tokens: int = 512
) -> list[float]:
"""Embed text, truncating to max tokens."""
encoder = tiktoken.get_encoding("cl100k_base")
tokens = encoder.encode(text)
if len(tokens) > max_tokens:
tokens = tokens[:max_tokens]
text = encoder.decode(tokens)
return model.encode(text).tolist()
def embed_long_text_by_sections(
text: str,
model: SentenceTransformer,
max_tokens_per_section: int = 512
) -> list[list[float]]:
"""Embed long text by sectioning, return multiple embeddings."""
encoder = tiktoken.get_encoding("cl100k_base")
tokens = encoder.encode(text)
section_embeddings = []
for i in range(0, len(tokens), max_tokens_per_section):
section_tokens = tokens[i:i + max_tokens_per_section]
section_text = encoder.decode(section_tokens)
embedding = model.encode(section_text)
section_embeddings.append(embedding.tolist())
return section_embeddings
Embedding normalization
Some similarity calculations (cosine similarity) work better with normalized vectors. Normalize embeddings to unit length.
import numpy as np
def normalize_embeddings(embeddings: list[list[float]]) -> list[list[float]]:
"""L2 normalize embeddings."""
embeddings_array = np.array(embeddings)
norms = np.linalg.norm(embeddings_array, axis=1, keepdims=True)
normalized = embeddings_array / norms
return normalized.tolist()
# After embedding
embeddings = pipeline.embed_chunks(chunks)
embeddings = normalize_embeddings(embeddings)
Processing documents end-to-end
Combine ingestion, chunking, and embedding into one pipeline.
from dataclasses import dataclass
from typing import Optional
@dataclass
class EmbeddedChunk:
chunk_id: str
text: str
embedding: list[float]
metadata: dict
class DocumentEmbeddingPipeline:
def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
self.embedding_pipeline = EmbeddingPipeline(model_name=embedding_model)
self.splitter = TokenAwareRecursiveSplitter(chunk_size=512, chunk_overlap=50)
def process_document(
self,
text: str,
document_id: str,
metadata: Optional[dict] = None
) -> list[EmbeddedChunk]:
"""Process a document into embedded chunks."""
chunks = self.splitter.split_text(text)
embeddings = self.embedding_pipeline.embed_chunks(chunks)
embedded_chunks = []
for i, (chunk_text, embedding) in enumerate(zip(chunks, embeddings)):
chunk_metadata = {
**(metadata or {}),
"document_id": document_id,
"chunk_index": i
}
embedded_chunks.append(EmbeddedChunk(
chunk_id=f"{document_id}_{i}",
text=chunk_text,
embedding=embedding,
metadata=chunk_metadata
))
return embedded_chunks
def process_documents(
self,
documents: list[dict]
) -> list[EmbeddedChunk]:
"""Process multiple documents."""
all_chunks = []
for doc in documents:
chunks = self.process_document(
text=doc["text"],
document_id=doc.get("id", f"doc_{len(all_chunks)}"),
metadata=doc.get("metadata", {})
)
all_chunks.extend(chunks)
return all_chunks
Batch processing for large document sets
When processing thousands of documents, batch operations and progress tracking matter.
from tqdm import tqdm
def process_documents_with_progress(
pipeline: DocumentEmbeddingPipeline,
documents: list[dict],
checkpoint_path: Optional[str] = None
) -> list[EmbeddedChunk]:
"""Process documents with progress tracking and checkpointing."""
all_chunks = []
processed = 0
# Resume from checkpoint if exists
if checkpoint_path and Path(checkpoint_path).exists():
checkpoint = json.loads(Path(checkpoint_path).read_text())
all_chunks = checkpoint["chunks"]
processed = checkpoint["processed"]
documents = documents[processed:]
for doc in tqdm(documents, desc="Processing documents"):
try:
chunks = pipeline.process_document(
text=doc["text"],
document_id=doc.get("id", f"doc_{processed}"),
metadata=doc.get("metadata", {})
)
all_chunks.extend(chunks)
processed += 1
# Checkpoint every 100 documents
if processed % 100 == 0 and checkpoint_path:
Path(checkpoint_path).write_text(json.dumps({
"chunks": all_chunks,
"processed": processed
}))
except Exception as e:
print(f"Error processing document {processed}: {e}")
return all_chunks
Verifying embedding quality
Always test your embeddings before building a production system.
def test_embedding_quality(model: SentenceTransformer):
"""Test embedding model on sample queries."""
queries = [
"return policy for electronics",
"refund procedure",
"warranty information"
]
documents = [
"Electronics can be returned within 30 days of purchase.",
"To get a refund, submit a request through the portal.",
"All products come with a one-year warranty."
]
query_embeddings = model.encode(queries)
doc_embeddings = model.encode(documents)
# Compute similarities
for i, query in enumerate(queries):
similarities = cosine_similarity(
[query_embeddings[i]],
doc_embeddings
)[0]
print(f"\nQuery: '{query}'")
for j, (doc, sim) in enumerate(zip(documents, similarities)):
print(f" [{sim:.3f}] {doc}")
Take 10 sentences from different domains (3 technology, 3 finance, 4 general). Create embeddings for all 10. Compute the cosine similarity matrix. Verify that sentences from the same domain have higher similarity to each other than to sentences from different domains. Report the average within-domain vs between-domain similarity.