KEY INSIGHT
Efficient batch processing requires memory management, parallel processing strategies, and error handling. Processing multiple images sequentially with proper resource cleanup maintains throughput without crashes.
Production deployments typically process many images in batches. Memory management becomes criticalΓÇöloading large models repeatedly wastes resources, but keeping them in memory may cause crashes with limited VRAM. Strategic batching maintains throughput.
```python
import gc
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchImageProcessor:
def __init__(self, model, processor, batch_size=4):
self.model = model
self.processor = processor
self.batch_size = batch_size
self.device = model.device
def process_image(self, image_path, prompt):
"""Process single image with given prompt."""
try:
image = Image.open(image_path).convert("RGB")
conversation = [
{"role": "user", "content": [{"type": "image"}, {"type": "text", "text": prompt}]}
]
prompt_text = self.processor.apply_chat_template(
conversation, add_generation_prompt=True
)
inputs = self.processor(
images=image,
text=prompt_text,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
output = self.model.generate(
**inputs,
max_new_tokens=200,
do_sample=False
)
result = self.processor.batch_decode(output, skip_special_tokens=True)[0]
del inputs, output, image
gc.collect(); torch.cuda.empty_cache()
return {"path": str(image_path), "result": result, "status": "success"}
except Exception as e:
return {"path": str(image_path), "result": None, "status": "error", "error": str(e)}
def process_batch(self, image_paths, prompt, max_workers=2):
"""Process multiple images with worker pool."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_path = {
executor.submit(self.process_image, path, prompt): path
for path in image_paths
}
for future in as_completed(future_to_path):
result = future.result()
results.append(result)
print(f"Processed: {result['path']} - {result['status']}")
return results
def process_directory(self, directory, prompt, pattern="*.jpg", max_workers=2):
"""Process all matching images in directory."""
image_dir = Path(directory)
image_paths = list(image_dir.glob(pattern))
print(f"Found {len(image_paths)} images to process")
return self.process_batch(image_paths, prompt, max_workers)
```
Memory-efficient processing sequence:
```python
def memory_safe_batch_processing(image_paths, model, processor, batch_size=2, checkpoint_file="checkpoint.json"):
"""Process with checkpointing to recover from crashes."""
import json
from pathlib import Path
checkpoint_path = Path(checkpoint_file)
# Load existing checkpoints
if checkpoint_path.exists():
completed = set(json.loads(checkpoint_path.read_text()))
else:
completed = set()
pending = [p for p in image_paths if str(p) not in completed]
print(f"Processing {len(pending)} pending images")
results = []
for i in range(0, len(pending), batch_size):
batch = pending[i:i+batch_size]
for path in batch:
result = process_single_image(path, model, processor)
results.append(result)
# Checkpoint after each image
if result["status"] == "success":
completed.add(str(path))
checkpoint_path.write_text(json.dumps(list(completed)))
# Memory cleanup between batches
gc.collect(); torch.cuda.empty_cache()
return results
```
Common batch processing failures:
- **OOM errors**: Reduce batch_size, enable gradient checkpointing
- **Timeout stalls**: Set generation timeout, implement retry logic
- **Model degradation**: Clear cache periodically, monitor memory leak signs
- **Partial results**: Implement checkpointing for crash recovery