17. Batch Processing
Chapter 17 of 22 · 20 min
Production RAG handles many queries simultaneously. Batch processing improves throughput by grouping requests.
Query Batching
Group queries by similarity for batch embedding.
import asyncio
from collections import defaultdict
async def batch_embed(queries: list[str], batch_size: int = 100) -> list[list]:
"""Batch embedding for multiple queries."""
results = []
embeddings = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
# Single API call for entire batch
response = await openai.embeddings.create(
model="text-embedding-3-small",
input=batch
)
batch_embeddings = [item.embedding for item in response.data]
embeddings.extend(batch_embeddings)
return embeddings
class QueryBatcher:
def __init__(self, max_wait_ms: int = 100, max_batch_size: int = 50):
self.max_wait_ms = max_wait_ms
self.max_batch_size = max_batch_size
self.pending = []
self.futures = []
async def submit(self, query: str) -> list:
"""Submit a query, returns when batch completes."""
future = asyncio.Future()
self.pending.append((query, future))
self.futures.append(future)
# Trigger batch if size limit reached
if len(self.pending) >= self.max_batch_size:
await self._process_batch()
return await future
async def _process_batch(self):
"""Process current pending queries as a batch."""
if not self.pending:
return
queries = [q for q, _ in self.pending]
futures = [f for _, f in self.pending]
self.pending = []
# Batch embed all queries
embeddings = await batch_embed(queries)
# Resolve futures with results
for embedding, future in zip(embeddings, futures):
future.set_result(embedding)
async def flush(self):
"""Process remaining queries."""
await self._process_batch()
Parallel Document Processing
Process multiple documents concurrently for indexing.
from concurrent.futures import ThreadPoolExecutor
import threading
def parallel_index(documents: list[dict],
workers: int = 8,
chunk_size: int = 500) -> list[dict]:
"""Index documents in parallel."""
def process_doc(doc):
# Split into chunks
chunks = sliding_window_chunks(doc["content"], chunk_size)
# Add metadata
for chunk in chunks:
chunk["doc_id"] = doc["id"]
chunk["source"] = doc.get("source", "unknown")
# Embed chunks
embeddings = embed_model.encode([c["text"] for c in chunks])
# Add to vector store
for chunk, embedding in zip(chunks, embeddings):
chunk["embedding"] = embedding
return chunks
with ThreadPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(process_doc, documents))
# Flatten results
all_chunks = [chunk for doc_chunks in results for chunk in doc_chunks]
return all_chunks
Rate Limiting
Respect API rate limits with token bucket or semaphore.
import time
class RateLimitedClient:
def __init__(self, max_calls_per_second: float = 10):
self.max_calls = max_calls_per_second
self.tokens = max_calls_per_second
self.last_update = time.time()
self.lock = threading.Lock()
async def call(self, func, *args, **kwargs):
"""Make a rate-limited API call."""
with self.lock:
now = time.time()
elapsed = now - self.last_update
self.last_update = now
# Replenish tokens
self.tokens = min(self.max_calls, self.tokens + elapsed * self.max_calls)
if self.tokens < 1:
# Wait for token
wait_time = (1 - self.tokens) / self.max_calls
time.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
return await func(*args, **kwargs)
EXERCISE
Process 1000 documents for indexing using parallel workers. Measure throughput as you vary worker count from 1 to 16.