16. Index Persistence

Chapter 16 of 18 · 25 min

Vector indexes are expensive to build. Persistence ensures you can recover from failures without rebuilding from scratch. This chapter covers the mechanics of saving and loading HNSW indexes.

Memory-Mapped Storage

The simplest approach is memory-mapping (mmap) the index files:

import mmap
import numpy as np
import struct
from pathlib import Path

class PersistedHNSWIndex:
    """
    HNSW index with memory-mapped persistence.
    Enables instant loading by mapping files directly into address space.
    """
    
    def __init__(self, index_path: Path, m: int = 16, ef_construction: int = 200):
        self.index_path = Path(index_path)
        self.m = m
        self.ef_construction = ef_construction
        
        # These will be memory-mapped
        self._vectors_mm = None
        self._graph_mm = None
    
    def save(self, vectors: np.ndarray, graph: list):
        """
        Save index to disk with memory-mapped layout.
        Layout:
        - Header: metadata (n_vectors, dim, m, max_layer)
        - Vectors: n_vectors × dim × 4 bytes (float32)
        - Graph: variable size, each node's edges
        """
        self.index_path.mkdir(parents=True, exist_ok=True)
        
        # Save vectors as contiguous float32 array
        vectors_path = self.index_path / "vectors.bin"
        vectors.astype(np.float32).tofile(str(vectors_path))
        
        # Save graph structure
        graph_path = self.index_path / "graph.bin"
        with open(graph_path, 'wb') as f:
            for node_edges in graph:
                # Format: [num_edges] [dest1, layer1, dest2, layer2, ...]
                f.write(struct.pack('I', len(node_edges)))
                for dest, layer in node_edges:
                    f.write(struct.pack('II', dest, layer))
        
        # Save metadata
        meta_path = self.index_path / "meta.json"
        import json
        with open(meta_path, 'w') as f:
            json.dump({
                'n_vectors': len(vectors),
                'dim': vectors.shape[1],
                'm': self.m,
                'max_layer': max(len(node) for node in graph)
            }, f)
    
    def load(self):
        """
        Load index using memory mapping for instant access.
        """
        # Memory-map vectors
        vectors_path = self.index_path / "vectors.bin"
        vectors_array = np.memmap(str(vectors_path), dtype=np.float32, mode='r')
        
        # Load metadata
        meta_path = self.index_path / "meta.json"
        with open(meta_path) as f:
            meta = json.load(f)
        
        # Reshape vectors
        self.vectors = vectors_array.reshape(meta['n_vectors'], meta['dim'])
        self.n_vectors = meta['n_vectors']
        self.dim = meta['dim']
        
        return self

Incremental Persistence

For production systems, implement write-ahead logging:

import tempfile
import shutil
from datetime import datetime

