RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Business Automation with Local AI
  6. /Ch. 16
Business Automation with Local AI

16. Audit Logging

Chapter 16 of 18 · 25 min
KEY INSIGHT

Audit logs must be tamper-evident and retained long enough to satisfy regulatory requirements. Compute checksums on every entry and implement retention policies before incidents occur.

Audit logs provide the historical record necessary for compliance, debugging, and accountability. Every significant action in an automation system must leave a traceable entry.

Structured Audit Log Design

# audit_logger.py
import json
from datetime import datetime
from pathlib import Path
from typing import Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib

class AuditEventType(Enum):
    DATA_ACCESS = "data_access"
    DATA_MODIFICATION = "data_modification"
    AI_REQUEST = "ai_request"
    AI_RESPONSE = "ai_response"
    CONFIGURATION_CHANGE = "configuration_change"
    USER_ACTION = "user_action"
    SYSTEM_ERROR = "system_error"
    SECURITY_EVENT = "security_event"

@dataclass
class AuditEntry:
    event_id: str
    timestamp: str
    event_type: str
    actor: str
    actor_type: str  # "user" | "system" | "service"
    resource: str
    action: str
    outcome: str  # "success" | "failure" | "partial"
    details: dict
    session_id: Optional[str] = None
    ip_address: Optional[str] = None
    checksum: Optional[str] = None

class AuditLogger:
    def __init__(self, audit_path: str = "audit_log.jsonl",
                 retention_days: int = 365):
        self.audit_path = Path(audit_path)
        self.retention_days = retention_days
        self._ensure_log_exists()
    
    def _ensure_log_exists(self):
        """Create log file if not exists."""
        if not self.audit_path.exists():
            self.audit_path.touch()
    
    def log(self, event_type: AuditEventType, actor: str,
            actor_type: str, resource: str, action: str,
            outcome: str, details: dict = None,
            session_id: str = None, ip_address: str = None):
        """Create and persist an audit entry."""
        entry = AuditEntry(
            event_id=self._generate_event_id(),
            timestamp=datetime.utcnow().isoformat(),
            event_type=event_type.value,
            actor=actor,
            actor_type=actor_type,
            resource=resource,
            action=action,
            outcome=outcome,
            details=details or {},
            session_id=session_id,
            ip_address=ip_address,
            checksum=None  # Computed after all fields populated
        )
        
        # Compute tamper-evident checksum
        entry.checksum = self._compute_checksum(entry)
        
        self._persist_entry(entry)
        
        return entry.event_id
    
    def _generate_event_id(self) -> str:
        """Generate unique event identifier."""
        import uuid
        return f"evt_{uuid.uuid4().hex[:16]}"
    
    def _compute_checksum(self, entry: AuditEntry) -> str:
        """Compute SHA-256 checksum for tamper detection."""
        # Exclude checksum itself from computation
        data = f"{entry.event_id}|{entry.timestamp}|{entry.event_type}|{entry.actor}|{entry.actor_type}|{entry.resource}|{entry.action}|{entry.outcome}|{json.dumps(entry.details, sort_keys=True)}"
        return hashlib.sha256(data.encode()).hexdigest()
    
    def _persist_entry(self, entry: AuditEntry):
        """Write entry to audit log."""
        with open(self.audit_path, "a") as f:
            f.write(json.dumps(asdict(entry)) + "\n")
    
    def query(self, filters: dict = None, start_time: str = None,
              end_time: str = None, limit: int = 1000) -> list[dict]:
        """Query audit log with filters."""
        filters = filters or {}
        results = []
        
        with open(self.audit_path) as f:
            for line in f:
                entry = json.loads(line)
                
                # Time range filter
                if start_time and entry["timestamp"] < start_time:
                    continue
                if end_time and entry["timestamp"] > end_time:
                    continue
                
                # Field filters
                match = True
                for key, value in filters.items():
                    if entry.get(key) != value:
                        match = False
                        break
                
                if match:
                    results.append(entry)
                    
                    if len(results) >= limit:
                        break
        
        return results
    
    def verify_integrity(self) -> dict:
        """Verify checksum integrity of audit log."""
        issues = []
        
        with open(self.audit_path) as f:
            for line in f:
                entry = json.loads(line)
                stored_checksum = entry.get("checksum")
                
                # Recompute checksum
                entry_copy = dict(entry)
                entry_copy["checksum"] = None
                
                recomputed = self._compute_checksum(AuditEntry(**entry_copy))
                
                if stored_checksum != recomputed:
                    issues.append({
                        "event_id": entry["event_id"],
                        "timestamp": entry["timestamp"],
                        "issue": "checksum_mismatch"
                    })
        
        return {
            "verified": len(issues) == 0,
            "total_entries": sum(1 for _ in open(self.audit_path)),
            "issues": issues
        }
    
    def archive_old_entries(self, archive_path: str = "audit_archive"):
        """Archive entries older than retention period."""
        archive = Path(archive_path)
        archive.mkdir(exist_ok=True)
        
        cutoff = datetime.utcnow().timestamp() - (self.retention_days * 86400)
        remaining = []
        archived_count = 0
        
        with open(self.audit_path) as f:
            for line in f:
                entry = json.loads(line)
                entry_time = datetime.fromisoformat(entry["timestamp"]).timestamp()
                
                if entry_time < cutoff:
                    # Archive this entry
                    archive_file = archive / f"audit_{entry['timestamp'][:10]}.jsonl"
                    with open(archive_file, "a") as af:
                        af.write(line)
                    archived_count += 1
                else:
                    remaining.append(line)
        
        # Rewrite log with remaining entries
        with open(self.audit_path, "w") as f:
            f.writelines(remaining)
        
        return {"archived": archived_count, "remaining": len(remaining)}

