15. Distributed Search

Chapter 15 of 18 · 25 min

Scaling vector search beyond a single machine requires distributing both the index and the query workload. There are two primary approaches: partition-based and replicated search.

Partition-Based Distribution

Split vectors across nodes by ID range or locality:

Node 0: vectors 0-999,999
Node 1: vectors 1,000,000-1,999,999
Node 2: vectors 2,000,000-2,999,999
Node 3: vectors 3,000,000-3,999,999
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import List

@dataclass
class SearchResult:
    node_id: int
    indices: np.ndarray
    distances: np.ndarray

class DistributedSearchCoordinator:
    def __init__(self, node_urls: List[str], n_shards: int):
        self.node_urls = node_urls
        self.n_shards = n_shards
    
    async def search(self, query: np.ndarray, k: int) -> tuple[np.ndarray, np.ndarray]:
        """
        Scatter-gather pattern: send query to all nodes, merge results.
        """
        async with aiohttp.ClientSession() as session:
            tasks = [
                self._search_node(session, url, node_id, query, k)
                for node_id, url in enumerate(self.node_urls)
            ]
            
            results = await asyncio.gather(*tasks)
        
        # Merge results from all nodes
        all_distances = np.concatenate([r.distances for r in results])
        all_node_ids = np.concatenate([
            np.full(len(r.indices), r.node_id) for r in results
        ])
        all_indices = np.concatenate([r.indices for r in results])
        
        # Get top-k globally
        top_k = np.argpartition(all_distances, k)[:k]
        sorted_top_k = np.argsort(all_distances[top_k])
        
        return all_distances[top_k][sorted_top_k], all_indices[top_k][sorted_top_k]
    
    async def _search_node(self, session, url: str, node_id: int, 
                          query: np.ndarray, k: int) -> SearchResult:
        async with session.post(
            f"{url}/search",
            json={"vector": query.tolist(), "k": k}
        ) as response:
            data = await response.json()
            return SearchResult(
                node_id=node_id,
                indices=np.array(data["indices"]),
                distances=np.array(data["distances"])
            )

Approximate Distributed HNSW

Full HNSW doesn't partition cleanly—each node needs visibility to the full graph for multi-layer traversal. The practical solution is approximate coordination:

class ApproximateDistributedHNSW:
    """
    Each node maintains a full HNSW index for its shard.
    A lightweight routing index (built on sketch/sampling) helps
    direct queries to most relevant shards.
    """
    
    def __init__(self, nodes: List[HNSWIndex], routing_index):
        self.nodes = nodes
        self.routing_index = routing_index  # e.g., random projection tree
    
    def search(self, query: np.ndarray, k: int, n_probe: int = 2) -> tuple[np.ndarray, np.ndarray]:
        """
        n_probe: number of nodes to query based on routing.
        Higher n_probe = better recall, more latency.
        """
        # Determine which nodes are most likely to have relevant vectors
        node_scores = self.routing_index.score_nodes(query, n_probe)
        top_nodes = sorted(node_scores, key=node_scores.get, reverse=True)[:n_probe]
        
        # Search only top nodes
        all_results = []
        for node_id in top_nodes:
            results = self.nodes[node_id].search(query, k)
            all_results.append((node_id, results))
        
        # Merge and return top-k
        return self._merge_results(all_results, k)

Consistency and Replication

For production, replicate shards for fault tolerance:

Primary Shard 0 → Replica A Shard 0, Replica B Shard 0
Primary Shard 1 → Replica A Shard 1, Replica B Shard 1

Use eventual consistency for search (reads can hit any replica) and strong consistency for writes.

Failure Modes

Coordinated omission: If the coordinator node fails, the entire search fails. Implement coordinator election or stateless request routing.

Network latency dominates: A 10ms network hop between nodes can exceed local memory latency. For latency-critical workloads, keep replicas co-located in the same availability zone.

Partition skew: Query patterns may overload certain partitions. Monitor per-node query rates and rebalance when load variance exceeds 2x.


EXERCISE

Implement a 4-node distributed search system using Python's multiprocessing or gRPC. Measure latency and recall as you increase the number of probed nodes from 1 to 4. Plot the latency-recall trade-off curve.

# Expected: recall improves rapidly with first few probes, then plateaus
# Latency increases linearly with probes
# Find the optimal operating point for your use case