23. Compliance Auditing

Chapter 23 of 24 · 20 min

Enterprise RAG systems process sensitive data requiring compliance with SOC 2, HIPAA, GDPR, and industry-specific regulations. Auditing provides evidence of controls and enables forensic investigation.

Audit logging architecture:

import logging
from datetime import datetime
from cryptography.fernet import Fernet
import json

class ComplianceAuditor:
    def __init__(self, encryption_key: bytes, audit_sink: str):
        self.cipher = Fernet(encryption_key)
        self.audit_sink = audit_sink  # S3 bucket or SIEM endpoint
    
    def log_query(self, event: dict) -> str:
        """Log query event with PII handling"""
        audit_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "event_type": "RAG_QUERY",
            "request_id": event.get("request_id"),
            
            # Fields with explicit consent for logging
            "query_hash": self._hash_sensitive(event.get("query", "")),
            "result_count": event.get("result_count"),
            "latency_ms": event.get("latency_ms"),
            
            # Anonymized user identifier (no PII)
            "user_segment": self._anonymize_user(event.get("user_id")),
            
            # Compliance-relevant metadata
            "data_classification": "INTERNAL",  # or SENSITIVE/RESTRICTED
            "consent_version": event.get("consent_version"),
        }
        
        return self._write_audit_entry(audit_entry)
    
    def log_data_access(self, event: dict) -> str:
        """Log when documents are retrieved from vector store"""
        audit_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "event_type": "DATA_ACCESS",
            "request_id": event.get("request_id"),
            
            # Document-level access without content
            "document_ids": event.get("document_ids", []),
            "access_reason": event.get("access_reason"),
            
            # Data residency tracking
            "region": event.get("region"),
            "data_classification": event.get("doc_classification"),
        }
        
        return self._write_audit_entry(audit_entry)
    
    def _hash_sensitive(self, value: str) -> str:
        """SHA-256 hash for query text without storing plaintext"""
        if not value:
            return ""
        return hashlib.sha256(value.encode()).hexdigest()[:16]
    
    def _anonymize_user(self, user_id: str) -> str:
        """Pseudonymize user identifiers"""
        if not user_id:
            return "anonymous"
        # Consistent anonymization: same user always maps to same ID
        return f"user_{hashlib.sha256(user_id.encode()).hexdigest()[:12]}"
    
    def _write_audit_entry(self, entry: dict) -> str:
        """Write encrypted audit entry to sink"""
        entry_json = json.dumps(entry)
        encrypted = self.cipher.encrypt(entry_json.encode())
        
        # Append to append-only log
        log_key = f"audit/{datetime.utcnow().date()}/{entry['request_id']}.enc"
        self._s3.put_object(
            Bucket=self.audit_sink,
            Key=log_key,
            Body=encrypted,
            ServerSideEncryption="AES256"
        )
        
        return entry["request_id"]

Compliance-specific queries:

    def generate_gdpr_data_subject_report(self, user_pseudonym: str) -> dict:
        """GDPR Article 15: Right of access report"""
        # Query all audit logs for user pseudonym
        query_results = self._query_audit_logs(
            filter_expr=f"user_segment = '{user_pseudonym}'",
            time_range_days=365
        )
        
        return {
            "user_segment": user_pseudonym,
            "queries_in_period": len(query_results),
            "access_timestamps": [r["timestamp"] for r in query_results],
            "retention_policy": "365 days",
            "generated_at": datetime.utcnow().isoformat()
        }
    
    def generate_soc2_access_report(self, start_date: str, end_date: str) -> dict:
        """SOC 2 CC6.1: Logical and physical access controls"""
        privileged_access = self._query_audit_logs(
            filter_expr="event_type IN ('ADMIN_ACCESS', 'CONFIG_CHANGE')",
            time_start=start_date,
            time_end=end_date
        )
        
        return {
            "period": {"start": start_date, "end": end_date},
            "privileged_access_events": len(privileged_access),
            "unique_admin_actions": self._count_unique_actions(privileged_access),
            "compliance_control_ref": "CC6.1"
        }

Failure Modes:

  • PII leakage in logs: Query content logged verbatim may contain SSNs, emails, healthcare information. Always hash or redact sensitive fields.
  • Audit log tampering: Immutable storage required. S3 Object Lock with Governance mode provides tamper-evidence.
  • Incomplete consent tracking: Processing data without valid consent is a compliance violation. Consent version must accompany every query.
  • Retention period mismatches: GDPR requires deletion after 30 days unless legal basis for longer retention exists. Automated purging policies essential.
EXERCISE

Implement an audit log filter that redacts email addresses and phone numbers from query text before logging. Verify with test cases containing realistic PII.