RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Business Automation with Local AI
  6. /Ch. 17
Business Automation with Local AI

17. Security and Compliance

Chapter 17 of 18 · 30 min
KEY INSIGHT

Compliance is not a configuration but a continuous process. Implement validation checks at every data boundary and maintain audit trails sufficient to demonstrate control to external auditors.

AI automation systems process sensitive data and make consequential decisions. Security and compliance requirements must be addressed from architecture through operation.

Access Control Architecture

# access_control.py
import json
from datetime import datetime, timedelta
from pathlib import Path
from enum import Enum
from dataclasses import dataclass
from typing import Optional

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    EXECUTE = "execute"
    ADMIN = "admin"

@dataclass
class User:
    user_id: str
    username: str
    roles: list[str]
    mfa_enabled: bool = False
    last_login: Optional[str] = None

class AccessControl:
    def __init__(self, config_path: str = "access_config.json"):
        self.config = self._load_config(config_path)
        self.users = self._load_users()
        self.active_sessions: dict[str, dict] = {}
    
    def _load_config(self, path: str) -> dict:
        with open(path) as f:
            return json.load(f)
    
    def _load_users(self) -> dict[str, User]:
        users_file = Path("users.json")
        if users_file.exists():
            with open(users_file) as f:
                data = json.load(f)
                return {u["user_id"]: User(**u) for u in data["users"]}
        return {}
    
    def authenticate(self, username: str, password: str,
                    mfa_token: str = None) -> Optional[str]:
        """Authenticate user and return session token."""
        user = self._find_user(username)
        if not user:
            return None
        
        # Verify password (use proper password hashing in production)
        if not self._verify_password(username, password):
            self._log_failed_auth(user.user_id)
            return None
        
        # Verify MFA if enabled
        if user.mfa_enabled:
            if not self._verify_mfa(user.user_id, mfa_token):
                self._log_failed_auth(user.user_id, reason="mfa_failed")
                return None
        
        # Create session
        import secrets
        session_token = secrets.token_urlsafe(32)
        self.active_sessions[session_token] = {
            "user_id": user.user_id,
            "created_at": datetime.now().isoformat(),
            "expires_at": (datetime.now() + timedelta(hours=8)).isoformat(),
            "ip_address": None  # Set by caller
        }
        
        user.last_login = datetime.now().isoformat()
        self._save_users()
        
        return session_token
    
    def _find_user(self, username: str) -> Optional[User]:
        for user in self.users.values():
            if user.username == username:
                return user
        return None
    
    def _verify_password(self, username: str, password: str) -> bool:
        """Verify password against stored hash."""
        import hashlib
        import hmac
        
        hash_file = Path(f"secrets/{username}_hash.txt")
        if not hash_file.exists():
            return False
        
        stored_hash = hash_file.read_text().strip()
        computed_hash = hashlib.sha256(password.encode()).hexdigest()
        
        return hmac.compare_digest(stored_hash, computed_hash)
    
    def _verify_mfa(self, user_id: str, token: str) -> bool:
        """Verify MFA token."""
        # Implement TOTP verification
        # This is a placeholder - use pyotp or similar in production
        return True  # Placeholder
    
    def authorize(self, session_token: str, resource: str,
                 action: str) -> bool:
        """Check if session has permission for action on resource."""
        session = self._get_session(session_token)
        if not session:
            return False
        
        user = self.users.get(session["user_id"])
        if not user:
            return False
        
        # Get required permissions for resource/action
        required_perms = self._get_required_permissions(resource, action)
        
        # Check if user has any required role
        for role in user.roles:
            role_perms = self._get_role_permissions(role)
            for perm in required_perms:
                if perm in role_perms:
                    return True
        
        return False
    
    def _get_session(self, token: str) -> Optional[dict]:
        """Retrieve and validate session."""
        session = self.active_sessions.get(token)
        if not session:
            return None
        
        # Check expiration
        expires = datetime.fromisoformat(session["expires_at"])
        if datetime.now() > expires:
            del self.active_sessions[token]
            return None
        
        return session
    
    def _get_required_permissions(self, resource: str, action: str) -> list[Permission]:
        """Determine required permissions for resource/action."""
        # Parse resource path
        parts = resource.split("/")
        base_resource = "/".join(parts[:2]) if len(parts) > 2 else resource
        
        # Map actions to permissions
        action_map = {
            "GET": [Permission.READ],
            "POST": [Permission.WRITE, Permission.EXECUTE],
            "PUT": [Permission.WRITE],
            "DELETE": [Permission.WRITE, Permission.ADMIN],
            "execute": [Permission.EXECUTE]
        }
        
        return action_map.get(action, [Permission.READ])
    
    def _get_role_permissions(self, role: str) -> list[Permission]:
        """Get permissions for role."""
        role_config = self.config.get("roles", {}).get(role, {})
        perm_strings = role_config.get("permissions", [])
        return [Permission(p) for p in perm_strings]
    
    def revoke_session(self, session_token: str):
        """Revoke a session."""
        if session_token in self.active_sessions:
            del self.active_sessions[session_token]
    
    def _log_failed_auth(self, user_id: str, reason: str = "password_failed"):
        """Log failed authentication attempt."""
        # Write to security log
        log_path = Path("security_log.jsonl")
        with open(log_path, "a") as f:
            f.write(json.dumps({
                "timestamp": datetime.now().isoformat(),
                "event": "auth_failure",
                "user_id": user_id,
                "reason": reason
            }) + "\n")

Data Classification and Handling

# data_classification.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable
import json