Automatic Logging Decorator

# audit_decorator.py
from functools import wraps
from typing import Callable

def audited(operation: str, resource: str):
    """Decorator to automatically log function calls."""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Extract actor from context if available
            actor = "system"
            actor_type = "system"
            session_id = None
            ip_address = None
            
            # Try to get from kwargs
            if "audit_context" in kwargs:
                ctx = kwargs["audit_context"]
                actor = ctx.get("actor", actor)
                actor_type = ctx.get("actor_type", actor_type)
                session_id = ctx.get("session_id")
                ip_address = ctx.get("ip_address")
                del kwargs["audit_context"]
            
            logger = kwargs.get("audit_logger")
            if not logger:
                # Use default logger
                logger = AuditLogger()
            
            try:
                result = func(*args, **kwargs)
                logger.log(
                    event_type=AuditEventType.DATA_ACCESS if "read" in operation.lower() 
                    else AuditEventType.DATA_MODIFICATION,
                    actor=actor,
                    actor_type=actor_type,
                    resource=resource,
                    action=operation,
                    outcome="success",
                    details={"args": str(args)[:500], "result_preview": str(result)[:500]},
                    session_id=session_id,
                    ip_address=ip_address
                )
                return result
            except Exception as e:
                logger.log(
                    event_type=AuditEventType.SYSTEM_ERROR,
                    actor=actor,
                    actor_type=actor_type,
                    resource=resource,
                    action=operation,
                    outcome="failure",
                    details={"error": str(e), "args": str(args)[:500]},
                    session_id=session_id,
                    ip_address=ip_address
                )
                raise
        
        return wrapper
    return decorator

# Usage
@audited(operation="Process AI request", resource="ollama_api")
def call_model(model_name: str, prompt: str):
    import ollama
    response = ollama.chat(model=model_name, messages=[{"role": "user", "content": prompt}])
    return response

Compliance Reporting

# compliance_report.py
from datetime import datetime, timedelta

class ComplianceReporter:
    def __init__(self, audit_logger: AuditLogger):
        self.logger = audit_logger
    
    def generate_report(self, start_date: str, end_date: str,
                       report_type: str = "full") -> dict:
        """Generate compliance report for time period."""
        entries = self.logger.query(
            start_time=start_date,
            end_time=end_date,
            limit=100000
        )
        
        if report_type == "full":
            return self._full_report(entries)
        elif report_type == "security":
            return self._security_report(entries)
        elif report_type == "data_access":
            return self._data_access_report(entries)
        else:
            return {"error": f"Unknown report type: {report_type}"}
    
    def _full_report(self, entries: list[dict]) -> dict:
        """Generate thorough audit report."""
        by_type = {}
        by_actor = {}
        by_outcome = {"success": 0, "failure": 0, "partial": 0}
        
        for entry in entries:
            event_type = entry["event_type"]
            by_type[event_type] = by_type.get(event_type, 0) + 1
            
            actor = entry["actor"]
            by_actor[actor] = by_actor.get(actor, 0) + 1
            
            outcome = entry["outcome"]
            by_outcome[outcome] = by_outcome.get(outcome, 0) + 1
        
        return {
            "report_period": self._get_date_range(entries),
            "total_events": len(entries),
            "by_event_type": by_type,
            "by_actor": by_actor,
            "by_outcome": by_outcome,
            "integrity_check": self.logger.verify_integrity()
        }
    
    def _security_report(self, entries: list[dict]) -> dict:
        """Generate security-focused report."""
        security_events = [
            AuditEventType.SECURITY_EVENT.value,
            AuditEventType.CONFIGURATION_CHANGE.value
        ]
        
        security_entries = [e for e in entries if e["event_type"] in security_events]
        
        failed_auth = [e for e in entries if e["outcome"] == "failure" 
                      and "auth" in e["action"].lower()]
        
        return {
            "report_type": "security",
            "total_security_events": len(security_entries),
            "failed_authentication_attempts": len(failed_auth),
            "configuration_changes": len([e for e in entries 
                                        if e["event_type"] == AuditEventType.CONFIGURATION_CHANGE.value]),
            "security_events": security_entries
        }
    
    def _data_access_report(self, entries: list[dict]) -> dict:
        """Generate data access report for compliance."""
        data_access = [e for e in entries 
                      if e["event_type"] in [AuditEventType.DATA_ACCESS.value,
                                           AuditEventType.DATA_MODIFICATION.value]]
        
        return {
            "report_type": "data_access",
            "total_data_events": len(data_access),
            "read_operations": len([e for e in data_access 
                                  if e["action"].lower().startswith("read")]),
            "write_operations": len([e for e in data_access 
                                   if e["action"].lower().startswith("write")]),
            "resources_accessed": list(set(e["resource"] for e in data_access)),
            "actors": list(set(e["actor"] for e in data_access))
        }
    
    def _get_date_range(self, entries: list[dict]) -> dict:
        """Extract date range from entries."""
        if not entries:
            return {"start": None, "end": None}
        
        timestamps = [e["timestamp"] for e in entries]
        return {"start": min(timestamps), "end": max(timestamps)}
EXERCISE

Create an audit logging system that records every Ollama API call including the model used, prompt length, response length, latency, and whether the response was successfully used downstream.

← Chapter 15
Error Handling
Chapter 17 →
Security and Compliance