18. Minimal Vector Database Project

Chapter 18 of 18 · 30 min

This chapter synthesizes everything into a working vector database implementation. By the end, you'll have a system that persists indexes, handles concurrent queries, and supports basic filtering.

Project Structure

minivecdb/
├── index/
│   ├── __init__.py
│   ├── hnsw.py          # HNSW implementation
│   ├── pq.py            # Product quantization
│   └── persistence.py   # Save/load logic
├── search/
│   ├── __init__.py
│   ├── query.py         # Search logic
│   └── filter.py        # Hybrid search
├── api/
│   ├── __init__.py
│   └── server.py        # HTTP API
├── tests/
│   └── test_index.py
└── main.py

Complete Implementation

# minivecdb/index/hnsw.py
"""
Minimal HNSW implementation with persistence.
Supports add, search, and save/load operations.
"""

import numpy as np
import heapq
import random
from dataclasses import dataclass, field
from typing import Optional
from pathlib import Path
import struct
import json

@dataclass
class HNSWConfig:
    dim: int
    m: int = 16
    ef_construction: int = 200
    max_layer: int = 16
    seed: int = 42

class HNSWIndex:
    def __init__(self, config: HNSWConfig):
        self.config = config
        self.dim = config.dim
        self.m = config.m
        self.ef_construction = config.ef_construction
        self.max_layer = config.max_layer
        
        # Storage
        self.vectors: list[np.ndarray] = []
        self.graph: list[dict[int, list[tuple[int, int]]]] = []  # node -> {layer: [(neighbor, dist), ...]}
        
        # Entry point
        self.entry_point: Optional[int] = None
        self.max_added_layer = 0
        
        random.seed(config.seed)
        np.random.seed(config.seed)
    
    def _distance(self, a: np.ndarray, b: np.ndarray) -> float:
        return float(np.linalg.norm(a - b))
    
    def _sample_layer(self) -> int:
        """Sample layer using geometric distribution."""
        level = 0
        while level < self.max_layer and random.random() < 0.5:
            level += 1
        return level
    
    def _search_layer(self, query: np.ndarray, ep: int, ef: int, 
                     layer: int) -> list[tuple[float, int]]:
        """Search graph layer for nearest neighbors."""
        visited = {ep}
        candidates = [(self._distance(query, self.vectors[ep]), ep)]
        results = []
        
        while candidates:
            dist, node = heapq.heappop(candidates)
            
            if results and dist > results[-1][0]:
                break
            
            results.append((dist, node))
            
            # Check neighbors
            neighbors = self.graph[node].get(layer, [])
            for neighbor, _ in neighbors:
                if neighbor not in visited:
                    visited.add(neighbor)
                    n_dist = self._distance(query, self.vectors[neighbor])
                    
                    if len(results) < ef or n_dist < results[-1][0]:
                        heapq.heappush(candidates, (n_dist, neighbor))
        
        return results[:ef]
    
    def _neighbor_selection(self, query: np.ndarray, neighbors: list[int], 
                           m: int) -> list[tuple[int, float]]:
        """Select m best neighbors based on query distance."""
        scored = []
        for n in neighbors:
            d = self._distance(query, self.vectors[n])
            scored.append((d, n))
        
        scored.sort()
        return [(n, d) for d, n in scored[:m]]
    
    def add(self, vector: np.ndarray) -> int:
        """Add vector to index. Returns vector ID."""
        vector = vector.astype(np.float32)
        vector_id = len(self.vectors)
        
        # Initialize graph entry
        self.graph.append({})
        
        level = self._sample_layer()
        
        # Search from entry point
        if self.entry_point is not None:
            ep = self.entry_point
            
            # Search upper layers
            for layer in range(self.max_added_layer, level, -1):
                neighbors = self._search_layer(vector, ep, 1, layer)
                if neighbors:
                    ep = neighbors[0][1]
            
            # Search target layer
            candidates = self._search_layer(vector, ep, self.ef_construction, level)
            
            # Connect bidirectional edges
            for dist, neighbor in candidates[:self.m]:
                if level not in self.graph[vector_id]:
                    self.graph[vector_id][level] = []
                if level not in self.graph[neighbor]:
                    self.graph[neighbor][level] = []
                
                self.graph[vector_id][level].append((neighbor, dist))
                self.graph[neighbor][level].append((vector_id, dist))
        else:
            # First element
            self.entry_point = vector_id
            self.max_added_layer = level
        
        self.max_added_layer = max(self.max_added_layer, level)
        self.vectors.append(vector)
        
        return vector_id
    
    def search(self, query: np.ndarray, k: int = 10, 
               ef: Optional[int] = None) -> tuple[np.ndarray, np.ndarray]:
        """Search for k nearest neighbors."""
        if not self.vectors:
            return np.array([]), np.array([])
        
        ef = ef or self.ef_construction
        query = query.astype(np.float32)
        
        # Navigate down from top layer
        ep = self.entry_point
        for layer in range(self.max_added_layer, 0, -1):
            candidates = self._search_layer(query, ep, 1, layer)
            if candidates:
                ep = candidates[0][1]
        
        # Search bottom layer
        results = self._search_layer(query, ep, ef, 0)
        results.sort()
        
        top_k = results[:k]
        distances = np.array([d for d, _ in top_k])
        indices = np.array([i for _, i in top_k])
        
        return distances, indices
    
    def save(self, path: Path):
        """Persist index to disk."""
        path = Path(path)
        path.mkdir(parents=True, exist_ok=True)
        
        # Save vectors
        vectors_array = np.array(self.vectors, dtype=np.float32)
        vectors_array.tofile(str(path / "vectors.bin"))
        
        # Save graph
        with open(path / "graph.json", 'w') as f:
            # Convert graph to serializable format
            serializable_graph = {
                str(node_id): {str(layer): neighbors 
                             for layer, neighbors in layers.items()}
                for node_id, layers in enumerate(self.graph)
            }
            json.dump(serializable_graph, f)
        
        # Save metadata
        meta = {
            'dim': self.dim,
            'm': self.m,
            'ef_construction': self.ef_construction,
            'max_layer': self.max_layer,
            'entry_point': self.entry_point,
            'max_added_layer': self.max_added_layer
        }
        with open(path / "meta.json", 'w') as f:
            json.dump(meta, f)
    
    @classmethod
    def load(cls, path: Path) -> 'HNSWIndex':
        """Load index from disk."""
        path = Path(path)
        
        # Load metadata
        with open(path / "meta.json") as f:
            meta = json.load(f)
        
        config = HNSWConfig(dim=meta['dim'], m=meta['m'])
        index = cls(config)
        
        # Load vectors
        vectors_data = np.fromfile(str(path / "vectors.bin"), dtype=np.float32)
        index.vectors = [vectors_data[i*meta['dim']:(i+1)*meta['dim']] 
                        for i in range(len(vectors_data) // meta['dim'])]
        
        # Load graph
        with open(path / "graph.json") as f:
            raw_graph = json.load(f)
        
        index.graph = [
            {int(layer): neighbors for layer, neighbors in layers.items()}
            for node_id, layers in raw_graph.items()
        ]
        
        index.entry_point = meta['entry_point']
        index.max_added_layer = meta['max_added_layer']
        
        return index

API Server

# minivecdb/api/server.py
"""
HTTP API for vector database.
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np
from pathlib import Path

from minivecdb.index.hnsw import HNSWIndex, HNSWConfig

app = FastAPI()

# Global index (in production, use connection pooling)
index: Optional[HNSWIndex] = None

class AddRequest(BaseModel):
    vector: list[float]
    id: Optional[str] = None

class AddResponse(BaseModel):
    id: int

class SearchRequest(BaseModel):
    vector: list[float]
    k: int = 10

class SearchResponse(BaseModel):
    ids: list[int]
    distances: list[float]

@app.post("/add", response_model=AddResponse)
async def add_vector(req: AddRequest):
    global index
    if index is None:
        raise HTTPException(503, "Index not initialized")
    
    vector = np.array(req.vector, dtype=np.float32)
    if len(vector) != index.dim:
        raise HTTPException(400, f"Expected dim={index.dim}, got {len(vector)}")
    
    vector_id = index.add(vector)
    return AddResponse(id=vector_id)

@app.post("/search", response_model=SearchResponse)
async def search(req: SearchRequest):
    global index
    if index is None:
        raise HTTPException(503, "Index not initialized")
    
    query = np.array(req.vector, dtype=np.float32)
    distances, ids = index.search(query, req.k)
    
    return SearchResponse(
        ids=ids.tolist(),
        distances=distances.tolist()
    )

@app.post("/save")
async def save_index(path: str = "index_data"):
    global index
    if index is None:
        raise HTTPException(503, "Index not initialized")
    
    index.save(Path(path))
    return {"status": "saved", "path": path}

@app.post("/load")
async def load_index(path: str = "index_data"):
    global index
    index = HNSWIndex.load(Path(path))
    return {"status": "loaded", "vectors": len(index.vectors)}

@app.post("/init")
async def init_index(dim: int = 128, m: int = 16):
    global index
    config = HNSWConfig(dim=dim, m=m)
    index = HNSWIndex(config)
    return {"status": "initialized", "dim": dim}

Running the System

# Install dependencies
pip install fastapi uvicorn numpy

# Start server
uvicorn minivecdb.api.server:app --host 0.0.0.0 --port 8000

# Test with curl
curl -X POST http://localhost:8000/init -d '{"dim": 128}'
curl -X POST http://localhost:8000/add -d '{"vector": [0.1] * 128}'
curl -X POST http://localhost:8000/search -d '{"vector": [0.1] * 128, "k": 5}'

Exercise: Extend the Minimal DB

  1. Add product quantization: Implement PQ encoding for the stored vectors to reduce memory usage by 4-8x.

  2. Add filtering: Extend the search endpoint to accept filter criteria and return only matching results.

  3. Add persistence: Implement periodic checkpointing so recent inserts survive crashes.

  4. Add batch operations: Add /add_batch endpoint that inserts 1000+ vectors efficiently.

  5. Add metrics: Track query latency, index size, and recall metrics. Expose via /metrics endpoint.

# Benchmark script to validate your implementation
import numpy as np
import time

def benchmark(index, n_queries=1000, k=10):
    queries = np.random.rand(n_queries, index.dim).astype(np.float32)
    
    start = time.perf_counter()
    for q in queries:
        index.search(q, k)
    elapsed = time.perf_counter() - start
    
    print(f"Throughput: {n_queries / elapsed:.0f} queries/second")
    print(f"Latency p50: {elapsed / n_queries * 1000:.2f} ms")
EXERCISE

Complete the minimal vector database by implementing all extensions listed above. Run a benchmark comparing your implementation against FAISS or Annoy on a standard dataset like SIFT1M. Document where your implementation excels and where optimization is needed.

# Final deliverable: working vector database with documented performance characteristics
# Expected: competitive with research implementations on small datasets (<100K vectors)
# Growth areas: large-scale performance, concurrency, fault tolerance