18. Edge Deployment Project

Chapter 18 of 18 · 30 min

This capstone project integrates all edge deployment concepts into a complete production system. The project deploys an object detection model to a Raspberry Pi that operates offline, reports metrics, receives OTA updates, and implements security measures.

Project structure:

edge_deployment_project/
├── models/
│   ├── yolov8n.onnx              # Production model
│   ├── yolov8n_quantized.tflite  # Edge-optimized
│   └── manifest.json             # Version manifest
├── src/
│   ├── __init__.py
│   ├── inference.py              # Core inference engine
│   ├── model_manager.py          # OTA and versioning
│   ├── security.py               # Encryption and validation
│   ├── metrics.py                # Telemetry collection
│   └── hardware_monitor.py       # Power and thermal
├── tests/
│   ├── test_inference.py
│   ├── test_security.py
│   └── test_ota.py
├── config/
│   └── deployment.yaml           # Configuration
├── requirements.txt
└── run_deployment.py

Core inference engine:

# src/inference.py
import numpy as np
import onnxruntime as ort
from typing import Dict, Tuple
import time

class EdgeInferenceEngine:
    def __init__(self, model_path: str, session_options: ort.SessionOptions = None):
        self.session_options = session_options or ort.SessionOptions()
        self.session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        
        self.session = ort.InferenceSession(
            model_path,
            sess_options=self.session_options,
            providers=[('CPUExecutionProvider', {
                'arena_extend_strategy': 'kSameAsRequested',
                'intra_op_num_threads': 4
            })]
        )
        
        # Cache tensor metadata
        self.input_meta = self.session.get_inputs()[0]
        self.output_meta = self.session.get_outputs()[0]
        
        self.inference_count = 0
        self.total_latency_ms = 0.0
    
    def infer(self, input_tensor: np.ndarray) -> Tuple[np.ndarray, Dict]:
        start = time.perf_counter()
        
        outputs = self.session.run(
            [self.output_meta.name],
            {self.input_meta.name: input_tensor.astype(np.float32)}
        )
        
        latency_ms = (time.perf_counter() - start) * 1000
        self.inference_count += 1
        self.total_latency_ms += latency_ms
        
        return outputs[0], {
            "latency_ms": latency_ms,
            "timestamp": time.time(),
            "inference_id": self.inference_count
        }
    
    @property
    def average_latency(self) -> float:
        return self.total_latency_ms / max(1, self.inference_count)

Model manager with OTA updates:

# src/model_manager.py
import hashlib
import json
import os
import shutil
from typing import Optional
from dataclasses import dataclass

@dataclass
class ModelVersion:
    version: str
    checksum: str
    size_bytes: int
    min_compatible_version: str

class ModelManager:
    def __init__(self, models_dir: str, current_version: str):
        self.models_dir = models_dir
        self.current_version = current_version
        self.active_model_path = None
    
    def load_active_model(self) -> str:
        """Locate and verify active model"""
        manifest_path = os.path.join(self.models_dir, "manifest.json")
        
        if not os.path.exists(manifest_path):
            raise FileNotFoundError("No model manifest found")
        
        with open(manifest_path) as f:
            manifest = json.load(f)
        
        model_path = os.path.join(self.models_dir, manifest["active_model"])
        
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model file not found: {model_path}")
        
        # Verify checksum
        with open(model_path, 'rb') as f:
            actual_checksum = hashlib.sha256(f.read()).hexdigest()
        
        if actual_checksum != manifest["checksum"]:
            raise ValueError("Model checksum mismatch")
        
        self.active_model_path = model_path
        return model_path
    
    def apply_update(self, new_model_path: str, manifest: dict) -> bool:
        """Apply new model version"""
        # Create backup
        if self.active_model_path:
            backup_path = self.active_model_path + ".backup"
            os.rename(self.active_model_path, backup_path)
        
        try:
            # Move new model into place
            dest_path = os.path.join(self.models_dir, manifest["model_file"])
            shutil.copy(new_model_path, dest_path)
            
            # Update manifest
            manifest_path = os.path.join(self.models_dir, "manifest.json")
            with open(manifest_path, 'w') as f:
                json.dump(manifest, f, indent=2)
            
            # Verify new model
            self.load_active_model()
            
            # Remove backup on success
            if self.active_model_path:
                os.remove(self.active_model_path + ".backup")
            
            return True
            
        except Exception as e:
            # Rollback on failure
            if os.path.exists(backup_path):
                os.rename(backup_path, self.active_model_path)
            raise e

