12. Real-Time Processing
Real-time multi-modal processing enables interactive applications: live video analysis, voice assistants, robotics, and autonomous systems. This chapter covers latency management, streaming architectures, and optimization techniques.
Streaming Architecture
Real-time systems process data as it arrives, maintaining state across the stream. The key components: input buffering, incremental inference, and output streaming.
import asyncio
import queue
from dataclasses import dataclass
@dataclass
class StreamConfig:
video_buffer_size: int = 8
audio_buffer_size: int = 1600 # ~100ms at 16kHz
inference_interval_ms: int = 100
max_latency_ms: int = 500
class StreamingMultiModalProcessor:
def __init__(self, model, config: StreamConfig):
self.model = model
self.config = config
# Buffers for each modality
self.video_buffer = queue.Queue(maxsize=config.video_buffer_size)
self.audio_buffer = queue.Queue(maxsize=config.audio_buffer_size)
# State maintained across stream
self.video_state = None
self.audio_state = None
self.last_output_time = 0
async def process_video_stream(self, video_reader):
"""Process video stream with bounded latency."""
async def capture_loop():
async for frame in video_reader:
# Drop frames if buffer is full (prevents latency buildup)
if self.video_buffer.full():
try:
self.video_buffer.get_nowait()
except queue.Empty:
pass
self.video_buffer.put_nowait(frame)
async def inference_loop():
while True:
if not self.video_buffer.empty():
frames = []
while not self.video_buffer.empty():
frames.append(self.video_buffer.get())
# Process batch
result = await self._process_video_batch(frames)
# Handle backpressure: skip if processing too slow
if self._check_latency_budget(result.timestamp):
await self._emit_output(result)
else:
print(f"WARNING: Skipping frame due to latency ({result.latency_ms}ms)")
await asyncio.sleep(self.config.inference_interval_ms / 1000)
await asyncio.gather(capture_loop(), inference_loop())
async def _process_video_batch(self, frames):
"""Process a batch of video frames."""
start_time = time.time()
# Decode frames
frame_tensors = [self._decode_frame(f) for f in frames]
batch = torch.stack(frame_tensors)
# Run inference
with torch.no_grad():
output = self.model(batch)
latency_ms = (time.time() - start_time) * 1000
return StreamResult(
output=output,
timestamp=frames[-1].timestamp,
latency_ms=latency_ms
)
def _check_latency_budget(self, timestamp):
"""Ensure total latency stays within budget."""
current_time = time.time() * 1000
age = current_time - timestamp
return age < self.config.max_latency_ms
Optimization Techniques
Real-time processing requires careful optimization. The main bottlenecks: data transfer, model inference, and memory bandwidth.
def optimize_for_inference(model, input_shape):
"""Apply common inference optimizations."""
# 1. Quantization: reduce weight precision
# This typically provides 2-4x speedup with <1% accuracy loss
quantized_model = torch.quantization.quantize_dynamic(
model,
{torch.nn.Linear, torch.nn.Conv2d},
dtype=torch.qint8
)
# 2. TorchScript for eager compilation bypass
scripted_model = torch.jit.script(model)
# 3. Attention KV-cache for autoregressive models
# Reduces repeated computation on long sequences
model.use_cache = True
return quantized_model
def benchmark_latency(model, input_shape, num_runs=100):
"""Measure inference latency distribution."""
# Warmup
for _ in range(10):
_ = model(torch.randn(*input_shape))
latencies = []
for _ in range(num_runs):
start = time.perf_counter()
_ = model(torch.randn(*input_shape))
latencies.append((time.perf_counter() - start) * 1000)
return {
"mean_ms": np.mean(latencies),
"p50_ms": np.percentile(latencies, 50),
"p95_ms": np.percentile(latencies, 95),
"p99_ms": np.percentile(latencies, 99),
}
Handling Variable Frame Rates
Real-world video streams have variable frame rates. The processing system must adapt without accumulating latency.
def adaptive_frame_processing(frame, last_processed_time, target_fps=30):
"""Decide whether to process a frame based on timing."""
current_time = time.time()
time_since_last = current_time - last_processed_time
target_interval = 1.0 / target_fps
# Skip if too soon (would exceed target fps)
if time_since_last < target_interval * 0.8:
return {"process": False, "reason": "too_soon"}
# Process if at or past target interval
if time_since_last >= target_interval:
return {"process": True, "frame": frame, "age_ms": time_since_last * 1000}
# Near target: decide based on motion
motion_score = compute_motion_score(frame, self.last_frame)
if motion_score > 0.5: # High motion, process anyway
return {"process": True, "frame": frame, "age_ms": time_since_last * 1000}
return {"process": False, "reason": "low_motion"}
Implement a real-time video frame processor using OpenCV that reads from a webcam, runs inference at configurable target FPS, and displays the processed output with latency overlay. Test with target FPS of 5, 15, and 30. Report observed latency at each setting.