RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Advanced Multi-Modal Systems
  6. /Ch. 12
Advanced Multi-Modal Systems

12. Real-Time Processing

Chapter 12 of 24 · 20 min
KEY INSIGHT

Real-time multi-modal processing is fundamentally about managing latency budgets. The system must prioritize timely output over thorough analysis. When resources are constrained, reducing computation (fewer frames, simpler models) beats letting latency grow unbounded.

Real-time multi-modal processing enables interactive applications: live video analysis, voice assistants, robotics, and autonomous systems. This chapter covers latency management, streaming architectures, and optimization techniques.

Streaming Architecture

Real-time systems process data as it arrives, maintaining state across the stream. The key components: input buffering, incremental inference, and output streaming.

import asyncio
import queue
from dataclasses import dataclass

@dataclass
class StreamConfig:
    video_buffer_size: int = 8
    audio_buffer_size: int = 1600  # ~100ms at 16kHz
    inference_interval_ms: int = 100
    max_latency_ms: int = 500

class StreamingMultiModalProcessor:
    def __init__(self, model, config: StreamConfig):
        self.model = model
        self.config = config
        
        # Buffers for each modality
        self.video_buffer = queue.Queue(maxsize=config.video_buffer_size)
        self.audio_buffer = queue.Queue(maxsize=config.audio_buffer_size)
        
        # State maintained across stream
        self.video_state = None
        self.audio_state = None
        self.last_output_time = 0
    
    async def process_video_stream(self, video_reader):
        """Process video stream with bounded latency."""
        
        async def capture_loop():
            async for frame in video_reader:
                # Drop frames if buffer is full (prevents latency buildup)
                if self.video_buffer.full():
                    try:
                        self.video_buffer.get_nowait()
                    except queue.Empty:
                        pass
                
                self.video_buffer.put_nowait(frame)
        
        async def inference_loop():
            while True:
                if not self.video_buffer.empty():
                    frames = []
                    while not self.video_buffer.empty():
                        frames.append(self.video_buffer.get())
                    
                    # Process batch
                    result = await self._process_video_batch(frames)
                    
                    # Handle backpressure: skip if processing too slow
                    if self._check_latency_budget(result.timestamp):
                        await self._emit_output(result)
                    else:
                        print(f"WARNING: Skipping frame due to latency ({result.latency_ms}ms)")
                
                await asyncio.sleep(self.config.inference_interval_ms / 1000)
        
        await asyncio.gather(capture_loop(), inference_loop())
    
    async def _process_video_batch(self, frames):
        """Process a batch of video frames."""
        start_time = time.time()
        
        # Decode frames
        frame_tensors = [self._decode_frame(f) for f in frames]
        batch = torch.stack(frame_tensors)
        
        # Run inference
        with torch.no_grad():
            output = self.model(batch)
        
        latency_ms = (time.time() - start_time) * 1000
        
        return StreamResult(
            output=output,
            timestamp=frames[-1].timestamp,
            latency_ms=latency_ms
        )
    
    def _check_latency_budget(self, timestamp):
        """Ensure total latency stays within budget."""
        current_time = time.time() * 1000
        age = current_time - timestamp
        
        return age < self.config.max_latency_ms

Optimization Techniques

Real-time processing requires careful optimization. The main bottlenecks: data transfer, model inference, and memory bandwidth.

def optimize_for_inference(model, input_shape):
    """Apply common inference optimizations."""
    
    # 1. Quantization: reduce weight precision
    # This typically provides 2-4x speedup with <1% accuracy loss
    quantized_model = torch.quantization.quantize_dynamic(
        model,
        {torch.nn.Linear, torch.nn.Conv2d},
        dtype=torch.qint8
    )
    
    # 2. TorchScript for eager compilation bypass
    scripted_model = torch.jit.script(model)
    
    # 3. Attention KV-cache for autoregressive models
    # Reduces repeated computation on long sequences
    model.use_cache = True
    
    return quantized_model

def benchmark_latency(model, input_shape, num_runs=100):
    """Measure inference latency distribution."""
    
    # Warmup
    for _ in range(10):
        _ = model(torch.randn(*input_shape))
    
    latencies = []
    for _ in range(num_runs):
        start = time.perf_counter()
        _ = model(torch.randn(*input_shape))
        latencies.append((time.perf_counter() - start) * 1000)
    
    return {
        "mean_ms": np.mean(latencies),
        "p50_ms": np.percentile(latencies, 50),
        "p95_ms": np.percentile(latencies, 95),
        "p99_ms": np.percentile(latencies, 99),
    }

Handling Variable Frame Rates

Real-world video streams have variable frame rates. The processing system must adapt without accumulating latency.

def adaptive_frame_processing(frame, last_processed_time, target_fps=30):
    """Decide whether to process a frame based on timing."""
    
    current_time = time.time()
    time_since_last = current_time - last_processed_time
    target_interval = 1.0 / target_fps
    
    # Skip if too soon (would exceed target fps)
    if time_since_last < target_interval * 0.8:
        return {"process": False, "reason": "too_soon"}
    
    # Process if at or past target interval
    if time_since_last >= target_interval:
        return {"process": True, "frame": frame, "age_ms": time_since_last * 1000}
    
    # Near target: decide based on motion
    motion_score = compute_motion_score(frame, self.last_frame)
    
    if motion_score > 0.5:  # High motion, process anyway
        return {"process": True, "frame": frame, "age_ms": time_since_last * 1000}
    
    return {"process": False, "reason": "low_motion"}
EXERCISE

Implement a real-time video frame processor using OpenCV that reads from a webcam, runs inference at configurable target FPS, and displays the processed output with latency overlay. Test with target FPS of 5, 15, and 30. Report observed latency at each setting.

← Chapter 11
Video Agent
Chapter 13 →
Streaming Video