35. Batch Processing with Progress

Chapter 35 of 36 · 15 min

Large-scale batch processing needs progress tracking, error handling, resumability, and reporting. Standard loops break down at scale.

A production-grade batch processor:

#!/usr/bin/env python3
"""Resumable batch processor with checkpointing."""

import json
import time
import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
from typing import Iterator
from concurrent.futures import ThreadPoolExecutor, Future

from rich.console import Console
from rich.table import Table
from rich.progress import Progress, DownloadColumn, TimeRemainingColumn

console = Console()

@dataclass
class BatchState:
    """Tracks processing state for resumability."""
    total: int = 0
    completed: int = 0
    failed: int = 0
    errors: list[dict] = field(default_factory=list)
    started_at: str = ""
    completed_at: str = ""
    checkpoints: list[str] = field(default_factory=list)

class BatchProcessor:
    def __init__(self, workers: int = 8, checkpoint_interval: int = 50):
        self.workers = workers
        self.checkpoint_interval = checkpoint_interval
        self.state = BatchState()
        self.state_file = Path(".batch_state.json")
    
    def load_state(self):
        """Load previous state if exists."""
        if self.state_file.exists():
            with open(self.state_file) as f:
                data = json.load(f)
                self.state = BatchState(**data)
            console.print(f"[dim]Resuming from checkpoint: {self.state.completed}/{self.state.total}[/dim]")
    
    def save_state(self):
        """Save current state for resumability."""
        with open(self.state_file, "w") as f:
            json.dump(self.state.__dict__, f, indent=2)
    
    def process_items(self, items: list[dict]) -> Iterator[tuple[int, dict, str | None]]:
        """Process items with progress tracking and error handling."""
        self.state.total = len(items)
        self.state.started_at = datetime.now().isoformat()
        
        with Progress(
            DownloadColumn(),
            TimeRemainingColumn(),
            console=console
        ) as progress:
            task = progress.add_task("[cyan]Processing...", total=self.state.total)
            
            with ThreadPoolExecutor(max_workers=self.workers) as executor:
                futures: dict[Future, tuple[int, dict]] = {}
                
                for idx, item in enumerate(items):
                    future = executor.submit(self._process_single, item)
                    futures[future] = (idx, item)
                    
                    # Throttle submissions to prevent memory explosion
                    if len(futures) >= self.workers * 2:
                        done_futures = [f for f in futures if f.done()]
                        for future in done_futures:
                            idx, item, error = futures.pop(future).result()
                            yield idx, item, error
                            progress.update(task, advance=1)
                
                # Collect remaining
                for future in as_completed(futures):
                    idx, item, error = futures[future].result()
                    yield idx, item, error
                    progress.update(task, advance=1)
    
    def _process_single(self, item: dict) -> tuple[int, dict, str | None]:
        """Process one item, return (index, item, error_message)."""
        try:
            # Simulate work
            time.sleep(0.1)
            
            # Simulate occasional failure
            if hash(item.get("id", "unknown")) % 20 == 0:
                return (item.get("_idx", 0), item, "Simulated failure")
            
            return (item.get("_idx", 0), item, None)
        except Exception as e:
            return (item.get("_idx", 0), item, str(e))
    
    def run(self, items: list[dict]):
        """Run complete batch with error collection."""
        self.load_state()
        
        results = []
        for idx, item, error in self.process_items(items):
            if error:
                self.state.failed += 1
                self.state.errors.append({"id": item.get("id"), "error": error})
            else:
                self.state.completed += 1
                results.append(item)
            
            # Checkpoint periodically
            if self.state.completed % self.checkpoint_interval == 0:
                self.save_state()
        
        self.state.completed_at = datetime.now().isoformat()
        self.save_state()
        
        return results
    
    def report(self):
        """Generate processing report."""
        table = Table(title="Batch Processing Report")
        table.add_column("Metric", style="cyan")
        table.add_column("Value", style="green", justify="right")
        
        table.add_row("Total", str(self.state.total))
        table.add_row("Completed", str(self.state.completed))
        table.add_row("Failed", str(self.state.failed))
        table.add_row("Success Rate", f"{self.state.completed/max(self.state.total,1)*100:.1f}%")
        
        console.print(table)
        
        if self.state.errors:
            console.print(f"[red]Errors:[/red] {len(self.state.errors)}")
            for err in self.state.errors[:5]:
                console.print(f"  [red]•[/red] {err['id']}: {err['error']}")

if __name__ == "__main__":
    items = [{"id": f"doc_{i}", "_idx": i} for i in range(100)]
    
    processor = BatchProcessor(workers=8, checkpoint_interval=25)
    results = processor.run(items)
    processor.report()

Knowledge transfer checkpoint

Connect Batch Processing with Progress back to the local-AI decision you are learning to make. The practical question is not only whether the code or concept works, but whether it still works when the model, runtime, hardware budget, privacy requirement, and latency target are real constraints.

Before moving on, write down four things: the local runtime or deployment surface involved, the memory or throughput constraint that could change the design, the verification signal that proves the lesson worked, and the failure mode you would check first if the result looked wrong. That turns this chapter from background knowledge into an operator habit.

A good answer should be specific enough that another reader could repeat the decision on their own machine. Name the model or component when there is one, record the relevant context or token budget, and prefer a measurable check over a vague statement such as "it seems faster" or "the setup is fine."

EXERCISE

Add a --resume flag to the processor that skips already-completed items (by ID). Test by running partial batches, then running again and verifying already-processed items are skipped.