14. Edge-Cloud Hybrid
Chapter 14 of 18 · 20 min
Hybrid inference distributes computation between edge devices and cloud servers based on task complexity, latency requirements, and resource availability. Simple queries execute on-device while challenging cases escalate to cloud resources with larger models.
Architecture decision framework:
from dataclasses import dataclass
from typing import Optional, Callable
import numpy as np
@dataclass
class InferenceRequest:
input_data: np.ndarray
priority: int # 1=highest, 3=lowest
confidence_threshold: float
deadline_ms: float
session_id: str
@dataclass
class InferenceResult:
output_data: np.ndarray
confidence: float
model_source: str # 'edge' or 'cloud'
latency_ms: float
inference_id: str
class HybridRouter:
def __init__(self, edge_model, cloud_client):
self.edge_model = edge_model
self.cloud_client = cloud_client
self.confidence_baseline = 0.7
self.latency_budget_ms = 500
def route_inference(self, request: InferenceRequest) -> InferenceResult:
"""Decide where to run inference based on request characteristics"""
# Priority routing: high-priority always edge for speed
if request.priority == 1:
return self._edge_inference(request)
# Quick edge inference first
edge_result = self._edge_inference(request)
# Check if edge result meets requirements
edge_adequate = (
edge_result.confidence >= request.confidence_threshold and
edge_result.latency_ms <= request.deadline_ms
)
if edge_adequate:
return edge_result
# Escalate to cloud for low-priority requests
if request.priority == 3 and request.confidence_threshold > 0.8:
return self._cloud_inference(request)
return edge_result
def _edge_inference(self, request) -> InferenceResult:
import time
start = time.perf_counter()
output = self.edge_model.predict(request.input_data)
confidence = self._compute_confidence(output)
return InferenceResult(
output_data=output,
confidence=confidence,
model_source='edge',
latency_ms=(time.perf_counter() - start) * 1000,
inference_id=request.session_id
)
def _cloud_inference(self, request) -> InferenceResult:
return self.cloud_client.predict(request.input_data)
def _compute_confidence(self, output):
probs = np.exp(output) / np.sum(np.exp(output))
return float(np.max(probs))
Request batching across edge and cloud:
import asyncio
class AdaptiveBatchingEngine:
"""Dynamically adjust batch size based on queue depth and latency targets"""
def __init__(self, min_batch=1, max_batch=8, target_latency_ms=100):
self.min_batch = min_batch
self.max_batch = max_batch
self.target_latency = target_latency_ms
self.queue = asyncio.Queue()
async def process_stream(self, generator):
"""Process incoming requests with adaptive batching"""
batch = []
last_batch_time = asyncio.get_event_loop().time()
async for request in generator:
batch.append(request)
# Batch interval logic
time_since_batch = asyncio.get_event_loop().time() - last_batch_time
if len(batch) >= self.max_batch or (
len(batch) >= self.min_batch and
time_since_batch > self.target_latency / 1000
):
await self._process_batch(batch)
batch = []
last_batch_time = asyncio.get_event_loop().time()
async def _process_batch(self, batch):
# Combined inference of batch
inputs = [r.input_data for r in batch]
await self.edge_model.batch_predict(inputs)
Cloud fallback monitoring:
def monitor_hybrid_health(edge_results, cloud_results, threshold=0.05):
"""Detect edge model degradation requiring cloud escalation"""
edge_confs = np.array([r.confidence for r in edge_results])
cloud_confs = np.array([r.confidence for r in cloud_results])
# Mean confidence drift
mean_drift = np.mean(edge_confs - cloud_confs)
if mean_drift < -threshold:
return {
"status": "degraded",
"drift": mean_drift,
"recommendation": "Increase cloud escalation threshold"
}
return {"status": "healthy", "drift": mean_drift}
EXERCISE
Implement a hybrid router that runs edge inference immediately and escalates to cloud based on confidence thresholds, then measure bandwidth savings versus pure cloud inference.