RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /RAG Systems: Part 2
  6. /Ch. 17
RAG Systems: Part 2

17. Batch Processing

Chapter 17 of 22 · 20 min
KEY INSIGHT

Batch processing groups queries to reduce API calls and uses parallel workers for throughput, while rate limiting prevents exceeding API quotas.

Production RAG handles many queries simultaneously. Batch processing improves throughput by grouping requests.

Query Batching

Group queries by similarity for batch embedding.

import asyncio
from collections import defaultdict

async def batch_embed(queries: list[str], batch_size: int = 100) -> list[list]:
    """Batch embedding for multiple queries."""
    
    results = []
    embeddings = []
    
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]
        
        # Single API call for entire batch
        response = await openai.embeddings.create(
            model="text-embedding-3-small",
            input=batch
        )
        
        batch_embeddings = [item.embedding for item in response.data]
        embeddings.extend(batch_embeddings)
    
    return embeddings

class QueryBatcher:
    def __init__(self, max_wait_ms: int = 100, max_batch_size: int = 50):
        self.max_wait_ms = max_wait_ms
        self.max_batch_size = max_batch_size
        self.pending = []
        self.futures = []
    
    async def submit(self, query: str) -> list:
        """Submit a query, returns when batch completes."""
        future = asyncio.Future()
        self.pending.append((query, future))
        self.futures.append(future)
        
        # Trigger batch if size limit reached
        if len(self.pending) >= self.max_batch_size:
            await self._process_batch()
        
        return await future
    
    async def _process_batch(self):
        """Process current pending queries as a batch."""
        if not self.pending:
            return
        
        queries = [q for q, _ in self.pending]
        futures = [f for _, f in self.pending]
        
        self.pending = []
        
        # Batch embed all queries
        embeddings = await batch_embed(queries)
        
        # Resolve futures with results
        for embedding, future in zip(embeddings, futures):
            future.set_result(embedding)
    
    async def flush(self):
        """Process remaining queries."""
        await self._process_batch()

Parallel Document Processing

Process multiple documents concurrently for indexing.

from concurrent.futures import ThreadPoolExecutor
import threading

def parallel_index(documents: list[dict], 
                   workers: int = 8,
                   chunk_size: int = 500) -> list[dict]:
    """Index documents in parallel."""
    
    def process_doc(doc):
        # Split into chunks
        chunks = sliding_window_chunks(doc["content"], chunk_size)
        
        # Add metadata
        for chunk in chunks:
            chunk["doc_id"] = doc["id"]
            chunk["source"] = doc.get("source", "unknown")
        
        # Embed chunks
        embeddings = embed_model.encode([c["text"] for c in chunks])
        
        # Add to vector store
        for chunk, embedding in zip(chunks, embeddings):
            chunk["embedding"] = embedding
        
        return chunks
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(process_doc, documents))
    
    # Flatten results
    all_chunks = [chunk for doc_chunks in results for chunk in doc_chunks]
    return all_chunks

Rate Limiting

Respect API rate limits with token bucket or semaphore.

import time

class RateLimitedClient:
    def __init__(self, max_calls_per_second: float = 10):
        self.max_calls = max_calls_per_second
        self.tokens = max_calls_per_second
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    async def call(self, func, *args, **kwargs):
        """Make a rate-limited API call."""
        
        with self.lock:
            now = time.time()
            elapsed = now - self.last_update
            self.last_update = now
            
            # Replenish tokens
            self.tokens = min(self.max_calls, self.tokens + elapsed * self.max_calls)
            
            if self.tokens < 1:
                # Wait for token
                wait_time = (1 - self.tokens) / self.max_calls
                time.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1
        
        return await func(*args, **kwargs)
EXERCISE

Process 1000 documents for indexing using parallel workers. Measure throughput as you vary worker count from 1 to 16.

← Chapter 16
Caching Strategies
Chapter 18 →
Production Pipeline