21. Production Migration
Chapter 21 of 24 · 20 min
Migrating a RAG system to production requires careful orchestration of data migration, traffic shifting, and rollback procedures. Prematurely moving all traffic leads to incidents with no easy recovery path.
The migration strategy uses canary deployment:
from dataclasses import dataclass
import hashlib
@dataclass
class MigrationConfig:
source_vector_db: str # "qdrant-old"
target_vector_db: str # "qdrant-new"
migration_batch_size: int = 1000
canary_percentage: float = 5.0
health_check_interval_seconds: int = 60
rollback_threshold_errors_per_min: int = 10
class RAGMigrationOrchestrator:
def __init__(self, config: MigrationConfig):
self.config = config
self.migration_state = {
"phase": "idle",
"canary_traffic_percent": 0,
"errors_since_last_check": 0
}
def begin_migration(self):
"""Phase 1: Start data migration and initial canary"""
self.migration_state["phase"] = "data_migration"
# Migrate historical data in batches
source_collection = self._get_collection(self.config.source_vector_db)
total_vectors = source_collection.count()
for offset in range(0, total_vectors, self.config.migration_batch_size):
batch = source_collection.scroll(offset=offset, limit=self.config.migration_batch_size)
self._write_batch(self.config.target_vector_db, batch)
# Progress logging
progress = (offset + len(batch)) / total_vectors
print(f"Migration progress: {progress:.1%}")
self.migration_state["phase"] = "canary_testing"
self._shift_traffic(self.config.canary_percentage)
def _shift_traffic(self, percentage: float):
"""Shift traffic using consistent hashing by user hash"""
routing_config = {
"routes": {
"rag-prod-old": 100 - percentage,
"rag-prod-new": percentage
}
}
# Push to load balancer configuration
self._update_load_balancer(routing_config)
self.migration_state["canary_traffic_percent"] = percentage
def check_canary_health(self) -> bool:
"""Phase 2: Monitor canary metrics"""
metrics = self._fetch_metrics(
self.config.target_vector_db,
window_minutes=5
)
error_rate = metrics["errors"] / metrics["requests"]
p99_latency = metrics["p99_latency_ms"]
cache_hit_rate = metrics["cache_hit_rate"]
# Check health thresholds
healthy = (
error_rate < 0.01 and
p99_latency < 500 and
cache_hit_rate > 0.3
)
if healthy:
self.migration_state["errors_since_last_check"] = 0
else:
self.migration_state["errors_since_last_check"] += metrics["errors"]
return healthy
def promote_or_rollback(self) -> str:
"""Phase 3: Decide migration outcome"""
if (self.migration_state["errors_since_last_check"] >
self.config.rollback_threshold_errors_per_min):
self._rollback()
return "rollback"
self._shift_traffic(100)
self.migration_state["phase"] = "completed"
return "promoted"
Rollback procedure:
def _rollback(self):
"""Immediately revert all traffic to source"""
self._shift_traffic(0)
print("ALERT: Rolled back to source. Investigate before retrying.")
# Notify on-call
self._send_alert(
severity="high",
message="RAG migration rolled back due to error threshold",
recipients=["[email protected]"]
)
Failure Modes:
- Schema drift: Target database uses different index parameters (HNSW M/effective) causing different search quality. Validate result set overlap before any traffic shift.
- Incomplete migration: New vectors inserted during migration batch processing aren't migrated. Use dual-write during migration.
- Traffic flip without warm-up: New system cold caches cause latency spike. Always warm cache before full promotion.
- Orphaned writes: Application still writing to old system after traffic shift. Use feature flags, not just routing.
Migration runbooks should include exact commands, monitoring dashboard URLs, and escalation procedures.
EXERCISE
Design a migration plan moving from Pinecone to Qdrant. Include data validation steps, traffic shift percentages, and rollback triggers.