KEY INSIGHT
Audit trails provide forensic accountability for ML systems—what data was used, what models were trained, who approved deployment, and what predictions were made. Complete audit trails are non-negotiable for regulated industries and essential for post-incident investigation.
### Audit Trail Requirements
Audit trails serve multiple purposes: regulatory compliance, incident investigation, bias investigation, and operational improvement. Each purpose requires different data granularity.
**Regulatory compliance** typically requires immutable logs of data access, model versions served, and decision records for affected individuals. GDPR's right to explanation requires knowing which model made specific predictions. HIPAA requires knowing what training data contained protected health information.
**Incident investigation** requires reconstructing exact conditions at incident time—model version, input data distribution, serving configuration, and performance metrics.
**Bias investigation** requires tracking model behavior across demographic groups over time, enabling identification of when discriminatory patterns emerged.
### Audit Log Implementation
```python
# Python: Immutable audit logging for ML systems
import hashlib
import json
import sqlite3
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
import threading
class AuditLogType:
DATA_ACCESS = "data_access"
MODEL_TRAINING = "model_training"
MODEL_EVALUATION = "model_evaluation"
MODEL_PROMOTION = "model_promotion"
MODEL_DEPLOYMENT = "model_deployment"
INFERENCE_REQUEST = "inference_request"
INFERENCE_RESPONSE = "inference_response"
CONFIGURATION_CHANGE = "configuration_change"
@dataclass
class AuditEntry:
"""Single audit log entry."""
timestamp: str
entry_id: str # UUID or sequential
event_type: str
actor: str # User, system process, automated pipeline
resource_type: str # model, dataset, configuration
resource_id: str # Versioned identifier
action: str # read, write, train, deploy
details: dict # Additional context
checksum: str # Integrity verification
def __post_init__(self):
# Auto-generate checksum for integrity verification
content = json.dumps(asdict(self), sort_keys=True).encode()
# Exclude checksum itself from hashing
content_for_hash = json.dumps(
{k: v for k, v in asdict(self).items() if k != "checksum"},
sort_keys=True
).encode()
self.checksum = hashlib.sha256(content_for_hash).hexdigest()[:16]
class ImmutableAuditLog:
"""
Append-only audit log with integrity verification.
Designed for compliance and forensic purposes.
"""
def __init__(self, db_path: str):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock()
self._init_db()
def _init_db(self):
"""Initialize database with write-once semantics."""
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
entry_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
event_type TEXT NOT NULL,
actor TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
action TEXT NOT NULL,
details TEXT NOT NULL,
checksum TEXT NOT NULL,
written_at TEXT NOT NULL
)
""")
# Create index for efficient querying
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_audit_time
ON audit_log(timestamp)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_audit_resource
ON audit_log(resource_type, resource_id)
""")
# Enable WAL mode for concurrent reads during writes
cursor.execute("PRAGMA journal_mode=WAL")
conn.commit()
conn.close()
def append(self, entry: AuditEntry):
"""Append an immutable audit entry."""
with self._lock:
conn = sqlite3.connect(str(self.db_path))
try:
cursor = conn.cursor()
cursor.execute("""
INSERT OR IGNORE INTO audit_log
(entry_id, timestamp, event_type, actor, resource_type,
resource_id, action, details, checksum, written_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
entry.entry_id,
entry.timestamp,
entry.event_type,
entry.actor,
entry.resource_type,
entry.resource_id,
entry.action,
json.dumps(entry.details),
entry.checksum,
datetime.now().isoformat()
))
conn.commit()
finally:
conn.close()
def query(
self,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
event_type: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
limit: int = 1000
) -> list[dict]:
"""Query audit log entries."""
conditions = []
params = []
if resource_type:
conditions.append("resource_type = ?")
params.append(resource_type)
if resource_id:
conditions.append("resource_id = ?")
params.append(resource_id)
if event_type:
conditions.append("event_type = ?")
params.append(event_type)
if start_time:
conditions.append("timestamp >= ?")
params.append(start_time)
if end_time:
conditions.append("timestamp <= ?")
params.append(end_time)
query = "SELECT * FROM audit_log"
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += f" ORDER BY timestamp DESC LIMIT {limit}"
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
columns = ["entry_id", "timestamp", "event_type", "actor", "resource_type",
"resource_id", "action", "details", "checksum", "written_at"]
return [dict(zip(columns, row)) for row in rows]
def verify_integrity(self) -> tuple[bool, list[str]]:
"""Verify checksums for all entries."""
conn = sqlite3.connect(str(self.db_path))
cursor = conn.cursor()
cursor.execute("SELECT * FROM audit_log")
rows = cursor.fetchall()
conn.close()
columns = ["entry_id", "timestamp", "event_type", "actor", "resource_type",
"resource_id", "action", "details", "checksum", "written_at"]
corrupted = []
for row in rows:
record = dict(zip(columns, row))
temp_entry = AuditEntry(
timestamp=record["timestamp"],
entry_id=record["entry_id"],
event_type=record["event_type"],
actor=record["actor"],
resource_type=record["resource_type"],
resource_id=record["resource_id"],
action=record["action"],
details=json.loads(record["details"]),
checksum=record["checksum"] # Expected from stored
)
# Verify
content_for_hash = json.dumps(
{k: v for k, v in asdict(temp_entry).items() if k != "checksum"},
sort_keys=True
).encode()
computed = hashlib.sha256(content_for_hash).hexdigest()[:16]
if computed != record["checksum"]:
corrupted.append(record["entry_id"])
return len(corrupted) == 0, corrupted
# Usage: Audit logging in serving context
def log_inference_request(
audit_log: ImmutableAuditLog,
request_id: str,
model_id: str,
model_version: str,
input_features: dict,
actor: str = "user_system"
):
"""Log an inference request for audit purposes."""
entry = AuditEntry(
timestamp=datetime.now().isoformat(),
entry_id=request_id,
event_type=AuditLogType.INFERENCE_REQUEST,
actor=actor,
resource_type="model",
resource_id=f"{model_id}:{model_version}",
action="predict",
details={
"input_feature_hash": hashlib.sha256(
json.dumps(input_features, sort_keys=True).encode()
).hexdigest()[:16]
# Do not log raw input—may contain PII
}
)
audit_log.append(entry)
```
### What Not to Log
Audit trails sometimes over-collect, logging sensitive information that itself becomes a liability. Do not log raw personal data in audit entries—log hashes or aggregate indicators for correlation purposes. Avoid logging information that would expose your model architecture or proprietary processing logic to potential attackers.
### Retention and Rotation
Audit logs have mass. Establish retention policies based on regulatory requirements—financial services may require 7+ years; general applications may require 90 days. Implement log rotation with appropriate archival before deletion.