14. Text Clustering
Chapter 14 of 18 · 20 min
Text clustering groups similar documents without supervision. Effective clustering combines appropriate embeddings with suitable clustering algorithms tuned to text data.
Embeddings + Clustering Pipeline
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import silhouette_score, calinski_harabasz_score
from sentence_transformers import SentenceTransformer
import numpy as np
class TextClusterer:
def __init__(self, embedding_model="all-MiniLM-L6-v2"):
self.embedder = SentenceTransformer(embedding_model)
self.cluster_labels = None
self.embeddings = None
def fit_predict(self, documents: list[str], n_clusters: int = None,
method: str = "kmeans") -> np.ndarray:
"""Cluster documents and return labels."""
self.embeddings = self.embedder.encode(documents, show_progress_bar=True)
if n_clusters is None:
n_clusters = self._find_optimal_clusters()
if method == "kmeans":
clusterer = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
elif method == "hierarchical":
clusterer = AgglomerativeClustering(n_clusters=n_clusters)
else:
raise ValueError(f"Unknown method: {method}")
self.cluster_labels = clusterer.fit_predict(self.embeddings)
return self.cluster_labels
def _find_optimal_clusters(self, max_k: int = 20) -> int:
"""Find optimal k using silhouette score."""
scores = []
K_range = range(2, min(max_k + 1, len(self.embeddings)))
for k in K_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = kmeans.fit_predict(self.embeddings)
if len(set(labels)) > 1:
score = silhouette_score(self.embeddings, labels)
scores.append((k, score))
optimal_k = max(scores, key=lambda x: x[1])[0]
print(f"Optimal clusters: {optimal_k} (silhouette: {max(s for _, s in scores):.3f})")
return optimal_k
def get_cluster_summary(self, documents: list[str]) -> list[dict]:
"""Generate summary for each cluster."""
summaries = []
for cluster_id in sorted(set(self.cluster_labels)):
cluster_docs = [doc for doc, label in zip(documents, self.cluster_labels)
if label == cluster_id]
cluster_embeddings = self.embeddings[[i for i, l in enumerate(self.cluster_labels)
if l == cluster_id]]
centroid = cluster_embeddings.mean(axis=0)
# Find representative document (closest to centroid)
distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
representative_idx = distances.argmin()
summaries.append({
"cluster_id": int(cluster_id),
"size": len(cluster_docs),
"representative": cluster_docs[representative_idx][:200] + "...",
"sample_docs": cluster_docs[:3]
})
return summaries
Hierarchical Clustering for Taxonomy
For document taxonomies, hierarchical clustering preserves relationships:
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
from scipy.spatial.distance import pdist
class HierarchicalTextClusterer(TextClusterer):
def fit_hierarchical(self, documents: list[str], n_levels: int = 3) -> dict:
self.embeddings = self.embedder.encode(documents, show_progress_bar=True)
# Compute linkage matrix
distances = pdist(self.embeddings, metric='cosine')
linkage_matrix = linkage(distances, method='ward')
# Cut tree at different levels
clusters = {}
for level in range(1, n_levels + 1):
n_clusters = 2 ** level
clusters[level] = fcluster(linkage_matrix, n_clusters, criterion='maxclust')
self.linkage_matrix = linkage_matrix
return clusters
def plot_dendrogram(self, output_path: str = "dendrogram.png"):
"""Plot cluster dendrogram."""
import matplotlib.pyplot as plt
plt.figure(figsize=(20, 10))
dendrogram(
self.linkage_matrix,
leaf_rotation=90,
leaf_font_size=8,
truncate_mode='lastp',
p=50
)
plt.title("Document Cluster Dendrogram")
plt.savefig(output_path)
plt.close()
EXERCISE
Build a document organization system that automatically categorizes incoming documents into a hierarchical taxonomy. Implement the ability to merge/split clusters as the collection evolves.