KEY INSIGHT
DevOps automation for Nigerian SaaS must account for limited bandwidth between Lagos and international cloud regions, with deployment pipelines optimized to minimize data transfer while maintaining security standards.
DevOps automation in a Nigerian SaaS context requires careful consideration of infrastructure costs, local compliance requirements, and the operational realities of managing systems from Lagos.
```python
from dataclasses import dataclass
from typing import Optional
import subprocess
import json
@dataclass
class DeploymentConfig:
environment: str
region: str
instance_type: str
min_instances: int
max_instances: int
autoscaling_threshold: float
class DeploymentAutomation:
"""Automated deployment system for SaaS infrastructure."""
def __init__(self, config_service, secrets_manager, notification_service):
self.config = config_service
self.secrets = secrets_manager
self.notifications = notification_service
self.environments = {
'staging': DeploymentConfig(
environment='staging',
region='eu-west-1',
instance_type='t3.medium',
min_instances=1,
max_instances=2,
autoscaling_threshold=0.7
),
'production': DeploymentConfig(
environment='production',
region='eu-west-1',
instance_type='t3.large',
min_instances=2,
max_instances=10,
autoscaling_threshold=0.6
)
}
def deploy(
self,
environment: str,
version: str,
skip_tests: bool = False
) -> dict:
"""Execute deployment with validation and rollback."""
config = self.environments.get(environment)
if not config:
raise ValueError(f"Unknown environment: {environment}")
deployment_id = self._initialize_deployment(environment, version)
try:
if not skip_tests:
test_result = self._run_tests(version)
if not test_result['passed']:
self._fail_deployment(deployment_id, "Tests failed")
return {'status': 'failed', 'reason': 'tests_failed'}
build_result = self._build_docker_image(version)
self._deploy_to_environment(
config,
build_result['image_tag'],
deployment_id
)
self._run_smoke_tests(environment)
self._finalize_deployment(deployment_id)
return {
'status': 'success',
'deployment_id': deployment_id,
'version': version,
'environment': environment
}
except Exception as e:
self._rollback(deployment_id, config)
return {'status': 'failed', 'reason': str(e)}
def _build_docker_image(self, version: str) -> dict:
"""Build Docker image with caching for bandwidth optimization."""
dockerfile = """
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN python -m compileall -q .
EXPOSE 8000
CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:app"]
"""
build_cmd = [
'docker', 'build',
'--cache-from', f'registry.example.com/saas:{version}',
'--tag', f'registry.example.com/saas:{version}',
'--tag', f'registry.example.com/saas:latest',
'--build-arg', f'BUILD_VERSION={version}'
]
result = subprocess.run(
build_cmd,
capture_output=True,
text=True
)
if result.returncode != 0:
raise BuildError(f"Docker build failed: {result.stderr}")
return {'image_tag': f'registry.example.com/saas:{version}'}
```
**Infrastructure as Code:**
```python
import boto3
from typing import dict
class InfrastructureManager:
"""Manages AWS infrastructure with Nigerian market optimizations."""
def __init__(self, aws_client):
self.ec2 = boto3.client('ec2')
self.ecs = boto3.client('ecs')
self.rds = boto3.client('rds')
def setup_production_infrastructure(self) -> dict:
"""Create production infrastructure with proper configuration."""
vpc = self._create_vpc()
private_subnets = self._create_subnets(
vpc['VpcId'],
availability_zones=['eu-west-1a', 'eu-west-1b'],
public=False
)
nat_gateways = self._setup_nat_gateways(vpc, private_subnets)
ecs_cluster = self._create_ecs_cluster(
'saas-production',
instance_type='t3.large',
min_capacity=2,
max_capacity=10
)
rds_instance = self._create_database(
instance_class='db.t3.medium',
allocated_storage=100,
multi_az=True
)
elasticache = self._create_redis_cluster(
node_type='cache.t3.medium',
num_nodes=2
)
load_balancer = self._create_alb(
vpc['VpcId'],
subnets=private_subnets
)
return {
'vpc': vpc,
'ecs_cluster': ecs_cluster,
'rds': rds_instance,
'elasticache': elasticache,
'load_balancer': load_balancer
}
def _create_database(self, instance_class: str, allocated_storage: int, multi_az: bool) -> dict:
"""Create RDS database with Nigerian compliance considerations."""
db_instance = self.rds.create_db_instance(
DBInstanceIdentifier='saas-production-db',
DBInstanceClass=instance_class,
Engine='postgres',
EngineVersion='15.3',
AllocatedStorage=allocated_storage,
MultiAZ=multi_az,
StorageEncrypted=True,
KmsKeyId='arn:aws:kms:eu-west-1:123456789:key/production-db',
BackupRetentionPeriod=30,
PreferredBackupWindow='03:00-04:00',
PreferredMaintenanceWindow='mon:04:00-mon:05:00',
PublicAccessibility=False,
VPCSecurityGroupIds=[self._get_db_security_group()],
DBParameterGroupName='saas-production-params',
Tags=[
{'Key': 'Environment', 'Value': 'production'},
{'Key': 'Compliance', 'Value': 'soc2'}
]
)
return db_instance
```
**Cost Optimization for Nigerian Operations:**
```python
class CostOptimizationScheduler:
"""Schedule non-production resources to reduce costs."""
def __init__(self, ecs_client, ec2_client):
self.ecs = ecs_client
self.ec2 = ec2_client
def schedule_staging_shutdown(self):
"""Shut down staging environment during off-hours (Nigerian business hours)."""
lagos_tz = pytz.timezone('Africa/Lagos')
now = datetime.now(lagos_tz)
business_hours = (8, 18) # 8 AM to 6 PM Lagos time
weekdays = (0, 1, 2, 3, 4) # Monday to Friday
should_run = (
now.weekday() in weekdays and
business_hours[0] <= now.hour < business_hours[1]
)
if should_run:
self._scale_up_staging()
else:
self._scale_down_staging()
def _scale_down_staging(self):
"""Scale staging to minimum for cost savings."""
self.ecs.update_service(
cluster='saas-staging',
service='saas-staging-api',
desiredCount=1
)
self.ec2.modify_instance_attribute(
InstanceId='staging-instance-id',
InstanceInitiatedShutdownBehavior='terminate'
)
```
**Common Failure Modes:**
Deployment automation that doesn't account for Lagos bandwidth limitations fails during large image pulls. Always implement local registry caching and use multi-stage builds to minimize image sizes.
```python
def optimize_for_bandwidth():
"""Configure Docker for low-bandwidth environments."""
config = {
'registry-mirrors': ['https://registry.example.com'],
'max-concurrent-downloads': 3,
'max-concurrent-uploads': 2
}
daemon_json = json.dumps(config)
subprocess.run([
'sudo', 'sh', '-c',
f'echo "{daemon_json}" > /etc/docker/daemon.json'
])
subprocess.run(['sudo', 'systemctl', 'restart', 'docker'])
```