class WALPersistedHNSW:
    """
    HNSW with Write-Ahead Logging for crash recovery.
    """
    
    def __init__(self, base_path: Path, checkpoint_interval: int = 10000):
        self.base_path = Path(base_path)
        self.checkpoint_interval = checkpoint_interval
        self.insertions_since_checkpoint = 0
        
        self.wal_path = self.base_path / "wal"
        self.wal_path.mkdir(exist_ok=True)
        self.wal_file = open(self.wal_path / "current.wal", 'ab')
        self.wal_offset = 0
    
    def add(self, vector: np.ndarray) -> int:
        """Add vector and log to WAL."""
        vector_id = self._add_to_index(vector)
        
        # Append to WAL
        vector_bytes = vector.astype(np.float32).tobytes()
        self.wal_file.write(struct.pack('I', len(vector_bytes)))
        self.wal_file.write(vector_bytes)
        self.wal_offset += 1
        
        self.insertions_since_checkpoint += 1
        if self.insertions_since_checkpoint >= self.checkpoint_interval:
            self._checkpoint()
        
        return vector_id
    
    def _checkpoint(self):
        """Write full index snapshot and clear WAL."""
        checkpoint_name = datetime.now().strftime("%Y%m%d_%H%M%S")
        checkpoint_path = self.base_path / "checkpoints" / checkpoint_name
        checkpoint_path.mkdir(parents=True)
        
        # Save full index
        self._save_full_index(checkpoint_path)
        
        # Rotate WAL
        self.wal_file.close()
        shutil.move(
            self.wal_path / "current.wal",
            self.wal_path / f"{checkpoint_name}.wal"
        )
        self.wal_file = open(self.wal_path / "current.wal", 'ab')
        self.insertions_since_checkpoint = 0
    
    def recover(self) -> int:
        """Recover index from checkpoint + WAL replay."""
        # Find latest checkpoint
        checkpoint_dir = sorted((self.base_path / "checkpoints").iterdir())[-1]
        
        # Load checkpoint
        self._load_index(checkpoint_dir)
        
        # Replay WAL entries
        for wal_file in sorted(self.wal_path.glob("*.wal")):
            with open(wal_file, 'rb') as f:
                while True:
                    size_data = f.read(4)
                    if not size_data:
                        break
                    size = struct.unpack('I', size_data)[0]
                    vector_bytes = f.read(size)
                    vector = np.frombuffer(vector_bytes, dtype=np.float32)
                    self._add_to_index(vector)
        
        return self.n_vectors

S3/Object Storage Persistence

For cloud-native deployments, store indexes in object storage:

import boto3
import hashlib

class S3PersistedIndex:
    """
    Vector index persisted to S3 with local caching.
    """
    
    def __init__(self, bucket: str, prefix: str, local_cache: Path):
        self.s3 = boto3.client('s3')
        self.bucket = bucket
        self.prefix = prefix
        self.cache = local_cache
        self.cache.mkdir(parents=True, exist_ok=True)
    
    def save(self, index_name: str, vectors: np.ndarray, graph: list):
        """Upload index to S3."""
        # Write to temp local file
        temp_path = self.cache / f"{index_name}.tmp"
        
        # ... write vectors and graph to temp_path ...
        
        # Upload with content hash as ETag
        content_hash = hashlib.md5(open(temp_path, 'rb').read()).hexdigest()
        
        s3_key = f"{self.prefix}/{index_name}/index.bin"
        self.s3.upload_file(str(temp_path), self.bucket, s3_key)
        
        # Update manifest
        manifest_key = f"{self.prefix}/{index_name}/manifest.json"
        manifest = {
            's3_key': s3_key,
            'etag': content_hash,
            'n_vectors': len(vectors)
        }
        self.s3.put_object(
            Body=json.dumps(manifest),
            Bucket=self.bucket,
            Key=manifest_key
        )
        
        temp_path.unlink()
    
    def load(self, index_name: str) -> np.ndarray:
        """Download and cache index from S3."""
        local_path = self.cache / f"{index_name}.bin"
        
        if not local_path.exists():
            # Download from S3
            s3_key = f"{self.prefix}/{index_name}/index.bin"
            self.s3.download_file(self.bucket, s3_key, str(local_path))
        
        return np.fromfile(local_path, dtype=np.float32)

Failure Modes

Partial write corruption: If a crash occurs during save, you may have an inconsistent index. Always write to a temp file and rename atomically.

WAL replay slowness: For large WAL files, replay can take hours. Set checkpoint intervals conservatively (every 10-60 seconds for high-write workloads).

S3 eventual consistency: After uploading, wait for consistency before serving traffic. List operations may return stale results for several seconds.


EXERCISE

Build an HNSW index with 100K vectors and measure the time to save and load it using different strategies: numpy save, memory-mapped I/O, and S3 upload/download. Report load times and identify bottlenecks.

# Expected: numpy.save is fastest, S3 is slowest
# Measure incremental cost of larger indexes
# Plot load time vs index size