11. Batch Processing Architecture
Processing one document is straightforward. Processing one thousand requires architectural decisions about concurrency, memory management, and failure recovery. This chapter covers patterns for scaling document processing reliably.
The Batch Processing Challenge
Each document consumes memory during processing. A naive loop loads all documents simultaneously, causing memory exhaustion on large batches. Processing documents sequentially wastes available CPU capacity on I/O-bound operations like disk reads.
Worker Pool Pattern
Distribute work across multiple processes using concurrent.futures:
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
def process_document(filepath):
from your_processor import process_single
return process_single(filepath)
def batch_process(document_paths, max_workers=4):
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_document, p): p for p in document_paths}
for future in as_completed(futures):
filepath = futures[future]
try:
result = future.result()
results.append({"path": filepath, "status": "success", "data": result})
except Exception as e:
results.append({"path": filepath, "status": "error", "error": str(e)})
return results
ProcessPoolExecutor spawns worker processes equal to max_workers. The operating system schedules these across CPU cores automatically.
Memory Management with Generators
For very large batches, use generators to avoid loading all paths into memory:
def document_paths(directory):
for path in Path(directory).rglob("*.pdf"):
yield path
for result in batch_process(document_paths("large-batch")):
print(f"Processed: {result['path']}")
Progress Tracking
Long-running batches need progress visibility:
from tqdm import tqdm
def batch_process_with_progress(document_paths, max_workers=4):
total = len(document_paths)
completed = 0
errors = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_document, p): p for p in document_paths}
for future in tqdm(as_completed(futures), total=total, desc="Processing"):
filepath = futures[future]
try:
future.result()
except Exception as e:
errors.append({"path": filepath, "error": str(e)})
completed += 1
return {"completed": completed - len(errors), "errors": errors}
Chunk-Based Processing
For datasets too large to fit in memory, split into chunks:
def chunk_process(documents, chunk_size=100):
for i in range(0, len(documents), chunk_size):
chunk = documents[i:i + chunk_size]
results = batch_process(chunk)
yield results
clear_memory_results(results)
Modify the batch processor to support configurable concurrency levels, display estimated time remaining based on processing rate, and write a summary report to disk upon completion.