COURSE · OPS · A017

Model Compression

Learn model compression through RunLocalAI's practical lens: compression, pruning, distillation and pipeline, hardware fit, runtime settings, verification habits and local-vs-cloud tradeoffs.

18 chapters12hOperator trackBy Fredoline Eruo
PREREQUISITES
  • I016
  • A012

Why this course matters

Model Compression is for operators making local AI reliable, measurable and cheaper to run. It connects compression, pruning, distillation, pipeline and pareto to the questions RunLocalAI wants every reader to answer before they install, upgrade or scale a model: will it run, what will it cost in memory, what setting changes the result, and how do you verify the answer instead of trusting a demo?

What you will be able to do

By the end, you should be able to explain the main tradeoffs in plain language, choose a safe next experiment, and use the chapter exercises as a repeatable operator checklist. The course favors local evidence, hardware fit, context limits, latency and failure modes over generic AI vocabulary.

How to use this course

Start at chapter one if the topic is new. If you already have a working stack, scan for chapters such as Why Compression?, Pruning: Unstructured, Pruning: Structured and Magnitude Pruning and use those lessons as a quality-control pass before changing a workstation, team workflow or production-like local deployment.

CHAPTERS
  1. 01Why Compression?Model compression reduces computational requirements while preserving predictive performance, enabling deployment on constrained hardware without sacrificing functionality. Large neural networks achieve top-performing results across domains, but their size creates deployment barriers. A model with 7 billion parameters requires approximately 14GB of memory just to store weights in float32 format. Inference demands additional memory for activations, gradients during training, and intermediate computations. These requirements exclude deployment on edge devices, mobile hardware, and resource-constrained servers. Compression addresses three core constraints: memory footprint, inference latency, and computational cost. Memory footprint determines where models can run and how many can serve concurrently. Latency matters for interactive applications where response time affects user experience. Computational cost drives cloud infrastructure expenses and battery consumption on mobile devices. Several compression techniques exist with different tradeoffs. Pruning removes weights or neurons entirely, creating sparse models. Quantization reduces the bit-width of weights and activations, from float32 to int8 or lower. Knowledge distillation transfers capabilities from large models to smaller ones. Low-rank factorization approximates weight matrices with smaller ones. Each technique offers distinct compression ratios, latency improvements, and accuracy tradeoffs. The compression ratio versus accuracy tradeoff follows a Pareto pattern. Aggressive compression saves more memory but degrades performance. Moderate compression preserves accuracy while still gaining efficiency. The operator's role is identifying the compression level matching deployment constraints while maintaining acceptable performance. A practical reality emerges: not all compression techniques apply universally. Vision transformers compress differently than language models. Recurrent networks respond to different pruning strategies than attention-based architectures. Understanding why compression works requires examining what information models encode and how compression affects that information. This course examines pruning and distillation as primary compression tools. Pruning removes structural components from trained models. Distillation trains compact models to mimic larger ones. Later chapters cover combining these techniques into optimization pipelines that squeeze maximum efficiency from limited budgets.15 min
  2. 02Pruning: UnstructuredUnstructured pruning removes individual weights, achieving high sparsity but requiring specialized sparse matrix formats and hardware for speedups. Traditional pruning sets individual weights to zero. The model topology remains intact—layers, connections, activation functions—but most weights become zero. This approach predates modern deep learning, originating from optimal brain damage research in the 1980s. The intuition: many weights contribute minimally to predictions, and their elimination should not degrade performance substantially. Unstructured pruning achieves the highest theoretical sparsity. Compression ratios of 90% or higher are achievable on certain architectures, reducing model weights to 10% of original size. A 1GB model becomes 100MB, dramatically improving memory-constrained deployment scenarios. The implementation challenge lies in storage and computation. Standard dense matrix formats do not exploit zero values—they still allocate memory and perform multiplication. Sparse matrix formats like compressed sparse row (CSR) or compressed sparse column (CSC) store only non-zero values and their indices. However, sparse matrix operations on general-purpose hardware often run slower than equivalent dense operations due to irregular memory access patterns. Three factors determine whether sparse matrices provide speed advantages. First, sparsity level must exceed a threshold—typically 80-90%—where the overhead of storing indices becomes worthwhile. Second, the sparse format must match the hardware's memory access patterns. Third, the hardware must support efficient sparse operations, which current GPUs do not universally. Libraries like `scipy.sparse` provide sparse matrix primitives for Python. Deep learning frameworks offer sparse tensor support with varying maturity. TorchSparse and DeepSpeed provide sparse operations for PyTorch with hardware acceleration on compatible accelerators. A failure mode appears when applying unstructured pruning to batched inference. Batching processes multiple inputs simultaneously, improving hardware utilization. However, each input may have different sparsity patterns, complicating batch-level optimization. This batching inefficiency motivated structured pruning approaches.10 min
  3. 03Pruning: StructuredStructured pruning removes entire neurons, channels, or attention heads, producing dense models that benefit from standard hardware acceleration. Structured pruning eliminates whole computational units rather than individual weights. A channel pruning removes entire filter banks from convolutional layers. Head pruning removes complete attention heads from transformer architectures. Neuron pruning removes entire hidden units from feedforward layers. The result: model layers become smaller but remain dense. The dense format enables standard matrix multiplication without sparse overhead. Removing 50% of channels from a convolutional layer halves the FLOPs and memory traffic for that layer. Since no indices need tracking, inference speed improvements translate directly through the hardware's standard acceleration pathways. Channel pruning in convolutional networks illustrates the mechanism. A convolutional layer with 64 input channels and 64 output channels produces a weight tensor of shape (64, 64, K, K) where K is the kernel size. Pruning half the output channels reduces this to (64, 32, K, K). The remaining channels compute normally with dense matrix multiplication. Implementations typically combine structured and unstructured approaches. IMP (Iterative Magnitude Pruning) popularized a hierarchy: coarse structured pruning at the neuron level, followed by fine unstructured pruning within surviving neurons. This hierarchy balances hardware efficiency with compression granularity. ```python import torch import torch.nn.utils.prune as prune # Structured pruning at neuron level (remove entire columns) prune.ln_structured( model.linear_layer, name="weight", amount=0.5, n=2, # L2 norm dim=0 # prune columns (output neurons) ) # Structured pruning at filter level (remove entire kernels) prune.ln_structured( model.conv_layer, name="weight", amount=0.5, n=2,, dim=0 # prune filter indices (output channels) ) ``` A common failure occurs when structured pruning reduces layer dimensions inconsistently. If one layer prunes 30% of channels but the next layer prunes 60%, dimension mismatches arise. Maintaining alignment requires coordinating pruning decisions across layer boundaries or using adaptation layers that project between mismatched dimensions.15 min
  4. 04Magnitude PruningMagnitude pruning eliminates low-magnitude weights based on the hypothesis that smaller weights contribute less to model predictions. Magnitude pruning ranks weights by absolute value and removes the smallest fraction. The intuition: weights near zero have minimal influence on the dot products and transformations that constitute neural network computation. Removing them should little affect the model's behavior. The original magnitude pruning research demonstrated surprising effectiveness. Networks pruned to 50% sparsity after training and fine-tuned for several epochs matched dense network performance. This discovery contradicted prior assumptions that random initialization matters critically for performance. The typical magnitude pruning schedule follows a iterative pattern: prune for a training period, then restore pruned connections and re-train. The standard schedule from research: train for one epoch, prune 20% of remaining weights, re-train for a fraction of an epoch, and repeat. This gradual pruning allows the network to adapt to structural changes while maintaining performance. ```python def iterative_magnitude_pruning(model, train_loader, optimizer, prune_fraction=0.2, iterations=5, rewind_epochs=0.1): """ Standard iterative magnitude pruning procedure. Args: model: Trained PyTorch model train_loader: Training data optimizer: Optimizer for re-training prune_fraction: Fraction of weights to prune per iteration iterations: Number of prune-retrain cycles rewind_epochs: Epochs to re-train after each prune """ for iteration in range(iterations): # Calculate threshold for pruning all_weights = torch.cat([p.data.flatten() for p in model.parameters()]) threshold = torch.quantile( all_weights.abs(), prune_fraction ) # Apply unstructured pruning below threshold for name, module in model.named_modules(): if hasattr(module, 'weight'): prune.custom_from_mask( module, name='weight', mask=module.weight.abs() >= threshold ) # Re-train after pruning model.train() for batch in train_loader: optimizer.zero_grad() loss = compute_loss(model, batch) loss.backward() optimizer.step() ``` A critical failure mode involves re-winding to initial weights rather than trained weights. After pruning, networks sometimes perform better when weights are reset to their values from an earlier training stage. This rewinding suggests that magnitude pruning creates configurations dependent on training trajectory, and earlier configurations may generalize better. Hardware limitations become apparent at extreme sparsity levels. Below 95% sparsity, memory bandwidth rather than computation becomes the bottleneck. Magnitude pruning achieves this sparsity, but the practical speedup requires hardware optimized for sparse computation—hardware that remains uncommon in production environments.15 min
  5. 05Movement PruningMovement pruning removes weights that remain small throughout training, identifying parameters whose contribution decreases during optimization rather than at a static checkpoint. Standard magnitude pruning evaluates weights at a single checkpoint—typically after training completes. Movement pruning tracks weight magnitudes across training, identifying weights that begin small and stay small. These weights never contribute meaningfully to the network's learned function. The movement score measures how consistently a weight stays small. Weights that spike during training and return to small values demonstrate dynamic contribution. Weights that remain small throughout indicate persistent dormancy. Movement pruning removes the latter, preserving weights with time-varying importance. ```python class MovementPruner: """ Tracks weight movements across training to identify consistently small weights. """ def __init__(self, model, beta=0.9): self.model = model self.movement_scores = {} self.beta = beta # Exponential moving average decay self._register_hooks() def _register_hooks(self): """Register forward hooks to track weight magnitudes.""" for name, module in self.model.named_modules(): if hasattr(module, 'weight'): self.movement_scores[name] = torch.zeros_like(module.weight) def update_scores(self): """Update running movement scores with current magnitudes.""" for name, module in self.model.named_modules(): if hasattr(module, 'weight') and module.weight is not None: magnitude = module.weight.abs() self.movement_scores[name] = ( self.beta * self.movement_scores[name] + (1 - self.beta) * magnitude ) def prune(self, sparsity): """Prune weights with lowest movement scores.""" for name, module in self.model.named_modules(): if hasattr(module, 'weight') and name in self.movement_scores: scores = self.movement_scores[name] threshold = torch.quantile(scores.flatten(), sparsity) mask = scores > threshold module.weight.data = module.weight.data * mask.float() ``` Movement pruning offers several advantages over magnitude pruning. First, it identifies weights with consistently low contribution rather than those that happen to be small at evaluation time. Second, it tolerates weight magnifications during training that might later revert. Third, the movement pattern itself provides information about weight importance. The computational overhead of movement tracking remains modest. After each training step, the pruner updates exponential moving averages of weight magnitudes. No forward passes beyond those already required for training are needed. The scoring happens during the normal training loop. A failure mode appears when training hyperparameters interact poorly with movement scores. High learning rates cause weights to fluctuate more, reducing the signal-to-noise ratio in movement scores. Very low learning rates cause weights to move less, potentially misclassifying important weights as unimportant. Movement pruning works best with stable training dynamics.15 min
  6. 06Knowledge DistillationKnowledge distillation transfers capabilities from a large model to a smaller one by training the compact model to match both the training labels and the large model's soft probability distributions. The core insight behind knowledge distillation: large models capture richer information than their predictions alone indicate. When a language model predicts "cat" with 0.7 probability and "dog" with 0.2 probability, those relative probabilities encode information about semantic similarity between categories. A compact model trained only on hard labels lacks access to this dark knowledge. The distillation training procedure uses two loss components. The first term is the standard cross-entropy against training labels, ensuring the student learns correct categories. The second term matches the student's soft probability distributions against the teacher's distributions, teaching the student the teacher's generalization behavior. ```python import torch import torch.nn as nn import torch.nn.functional as F class DistillationLoss(nn.Module): """ Combines label loss with soft target loss from teacher model. """ def __init__(self, temperature=4.0, alpha=0.5, target='soft'): super().__init__() self.temperature = temperature self.alpha = alpha # Weight for soft targets vs hard labels self.target_type = target def forward(self, student_logits, teacher_logits=None, labels=None): """ Args: student_logits: Raw logits from student model teacher_logits: Raw logits from teacher model (optional) labels: Ground truth labels for hard targets """ total_loss = 0.0 # Hard label loss (standard cross-entropy) if labels is not None: hard_loss = F.cross_entropy(student_logits, labels) total_loss += (1 - self.alpha) * hard_loss # Soft target loss from teacher if teacher_logits is not None: soft_student = F.log_softmax(student_logits / self.temperature, dim=-1) soft_teacher = F.softmax(teacher_logits / self.temperature, dim=-1) soft_loss = F.kl_div( soft_student, soft_teacher, reduction='batchmean' ) * (self.temperature ** 2) total_loss += self.alpha * soft_loss return total_loss ``` The temperature parameter controls the softness of probability distributions. Higher temperatures spread probability across more classes, magnifying differences in teacher confidence. Lower temperatures sharpen distributions toward dominant classes. Temperature values between 2 and 10 typically work best, with the specific value depending on how many classes the model distinguishes. A failure mode involves temperature-sensitive loss scaling. The KL divergence loss term must be scaled by temperature squared to compensate for the softened distributions. Forgetting this scaling results in the soft loss dominating early training and the hard loss dominating late training, destabilizing learning. Distillation does not always improve performance. If the student architecture lacks sufficient capacity to represent the teacher's knowledge, distillation cannot create information that was never present. The student must be large enough to capture the essential patterns, even if smaller than the teacher.15 min
  7. 07Teacher-Student SetupSuccessful knowledge distillation requires careful architecture selection for the student model, balancing capacity constraints against deployment requirements. The teacher model is typically a pre-trained model already performing well on the target task. Large models with high accuracy serve as effective teachers. The student model must be compact enough for deployment constraints while maintaining sufficient representational capacity. Architecture selection for students differs from standard model design. Pruning-based distillers derive student architectures by removing structural components from the teacher. Low-rank factorizations create students from factorized teacher layers. Manual architectures provide explicit control over student parameters. Pruning-based student extraction offers a principled approach. After magnitude pruning the teacher to target sparsity, the remaining architecture defines the student. Training proceeds with distillation from the original teacher. This approach guarantees the student can represent a compression of the teacher. ```python def extract_student_by_pruning(teacher_model, target_sparsity=0.5): """ Extract student architecture by pruning teacher. Returns a student model with pruned architecture. """ student = type(teacher_model)(teacher_model.config) # Copy pruned teacher state for (t_name, t_param), (s_name, s_param) in zip( teacher_model.named_parameters(), student.named_parameters() ): if 'weight' in t_name: threshold = torch.quantile(t_param.abs(), target_sparsity) mask = t_param.abs() > threshold s_param.data = t_param.data * mask.float() else: s_param.data = t_param.data return student def manual_student_architecture(config): """ Create student architecture manually. Smaller hidden dimensions, fewer layers. """ return StudentModel( embed_dim=config.embed_dim // 2, # Half the teacher capacity num_layers=config.num_layers // 2, # Half the depth num_heads=config.num_heads // 2, # Half the attention heads ff_dim=config.ff_dim // 2, vocab_size=config.vocab_size, ) ``` A critical decision point involves intermediate layer matching. Standard distillation matches only final outputs, but intermediate representations also carry information. Intermediate layer distillation adds losses comparing student's hidden states against corresponding teacher hidden states, providing more gradient signal during training. The capacity gap between teacher and student creates a fundamental tension. Students too similar to teachers offer minimal compression. Students too small cannot learn the teacher's behavior. The optimal student architecture represents the minimum capacity needed to capture essential task performance. A failure mode emerges when the student learns to mimic the teacher without learning the underlying task. This can occur when hard label loss receives insufficient weight or when the teacher's soft targets contain spurious correlations that the student adopts. Regularization and validation monitoring help prevent this degenerate solution.15 min
  8. 08Distillation Loss FunctionsDifferent distillation loss formulations emphasize different aspects of teacher knowledge, and hybrid formulations typically outperform any single approach. Beyond soft and hard targets, several specialized loss functions extract specific knowledge types from teachers. Response-based distillation matches final outputs. Feature-based distillation matches intermediate representations. Relation-based distillation matches relationships between representations. Feature-based distillation connects intermediate layers. Hidden states in neural networks encode hierarchical features—early layers capture low-level patterns while later layers encode high-level abstractions. Teaching the student to produce similar intermediate representations transfers structured knowledge about feature hierarchies. ```python class FeatureDistillationLoss(nn.Module): """ Match intermediate feature representations between teacher and student. """ def __init__(self, hidden_size_match=True, temperature=2.0): super().__init__() self.temperature = temperature self.hidden_match = hidden_size_match # Projection layer if dimensions differ self.projection = None def forward(self, student_hidden, teacher_hidden, attention_mask=None): """ Args: student_hidden: Student's hidden states [batch, seq, hidden] teacher_hidden: Teacher's hidden states [batch, seq, hidden] """ if student_hidden.shape != teacher_hidden.shape: if self.projection is None: self.projection = nn.Linear( student_hidden.shape[-1], teacher_hidden.shape[-1] ).to(student_hidden.device) student_hidden = self.projection(student_hidden) # Cosine similarity between representations student_norm = F.normalize(student_hidden, p=2, dim=-1) teacher_norm = F.normalize(teacher_hidden, p=2, dim=-1) cosine_sim = (student_norm * teacher_norm).sum(dim=-1) feature_loss = (1 - cosine_sim).mean() return feature_loss * (self.temperature ** 2) ``` Relation-based distillation captures cross-layer relationships. Instead of matching individual representations, this approach matches relationships between representations. Two representations that are similar for the teacher should remain similar for the student. Gram matrices capture these pairwise relationships efficiently. A hybrid loss combines multiple distillation objectives: ```python class HybridDistillationLoss(nn.Module): """ Combines multiple distillation objectives. """ def __init__(self, label_weight=0.3, response_weight=0.3, feature_weight=0.2, relation_weight=0.2): super().__init__() self.label_weight = label_weight self.response_weight = response_weight self.feature_weight = feature_weight self.relation_weight = relation_weight self.response_loss = DistillationLoss() self.feature_loss = FeatureDistillationLoss() self.relation_loss = RelationDistillationLoss() def forward(self, batch): student = self.student(batch) teacher = self.teacher(batch) total_loss = ( self.label_weight * self.compute_label_loss(student.logits, batch.labels) + self.response_weight * self.response_loss( student.logits, teacher.logits ) + self.feature_weight * sum( self.feature_loss(s, t) for s, t in zip(student.hidden_states, teacher.hidden_states) ) + self.relation_weight * self.relation_loss( student.hidden_states, teacher.hidden_states ) ) return total_loss ``` Weight selection for loss components requires empirical tuning. Too much emphasis on soft targets risks mimicking teacher errors. Too much emphasis on hard labels wastes the teacher's generalization signal. Adaptive weighting schemes adjust loss coefficients during training based on validation performance.15 min
  9. 09Distillation at ScaleEnterprise-scale distillation introduces coordination overhead, computational expense, and distributed training challenges that require systematic infrastructure design. Large-scale distillation trains students against teachers comprising billions of parameters. The computational cost of running the teacher for every training step can exceed the cost of training the student itself. Efficient distillation pipelines must optimize teacher inference alongside student optimization. Batch distillation amortizes teacher computation across multiple student updates. Instead of generating teacher outputs for each student step, generate teacher outputs for many student batches and cache them. Student training proceeds using cached outputs, updating without repeatedly running the teacher. ```python class CachedDistillationPipeline: """ Discretizes teacher inference to amortize computational cost. """ def __init__(self, teacher, cache_size=1000, refresh_fraction=0.1): self.teacher = teacher self.cache = {} # {(input_hash): teacher_output} self.cache_size = cache_size self.refresh_fraction = refresh_fraction self.access_counts = defaultdict(int) def generate_cache(self, dataloader): """Pre-compute teacher outputs for dataset.""" self.teacher.eval() teacher_outputs = [] with torch.no_grad(): for batch_idx, batch in enumerate(dataloader): if len(teacher_outputs) >= self.cache_size: break inputs = self.collate_inputs(batch) with torch.cuda.amp.autocast(): outputs = self.teacher(inputs) teacher_outputs.append({ 'inputs': inputs, 'logits': outputs.logits, 'hidden_states': outputs.hidden_states, 'attention': outputs.attentions, }) # Store in cache for idx, item in enumerate(teacher_outputs[:self.cache_size]): self.cache[idx] = item return self.cache def get_distillation_batch(self, batch_idx): """Retrieve cached teacher output for batch.""" if batch_idx not in self.cache: # Regenerate if cache miss self.regenerate_single(batch_idx) return self.cache[batch_idx] def regenerate_single(self, batch_idx): """Regenerate cache entry for single batch.""" inputs = self.fetch_inputs(batch_idx) with torch.no_grad(), torch.cuda.amp.autocast(): self.cache[batch_idx] = self.teacher(inputs) ``` Distributed distillation splits training across multiple devices. The teacher can reside on one device generating outputs while the student trains on another. Pipeline parallelism overlaps teacher inference with student training stages, hiding latency from teacher computation. A failure mode involves staleness in cached distillation. Training dynamics shift as the student learns—cached teacher outputs reflect the teacher's behavior before student updates. Stale outputs cause the student to chase a moving target with delayed information. Periodic cache refreshes mitigate this issue at computational cost. Multi-teacher distillation trains against multiple teacher models simultaneously. Different teachers capture complementary knowledge—a retrieval teacher and a language understanding teacher, for example. Multi-teacher approaches are more complex but can transfer broader capabilities than single-teacher approaches.15 min
  10. 10Prune-Distill-Quantize PipelineCombining pruning, knowledge distillation, and quantization in a structured pipeline yields better results than applying these techniques in isolation, but the order and hyperparameters require careful tuning. The three compression techniques—pruning, distillation, and quantization—each target different aspects of model redundancy. A well-designed pipeline applies them sequentially to progressively reduce model size while preserving accuracy. However, the order significantly impacts final model quality. ### Pipeline Architecture A typical three-stage pipeline proceeds as follows: ```python class CompressionPipeline: def __init__(self, model, config): self.model = model self.config = config def compress(self, train_loader, eval_loader): # Stage 1: Pruning print("Stage 1: Structured pruning") pruned_model = self.apply_pruning( self.model, self.config.pruning_ratio ) pruned_model = self.finetune_after_pruning( pruned_model, train_loader ) # Stage 2: Knowledge Distillation print("Stage 2: Knowledge distillation") teacher_outputs = self.collect_teacher_logits( self.model, train_loader ) student_model = self.create_student(pruned_model) distilled_model = self.distill( student_model, teacher_outputs, train_loader ) # Stage 3: Quantization print("Stage 3: Quantization-aware training") quantized_model = self.quantize_aware_training( distilled_model, train_loader, self.config.target_precision ) return quantized_model ``` ### Order Considerations Applying techniques in the wrong order causes problems: - **Pruning after quantization**: Harder to identify which weights matter when they're already clustered by quantization boundaries - **Distillation before pruning**: Wastes capacity on weights that will be removed - **Quantization without calibration data**: Poor activation range estimates The recommended order (prune → distill → quantize) removes structural redundancy first, then transfers remaining knowledge to a leaner architecture, and finally reduces numerical precision. ### Hyperparameter Tuning Each stage's hyperparameters affect subsequent stages. A 70% pruned model may not have enough capacity to benefit from distillation, while an 8-bit quantized model may have insufficient gradient signal for effective distillation. ```python def find_optimal_sequence(config_space): results = [] for pruning_ratio in [0.5, 0.6, 0.7]: for distill_temperature in [2.0, 4.0, 8.0]: for target_bits in [8, 6, 4]: pipeline = CompressionPipeline(model, { 'pruning_ratio': pruning_ratio, 'distill_temperature': distill_temperature, 'target_precision': target_bits }) result = pipeline.compress(train_loader, eval_loader) results.append({ 'config': {...}, 'metrics': evaluate_model(result, test_loader) }) return select_pareto_optimal(results) ``` ### Failure Modes The pipeline fails when stages interact poorly: 1. **Catastrophic forgetting during finetuning**: Pruned model loses knowledge before distillation recovers it 2. **Distribution mismatch**: Teacher logits from original model don't match student's learning dynamics after pruning 3. **Quantization degradation compounds**: Each stage loses small amounts of accuracy, which accumulates Using early stopping with validation loss during each stage prevents excessive degradation.20 min
  11. 11Combined CompressionJoint optimization of multiple compression techniques often outperforms sequential application, but requires careful gradient coordination to avoid conflicting objectives. While sequential pipelines are simpler to implement, joint compression allows techniques to adapt to each other's effects. This is especially important when compression methods have interdependent effects on the loss landscape. ### Joint Optimization Framework ```python class JointCompression: def __init__(self, model, compression_params): self.model = model # Masks for pruning (binary) self.prune_mask = torch.ones_like(model.weight, dtype=torch.bool) # Scale factors for quantization (learnable) self.quant_scales = nn.Parameter(torch.ones_like(model.weight)) def forward(self, x): # Apply pruning mask weight_pruned = self.model.weight * self.prune_mask.float() # Apply quantization scaling weight_quant = weight_pruned * self.quant_scales # Quantize to target precision weight_discrete = self.round_ste(weight_quant) # STE for gradients # Forward pass with discrete weights return F.linear(x, weight_discrete, self.model.bias) def loss(self, output, target, teacher_output=None): task_loss = F.cross_entropy(output, target) # Pruning regularization: encourage sparsity prune_reg = 0.01 * self.prune_mask.float().mean() # Quantization regularization: encourage scales toward uniform scale_reg = 0.01 * (self.quant_scales.std() + 1e-6) # Distillation loss if teacher available distill_loss = 0 if teacher_output is not None: distill_loss = 0.5 * F.kl_div( F.log_softmax(output / 4.0, dim=-1), F.log_softmax(teacher_output / 4.0, dim=-1) ) return task_loss + prune_reg + scale_reg + distill_loss ``` ### Gradient Coordination When pruning and quantization gradients conflict, optimization becomes unstable. Pruning gradients encourage certain weights to zero, while quantization gradients encourage uniform scaling. Without coordination, the model oscillates. ```python def compute_coordinated_gradients(loss, model, prune_mask, quant_scales): # Compute gradients for each compression technique separately grad_task = torch.autograd.grad(loss, model.parameters(), retain_graph=True) grad_prune = torch.autograd.grad(loss, prune_mask, retain_graph=True) grad_quant = torch.autograd.grad(loss, quant_scales) # Detect conflicts: opposite sign gradients prune_conflict = detect_conflicts(grad_prune, grad_quant) if prune_conflict > 0.3: # Threshold for conflict # Reduce learning rate for conflicting components lr_reduction = 0.5 grad_prune = [g * lr_reduction for g in grad_prune] return grad_task, grad_prune, grad_quant ``` ### Practical Implementation Joint compression works best with iterative updates: 1. Initialize all masks and scales uniformly 2. Perform several gradient steps jointly 3. Periodically sharpen masks (push toward binary) and scales 4. Evaluate after each cycle for convergence ```python def joint_compress_loop(model, train_loader, epochs=100): compressor = JointCompression(model) optimizer = torch.optim.Adam([ {'params': model.parameters()}, {'params': compressor.prune_mask}, {'params': compressor.quant_scales, 'lr': 0.01} ], lr=0.001) for epoch in range(epochs): for batch in train_loader: optimizer.zero_grad() output = compressor(batch['input']) loss = compressor.loss(output, batch['target']) loss.backward() # Gradient coordination coordinated_grads = compute_coordinated_gradients( loss, model, compressor.prune_mask, compressor.quant_scales ) optimizer.step() # Periodic sharpening if epoch % 10 == 0: compressor.sharpen_masks() compressor.evaluate(model, eval_loader) ``` ### When Joint Beats Sequential Joint compression excels when: - Compression techniques compete for the same weights - Target compression ratio is aggressive (>80%) - Limited fine-tuning data makes each technique's accuracy recovery critical Sequential pipelines remain valuable for simpler models or when interpretability of each stage matters.20 min
  12. 12Pareto Frontier AnalysisThe Pareto frontier reveals the optimal trade-off curve between model accuracy and size, enabling informed decisions about which compression configurations to pursue. Understanding the relationship between accuracy and model size is essential for choosing compression strategies. The Pareto frontier identifies configurations where no improvement in one metric is possible without sacrificing the other. ### Computing the Frontier Generate multiple compression configurations spanning a wide range of target sizes, then plot accuracy versus model size: ```python def compute_pareto_frontier(model, compression_configs, test_loader): results = [] for config in compression_configs: compressed = apply_compression(model, config) accuracy = evaluate(compressed, test_loader) model_size = count_parameters(compressed) * config['bits'] / 8 results.append({ 'accuracy': accuracy, 'size_mb': model_size, 'config': config }) # Sort by accuracy descending results.sort(key=lambda x: x['accuracy'], reverse=True) # Identify Pareto-optimal points pareto_frontier = [] max_size_seen = 0 for r in results: # A point is Pareto-optimal if no other point has both # higher accuracy AND smaller size if r['size_mb'] >= max_size_seen: # Check if any point dominates this one is_dominated = any( other['accuracy'] > r['accuracy'] and other['size_mb'] < r['size_mb'] for other in results ) if not is_dominated: pareto_frontier.append(r) max_size_seen = r['size_mb'] return pareto_frontier ``` ### Visualization ```python import matplotlib.pyplot as plt def plot_pareto_frontier(results, pareto_points): plt.figure(figsize=(10, 6)) # Plot all points sizes = [r['size_mb'] for r in results] accuracies = [r['accuracy'] for r in results] plt.scatter(sizes, accuracies, alpha=0.5, label='All configurations') # Highlight Pareto frontier frontier_sizes = [p['size_mb'] for p in pareto_points] frontier_accs = [p['accuracy'] for p in pareto_points] plt.plot(frontier_sizes, frontier_accs, 'r-', linewidth=2, label='Pareto frontier') plt.scatter(frontier_sizes, frontier_accs, c='red', s=100, zorder=5) plt.xlabel('Model Size (MB)') plt.ylabel('Accuracy (%)') plt.legend() plt.grid(True, alpha=0.3) plt.savefig('pareto_frontier.png') ``` ### Interpreting the Frontier The frontier reveals several key insights: 1. **Diminishing returns**: Moving along the frontier from large to small models, accuracy drops slowly at first, then steeply as you approach the frontier's knee 2. **Compression headroom**: Points far from the frontier indicate inefficient compression—these configurations underperform relative to what's achievable 3. **Optimal operating points**: The knee of the frontier (where small size increases come at large accuracy costs) often represents the best deployment choice ```python def find_knee(frontier_points): """Find the knee point where the frontier has maximum curvature.""" import numpy as np sizes = np.array([p['size_mb'] for p in frontier_points]) accuracies = np.array([p['accuracy'] for p in frontier_points]) # Normalize to [0, 1] range sizes_norm = (sizes - sizes.min()) / (sizes.max() - sizes.min()) accuracies_norm = (accs - accuracies.min()) / (accuracies.max() - accuracies.min()) # Compute second derivative (curvature) # Higher curvature = knee region curvatures = np.gradient(np.gradient(accuracies_norm)) knee_idx = np.argmax(np.abs(curvatures)) return frontier_points[knee_idx] ``` ### Multi-Objective Frontier When optimizing beyond size and accuracy (e.g., latency, power consumption), use multi-objective optimization to generate the full Pareto set: ```python from pymoo.optimize import minimize from pymoo.problems.multi import get_problem def multi_objective_frontier(): problem = get_problem("dtlz1", n_var=10, n_obj=3) # 3 objectives algorithm = NSGA2( pop_size=100, elimination_duplicates=False ) result = minimize( problem, algorithm, ('n_gen', 200), seed=1, verbose=False ) return result.F # Pareto front approximation ``` ### Practical Usage Before committing to a compression configuration: 1. Generate the Pareto frontier across your design space 2. Identify the knee point as the default choice 3. Adjust toward smaller or larger models based on deployment constraints 4. Verify that chosen configurations remain on the frontier with validation data25 min
  13. 13Accuracy vs Size TradeoffsUnderstanding how accuracy degrades as model size decreases enables principled selection of compression targets based on acceptable loss thresholds. The relationship between model size and accuracy is rarely linear. Deep understanding of this relationship guides compression decisions and prevents over-compression or under-compression. ### Degradation Patterns Different model architectures exhibit different degradation patterns: ```python def analyze_degradation(model, size_targets, test_loader): """ Analyze how accuracy changes across different model sizes. Returns degradation rate and critical thresholds. """ results = [] baseline_acc = evaluate(model, test_loader) baseline_size = model.num_parameters() * 4 / 1e6 # MB (float32) for target_size_mb in size_targets: # Compute required compression ratio ratio = baseline_size / target_size_mb # Apply compression compressed = compress_to_size(model, ratio) acc = evaluate(compressed, test_loader) # Calculate metrics acc_drop = baseline_acc - acc size_reduction = ratio results.append({ 'target_size': target_size_mb, 'accuracy': acc, 'accuracy_drop': acc_drop, 'compression_ratio': ratio, 'efficiency': acc_drop / (1 - 1/ratio) # accuracy per size unit }) return results def compress_to_size(model, target_ratio): """Iteratively find compression settings to hit target size.""" # Binary search for pruning ratio to hit target size low, high = 0.0, 0.99 for _ in range(20): # Binary search iterations mid = (low + high) / 2 pruned = magnitude_pruning(model, mid) current_ratio = model.num_parameters() / pruned.num_parameters() if current_ratio < target_ratio: low = mid else: high = mid return pruned ``` ### Acceptable Loss Thresholds Different applications tolerate different accuracy losses: ```python def recommend_compression_target(task, baseline_acc): """ Recommend compression ratio based on acceptable accuracy loss. """ thresholds = { 'safety_critical': 0.01, # <1% accuracy drop allowed 'medical_diagnosis': 0.02, # <2% drop allowed 'standard_classification': 0.05, # <5% drop allowed 'ranking_recommendation': 0.10, # <10% drop allowed 'generative_creative': 0.15, # <15% drop allowed } max_drop = thresholds.get(task, 0.05) min_acceptable_acc = baseline_acc - max_drop return min_acceptable_acc ``` ### Identifying Critical Layers Some layers degrade faster under compression than others. Identifying these layers allows targeted preservation of important capacity: ```python def identify_critical_layers(model, train_loader): """ Identify layers where pruning causes largest accuracy drops. These layers should be pruned less aggressively. """ original_acc = evaluate(model, test_loader) layer_importance = {} for name, module in model.named_modules(): if isinstance(module, nn.Linear) or isinstance(module, nn.Conv2d): # Test sensitivity by pruning this layer alone pruned = prune_single_layer(model, name, 0.5) pruned_acc = evaluate(pruned, test_loader) layer_importance[name] = original_acc - pruned_acc # Sort by importance (highest first) sorted_importance = sorted( layer_importance.items(), key=lambda x: x[1], reverse=True ) return sorted_importance def compress_with_layer_sensitivity(model, sensitivity_scores, target_ratio): """ Apply variable pruning ratios based on layer sensitivity. Critical layers get higher保留 (less pruning). """ # Assign pruning ratios inversely proportional to sensitivity # High sensitivity = low pruning ratio sensitivity_values = list(sensitivity_scores.values()) max_sens = max(sensitivity_values) for name, sens in sensitivity_scores.items(): # Normalize to [0.3, 0.8] range # Less critical layers can be pruned more layer_prune_ratio = 0.3 + 0.5 * (sens / max_sens) prune_layer(model, name, layer_prune_ratio) return model ``` ### Degradation Recovery Fine-tuning partially recovers accuracy lost during compression: ```python def gradual_degradation_recovery(model, train_loader, eval_loader): """ Apply compression in stages with recovery between each. """ stages = [0.2, 0.4, 0.6, 0.8] # Progressive pruning ratios current_model = model for stage_ratio in stages: # Apply stage compression current_model = magnitude_pruning(current_model, stage_ratio) # Recovery fine-tuning current_model = finetune_recovery( current_model, train_loader, epochs=5, eval_loader=eval_loader ) acc = evaluate(current_model, eval_loader) size = count_parameters(current_model) print(f"Stage {stage_ratio}: acc={acc:.4f}, size={size/1e6:.2f}M") return current_model ``` ### Common Failure: Over-Compression The most common error is aggressive compression without verifying the accuracy impact: ```python def validate_compression_target(compressed_model, original_model, test_loader, max_accuracy_drop=0.05): """ Validate that compression stayed within acceptable accuracy loss. """ original_acc = evaluate(original_model, test_loader) compressed_acc = evaluate(compressed_model, test_loader) actual_drop = original_acc - compressed_acc if actual_drop > max_accuracy_drop: print(f"WARNING: Accuracy drop {actual_drop:.4f} exceeds threshold " f"{max_accuracy_drop:.4f}") print("Consider reducing compression intensity or using more recovery epochs.") return False return True ```25 min
  14. 14Hardware-Aware CompressionCompression choices should account for target hardware characteristics; different devices favor different compression strategies for optimal inference performance. Not all compression techniques improve performance equally across hardware platforms. A 4-bit quantized model may be faster on GPUs with native int8 support but slower on CPUs without vectorized int4 operations. ### Hardware Profiling ```python import time import torch class HardwareProfiler: def __init__(self, device): self.device = device self.results = {} def profile_operation(self, op_name, fn, *args, **kwargs): """ Profile execution time and memory usage of an operation. """ if self.device == 'cuda': torch.cuda.reset_peak_memory_stats() torch.cuda.synchronize() start_time = time.perf_counter() result = fn(*args, **kwargs) if self.device == 'cuda': torch.cuda.synchronize() end_time = time.perf_counter() memory_mb = 0 if self.device == 'cuda': memory_mb = torch.cuda.max_memory_allocated() / 1e6 self.results[op_name] = { 'time_ms': (end_time - start_time) * 1000, 'memory_mb': memory_mb } return result def report(self): for op, metrics in self.results.items(): print(f"{op}: {metrics['time_ms']:.2f}ms, {metrics['memory_mb']:.2f}MB") ``` ### Hardware-Specific Optimization Different targets require different strategies: ```python class HardwareAwareCompressor: def recommend_strategy(self, target_device): """ Recommend compression strategy based on hardware. """ strategies = { 'nvidia_gpu': { 'quantization_bits': 8, # INT8 tensor cores available 'pruning_type': 'structured', # Better memory access patterns 'layout': 'NCHW', # Optimized for convolution 'precision': 'fp16' # Tensor core compatible }, 'cpu': { 'quantization_bits': 16, # AVX2 may not have efficient int8 'pruning_type': 'unstructured', # More flexibility 'layout': 'NHWC', # Better cache utilization 'precision': 'bf16' # Better numerical stability on CPU }, 'mobile_npu': { 'quantization_bits': 8, # Fixed-function accelerators 'pruning_type': 'channel', # Matches fixed hardware shapes 'layout': 'NCHW', # Typical for mobile processors 'precision': 'int8' # Hardware natively supports }, 'embedded_mcu': { 'quantization_bits': 4, # Minimal memory 'pruning_type': 'structured', # Predictable access patterns 'layout': 'NCHW', 'precision': 'int4' # Smallest representable } } return strategies.get(target_device, strategies['cpu']) ``` ### Benchmark-Based Selection ```python def benchmark_compression_strategies(model, test_input, target_device): """ Benchmark multiple compression strategies on target hardware. """ strategies = [ {'bits': 8, 'pruning': 0.5, 'method': 'int8_quantize'}, {'bits': 8, 'pruning': 0.7, 'method': 'int8_quantize'}, {'bits': 4, 'pruning': 0.5, 'method': 'int4_quantize'}, {'bits': 16, 'pruning': 0.5, 'method': 'bf16_quantize'}, ] results = [] for strategy in strategies: compressed = apply_compression(model, strategy) # Warm-up runs for _ in range(3): compressed(test_input) # Timed runs times = [] for _ in range(10): start = time.perf_counter() output = compressed(test_input) if target_device == 'cuda': torch.cuda.synchronize() times.append(time.perf_counter() - start) results.append({ 'strategy': strategy, 'mean_latency_ms': np.mean(times) * 1000, 'std_ms': np.std(times) * 1000, 'accuracy': evaluate(compressed, test_loader) }) return sorted(results, key=lambda x: x['mean_latency_ms']) ``` ### Memory Bandwidth Considerations Compression effectiveness depends on memory bandwidth constraints: ```python def analyze_memory_bottleneck(model, input_shape): """ Analyze whether model is compute-bound or memory-bound. Determines which compression helps most. """ # Count memory accesses per operation input_tensor = torch.randn(input_shape).cuda() model = model.cuda() model.eval() activations_memory = 0 for module in model.modules(): if isinstance(module, nn.Conv2d): # Memory for output activation out_h = input_tensor.shape[2] // module.stride[0] out_w = input_tensor.shape[3] // module.stride[1] activations_memory += out_h * out_w * module.out_channels * 4 # Compute-to-memory ratio total_params = sum(p.numel() for p in model.parameters()) compute_ops = sum( m.weight.numel() * input_tensor.shape[2] // m.stride[0] for m in model.modules() if isinstance(m, nn.Conv2d) ) ratio = compute_ops / (total_params + activations_memory) if ratio < 1.0: print("Memory-bound: Focus on reducing model size (pruning, quantization)") else: print("Compute-bound: Focus on reducing compute (architecture changes)") return ratio ``` ### Device-Specific Failure Modes | Device | Common Failure | Mitigation | |--------|---------------|------------| | GPU | Unstructured pruning causes irregular memory access | Use structured pruning patterns (N:M) | | CPU | int4 quantization without hardware support | Stay at int8 or use CPU-specific kernels | | Mobile NPU | Pruning changes tensor shapes | Use channel pruning to preserve shapes | | MCU | Quantization noise accumulation | Use symmetric quantization, reduce bit width gradually |25 min
  15. 15Compression BenchmarkingRigorous benchmarking requires standardized metrics, diverse workloads, and statistical validation to ensure compression results are reproducible and comparable. Benchmarking compression requires more than simple accuracy measurements. Thorough evaluation covers latency, memory, throughput, and accuracy across multiple conditions. ### Benchmarking Framework ```python class CompressionBenchmark: def __init__(self, model, test_data): self.model = model self.test_data = test_data def full_benchmark(self, device='cuda'): """ Run thorough benchmark suite. """ results = { 'accuracy': self.benchmark_accuracy(), 'latency': self.benchmark_latency(device), 'throughput': self.benchmark_throughput(device), 'memory': self.benchmark_memory(device), 'model_size': self.measure_model_size() } return results def benchmark_accuracy(self): """Measure task accuracy with confidence interval.""" self.model.eval() correct = 0 total = 0 all_preds = [] all_targets = [] with torch.no_grad(): for batch in self.test_data: inputs = batch['input'].to(self.model.device) targets = batch['target'].to(self.model.device) outputs = self.model(inputs) preds = outputs.argmax(dim=1) correct += (preds == targets).sum().item() total += targets.shape[0] all_preds.extend(preds.cpu().numpy()) all_targets.extend(targets.cpu().numpy()) accuracy = correct / total # Compute confidence interval from scipy import stats n = len(all_preds) se = np.sqrt(accuracy * (1 - accuracy) / n) ci = stats.t.interval(0.95, n-1, loc=accuracy, scale=se) return { 'accuracy': accuracy, 'confidence_interval': (ci[0], ci[1]), 'std_error': se } def benchmark_latency(self, device, warmup=10, iterations=100): """Measure inference latency with warmup.""" self.model.eval() if device == 'cuda': torch.cuda.empty_cache() test_input = self.test_data[0]['input'].to(device) # Warmup for _ in range(warmup): _ = self.model(test_input) if device == 'cuda': torch.cuda.synchronize() # Measure times = [] for _ in range(iterations): start = time.perf_counter() _ = self.model(test_input) if device == 'cuda': torch.cuda.synchronize() end = time.perf_counter() times.append((end - start) * 1000) # ms return { 'mean_ms': np.mean(times), 'p50_ms': np.percentile(times, 50), 'p95_ms': np.percentile(times, 95), 'p99_ms': np.percentile(times, 99), 'std_ms': np.std(times) } ``` ### Throughput Measurement ```python def benchmark_throughput(self, device, duration_seconds=5): """ Measure sustained throughput over a time period. """ self.model.eval() test_input = self.test_data[0]['input'].to(device) batch_size = test_input.shape[0] # Warmup for _ in range(10): _ = self.model(test_input) if device == 'cuda': torch.cuda.synchronize() # Throughput test start_time = time.time() count = 0 while time.time() - start_time < duration_seconds: _ = self.model(test_input) count += 1 if device == 'cuda': torch.cuda.synchronize() elapsed = time.time() - start_time return { 'samples_per_second': count * batch_size / elapsed, 'batches_per_second': count / elapsed, 'batch_size': batch_size } def benchmark_memory(self, device): """Measure peak memory usage.""" if device == 'cuda': torch.cuda.reset_peak_memory_stats() self.model.eval() for batch in self.test_data: inputs = batch['input'].to(device) _ = self.model(inputs) break # Just need one batch for memory measurement if device == 'cuda': peak_memory = torch.cuda.max_memory_allocated() / (1024 ** 2) # MB return {'peak_memory_mb': peak_memory} return {'peak_memory_mb': None} # CPU measurement not available ``` ### Comparative Benchmarking ```python def compare_compression_methods(original_model, compressed_models, test_data): """ Compare multiple compression configurations side-by-side. """ benchmark = CompressionBenchmark(original_model, test_data) baseline = benchmark.full_benchmark() results = { 'baseline': baseline, 'comparisons': [] } for name, compressed_model in compressed_models.items(): benchmark = CompressionBenchmark(compressed_model, test_data) compressed_results = benchmark.full_benchmark() comparison = { 'name': name, 'results': compressed_results, 'relative': { 'accuracy_delta': compressed_results['accuracy']['accuracy'] - baseline['accuracy']['accuracy'], 'latency_speedup': baseline['latency']['mean_ms'] / compressed_results['latency']['mean_ms'], 'size_reduction': baseline['model_size'] / compressed_results['model_size'], 'memory_reduction': baseline['memory']['peak_memory_mb'] / compressed_results['memory']['peak_memory_mb'] } } results['comparisons'].append(comparison) return results ``` ### Benchmark Reporting ```python def generate_benchmark_report(results): """ Generate human-readable benchmark report. """ print("=" * 60) print("COMPRESSION BENCHMARK REPORT") print("=" * 60) baseline = results['baseline'] print(f"\nBaseline Model:") print(f" Accuracy: {baseline['accuracy']['accuracy']:.4f}") print(f" Latency (mean): {baseline['latency']['mean_ms']:.2f}ms") print(f" Model size: {baseline['model_size']:.2f}MB") print("\n" + "-" * 60) print(f"{'Method':<20} {'Acc Δ':<10} {'Speedup':<10} {'Size ↓':<10}") print("-" * 60) for comp in results['comparisons']: rel = comp['relative'] print(f"{comp['name']:<20} " f"{rel['accuracy_delta']:+.4f} " f"{rel['latency_speedup']:.2f}x " f"{rel['size_reduction']:.2f}x") print("=" * 60) ```25 min
  16. 16Deploying Compressed ModelsSuccessful deployment requires not just compression but also proper export, runtime configuration, and monitoring infrastructure to maintain model quality in production. Deploying compressed models involves more than converting weights. The entire inference pipeline must adapt to compressed representations while maintaining reliability. ### Model Export ```python import torch import onnx class CompressedModelExporter: def __init__(self, model): self.model = model def export_to_onnx(self, output_path, input_shape, opset_version=13): """ Export compressed model to ONNX format. Handles quantization nodes and pruned tensors. """ self.model.eval() # Create dummy input matching expected shape dummy_input = torch.randn(input_shape) # Export to ONNX torch.onnx.export( self.model, dummy_input, output_path, export_params=True, opset_version=opset_version, do_constant_folding=True, input_names=['input'], output_names=['output'], dynamic_axes={ 'input': {0: 'batch_size'}, 'output': {0: 'batch_size'} } ) # Verify export self._verify_onnx(output_path, input_shape) return output_path def _verify_onnx(self, onnx_path, input_shape): """Verify ONNX model loads and produces valid outputs.""" import onnxruntime as ort session = ort.InferenceSession(onnx_path) # Run inference input_data = np.random.randn(*input_shape).astype(np.float32) output = session.run(None, {'input': input_data}) assert len(output) > 0, "ONNX model produced no outputs" assert not np.any(np.isnan(output[0])), "ONNX output contains NaN" assert not np.any(np.isinf(output[0])), "ONNX output contains Inf" def export_with_quantization(self, output_path, calibration_data): """ Export model with post-training quantization applied. """ import torch.quantization as tq # Prepare model for quantization quantized_model = torch.quantization.quantize_ptq( self.model, tq.get_default_qconfig('fbgemm'), calibration_data ) # Export quantized model torch.save({ 'state_dict': quantized_model.state_dict(), 'quantization_config': quantized_model.qconfig, 'architecture': type(self.model).__name__ }, output_path) return output_path ``` ### Runtime Configuration ```python class CompressedInferenceEngine: def __init__(self, model_path, device='cpu'): self.device = device self.model = self._load_model(model_path) def _load_model(self, model_path): """Load model with appropriate runtime settings.""" if model_path.endswith('.onnx'): return self._load_onnx(model_path) elif model_path.endswith('.pt'): return self._load_torch(model_path) else: raise ValueError(f"Unsupported format: {model_path}") def _load_onnx(self, model_path): """Load ONNX model with optimized runtime.""" import onnxruntime as ort providers = { 'cpu': ['CPUExecutionProvider'], 'cuda': ['CUDAExecutionProvider', 'CPUExecutionProvider'], 'tensorrt': ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider'] } sess_options = ort.SessionOptions() sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL session = ort.InferenceSession( model_path, sess_options, providers=providers.get(self.device, providers['cpu']) ) return session def predict(self, inputs): """ Run inference with error handling. """ try: if isinstance(self.model, ort.InferenceSession): # ONNX inference input_name = self.model.get_inputs()[0].name output_name = self.model.get_outputs()[0].name outputs = self.model.run([output_name], {input_name: inputs}) return outputs[0] else: # PyTorch inference with torch.no_grad(): return self.model(torch.from_numpy(inputs)).numpy() except Exception as e: logging.error(f"Inference failed: {e}") return self._fallback_predict(inputs) def _fallback_predict(self, inputs): """ Fallback prediction for when primary model fails. Could load a backup model or return cached predictions. """ raise RuntimeError("Both primary and fallback inference failed") ``` ### Monitoring in Production ```python import logging class ModelMonitor: def __init__(self, model, metrics_backend): self.model = model self.backend = metrics_backend self.prediction_count = 0 def predict_with_monitoring(self, inputs): """ Run prediction and track metrics. """ start_time = time.perf_counter() # Run inference outputs = self.model.predict(inputs) # Track latency latency_ms = (time.perf_counter() - start_time) * 1000 # Track predictions self.prediction_count += 1 # Report metrics self.backend.gauge('model_latency_ms', latency_ms) self.backend.gauge('predictions_total', self.prediction_count) # Check for anomalies if np.any(np.isnan(outputs)): logging.warning(f"NaN detected in prediction {self.prediction_count}") self.backend.increment('nan_predictions') if np.any(np.abs(outputs) > 100): logging.warning(f"Unusual output magnitude detected") self.backend.increment('outlier_predictions') return outputs def report_accuracy(self, y_true, y_pred): """Track batch accuracy for monitoring drift.""" accuracy = np.mean(y_true == y_pred) self.backend.gauge('accuracy', accuracy) # Check for accuracy degradation if accuracy < self.baseline_accuracy - 0.05: self._alert_accuracy_degradation() def _alert_accuracy_degradation(self): """Alert when accuracy drops below threshold.""" logging.critical( f"Model accuracy degraded below acceptable threshold. " f"Predictions: {self.prediction_count}" ) ``` ### Deployment Checklist Before production deployment: - [ ] Verify accuracy on held-out test set - [ ] Benchmark latency on target hardware - [ ] Check model file size and memory requirements - [ ] Validate ONNX export produces correct outputs - [ ] Test with production traffic patterns - [ ] Set up monitoring and alerting - [ ] Prepare rollback procedure - [ ] Document compression configuration and date20 min
  17. 17Compression EvaluationThorough evaluation of compressed models requires checking not just aggregate metrics but also behavioral consistency with the original model across diverse inputs. Evaluating compression requires more than comparing accuracy numbers. The compressed model must behave consistently with the original across different input types, edge cases, and confidence patterns. ### Behavioral Consistency ```python class BehavioralEvaluator: def __init__(self, original_model, compressed_model, test_data): self.original = original_model self.compressed = compressed_model self.test_data = test_data def evaluate_behavioral_alignment(self): """ Evaluate how well compressed model aligns with original's behavior. """ self.original.eval() self.compressed.eval() results = { 'confidence_calibration': self._check_calibration(), 'prediction_agreement': self._check_prediction_agreement(), 'error_correlation': self._check_error_correlation(), 'adversarial_reliableness': self._check_adversarial_reliableness() } return results def _check_prediction_agreement(self): """ Check what fraction of predictions match between models. """ agreements = [] confidence_agreement = [] with torch.no_grad(): for batch in self.test_data: inputs = batch['input'] orig_outputs = self.original(inputs) comp_outputs = self.compressed(inputs) # Top-1 agreement orig_pred = orig_outputs.argmax(dim=1) comp_pred = comp_outputs.argmax(dim=1) agreements.append((orig_pred == comp_pred).float().mean().item()) # Confidence alignment orig_conf = F.softmax(orig_outputs, dim=1).max(dim=1)[0] comp_conf = F.softmax(comp_outputs, dim=1).max(dim=1)[0] confidence_agreement.append( torch.abs(orig_conf - comp_conf).mean().item() ) return { 'mean_top1_agreement': np.mean(agreements), 'mean_confidence_diff': np.mean(confidence_agreement) } ``` ### Calibration Analysis ```python def _check_calibration(self): """ Evaluate whether confidence scores match actual accuracy. """ from sklearn.calibration import calibration_curve all_probs = [] all_labels = [] all_preds = [] with torch.no_grad(): for batch in self.test_data: inputs = batch['input'] labels = batch['target'] outputs = self.compressed(inputs) probs = F.softmax(outputs, dim=1) all_probs.append(probs.cpu().numpy()) all_labels.append(labels.cpu().numpy()) all_probs = np.concatenate(all_probs) all_labels = np.concatenate(all_labels) all_preds = all_probs.argmax(axis=1) # Expected Calibration Error (ECE) n_bins = 10 bin_boundaries = np.linspace(0, 1, n_bins + 1) ece = 0 total = len(all_labels) for i in range(n_bins): bin_lower = bin_boundaries[i] bin_upper = bin_boundaries[i + 1] in_bin = (all_probs.max(axis=1) >= bin_lower) & \ (all_probs.max(axis=1) < bin_upper) if in_bin.sum() > 0: bin_accuracy = (all_preds[in_bin] == all_labels[in_bin]).mean() bin_confidence = all_probs.max(axis=1)[in_bin].mean() ece += (in_bin.sum() / total) * abs(bin_accuracy - bin_confidence) return { 'expected_calibration_error': ece, 'n_samples': total } ``` ### Critical Failure Detection ```python def detect_critical_failures(self): """ Find cases where compressed model fails badly while original succeeds. """ critical_failures = [] with torch.no_grad(): for i, batch in enumerate(self.test_data): inputs = batch['input'] labels = batch['target'] orig_outputs = self.original(inputs) comp_outputs = self.compressed(inputs) orig_pred = orig_outputs.argmax(dim=1) comp_pred = comp_outputs.argmax(dim=1) # Critical failure: original correct, compressed wrong orig_correct = (orig_pred == labels) comp_wrong = (comp_pred != labels) critical_mask = orig_correct & comp_wrong if critical_mask.any(): indices = critical_mask.nonzero().squeeze() for idx in indices: confidence_original = F.softmax( orig_outputs[idx:idx+1], dim=1 ).max().item() confidence_compressed = F.softmax( comp_outputs[idx:idx+1], dim=1 ).max().item() critical_failures.append({ 'sample_idx': i * batch['input'].shape[0] + idx.item(), 'original_confidence': confidence_original, 'compressed_confidence': confidence_compressed, 'gap': confidence_original - confidence_compressed }) # Sort by confidence gap critical_failures.sort(key=lambda x: x['gap'], reverse=True) return critical_failures[:20] # Top 20 most critical ``` ### Fairness Evaluation ```python def evaluate_fairness(self, sensitive_attribute): """ Check if compression affects different groups equally. """ group_metrics = {} with torch.no_grad(): for batch in self.test_data: inputs = batch['input'] labels = batch['target'] groups = batch[sensitive_attribute] outputs = self.compressed(inputs) preds = outputs.argmax(dim=1) correct = (preds == labels) for group_id in torch.unique(groups): mask = (groups == group_id) if group_id.item() not in group_metrics: group_metrics[group_id.item()] = {'correct': 0, 'total': 0} group_metrics[group_id.item()]['correct'] += correct[mask].sum().item() group_metrics[group_id.item()]['total'] += mask.sum().item() # Compute per-group accuracy fairness_report = {} accuracies = [] for group_id, metrics in group_metrics.items(): acc = metrics['correct'] / metrics['total'] accuracies.append(acc) fairness_report[group_id] = { 'accuracy': acc, 'samples': metrics['total'] } # Disparate impact ratio min_acc = min(accuracies) max_acc = max(accuracies) fairness_report['disparate_impact_ratio'] = min_acc / max_acc return fairness_report ``` ### Evaluation Summary Report ```python def generate_evaluation_report(evaluator, original_model, compressed_model): """ Generate thorough evaluation report. """ results = evaluator.evaluate_behavioral_alignment() critical = evaluator.detect_critical_failures() fairness = evaluator.evaluate_fairness('group_id') report = [] report.append("=" * 60) report.append("COMPRESSION EVALUATION REPORT") report.append("=" * 60) report.append(f"\nBehavioral Alignment:") report.append(f" Prediction Agreement: {results['prediction_agreement']['mean_top1_agreement']:.4f}") report.append(f" Calibration ECE: {results['confidence_calibration']['expected_calibration_error']:.4f}") report.append(f"\nCritical Failures: {len(critical)}") if critical: report.append(f" Top failure confidence gap: {critical[0]['gap']:.4f}") report.append(f"\nFairness Analysis:") for group, metrics in fairness.items(): if isinstance(group, int): report.append(f" Group {group}: {metrics['accuracy']:.4f} ({metrics['samples']} samples)") else: report.append(f" {group}: {metrics:.4f}") report.append("=" * 60) return "\n".join(report) ```25 min
  18. 18Model Compression Pipeline ProjectBuilding an end-to-end compression pipeline requires integrating multiple techniques, handling edge cases, and validating results at each stage to produce production-ready compressed models. This final chapter guides you through building a complete model compression pipeline that applies pruning, distillation, and quantization in a coordinated workflow to compress a real model. ### Project Overview You will compress a ResNet-18 model for image classification, targeting: - 75% reduction in model size - Less than 2% accuracy drop from baseline (75.1% top-1 on ImageNet subset) - Inference latency under 5ms on target hardware ### Starter Code ```python import torch import torch.nn as nn import torch.nn.functional as F import torchvision.models as models from torchvision import transforms from torch.utils.data import DataLoader class ModelCompressionPipeline: def __init__(self, model, config): self.model = model self.config = config self.history = [] def run(self, train_loader, val_loader, test_loader): """ Execute the full compression pipeline. """ print("=" * 60) print("Starting Model Compression Pipeline") print("=" * 60) # Stage 1: Structured Pruning print("\n[Stage 1] Structured Pruning") self.model = self.apply_structured_pruning( self.model, train_loader, val_loader, sparsity=self.config.pruning_sparsity ) self._evaluate("after_pruning", test_loader) # Stage 2: Knowledge Distillation print("\n[Stage 2] Knowledge Distillation") teacher_model = self._create_teacher_copy() self.model = self.knowledge_distillation( self.model, teacher_model, train_loader, val_loader, temperature=self.config.distill_temperature ) self._evaluate("after_distillation", test_loader) # Stage 3: Quantization print("\n[Stage 3] Quantization") self.model = self.quantize_model( self.model, train_loader, val_loader, target_bits=self.config.target_bits ) self._evaluate("after_quantization", test_loader) print("\n" + "=" * 60) print("Pipeline Complete") self._print_summary() print("=" * 60) return self.model ``` ### Stage 1: Structured Pruning ```python def apply_structured_pruning(self, model, train_loader, val_loader, sparsity): """ Apply structured channel pruning based on activation statistics. """ # Compute channel importance using Taylor method importance = self._compute_channel_importance(model, train_loader) # Determine pruning thresholds per layer thresholds = self._compute_pruning_thresholds(importance, sparsity) # Create pruning masks masks = self._create_structured_masks(model, importance, thresholds) # Apply pruning masks model = self._apply_masks(model, masks) # Recovery fine-tuning model = self._finetune_recovery(model, train_loader, val_loader, epochs=5) return model def _compute_channel_importance(self, model, train_loader): """ Compute channel importance using first-order Taylor approximation. """ model.eval() importance = {} # Hook to capture gradients gradients = {} def compute_grad(name): def hook(grad): gradients[name] = grad return hook hooks = [] for name, module in model.named_modules(): if isinstance(module, nn.Conv2d): handle = module.weight.register_hook(compute_grad(name)) hooks.append(handle) # Collect importance metrics importance_sum = {} for batch in train_loader: inputs, targets = batch outputs = model(inputs) # Use magnitude of gradients as importance loss = F.cross_entropy(outputs, targets) loss.backward() for name, module in model.named_modules(): if isinstance(module, nn.Conv2d): grad = gradients.get(name) if grad is not None: imp = grad.abs().mean(dim=(1, 3)) # Per-channel mean if name not in importance_sum: importance_sum[name] = imp else: importance_sum[name] += imp # Clean up hooks for handle in hooks: handle.remove() # Normalize importance for name in importance_sum: imp = importance_sum[name] importance[name] = imp / (imp.sum() + 1e-8) return importance ``` ### Stage 2: Knowledge Distillation ```python def knowledge_distillation(self, student_model, teacher_model, train_loader, val_loader, temperature=4.0): """ Distill knowledge from teacher to student with soft targets. """ student_model.train() teacher_model.eval() optimizer = torch.optim.Adam(student_model.parameters(), lr=1e-4) for epoch in range(15): epoch_loss = 0 for batch in train_loader: inputs, targets = batch # Teacher predictions with torch.no_grad(): teacher_outputs = teacher_model(inputs) soft_targets = F.softmax(teacher_outputs / temperature, dim=-1) # Student predictions student_outputs = student_model(inputs) # Distillation loss distill_loss = F.kl_div( F.log_softmax(student_outputs / temperature, dim=-1), soft_targets, reduction='batchmean' ) * (temperature ** 2) # Hard target loss hard_loss = F.cross_entropy(student_outputs, targets) # Combined loss loss = 0.7 * distill_loss + 0.3 * hard_loss optimizer.zero_grad() loss.backward() optimizer.step() epoch_loss += loss.item() # Validate val_acc = self._validate(student_model, val_loader) print(f" Epoch {epoch+1}: loss={epoch_loss/len(train_loader):.4f}, val_acc={val_acc:.4f}") student_model.eval() return student_model ``` ### Stage 3: Quantization ```python def quantize_model(self, model, train_loader, val_loader, target_bits=8): """ Apply quantization-aware training for specified bit width. """ import torch.quantization as tq # Prepare model for quantization model.qconfig = tq.get_default_qconfig('fbgemm') model.prepare_qat() # Fine-tune with quantization optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9) for epoch in range(10): model.train() for batch in train_loader: inputs, targets = batch optimizer.zero_grad() outputs = model(inputs) loss = F.cross_entropy(outputs, targets) loss.backward() optimizer.step() model.eval() acc = self._validate(model, val_loader) print(f" QAT Epoch {epoch+1}: val_acc={acc:.4f}") # Convert to quantized model quantized_model = torch.quantization.convert(model) return quantized_model ``` ### Evaluation and Reporting ```python def _evaluate(self, stage_name, test_loader): """Evaluate model and record results.""" model = self.model model.eval() correct = 0 total = 0 with torch.no_grad(): for batch in test_loader: inputs, targets = batch outputs = model(inputs) preds = outputs.argmax(dim=1) correct += (preds == targets).sum().item() total += targets.shape[0] accuracy = correct / total size_mb = self._compute_model_size() / 1e6 self.history.append({ 'stage': stage_name, 'accuracy': accuracy, 'size_mb': size_mb }) print(f" Accuracy: {accuracy:.4f}, Size: {size_mb:.2f}MB") def _compute_model_size(self): """Calculate model size in bytes.""" param_size = 0 for param in self.model.parameters(): param_size += param.numel() * param.element_size() return param_size def _print_summary(self): """Print final compression summary.""" print("\nCompression Summary:") print("-" * 40) for record in self.history: print(f" {record['stage']:<25} | Acc: {record['accuracy']:.4f} | Size: {record['size_mb']:.2f}MB") baseline_acc = self.history[0]['accuracy'] final_acc = self.history[-1]['accuracy'] size_reduction = self.history[0]['size_mb'] / self.history[-1]['size_mb'] print("-" * 40) print(f"Accuracy drop: {(baseline_acc - final_acc)*100:.2f}%") print(f"Size reduction: {size_reduction:.2f}x") ``` ### Running the Pipeline ```python def main(): # Load model model = models.resnet18(pretrained=True) # Configuration config = { 'pruning_sparsity': 0.5, # Remove 50% of channels 'distill_temperature': 4.0, 'target_bits': 8 } pipeline = ModelCompressionPipeline(model, config) # Load data (using small subset for demonstration) train_loader = DataLoader(train_dataset, batch_size=32) val_loader = DataLoader(val_dataset, batch_size=64) test_loader = DataLoader(test_dataset, batch_size=64) # Run pipeline compressed_model = pipeline.run(train_loader, val_loader, test_loader) # Export torch.save(compressed_model.state_dict(), 'compressed_resnet18.pt') return compressed_model ``` ### Exercise30 min