17. Security and Compliance
Chapter 17 of 18 · 30 min
AI automation systems process sensitive data and make consequential decisions. Security and compliance requirements must be addressed from architecture through operation.
Access Control Architecture
# access_control.py
import json
from datetime import datetime, timedelta
from pathlib import Path
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class Permission(Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
@dataclass
class User:
user_id: str
username: str
roles: list[str]
mfa_enabled: bool = False
last_login: Optional[str] = None
class AccessControl:
def __init__(self, config_path: str = "access_config.json"):
self.config = self._load_config(config_path)
self.users = self._load_users()
self.active_sessions: dict[str, dict] = {}
def _load_config(self, path: str) -> dict:
with open(path) as f:
return json.load(f)
def _load_users(self) -> dict[str, User]:
users_file = Path("users.json")
if users_file.exists():
with open(users_file) as f:
data = json.load(f)
return {u["user_id"]: User(**u) for u in data["users"]}
return {}
def authenticate(self, username: str, password: str,
mfa_token: str = None) -> Optional[str]:
"""Authenticate user and return session token."""
user = self._find_user(username)
if not user:
return None
# Verify password (use proper password hashing in production)
if not self._verify_password(username, password):
self._log_failed_auth(user.user_id)
return None
# Verify MFA if enabled
if user.mfa_enabled:
if not self._verify_mfa(user.user_id, mfa_token):
self._log_failed_auth(user.user_id, reason="mfa_failed")
return None
# Create session
import secrets
session_token = secrets.token_urlsafe(32)
self.active_sessions[session_token] = {
"user_id": user.user_id,
"created_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(hours=8)).isoformat(),
"ip_address": None # Set by caller
}
user.last_login = datetime.now().isoformat()
self._save_users()
return session_token
def _find_user(self, username: str) -> Optional[User]:
for user in self.users.values():
if user.username == username:
return user
return None
def _verify_password(self, username: str, password: str) -> bool:
"""Verify password against stored hash."""
import hashlib
import hmac
hash_file = Path(f"secrets/{username}_hash.txt")
if not hash_file.exists():
return False
stored_hash = hash_file.read_text().strip()
computed_hash = hashlib.sha256(password.encode()).hexdigest()
return hmac.compare_digest(stored_hash, computed_hash)
def _verify_mfa(self, user_id: str, token: str) -> bool:
"""Verify MFA token."""
# Implement TOTP verification
# This is a placeholder - use pyotp or similar in production
return True # Placeholder
def authorize(self, session_token: str, resource: str,
action: str) -> bool:
"""Check if session has permission for action on resource."""
session = self._get_session(session_token)
if not session:
return False
user = self.users.get(session["user_id"])
if not user:
return False
# Get required permissions for resource/action
required_perms = self._get_required_permissions(resource, action)
# Check if user has any required role
for role in user.roles:
role_perms = self._get_role_permissions(role)
for perm in required_perms:
if perm in role_perms:
return True
return False
def _get_session(self, token: str) -> Optional[dict]:
"""Retrieve and validate session."""
session = self.active_sessions.get(token)
if not session:
return None
# Check expiration
expires = datetime.fromisoformat(session["expires_at"])
if datetime.now() > expires:
del self.active_sessions[token]
return None
return session
def _get_required_permissions(self, resource: str, action: str) -> list[Permission]:
"""Determine required permissions for resource/action."""
# Parse resource path
parts = resource.split("/")
base_resource = "/".join(parts[:2]) if len(parts) > 2 else resource
# Map actions to permissions
action_map = {
"GET": [Permission.READ],
"POST": [Permission.WRITE, Permission.EXECUTE],
"PUT": [Permission.WRITE],
"DELETE": [Permission.WRITE, Permission.ADMIN],
"execute": [Permission.EXECUTE]
}
return action_map.get(action, [Permission.READ])
def _get_role_permissions(self, role: str) -> list[Permission]:
"""Get permissions for role."""
role_config = self.config.get("roles", {}).get(role, {})
perm_strings = role_config.get("permissions", [])
return [Permission(p) for p in perm_strings]
def revoke_session(self, session_token: str):
"""Revoke a session."""
if session_token in self.active_sessions:
del self.active_sessions[session_token]
def _log_failed_auth(self, user_id: str, reason: str = "password_failed"):
"""Log failed authentication attempt."""
# Write to security log
log_path = Path("security_log.jsonl")
with open(log_path, "a") as f:
f.write(json.dumps({
"timestamp": datetime.now().isoformat(),
"event": "auth_failure",
"user_id": user_id,
"reason": reason
}) + "\n")
Data Classification and Handling
# data_classification.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable
import json
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class DataPolicy:
classification: DataClassification
encryption_required: bool
retention_days: int
allowed_actions: list[str]
mask_in_logs: bool = True
class DataClassifier:
def __init__(self, config_path: str = "classification_config.json"):
self.config = self._load_config(config_path)
self.policies = self._load_policies()
def _load_config(self, path: str) -> dict:
with open(path) as f:
return json.load(f)
def _load_policies(self) -> dict[str, DataPolicy]:
policies = {}
for name, config in self.config.get("policies", {}).items():
policies[name] = DataPolicy(
classification=DataClassification(config["classification"]),
encryption_required=config["encryption_required"],
retention_days=config["retention_days"],
allowed_actions=config["allowed_actions"],
mask_in_logs=config.get("mask_in_logs", True)
)
return policies
def classify(self, data_type: str) -> DataPolicy:
"""Get classification policy for data type."""
return self.policies.get(data_type, self._default_policy())
def _default_policy(self) -> DataPolicy:
"""Return default policy for unclassified data."""
return DataPolicy(
classification=DataClassification.INTERNAL,
encryption_required=False,
retention_days=90,
allowed_actions=["read", "write"]
)
def apply_policy(self, data: dict, data_type: str) -> dict:
"""Apply classification policy to data."""
policy = self.classify(data_type)
# Redact sensitive fields if required
if policy.mask_in_logs:
data = self._mask_sensitive_fields(data, data_type)
return {
"data": data,
"classification": policy.classification.value,
"retention_days": policy.retention_days
}
def _mask_sensitive_fields(self, data: dict, data_type: str) -> dict:
"""Mask sensitive fields based on type."""
masking_rules = self.config.get("masking_rules", {}).get(data_type, {})
masked = dict(data)
for field, rule in masking_rules.items():
if field in masked:
if rule == "full":
masked[field] = "***REDACTED***"
elif rule == "partial":
value = str(masked[field])
masked[field] = value[:2] + "***" + value[-2:] if len(value) > 4 else "****"
elif rule == "hash":
import hashlib
masked[field] = hashlib.sha256(str(masked[field]).encode()).hexdigest()[:16]
return masked
Compliance Validation
# compliance_validator.py
import json
from datetime import datetime
from typing import Optional
class ComplianceValidator:
def __init__(self, audit_logger, data_classifier: DataClassifier):
self.audit_logger = audit_logger
self.classifier = data_classifier
self.violations = []
def validate_ai_request(self, request: dict, user_id: str,
session_token: str) -> dict:
"""Validate AI request for compliance."""
violations = []
# Check user authorization
if not self._check_authorization(user_id, session_token):
violations.append({
"type": "authorization",
"message": "User not authorized for AI operations"
})
# Check data classification
data_type = request.get("data_type", "unknown")
policy = self.classifier.classify(data_type)
if policy.classification == DataClassification.RESTRICTED:
if not self._check_restricted_access(user_id):
violations.append({
"type": "data_access",
"message": f"User not authorized for {data_type} data"
})
# Check prompt for sensitive data
prompt = request.get("prompt", "")
sensitive_patterns = self._detect_sensitive_data(prompt)
if sensitive_patterns:
violations.append({
"type": "pii_detected",
"message": f"Potential PII detected: {sensitive_patterns}"
})
# Check output handling
if policy.encryption_required:
if not request.get("encrypt_output"):
violations.append({
"type": "encryption_required",
"message": "Output encryption required for this data type"
})
return {
"valid": len(violations) == 0,
"violations": violations,
"timestamp": datetime.now().isoformat()
}
def _check_authorization(self, user_id: str, session_token: str) -> bool:
"""Check user has valid session."""
# Implementation depends on access control
return True
def _check_restricted_access(self, user_id: str) -> bool:
"""Check if user can access restricted data."""
# Check user roles for restricted access
return False
def _detect_sensitive_data(self, text: str) -> list[str]:
"""Detect potential PII in text."""
import re
patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}[-]?\d{2}[-]?\d{4}\b',
"credit_card": r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'
}
detected = []
for pattern_name, pattern in patterns.items():
if re.search(pattern, text):
detected.append(pattern_name)
return detected
def generate_compliance_report(self, start_date: str, end_date: str) -> dict:
"""Generate compliance validation report."""
violations = self.audit_logger.query(
filters={"event_type": "compliance_violation"},
start_time=start_date,
end_time=end_date
)
return {
"report_period": {"start": start_date, "end": end_date},
"total_violations": len(violations),
"violations_by_type": self._aggregate_violations(violations),
"remediation_required": len(violations) > 0,
"generated_at": datetime.now().isoformat()
}
def _aggregate_violations(self, violations: list[dict]) -> dict:
"""Aggregate violations by type."""
by_type = {}
for v in violations:
v_type = v.get("details", {}).get("type", "unknown")
by_type[v_type] = by_type.get(v_type, 0) + 1
return by_type
EXERCISE
Implement a compliance validation system that checks every Ollama request for PII before processing, blocks requests containing SSN or credit card numbers, and logs compliance checks to the audit system.