23. Multi-Modal Pipeline
End-to-end multimodal pipelines orchestrate multiple models and processing stages. Pipeline design determines overall system latency, throughput, and reliability.
Pipeline architectures include linear (sequential stages), parallel (branch processing), and hybrid (parallel branches feeding merged stages) topologies. Video analytics commonly uses parallel branches for audio and video processing that merge at the classification stage.
class VideoAudioPipeline:
def __init__(self, video_model, audio_model, fusion_layer):
self.video_model = video_model
self.audio_model = audio_model
self.fusion_layer = fusion_layer
def process_stream(self, video_frames, audio_samples):
"""
video_frames: Tensor of shape [T, C, H, W]
audio_samples: Tensor of shape [T, N]
"""
# Parallel inference
video_features = self.video_model(video_frames)
audio_features = self.audio_model(audio_samples)
# Synchronize modalities
# Video features: [T, D_v], Audio features: [T, D_a]
video_emb = video_features.mean(dim=0) # Pool over time
audio_emb = audio_features.mean(dim=0)
# Late fusion
combined = torch.cat([video_emb, audio_emb])
return self.fusion_layer(combined)
Asynchronous pipeline stages prevent blocking when one modality processes faster than another. Python asyncio enables non-blocking coordination between stages without multi-threading complexity. CUDA streams provide GPU-side parallelism when operations can overlap.
Backpressure handling prevents pipeline collapse when input rate exceeds processing capacity. Explicit buffering with size limits and drop policies ensures the pipeline remains responsive under load. Dead letter queues capture failed items for later analysis.
Local verification checkpoint
Run the smallest example from this chapter in a local workspace and record the package version, runtime, data path, and observed output. If the result depends on model size, vector count, CPU/GPU backend, or available memory, note that constraint beside the exercise so the lesson remains reproducible.
Local verification checkpoint
Run the smallest example from this chapter in a local workspace and record the package version, runtime, data path, and observed output. If the result depends on model size, vector count, CPU/GPU backend, or available memory, note that constraint beside the exercise so the lesson remains reproducible.
Build a video+audio classification pipeline. Measure per-stage latency and identify the bottleneck. Apply one optimization and verify throughput improvement.