KEY INSIGHT
Nigerian SaaS compliance requires adherence to NDPR for data protection, NDIC guidelines for financial transactions, and NCC regulations for telecommunications, with documentation and audit trails essential for legal operations.
Compliance in the Nigerian context extends beyond technical security to encompass regulatory requirements specific to data handling, payment processing, and telecommunications.
```python
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import hashlib
@dataclass
class ComplianceRequirement:
regulation: str
requirement: str
implementation: str
audit_frequency: str
documentation_required: list[str]
class NigerianComplianceManager:
"""Manage compliance requirements for Nigerian SaaS."""
def __init__(self, db_session, audit_service):
self.db = db_session
self.audit = audit_service
self.requirements = self._load_requirements()
def _load_requirements(self) -> dict:
"""Load compliance requirements for Nigerian regulations."""
return {
'ndpr': {
'regulation': 'Nigeria Data Protection Regulation',
'key_requirements': [
'Lawful basis for processing',
'Data minimization',
'Purpose limitation',
'Storage limitation',
'Accuracy',
'Integrity and confidentiality',
'Accountability'
],
'implementation_guide': self._implement_ndpr
},
'ndic': {
'regulation': 'NDIC Guidelines',
'key_requirements': [
'KYC compliance',
'Transaction monitoring',
'Fraud detection',
'Customer verification'
],
'implementation_guide': self._implement_ndic_compliance
},
'ncc': {
'regulation': 'NCC Regulations',
'key_requirements': [
'Service availability',
'Quality of service',
'Consumer protection',
'Data retention'
],
'implementation_guide': self._implement_ncc_compliance
}
}
def _implement_ndpr(self) -> dict:
"""Implement NDPR compliance requirements."""
implementation = {
'data_classification': self._setup_data_classification(),
'consent_management': self._setup_consent_management(),
'data_subject_rights': self._setup_data_subject_rights(),
'dpoc_appointment': self._setup_dpoc(),
'breach_notification': self._setup_breach_notification()
}
return implementation
def _setup_data_classification(self) -> dict:
"""Implement data classification for NDPR."""
classification_levels = [
{
'level': 'public',
'description': 'Information intended for public disclosure',
'handling': 'No restrictions',
'encryption': False
},
{
'level': 'internal',
'description': 'Internal business information',
'handling': 'Internal access only',
'encryption': True
},
{
'level': 'confidential',
'description': 'Sensitive business or customer data',
'handling': 'Restricted access, audit required',
'encryption': True
},
{
'level': 'restricted',
'description': 'Highly sensitive personal or financial data',
'handling': 'Maximum protection, encrypted at rest and in transit',
'encryption': True,
'additional_controls': ['mfa', 'ip_whitelist', 'audit_trail']
}
]
return {
'levels': classification_levels,
'classification_policy': 'Data classification required for all new data assets',
'review_frequency': 'Quarterly'
}
```
**Data Subject Rights Implementation:**
```python
class NDPRDataSubjectRights:
"""Implement NDPR data subject rights."""
def __init__(self, db_session, storage_service):
self.db = db_session
self.storage = storage_service
def handle_access_request(self, data_subject_id: str, request_id: str) -> dict:
"""Handle data access request within 30 days."""
data_subject = self.db.query(DataSubject).filter(
DataSubject.id == data_subject_id
).first()
if not data_subject:
raise ValueError("Data subject not found")
request = DataAccessRequest(
id=request_id,
data_subject_id=data_subject_id,
request_type='access',
received_at=datetime.utcnow(),
deadline=datetime.utcnow() + timedelta(days=30),
status='in_progress'
)
tenant_data = self._collect_subject_data(data_subject)
data_package = self._compile_data_package(tenant_data)
secure_link = self.storage.create_secure_link(
data_package,
expiry_seconds=86400 * 7
)
request.completed_at = datetime.utcnow()
request.data_link = secure_link
request.status = 'completed'
self.db.commit()
self._send_completion_notification(data_subject.email, secure_link)
return {
'request_id': request_id,
'status': 'completed',
'data_link': secure_link,
'expires_in': '7 days'
}
def handle_deletion_request(self, data_subject_id: str, request_id: str) -> dict:
"""Handle data deletion request."""
data_subject = self.db.query(DataSubject).filter(
DataSubject.id == data_subject_id
).first()
legal_holds = self.db.query(LegalHold).filter(
LegalHold.data_subject_id == data_subject_id,
LegalHold.status == 'active'
).all()
if legal_holds:
return {
'request_id': request_id,
'status': 'blocked',
'reason': 'Legal hold exists',
'hold_ids': [h.id for h in legal_holds]
}
deletion_tasks = []
deletion_tasks.append(self._schedule_database_deletion(data_subject_id))
deletion_tasks.append(self._schedule_file_deletion(data_subject_id))
deletion_tasks.append(self._schedule_backup_deletion(data_subject_id))
request = DeletionRequest(
id=request_id,
data_subject_id=data_subject_id,
received_at=datetime.utcnow(),
tasks=deletion_tasks,
status='scheduled',
scheduled_completion=datetime.utcnow() + timedelta(days=30)
)
self.db.add(request)
self.db.commit()
return {
'request_id': request_id,
'status': 'scheduled',
'tasks': deletion_tasks,
'estimated_completion': request.scheduled_completion
}
def _collect_subject_data(self, data_subject: DataSubject) -> dict:
"""Collect all data for a data subject."""
data = {
'profile': data_subject.to_dict(),
'usage_records': self.db.query(UsageRecord).filter(
UsageRecord.tenant_id == data_subject.tenant_id
).all(),
'api_logs': self.db.query(APILog).filter(
APILog.tenant_id == data_subject.tenant_id
).all(),
'payments': self.db.query(Payment).filter(
Payment.tenant_id == data_subject.tenant_id
).all()
}
return data
```
**Audit Trail Implementation:**
```python
class ComplianceAuditTrail:
"""Implement thorough audit trail for compliance."""
def __init__(self, audit_db):
self.audit_db = audit_db
def log_security_event(
self,
event_type: str,
actor_id: str,
resource_type: str,
resource_id: str,
action: str,
outcome: str,
metadata: dict = None
) -> str:
"""Log security event for compliance audit."""
event = AuditEvent(
id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type=event_type,
actor_id=actor_id,
actor_type='user',
resource_type=resource_type,
resource_id=resource_id,
action=action,
outcome=outcome,
ip_address=metadata.get('ip_address') if metadata else None,
user_agent=metadata.get('user_agent') if metadata else None,
metadata=metadata,
compliance_tags=['ndpr', 'ndic']
)
self.audit_db.add(event)
self.audit_db.commit()
if event_type in ['login_failed', 'permission_denied', 'data_access']:
self._check_anomaly(event)
return event.id
def generate_compliance_report(
self,
start_date: datetime,
end_date: datetime,
regulations: list[str]
) -> dict:
"""Generate compliance report for specified regulations."""
events = self.audit_db.query(AuditEvent).filter(
AuditEvent.timestamp >= start_date,
AuditEvent.timestamp <= end_date,
AuditEvent.compliance_tags.overlap(regulations)
).all()
report = {
'period': {'start': start_date, 'end': end_date},
'regulations': regulations,
'total_events': len(events),
'event_breakdown': self._categorize_events(events),
'access_requests': self._report_access_requests(events),
'deletion_requests': self._report_deletion_requests(events),
'security_incidents': self._report_security_incidents(events),
'generated_at': datetime.utcnow()
}
return report
def _check_anomaly(self, event: AuditEvent):
"""Check for anomalous patterns."""
recent_events = self.audit_db.query(AuditEvent).filter(
AuditEvent.actor_id == event.actor_id,
AuditEvent.timestamp >= datetime.utcnow() - timedelta(hours=1)
).count()
if recent_events > 10:
self._trigger_security_alert(event.actor_id, 'high_event_frequency')
```
**Common Failure Modes:**
Compliance documentation that isn't updated when systems change creates liability gaps. Implement automated compliance checks that verify documentation matches actual system configurations.
```python
def verify_compliance_state():
"""Verify current compliance state against requirements."""
checks = [
{
'requirement': 'Encryption at rest',
'verify': lambda: check_rds_encryption() == True,
'evidence': 'AWS RDS encryption screenshot'
},
{
'requirement': 'MFA for all admin access',
'verify': lambda: verify_mfa_enabled('admin'),
'evidence': 'IAM policy screenshot'
},
{
'requirement': 'Data retention policy documented',
'verify': lambda: check_policy_exists('retention'),
'evidence': 'Policy document location'
}
]
results = []
for check in checks:
results.append({
'requirement': check['requirement'],
'status': 'pass' if check['verify']() else 'fail',
'evidence': check['evidence']
})
return results
```