Main deployment entry point:

# run_deployment.py
#!/usr/bin/env python3
import argparse
import signal
import sys
import time
import logging
from pathlib import Path

from src.inference import EdgeInferenceEngine
from src.model_manager import ModelManager
from src.metrics import MetricsCollector
from src.hardware_monitor import HardwareMonitor
from src.security import SecureInferencePipeline

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("edge_deployment")

class Deployment:
    def __init__(self, config: dict):
        self.config = config
        self.running = False
        
        self.model_manager = ModelManager(
            config["models_dir"],
            config["model_version"]
        )
        
        self.metrics = MetricsCollector(config["metrics_dir"])
        self.hardware_monitor = HardwareMonitor()
        
        # Setup signal handlers for graceful shutdown
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
    
    def start(self):
        logger.info("Starting edge deployment...")
        
        # Load active model
        model_path = self.model_manager.load_active_model()
        logger.info(f"Loaded model: {model_path}")
        
        # Initialize inference
        self.inference_engine = EdgeInferenceEngine(model_path)
        
        # Connect secure pipeline if encryption enabled
        if self.config.get("encryption_enabled"):
            self.pipeline = SecureInferencePipeline(
                self.inference_engine,
                self.config["key_path"]
            )
        else:
            self.pipeline = self.inference_engine
        
        # Hardware warm-up
        logger.info("Warming up hardware...")
        for _ in range(100):
            dummy_input = self._create_dummy_input()
            self.pipeline.infer(dummy_input)
        
        # Main inference loop
        self.running = True
        self._inference_loop()
    
    def _inference_loop(self):
        while self.running:
            try:
                # Read input from sensor/camera
                input_data = self._read_sensor()
                
                # Run inference with timing
                output, metadata = self.pipeline.infer(input_data)
                
                # Collect metrics
                self.metrics.record_inference(metadata)
                self.hardware_monitor.sample()
                
                # Process results
                self._process_output(output)
                
                # Check for OTA updates
                self._check_updates()
                
            except Exception as e:
                logger.error(f"Inference error: {e}")
                self.metrics.record_error(str(e))
            
            time.sleep(self.config.get("inference_interval", 0.1))
    
    def _handle_shutdown(self, signum, frame):
        logger.info("Shutdown signal received")
        self.running = False
        
        logger.info(f"Total inferences: {self.inference_engine.inference_count}")
        logger.info(f"Average latency: {self.inference_engine.average_latency:.2f}ms")
        
        self.metrics.flush()
        sys.exit(0)
    
    # Placeholder methods for completeness
    def _create_dummy_input(self):
        return np.random.randn(1, 3, 640, 640).astype(np.float32)
    
    def _read_sensor(self):
        return self._create_dummy_input()
    
    def _process_output(self, output):
        pass
    
    def _check_updates(self):
        pass

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", required=True)
    args = parser.parse_args()
    
    import yaml
    with open(args.config) as f:
        config = yaml.safe_load(f)
    
    deployment = Deployment(config)
    deployment.start()

Deployment script installation:

#!/bin/bash
# install.sh - Deployment installation script

set -e

# Update system
sudo apt-get update
sudo apt-get install -y python3.10 python3-pip libgomp1

# Create virtual environment
python3 -m venv venv
source venv/bin/activate

# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt

# Create directories
sudo mkdir -p /opt/edge-inference/{models,metrics,logs}
sudo chown -R $USER:$USER /opt/edge-inference

# Register systemd service
sudo bash -c 'cat > /etc/systemd/system/edge-inference.service <<EOF
[Unit]
Description=Edge ML Inference Service
After=network.target

[Service]
Type=simple
User=pi
WorkingDirectory=/opt/edge-inference
ExecStart=/opt/edge-inference/venv/bin/python run_deployment.py --config /opt/edge-inference/config/deployment.yaml
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
EOF'

sudo systemctl daemon-reload
sudo systemctl enable edge-inference
sudo systemctl start edge-inference

echo "Deployment installed successfully"
echo "View logs: journalctl -u edge-inference -f"
EXERCISE

Deploy the complete edge project onto a Raspberry Pi, configure systemd for auto-restart, instrument all components with metrics, verify OTA update rollback works, and publish the deployment as a reproducible bash script.