class DataClassification(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"

@dataclass
class DataPolicy:
    classification: DataClassification
    encryption_required: bool
    retention_days: int
    allowed_actions: list[str]
    mask_in_logs: bool = True

class DataClassifier:
    def __init__(self, config_path: str = "classification_config.json"):
        self.config = self._load_config(config_path)
        self.policies = self._load_policies()
    
    def _load_config(self, path: str) -> dict:
        with open(path) as f:
            return json.load(f)
    
    def _load_policies(self) -> dict[str, DataPolicy]:
        policies = {}
        for name, config in self.config.get("policies", {}).items():
            policies[name] = DataPolicy(
                classification=DataClassification(config["classification"]),
                encryption_required=config["encryption_required"],
                retention_days=config["retention_days"],
                allowed_actions=config["allowed_actions"],
                mask_in_logs=config.get("mask_in_logs", True)
            )
        return policies
    
    def classify(self, data_type: str) -> DataPolicy:
        """Get classification policy for data type."""
        return self.policies.get(data_type, self._default_policy())
    
    def _default_policy(self) -> DataPolicy:
        """Return default policy for unclassified data."""
        return DataPolicy(
            classification=DataClassification.INTERNAL,
            encryption_required=False,
            retention_days=90,
            allowed_actions=["read", "write"]
        )
    
    def apply_policy(self, data: dict, data_type: str) -> dict:
        """Apply classification policy to data."""
        policy = self.classify(data_type)
        
        # Redact sensitive fields if required
        if policy.mask_in_logs:
            data = self._mask_sensitive_fields(data, data_type)
        
        return {
            "data": data,
            "classification": policy.classification.value,
            "retention_days": policy.retention_days
        }
    
    def _mask_sensitive_fields(self, data: dict, data_type: str) -> dict:
        """Mask sensitive fields based on type."""
        masking_rules = self.config.get("masking_rules", {}).get(data_type, {})
        
        masked = dict(data)
        for field, rule in masking_rules.items():
            if field in masked:
                if rule == "full":
                    masked[field] = "***REDACTED***"
                elif rule == "partial":
                    value = str(masked[field])
                    masked[field] = value[:2] + "***" + value[-2:] if len(value) > 4 else "****"
                elif rule == "hash":
                    import hashlib
                    masked[field] = hashlib.sha256(str(masked[field]).encode()).hexdigest()[:16]
        
        return masked

Compliance Validation

# compliance_validator.py
import json
from datetime import datetime
from typing import Optional

class ComplianceValidator:
    def __init__(self, audit_logger, data_classifier: DataClassifier):
        self.audit_logger = audit_logger
        self.classifier = data_classifier
        self.violations = []
    
    def validate_ai_request(self, request: dict, user_id: str,
                          session_token: str) -> dict:
        """Validate AI request for compliance."""
        violations = []
        
        # Check user authorization
        if not self._check_authorization(user_id, session_token):
            violations.append({
                "type": "authorization",
                "message": "User not authorized for AI operations"
            })
        
        # Check data classification
        data_type = request.get("data_type", "unknown")
        policy = self.classifier.classify(data_type)
        
        if policy.classification == DataClassification.RESTRICTED:
            if not self._check_restricted_access(user_id):
                violations.append({
                    "type": "data_access",
                    "message": f"User not authorized for {data_type} data"
                })
        
        # Check prompt for sensitive data
        prompt = request.get("prompt", "")
        sensitive_patterns = self._detect_sensitive_data(prompt)
        if sensitive_patterns:
            violations.append({
                "type": "pii_detected",
                "message": f"Potential PII detected: {sensitive_patterns}"
            })
        
        # Check output handling
        if policy.encryption_required:
            if not request.get("encrypt_output"):
                violations.append({
                    "type": "encryption_required",
                    "message": "Output encryption required for this data type"
                })
        
        return {
            "valid": len(violations) == 0,
            "violations": violations,
            "timestamp": datetime.now().isoformat()
        }
    
    def _check_authorization(self, user_id: str, session_token: str) -> bool:
        """Check user has valid session."""
        # Implementation depends on access control
        return True
    
    def _check_restricted_access(self, user_id: str) -> bool:
        """Check if user can access restricted data."""
        # Check user roles for restricted access
        return False
    
    def _detect_sensitive_data(self, text: str) -> list[str]:
        """Detect potential PII in text."""
        import re
        
        patterns = {
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            "ssn": r'\b\d{3}[-]?\d{2}[-]?\d{4}\b',
            "credit_card": r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'
        }
        
        detected = []
        for pattern_name, pattern in patterns.items():
            if re.search(pattern, text):
                detected.append(pattern_name)
        
        return detected
    
    def generate_compliance_report(self, start_date: str, end_date: str) -> dict:
        """Generate compliance validation report."""
        violations = self.audit_logger.query(
            filters={"event_type": "compliance_violation"},
            start_time=start_date,
            end_time=end_date
        )
        
        return {
            "report_period": {"start": start_date, "end": end_date},
            "total_violations": len(violations),
            "violations_by_type": self._aggregate_violations(violations),
            "remediation_required": len(violations) > 0,
            "generated_at": datetime.now().isoformat()
        }
    
    def _aggregate_violations(self, violations: list[dict]) -> dict:
        """Aggregate violations by type."""
        by_type = {}
        for v in violations:
            v_type = v.get("details", {}).get("type", "unknown")
            by_type[v_type] = by_type.get(v_type, 0) + 1
        return by_type
EXERCISE

Implement a compliance validation system that checks every Ollama request for PII before processing, blocks requests containing SSN or credit card numbers, and logs compliance checks to the audit system.

← Chapter 16
Audit Logging
Chapter 18 →
Business Automation Suite Project