KEY INSIGHT
Vision models are computationally intensive. Optimization strategies include caching decoded images, batching requests, reducing resolution for preliminary filtering, and using task-specialized models for routing.
Production vision workloads face latency and cost pressures. Strategic optimizations can reduce costs 10x while maintaining accuracy for production use cases.
```python
import hashlib
import time
from functools import lru_cache
from pathlib import Path
import asyncio
class VisionPerformanceOptimizer:
def __init__(self, client):
self.client = client
self.preprocessed_cache = {}
self.embedding_cache = {}
def cache_key(self, image_path: str) -> str:
"""Generate cache key from image path and modification time"""
stat = Path(image_path).stat()
return hashlib.sha256(
f"{image_path}{stat.st_mtime}".encode()
).hexdigest()
def preprocess_image(
self,
image_path: str,
target_size: tuple[int, int] = (512, 512)
) -> bytes:
"""
Preprocess and cache image once.
Returns cached bytes for subsequent calls.
"""
cache_key = self.cache_key(image_path)
if cache_key in self.preprocessed_cache:
return self.preprocessed_cache[cache_key]
from PIL import Image
img = Image.open(image_path)
# Resize for preliminary analysis
img.thumbnail(target_size, Image.LANCZOS)
# Save to bytes (PNG for quality, JPEG for size)
import io
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
processed = buffer.getvalue()
self.preprocessed_cache[cache_key] = processed
return processed
async def batch_vision_analysis(
self,
image_paths: list[str],
prompt: str,
batch_size: int = 4
) -> list[str]:
"""
Process images in batches to reduce API overhead.
"""
results = []
for i in range(0, len(image_paths), batch_size):
batch = image_paths[i:i + batch_size]
# Preprocess all in batch
processed_batch = [
self.preprocess_image(path)
for path in batch
]
# Send batch request
batch_request = [
{"type": "image", "source": {"type": "base64", "data": p}}
for p in processed_batch
]
# Process sequentially for API compatibility
for req in batch_request:
response = await self.client.messages.create(
model="gemini-2.0-flash-thinking",
messages=[{"role": "user", "content": [req, {"type": "text", "text": prompt}]}]
)
results.append(response.content[0].text)
return results
async def adaptive_quality_analysis(
self,
image_path: str,
query: str
) -> dict:
"""
Use low-quality for initial filtering, high-quality for confident analysis.
"""
# Quick low-res analysis
low_res = self.preprocess_image(image_path, target_size=(256, 256))
quick_response = await self.client.messages.create(
model="gemini-2.0-flash-thinking",
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "data": low_res}},
{"type": "text", "text": f"Is this image relevant to: {query}? Answer yes or no."}
]
}]
)
if "yes" in quick_response.content[0].text.lower():
# Upgrade to high-res analysis
high_res = self.preprocess_image(image_path, target_size=(1024, 1024))
detailed_response = await self.client.messages.create(
model="gemini-2.0-flash-thinking",
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "data": high_res}},
{"type": "text", "text": query}
]
}]
)
return {"relevant": True, "analysis": detailed_response.content[0].text}
return {"relevant": False}
```
**Failure Modes:**
- Cache invalidation bugs where stale cached preprocessed images cause incorrect outputs. Include modification time in cache keys.
- Batch size too large causing memory exhaustion. Monitor GPU memory during batching.
- Adaptive quality making wrong filtering decisions. Audit precision/recall of filtering stage.