16. Audit Logging
Chapter 16 of 18 · 25 min
Audit logs provide the historical record necessary for compliance, debugging, and accountability. Every significant action in an automation system must leave a traceable entry.
Structured Audit Log Design
# audit_logger.py
import json
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
class AuditEventType(Enum):
DATA_ACCESS = "data_access"
DATA_MODIFICATION = "data_modification"
AI_REQUEST = "ai_request"
AI_RESPONSE = "ai_response"
CONFIGURATION_CHANGE = "configuration_change"
USER_ACTION = "user_action"
SYSTEM_ERROR = "system_error"
SECURITY_EVENT = "security_event"
@dataclass
class AuditEntry:
event_id: str
timestamp: str
event_type: str
actor: str
actor_type: str # "user" | "system" | "service"
resource: str
action: str
outcome: str # "success" | "failure" | "partial"
details: dict
session_id: Optional[str] = None
ip_address: Optional[str] = None
checksum: Optional[str] = None
class AuditLogger:
def __init__(self, audit_path: str = "audit_log.jsonl",
retention_days: int = 365):
self.audit_path = Path(audit_path)
self.retention_days = retention_days
self._ensure_log_exists()
def _ensure_log_exists(self):
"""Create log file if not exists."""
if not self.audit_path.exists():
self.audit_path.touch()
def log(self, event_type: AuditEventType, actor: str,
actor_type: str, resource: str, action: str,
outcome: str, details: dict = None,
session_id: str = None, ip_address: str = None):
"""Create and persist an audit entry."""
entry = AuditEntry(
event_id=self._generate_event_id(),
timestamp=datetime.utcnow().isoformat(),
event_type=event_type.value,
actor=actor,
actor_type=actor_type,
resource=resource,
action=action,
outcome=outcome,
details=details or {},
session_id=session_id,
ip_address=ip_address,
checksum=None # Computed after all fields populated
)
# Compute tamper-evident checksum
entry.checksum = self._compute_checksum(entry)
self._persist_entry(entry)
return entry.event_id
def _generate_event_id(self) -> str:
"""Generate unique event identifier."""
import uuid
return f"evt_{uuid.uuid4().hex[:16]}"
def _compute_checksum(self, entry: AuditEntry) -> str:
"""Compute SHA-256 checksum for tamper detection."""
# Exclude checksum itself from computation
data = f"{entry.event_id}|{entry.timestamp}|{entry.event_type}|{entry.actor}|{entry.actor_type}|{entry.resource}|{entry.action}|{entry.outcome}|{json.dumps(entry.details, sort_keys=True)}"
return hashlib.sha256(data.encode()).hexdigest()
def _persist_entry(self, entry: AuditEntry):
"""Write entry to audit log."""
with open(self.audit_path, "a") as f:
f.write(json.dumps(asdict(entry)) + "\n")
def query(self, filters: dict = None, start_time: str = None,
end_time: str = None, limit: int = 1000) -> list[dict]:
"""Query audit log with filters."""
filters = filters or {}
results = []
with open(self.audit_path) as f:
for line in f:
entry = json.loads(line)
# Time range filter
if start_time and entry["timestamp"] < start_time:
continue
if end_time and entry["timestamp"] > end_time:
continue
# Field filters
match = True
for key, value in filters.items():
if entry.get(key) != value:
match = False
break
if match:
results.append(entry)
if len(results) >= limit:
break
return results
def verify_integrity(self) -> dict:
"""Verify checksum integrity of audit log."""
issues = []
with open(self.audit_path) as f:
for line in f:
entry = json.loads(line)
stored_checksum = entry.get("checksum")
# Recompute checksum
entry_copy = dict(entry)
entry_copy["checksum"] = None
recomputed = self._compute_checksum(AuditEntry(**entry_copy))
if stored_checksum != recomputed:
issues.append({
"event_id": entry["event_id"],
"timestamp": entry["timestamp"],
"issue": "checksum_mismatch"
})
return {
"verified": len(issues) == 0,
"total_entries": sum(1 for _ in open(self.audit_path)),
"issues": issues
}
def archive_old_entries(self, archive_path: str = "audit_archive"):
"""Archive entries older than retention period."""
archive = Path(archive_path)
archive.mkdir(exist_ok=True)
cutoff = datetime.utcnow().timestamp() - (self.retention_days * 86400)
remaining = []
archived_count = 0
with open(self.audit_path) as f:
for line in f:
entry = json.loads(line)
entry_time = datetime.fromisoformat(entry["timestamp"]).timestamp()
if entry_time < cutoff:
# Archive this entry
archive_file = archive / f"audit_{entry['timestamp'][:10]}.jsonl"
with open(archive_file, "a") as af:
af.write(line)
archived_count += 1
else:
remaining.append(line)
# Rewrite log with remaining entries
with open(self.audit_path, "w") as f:
f.writelines(remaining)
return {"archived": archived_count, "remaining": len(remaining)}
Automatic Logging Decorator
# audit_decorator.py
from functools import wraps
from typing import Callable
def audited(operation: str, resource: str):
"""Decorator to automatically log function calls."""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
# Extract actor from context if available
actor = "system"
actor_type = "system"
session_id = None
ip_address = None
# Try to get from kwargs
if "audit_context" in kwargs:
ctx = kwargs["audit_context"]
actor = ctx.get("actor", actor)
actor_type = ctx.get("actor_type", actor_type)
session_id = ctx.get("session_id")
ip_address = ctx.get("ip_address")
del kwargs["audit_context"]
logger = kwargs.get("audit_logger")
if not logger:
# Use default logger
logger = AuditLogger()
try:
result = func(*args, **kwargs)
logger.log(
event_type=AuditEventType.DATA_ACCESS if "read" in operation.lower()
else AuditEventType.DATA_MODIFICATION,
actor=actor,
actor_type=actor_type,
resource=resource,
action=operation,
outcome="success",
details={"args": str(args)[:500], "result_preview": str(result)[:500]},
session_id=session_id,
ip_address=ip_address
)
return result
except Exception as e:
logger.log(
event_type=AuditEventType.SYSTEM_ERROR,
actor=actor,
actor_type=actor_type,
resource=resource,
action=operation,
outcome="failure",
details={"error": str(e), "args": str(args)[:500]},
session_id=session_id,
ip_address=ip_address
)
raise
return wrapper
return decorator
# Usage
@audited(operation="Process AI request", resource="ollama_api")
def call_model(model_name: str, prompt: str):
import ollama
response = ollama.chat(model=model_name, messages=[{"role": "user", "content": prompt}])
return response
Compliance Reporting
# compliance_report.py
from datetime import datetime, timedelta
class ComplianceReporter:
def __init__(self, audit_logger: AuditLogger):
self.logger = audit_logger
def generate_report(self, start_date: str, end_date: str,
report_type: str = "full") -> dict:
"""Generate compliance report for time period."""
entries = self.logger.query(
start_time=start_date,
end_time=end_date,
limit=100000
)
if report_type == "full":
return self._full_report(entries)
elif report_type == "security":
return self._security_report(entries)
elif report_type == "data_access":
return self._data_access_report(entries)
else:
return {"error": f"Unknown report type: {report_type}"}
def _full_report(self, entries: list[dict]) -> dict:
"""Generate thorough audit report."""
by_type = {}
by_actor = {}
by_outcome = {"success": 0, "failure": 0, "partial": 0}
for entry in entries:
event_type = entry["event_type"]
by_type[event_type] = by_type.get(event_type, 0) + 1
actor = entry["actor"]
by_actor[actor] = by_actor.get(actor, 0) + 1
outcome = entry["outcome"]
by_outcome[outcome] = by_outcome.get(outcome, 0) + 1
return {
"report_period": self._get_date_range(entries),
"total_events": len(entries),
"by_event_type": by_type,
"by_actor": by_actor,
"by_outcome": by_outcome,
"integrity_check": self.logger.verify_integrity()
}
def _security_report(self, entries: list[dict]) -> dict:
"""Generate security-focused report."""
security_events = [
AuditEventType.SECURITY_EVENT.value,
AuditEventType.CONFIGURATION_CHANGE.value
]
security_entries = [e for e in entries if e["event_type"] in security_events]
failed_auth = [e for e in entries if e["outcome"] == "failure"
and "auth" in e["action"].lower()]
return {
"report_type": "security",
"total_security_events": len(security_entries),
"failed_authentication_attempts": len(failed_auth),
"configuration_changes": len([e for e in entries
if e["event_type"] == AuditEventType.CONFIGURATION_CHANGE.value]),
"security_events": security_entries
}
def _data_access_report(self, entries: list[dict]) -> dict:
"""Generate data access report for compliance."""
data_access = [e for e in entries
if e["event_type"] in [AuditEventType.DATA_ACCESS.value,
AuditEventType.DATA_MODIFICATION.value]]
return {
"report_type": "data_access",
"total_data_events": len(data_access),
"read_operations": len([e for e in data_access
if e["action"].lower().startswith("read")]),
"write_operations": len([e for e in data_access
if e["action"].lower().startswith("write")]),
"resources_accessed": list(set(e["resource"] for e in data_access)),
"actors": list(set(e["actor"] for e in data_access))
}
def _get_date_range(self, entries: list[dict]) -> dict:
"""Extract date range from entries."""
if not entries:
return {"start": None, "end": None}
timestamps = [e["timestamp"] for e in entries]
return {"start": min(timestamps), "end": max(timestamps)}
EXERCISE
Create an audit logging system that records every Ollama API call including the model used, prompt length, response length, latency, and whether the response was successfully used downstream.