11. Product Quantization
Product Quantization (PQ) compresses high-dimensional vectors by splitting them into subspaces and representing each subspace with a compact code. This enables storing millions of vectors in RAM while sacrificing some accuracy.
The Core Concept
Given a vector v ∈ R^d, PQ splits it into m subspaces of size d/m each:
v = [v₁, v₂, ..., v_m] where each v_i ∈ R^(d/m)
Each subspace is quantized independently using k-means clustering with k = 256 centroids (8 bits per subspace). A vector becomes a tuple of m centroid IDs, totaling m × 1 byte per vector.
Subspace Encoding
The standard PQ implementation uses 256 centroids per subspace because 8 bits encodes exactly that range and matches typical SIMD vector widths. For a 128-dimensional vector split into 8 subspaces, each subspace has 16 dimensions.
import numpy as np
from sklearn.cluster import MiniBatchKMeans
class ProductQuantizer:
def __init__(self, dim: int, m_subspaces: int = 8, k_centroids: int = 256):
self.dim = dim
self.m = m_subspaces
self.k = k_centroids
self.subspace_dim = dim // m_subspaces
self.codebooks = [] # m arrays of shape (k, subspace_dim)
self.code_dtype = np.uint8 if k_centroids <= 256 else np.uint16
def fit(self, vectors: np.ndarray):
"""
Train PQ on a set of training vectors.
Training vectors should be representative of your data distribution.
"""
assert vectors.shape[1] == self.dim
# Train on a sample if dataset is large (PQ is memory-intensive)
training_sample = vectors if len(vectors) < 1_000_000 else vectors[
np.random.choice(len(vectors), 1_000_000, replace=False)
]
for i in range(self.m):
start = i * self.subspace_dim
end = start + self.subspace_dim
subspace_vectors = training_sample[:, start:end]
kmeans = MiniBatchKMeans(
n_clusters=self.k,
random_state=42,
batch_size=1024
).fit(subspace_vectors)
self.codebooks.append(kmeans.cluster_centers_)
return self
def encode(self, vectors: np.ndarray) -> np.ndarray:
"""
Encode vectors as PQ codes.
Returns shape (n_vectors, m) with centroid IDs.
"""
codes = np.zeros((len(vectors), self.m), dtype=self.code_dtype)
for i in range(self.m):
start = i * self.subspace_dim
end = start + self.subspace_dim
subspace_vectors = vectors[:, start:end]
# Compute distances to all centroids (m, k, subspace_dim)
distances = np.linalg.norm(
subspace_vectors[:, np.newaxis, :] - self.codebooks[i][np.newaxis, :, :],
axis=2
)
codes[:, i] = np.argmin(distances, axis=1)
return codes
def decode(self, codes: np.ndarray) -> np.ndarray:
"""
Reconstruct vectors from PQ codes.
Returns approximate vectors, not exact originals.
"""
reconstructed = np.zeros((len(codes), self.dim), dtype=np.float32)
for i in range(self.m):
start = i * self.subspace_dim
end = start + self.subspace_dim
reconstructed[:, start:end] = self.codebooks[i][codes[:, i]]
return reconstructed
Compression Ratio
Raw float32 storage: d × 4 bytes per vector
PQ storage: m × log₂(k) / 8 bytes per vector
For dim=128, m=8, k=256: 128 × 4 = 512 bytes → 8 bytes. That's 64x compression.
Reconstruction Error
The approximation error depends on the codebook quality and subspace independence assumption. Vectors with high variance in a particular direction need more centroids for that subspace. The recall loss is typically 5-15% compared to exact search.
Implement a PQ encoder and measure reconstruction quality across different m values (4, 8, 16) on a dataset of your choice. Calculate mean squared error between original and reconstructed vectors, and plot how reconstruction quality degrades with fewer bits.
# Target: find minimum m that maintains acceptable reconstruction quality
# Trade-off: smaller m = more compression but worse accuracy