18. Minimal Vector Database Project
This chapter synthesizes everything into a working vector database implementation. By the end, you'll have a system that persists indexes, handles concurrent queries, and supports basic filtering.
Project Structure
minivecdb/
├── index/
│ ├── __init__.py
│ ├── hnsw.py # HNSW implementation
│ ├── pq.py # Product quantization
│ └── persistence.py # Save/load logic
├── search/
│ ├── __init__.py
│ ├── query.py # Search logic
│ └── filter.py # Hybrid search
├── api/
│ ├── __init__.py
│ └── server.py # HTTP API
├── tests/
│ └── test_index.py
└── main.py
Complete Implementation
# minivecdb/index/hnsw.py
"""
Minimal HNSW implementation with persistence.
Supports add, search, and save/load operations.
"""
import numpy as np
import heapq
import random
from dataclasses import dataclass, field
from typing import Optional
from pathlib import Path
import struct
import json
@dataclass
class HNSWConfig:
dim: int
m: int = 16
ef_construction: int = 200
max_layer: int = 16
seed: int = 42
class HNSWIndex:
def __init__(self, config: HNSWConfig):
self.config = config
self.dim = config.dim
self.m = config.m
self.ef_construction = config.ef_construction
self.max_layer = config.max_layer
# Storage
self.vectors: list[np.ndarray] = []
self.graph: list[dict[int, list[tuple[int, int]]]] = [] # node -> {layer: [(neighbor, dist), ...]}
# Entry point
self.entry_point: Optional[int] = None
self.max_added_layer = 0
random.seed(config.seed)
np.random.seed(config.seed)
def _distance(self, a: np.ndarray, b: np.ndarray) -> float:
return float(np.linalg.norm(a - b))
def _sample_layer(self) -> int:
"""Sample layer using geometric distribution."""
level = 0
while level < self.max_layer and random.random() < 0.5:
level += 1
return level
def _search_layer(self, query: np.ndarray, ep: int, ef: int,
layer: int) -> list[tuple[float, int]]:
"""Search graph layer for nearest neighbors."""
visited = {ep}
candidates = [(self._distance(query, self.vectors[ep]), ep)]
results = []
while candidates:
dist, node = heapq.heappop(candidates)
if results and dist > results[-1][0]:
break
results.append((dist, node))
# Check neighbors
neighbors = self.graph[node].get(layer, [])
for neighbor, _ in neighbors:
if neighbor not in visited:
visited.add(neighbor)
n_dist = self._distance(query, self.vectors[neighbor])
if len(results) < ef or n_dist < results[-1][0]:
heapq.heappush(candidates, (n_dist, neighbor))
return results[:ef]
def _neighbor_selection(self, query: np.ndarray, neighbors: list[int],
m: int) -> list[tuple[int, float]]:
"""Select m best neighbors based on query distance."""
scored = []
for n in neighbors:
d = self._distance(query, self.vectors[n])
scored.append((d, n))
scored.sort()
return [(n, d) for d, n in scored[:m]]
def add(self, vector: np.ndarray) -> int:
"""Add vector to index. Returns vector ID."""
vector = vector.astype(np.float32)
vector_id = len(self.vectors)
# Initialize graph entry
self.graph.append({})
level = self._sample_layer()
# Search from entry point
if self.entry_point is not None:
ep = self.entry_point
# Search upper layers
for layer in range(self.max_added_layer, level, -1):
neighbors = self._search_layer(vector, ep, 1, layer)
if neighbors:
ep = neighbors[0][1]
# Search target layer
candidates = self._search_layer(vector, ep, self.ef_construction, level)
# Connect bidirectional edges
for dist, neighbor in candidates[:self.m]:
if level not in self.graph[vector_id]:
self.graph[vector_id][level] = []
if level not in self.graph[neighbor]:
self.graph[neighbor][level] = []
self.graph[vector_id][level].append((neighbor, dist))
self.graph[neighbor][level].append((vector_id, dist))
else:
# First element
self.entry_point = vector_id
self.max_added_layer = level
self.max_added_layer = max(self.max_added_layer, level)
self.vectors.append(vector)
return vector_id
def search(self, query: np.ndarray, k: int = 10,
ef: Optional[int] = None) -> tuple[np.ndarray, np.ndarray]:
"""Search for k nearest neighbors."""
if not self.vectors:
return np.array([]), np.array([])
ef = ef or self.ef_construction
query = query.astype(np.float32)
# Navigate down from top layer
ep = self.entry_point
for layer in range(self.max_added_layer, 0, -1):
candidates = self._search_layer(query, ep, 1, layer)
if candidates:
ep = candidates[0][1]
# Search bottom layer
results = self._search_layer(query, ep, ef, 0)
results.sort()
top_k = results[:k]
distances = np.array([d for d, _ in top_k])
indices = np.array([i for _, i in top_k])
return distances, indices
def save(self, path: Path):
"""Persist index to disk."""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
# Save vectors
vectors_array = np.array(self.vectors, dtype=np.float32)
vectors_array.tofile(str(path / "vectors.bin"))
# Save graph
with open(path / "graph.json", 'w') as f:
# Convert graph to serializable format
serializable_graph = {
str(node_id): {str(layer): neighbors
for layer, neighbors in layers.items()}
for node_id, layers in enumerate(self.graph)
}
json.dump(serializable_graph, f)
# Save metadata
meta = {
'dim': self.dim,
'm': self.m,
'ef_construction': self.ef_construction,
'max_layer': self.max_layer,
'entry_point': self.entry_point,
'max_added_layer': self.max_added_layer
}
with open(path / "meta.json", 'w') as f:
json.dump(meta, f)
@classmethod
def load(cls, path: Path) -> 'HNSWIndex':
"""Load index from disk."""
path = Path(path)
# Load metadata
with open(path / "meta.json") as f:
meta = json.load(f)
config = HNSWConfig(dim=meta['dim'], m=meta['m'])
index = cls(config)
# Load vectors
vectors_data = np.fromfile(str(path / "vectors.bin"), dtype=np.float32)
index.vectors = [vectors_data[i*meta['dim']:(i+1)*meta['dim']]
for i in range(len(vectors_data) // meta['dim'])]
# Load graph
with open(path / "graph.json") as f:
raw_graph = json.load(f)
index.graph = [
{int(layer): neighbors for layer, neighbors in layers.items()}
for node_id, layers in raw_graph.items()
]
index.entry_point = meta['entry_point']
index.max_added_layer = meta['max_added_layer']
return index
API Server
# minivecdb/api/server.py
"""
HTTP API for vector database.
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import numpy as np
from pathlib import Path
from minivecdb.index.hnsw import HNSWIndex, HNSWConfig
app = FastAPI()
# Global index (in production, use connection pooling)
index: Optional[HNSWIndex] = None
class AddRequest(BaseModel):
vector: list[float]
id: Optional[str] = None
class AddResponse(BaseModel):
id: int
class SearchRequest(BaseModel):
vector: list[float]
k: int = 10
class SearchResponse(BaseModel):
ids: list[int]
distances: list[float]
@app.post("/add", response_model=AddResponse)
async def add_vector(req: AddRequest):
global index
if index is None:
raise HTTPException(503, "Index not initialized")
vector = np.array(req.vector, dtype=np.float32)
if len(vector) != index.dim:
raise HTTPException(400, f"Expected dim={index.dim}, got {len(vector)}")
vector_id = index.add(vector)
return AddResponse(id=vector_id)
@app.post("/search", response_model=SearchResponse)
async def search(req: SearchRequest):
global index
if index is None:
raise HTTPException(503, "Index not initialized")
query = np.array(req.vector, dtype=np.float32)
distances, ids = index.search(query, req.k)
return SearchResponse(
ids=ids.tolist(),
distances=distances.tolist()
)
@app.post("/save")
async def save_index(path: str = "index_data"):
global index
if index is None:
raise HTTPException(503, "Index not initialized")
index.save(Path(path))
return {"status": "saved", "path": path}
@app.post("/load")
async def load_index(path: str = "index_data"):
global index
index = HNSWIndex.load(Path(path))
return {"status": "loaded", "vectors": len(index.vectors)}
@app.post("/init")
async def init_index(dim: int = 128, m: int = 16):
global index
config = HNSWConfig(dim=dim, m=m)
index = HNSWIndex(config)
return {"status": "initialized", "dim": dim}
Running the System
# Install dependencies
pip install fastapi uvicorn numpy
# Start server
uvicorn minivecdb.api.server:app --host 0.0.0.0 --port 8000
# Test with curl
curl -X POST http://localhost:8000/init -d '{"dim": 128}'
curl -X POST http://localhost:8000/add -d '{"vector": [0.1] * 128}'
curl -X POST http://localhost:8000/search -d '{"vector": [0.1] * 128, "k": 5}'
Exercise: Extend the Minimal DB
Add product quantization: Implement PQ encoding for the stored vectors to reduce memory usage by 4-8x.
Add filtering: Extend the search endpoint to accept filter criteria and return only matching results.
Add persistence: Implement periodic checkpointing so recent inserts survive crashes.
Add batch operations: Add
/add_batchendpoint that inserts 1000+ vectors efficiently.Add metrics: Track query latency, index size, and recall metrics. Expose via
/metricsendpoint.
# Benchmark script to validate your implementation
import numpy as np
import time
def benchmark(index, n_queries=1000, k=10):
queries = np.random.rand(n_queries, index.dim).astype(np.float32)
start = time.perf_counter()
for q in queries:
index.search(q, k)
elapsed = time.perf_counter() - start
print(f"Throughput: {n_queries / elapsed:.0f} queries/second")
print(f"Latency p50: {elapsed / n_queries * 1000:.2f} ms")
Complete the minimal vector database by implementing all extensions listed above. Run a benchmark comparing your implementation against FAISS or Annoy on a standard dataset like SIFT1M. Document where your implementation excels and where optimization is needed.
# Final deliverable: working vector database with documented performance characteristics
# Expected: competitive with research implementations on small datasets (<100K vectors)
# Growth areas: large-scale performance, concurrency, fault tolerance