15. Distributed Search
Scaling vector search beyond a single machine requires distributing both the index and the query workload. There are two primary approaches: partition-based and replicated search.
Partition-Based Distribution
Split vectors across nodes by ID range or locality:
Node 0: vectors 0-999,999
Node 1: vectors 1,000,000-1,999,999
Node 2: vectors 2,000,000-2,999,999
Node 3: vectors 3,000,000-3,999,999
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List
@dataclass
class SearchResult:
node_id: int
indices: np.ndarray
distances: np.ndarray
class DistributedSearchCoordinator:
def __init__(self, node_urls: List[str], n_shards: int):
self.node_urls = node_urls
self.n_shards = n_shards
async def search(self, query: np.ndarray, k: int) -> tuple[np.ndarray, np.ndarray]:
"""
Scatter-gather pattern: send query to all nodes, merge results.
"""
async with aiohttp.ClientSession() as session:
tasks = [
self._search_node(session, url, node_id, query, k)
for node_id, url in enumerate(self.node_urls)
]
results = await asyncio.gather(*tasks)
# Merge results from all nodes
all_distances = np.concatenate([r.distances for r in results])
all_node_ids = np.concatenate([
np.full(len(r.indices), r.node_id) for r in results
])
all_indices = np.concatenate([r.indices for r in results])
# Get top-k globally
top_k = np.argpartition(all_distances, k)[:k]
sorted_top_k = np.argsort(all_distances[top_k])
return all_distances[top_k][sorted_top_k], all_indices[top_k][sorted_top_k]
async def _search_node(self, session, url: str, node_id: int,
query: np.ndarray, k: int) -> SearchResult:
async with session.post(
f"{url}/search",
json={"vector": query.tolist(), "k": k}
) as response:
data = await response.json()
return SearchResult(
node_id=node_id,
indices=np.array(data["indices"]),
distances=np.array(data["distances"])
)
Approximate Distributed HNSW
Full HNSW doesn't partition cleanly—each node needs visibility to the full graph for multi-layer traversal. The practical solution is approximate coordination:
class ApproximateDistributedHNSW:
"""
Each node maintains a full HNSW index for its shard.
A lightweight routing index (built on sketch/sampling) helps
direct queries to most relevant shards.
"""
def __init__(self, nodes: List[HNSWIndex], routing_index):
self.nodes = nodes
self.routing_index = routing_index # e.g., random projection tree
def search(self, query: np.ndarray, k: int, n_probe: int = 2) -> tuple[np.ndarray, np.ndarray]:
"""
n_probe: number of nodes to query based on routing.
Higher n_probe = better recall, more latency.
"""
# Determine which nodes are most likely to have relevant vectors
node_scores = self.routing_index.score_nodes(query, n_probe)
top_nodes = sorted(node_scores, key=node_scores.get, reverse=True)[:n_probe]
# Search only top nodes
all_results = []
for node_id in top_nodes:
results = self.nodes[node_id].search(query, k)
all_results.append((node_id, results))
# Merge and return top-k
return self._merge_results(all_results, k)
Consistency and Replication
For production, replicate shards for fault tolerance:
Primary Shard 0 → Replica A Shard 0, Replica B Shard 0
Primary Shard 1 → Replica A Shard 1, Replica B Shard 1
Use eventual consistency for search (reads can hit any replica) and strong consistency for writes.
Failure Modes
Coordinated omission: If the coordinator node fails, the entire search fails. Implement coordinator election or stateless request routing.
Network latency dominates: A 10ms network hop between nodes can exceed local memory latency. For latency-critical workloads, keep replicas co-located in the same availability zone.
Partition skew: Query patterns may overload certain partitions. Monitor per-node query rates and rebalance when load variance exceeds 2x.
Implement a 4-node distributed search system using Python's multiprocessing or gRPC. Measure latency and recall as you increase the number of probed nodes from 1 to 4. Plot the latency-recall trade-off curve.
# Expected: recall improves rapidly with first few probes, then plateaus
# Latency increases linearly with probes
# Find the optimal operating point for your use case