07. Batch vs Streaming Ingestion
Batch and streaming ingestion serve different purposes. Batch handles bulk operations: initial corpus load, bulk updates, re-embedding with a new model. Streaming handles continuous updates: new documents, edits, deletions.
Batch ingestion prioritizes throughput over latency. Process millions of documents overnight. Use parallelization aggressively. Disable replication during load to speed writes, then restore afterward.
# Batch ingestion with parallelism control
async def batch_ingest(documents: list[Document], max_concurrency: int = 50):
semaphore = asyncio.Semaphore(max_concurrency)
async def process_one(doc: Document) -> ProcessingResult:
async with semaphore:
try:
parsed = await parser.extract(doc.content)
chunks = chunker.split(parsed)
embeddings = await embedding_batch(chunks) # Batch API call
await vector_db.bulk_insert(chunks, embeddings)
return ProcessingResult.success(doc.id)
except Exception as e:
return ProcessingResult.failure(doc.id, str(e))
results = await asyncio.gather(*[process_one(d) for d in documents])
return aggregate_results(results)
Streaming ingestion prioritizes latency over throughput. Process each document as it arrives. Use smaller batch sizes to reduce per-document delay. Monitor end-to-end latency, not throughput.
The hybrid model is common: stream small updates immediately, batch large updates nightly. The challenge is maintaining consistency when the same document appears in both streams.
Re-embedding scenarios expose the batch/streaming tension. When you upgrade your embedding model, all existing chunks need re-embedding. This is a batch operation on a massive scale. During re-embedding, you cannot stop serving queries.
# Blue-green indexing for zero-downtime re-embedding
async def reembed_corpus(new_model: str):
# Create new index with new model
new_index = f"embeddings_{new_model}_{uuid.uuid4()}"
await vector_db.create_index(new_index)
# Stream documents through new index
async for chunk in document_store.iter_chunks():
vector = await embed(chunk.text, model=new_model)
await vector_db.insert(new_index, chunk.id, vector, chunk.metadata)
# Atomic swap
await vector_db.set_default_index(new_index)
await vector_db.delete_index(OLD_INDEX)
Failure modes differ significantly. Batch failures waste hours of compute. Streaming failures back up the pipeline and create user-visible lag. Batch systems need checkpointing (resume from last successful document). Streaming systems need backpressure (slow producers when consumers are overwhelmed).
Calculate the infrastructure cost difference between pure streaming (index each document immediately) and hybrid (stream small, batch large) for a system with 10,000 daily documents where 90% are bulk uploads and 10% are individual updates.