18. Business Automation Suite Project

Chapter 18 of 18 · 35 min

This capstone project integrates all previous chapters into a cohesive business automation system. Build a complete solution that handles document processing, approval workflows, alerting, and audit logging.

System Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Business Automation Suite                    │
├─────────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────┐  │
│  │   Document   │  │   Approval   │  │     Reporting        │  │
│  │   Ingestion  │──│   Workflow   │──│     Engine           │  │
│  └──────────────┘  └──────────────┘  └──────────────────────┘  │
│         │                  │                    │               │
│         ▼                  ▼                    ▼               │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                    AI Processing Layer                     │  │
│  │         (Ollama + Local Models)                          │  │
│  └──────────────────────────────────────────────────────────┘  │
│         │                  │                    │               │
│         ▼                  ▼                    ▼               │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────────────┐  │
│  │   Alerting   │  │   Audit      │  │   Dashboard          │  │
│  │   System     │  │   Logger     │  │   Updater            │  │
│  └──────────────┘  └──────────────┘  └──────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Core Implementation

# business_automation_suite.py
"""
Business Automation Suite
A complete automation system integrating document processing, 
approvals, AI analysis, alerting, and audit logging.
"""

import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import ollama

# Import components from previous chapters
from error_handling import ErrorHandler, AutomationError, ModelError
from alerting_system import AlertingSystem, Severity
from audit_logger import AuditLogger, AuditEventType, audited
from approval_workflow import ApprovalWorkflow
from data_classification import DataClassifier, DataClassification
from circuit_breaker import CircuitBreaker

class BusinessAutomationSuite:
    """
    Main orchestrator for business automation.
    Integrates all components into cohesive system.
    """
    
    def __init__(self, config_path: str = "suite_config.json"):
        self.config = self._load_config(config_path)
        self._initialize_components()
        self._setup_directories()
    
    def _load_config(self, path: str) -> dict:
        with open(path) as f:
            return json.load(f)
    
    def _initialize_components(self):
        """Initialize all subsystems."""
        # Error handling
        self.error_handler = ErrorHandler()
        
        # Audit logging
        self.audit_logger = AuditLogger(
            audit_path=self.config.get("audit_path", "audit_log.jsonl"),
            retention_days=self.config.get("retention_days", 365)
        )
        
        # Alerting
        self.alerting = AlertingSystem()
        
        # Approval workflow
        self.approval_workflow = ApprovalWorkflow()
        
        # Data classification
        self.data_classifier = DataClassifier()
        
        # Circuit breakers for external services
        self.model_circuit = CircuitBreaker(failure_threshold=5)
        
    def _setup_directories(self):
        """Create required directories."""
        dirs = [
            self.config.get("document_path", "documents"),
            self.config.get("output_path", "output"),
            self.config.get("log_path", "logs"),
            self.config.get("temp_path", "temp")
        ]
        for dir_path in dirs:
            Path(dir_path).mkdir(parents=True, exist_ok=True)
    
    def process_document(self, document_path: str, 
                         classification: str = "internal") -> dict:
        """
        Process incoming document through AI pipeline.
        Handles ingestion, analysis, classification, and action routing.
        """
        audit_context = {
            "actor": "system",
            "actor_type": "service",
            "session_id": None
        }
        
        # Log ingestion
        self.audit_logger.log(
            event_type=AuditEventType.DATA_ACCESS,
            actor="system",
            actor_type="service",
            resource=document_path,
            action="document_ingestion",
            outcome="success",
            details={"classification": classification},
            session_id=None
        )
        
        try:
            # Classify data
            policy = self.data_classifier.classify(classification)
            
            # Load document
            with open(document_path, "r") as f:
                content = f.read()
            
            # Check for sensitive data
            sensitive = self._detect_sensitive_content(content)
            if sensitive:
                self.alerting.send_alert(Alert(
                    alert_id=f"sensitive_{datetime.now().timestamp()}",
                    source="document_processor",
                    metric="pii_detected",
                    value=len(sensitive),
                    threshold=0,
                    severity=Severity.WARNING,
                    message=f"Sensitive data found in document",
                    created_at=datetime.now()
                ))
            
            # Process through AI with circuit breaker
            result = self.model_circuit.call(
                self._analyze_document,
                content,
                policy
            )
            
            # Route to appropriate workflow
            workflow_action = self._determine_action(result, policy)
            
            if workflow_action["requires_approval"]:
                approval_id = self.approval_workflow.trigger_approval(
                    action_type=workflow_action["type"],
                    action_data={
                        "document": document_path,
                        "result": result,
                        "action": workflow_action
                    }
                )
                return {
                    "status": "pending_approval",
                    "approval_id": approval_id,
                    "result": result
                }
            else:
                # Execute action directly
                self._execute_action(workflow_action, result)
                return {
                    "status": "completed",
                    "result": result
                }
                
        except Exception as e:
            error_result = self.error_handler.handle(
                e, "document_processing", {"document": document_path}
            )
            
            self.alerting.send_alert(Alert(
                alert_id=f"error_{datetime.now().timestamp()}",
                source="document_processor",
                metric="processing_error",
                value=1,
                threshold=0,
                severity=Severity.CRITICAL,
                message=f"Document processing failed: {str(e)}",
                created_at=datetime.now()
            ))
            
            return {
                "status": "failed",
                "error": str(e),
                "recovery": error_result.get("recovery", {})
            }
    
    def _analyze_document(self, content: str, policy) -> dict:
        """Analyze document using local AI model."""
        model = self.config.get("ai_model", "llama3")
        
        prompt = f"""Analyze this business document and extract:
1. Key entities (people, organizations, amounts)
2. Document type and purpose
3. Recommended actions
4. Risk assessment (low/medium/high)
5. Summary (3 sentences max)

Document:
{content[:5000]}"""

        response = ollama.chat(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            options={"temperature": 0.3}
        )
        
        return {
            "analysis": response["message"]["content"],
            "model_used": model,
            "timestamp": datetime.now().isoformat()
        }
    
    def _detect_sensitive_content(self, content: str) -> list[str]:
        """Detect sensitive data in content."""
        import re
        
        patterns = {
            "ssn": r'\b\d{3}-\d{2}-\d{4}\b',
            "credit_card": r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
            "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        }
        
        detected = []
        for pattern_name, pattern in patterns.items():
            matches = re.findall(pattern, content)
            if matches:
                detected.append(f"{pattern_name}: {len(matches)} occurrences")
        
        return detected
    
    def _determine_action(self, result: dict, policy) -> dict:
        """Determine workflow action based on analysis."""
        # Simple heuristic - extend with AI in production
        requires_approval = policy.classification == DataClassification.CONFIDENTIAL
        
        return {
            "type": "document_classification",
            "requires_approval": requires_approval,
            "auto_route": not requires_approval,
            "retention_days": policy.retention_days
        }
    
    def _execute_action(self, action: dict, result: dict):
        """Execute automated action."""
        output_path = Path(self.config.get("output_path", "output"))
        
        output_file = output_path / f"processed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(output_file, "w") as f:
            json.dump({
                "action": action,
                "result": result,
                "executed_at": datetime.now().isoformat()
            }, f, indent=2)
        
        self.audit_logger.log(
            event_type=AuditEventType.DATA_MODIFICATION,
            actor="system",
            actor_type="service",
            resource=str(output_file),
            action="automated_action",
            outcome="success",
            details=action
        )
    
    def run_report(self, report_type: str, period: str = "daily") -> dict:
        """Generate scheduled reports."""
        report_config = self.config.get("reports", {}).get(report_type, {})
        
        if not report_config:
            return {"error": f"Unknown report type: {report_type}"}
        
        # Execute report generation
        command = report_config.get("command")
        if not command:
            return {"error": "No command configured for report"}
        
        import subprocess
        try:
            result = subprocess.run(
                command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=report_config.get("timeout", 300)
            )
            
            output = {
                "report_type": report_type,
                "period": period,
                "status": "success" if result.returncode == 0 else "failed",
                "output": result.stdout,
                "generated_at": datetime.now().isoformat()
            }
            
            if result.returncode != 0:
                output["error"] = result.stderr
            
            # Log report generation
            self.audit_logger.log(
                event_type=AuditEventType.SYSTEM_ERROR if result.returncode != 0 
                          else AuditEventType.DATA_ACCESS,
                actor="scheduler",
                actor_type="system",
                resource=f"report:{report_type}",
                action="report_generation",
                outcome="success" if result.returncode == 0 else "failure",
                details=output
            )
            
            return output
            
        except subprocess.TimeoutExpired:
            return {
                "report_type": report_type,
                "status": "timeout",
                "error": f"Report exceeded {report_config.get('timeout', 300)}s"
            }
    
    def get_system_health(self) -> dict:
        """Return system health status."""
        health = {
            "timestamp": datetime.now().isoformat(),
            "components": {}
        }
        
        # Check Ollama
        try:
            ollama.list()
            health["components"]["ollama"] = {"status": "healthy"}
        except Exception as e:
            health["components"]["ollama"] = {
                "status": "unhealthy",
                "error": str(e)
            }
        
        # Check audit log integrity
        integrity = self.audit_logger.verify_integrity()
        health["components"]["audit"] = {
            "status": "healthy" if integrity["verified"] else "degraded",
            "total_entries": integrity["total_entries"],
            "issues": len(integrity["issues"])
        }
        
        # Check error rates
        health["components"]["error_handler"] = {
            "status": "healthy",
            "error_counts": self.error_handler.error_counts
        }
        
        # Overall status
        unhealthy = [k for k, v in health["components"].items() 
                     if v.get("status") != "healthy"]
        health["overall"] = "healthy" if not unhealthy else "degraded"
        health["issues"] = unhealthy
        
        return health
    
    def export_compliance_report(self, start_date: str, 
                                end_date: str) -> dict:
        """Export compliance report for audit period."""
        from compliance_validator import ComplianceValidator
        
        validator = ComplianceValidator(self.audit_logger, self.data_classifier)
        
        return validator.generate_compliance_report(start_date, end_date)

Configuration File

// suite_config.json
{
  "ai_model": "llama3",
  "document_path": "documents/incoming",
  "output_path": "output/processed",
  "log_path": "logs",
  "temp_path": "temp",
  "audit_path": "audit_log.jsonl",
  "retention_days": 365,
  "reports": {
    "daily_summary": {
      "command": "python3 /opt/reports/daily_summary.py",
      "timeout": 300,
      "schedule": "0 6 * * *"
    },
    "weekly_metrics": {
      "command": "python3 /opt/reports/weekly_metrics.py",
      "timeout": 600,
      "schedule": "0 7 * * 1"
    }
  },
  "alerting": {
    "webhook_url": "https://hooks.example.com/alerts",
    "severity_thresholds": {
      "critical_response_minutes": 5,
      "warning_response_minutes": 30
    }
  },
  "approval": {
    "threshold_amount": 10000,
    "timeout_hours": 24
  }
}

Main Entry Point

# main.py
#!/usr/bin/env python3
"""
Business Automation Suite - Main Entry Point
"""

import sys
import argparse
from pathlib import Path

def main():
    parser = argparse.ArgumentParser(description="Business Automation Suite")
    parser.add_argument("command", choices=[
        "process", "report", "health", "audit", "compliance"
    ], help="Command to execute")
    parser.add_argument("--file", help="Document file to process")
    parser.add_argument("--report-type", help="Type of report to generate")
    parser.add_argument("--period", help="Report period (daily, weekly, monthly)")
    parser.add_argument("--start-date", help="Start date for reports (ISO format)")
    parser.add_argument("--end-date", help="End date for reports (ISO format)")
    
    args = parser.parse_args()
    
    # Initialize suite
    from business_automation_suite import BusinessAutomationSuite
    suite = BusinessAutomationSuite()
    
    if args.command == "process":
        if not args.file:
            print("Error: --file required for process command")
            sys.exit(1)
        
        result = suite.process_document(args.file)
        print(json.dumps(result, indent=2))
        
    elif args.command == "report":
        if not args.report_type:
            print("Error: --report-type required for report command")
            sys.exit(1)
        
        result = suite.run_report(args.report_type, args.period or "daily")
        print(json.dumps(result, indent=2))
        
    elif args.command == "health":
        health = suite.get_system_health()
        print(json.dumps(health, indent=2))
        
    elif args.command == "audit":
        entries = suite.audit_logger.query(limit=100)
        print(json.dumps(entries, indent=2))
        
    elif args.command == "compliance":
        if not args.start_date or not args.end_date:
            print("Error: --start-date and --end-date required for compliance command")
            sys.exit(1)
        
        report = suite.export_compliance_report(args.start_date, args.end_date)
        print(json.dumps(report, indent=2))

if __name__ == "__main__":
    main()

Running the Suite

#!/bin/bash
# run_suite.sh

# Set up environment
export PYTHONPATH=/opt/business-automation:$PYTHONPATH

# Ensure Ollama is running
if ! curl -s http://localhost:11434/api/tags > /dev/null 2>&1; then
    echo "Warning: Ollama not responding at http://localhost:11434"
fi

# Run the suite
python3 main.py "$@"
# Example usage:

# Process a document
./run_suite.sh process --file documents/incoming/invoice_2024_001.txt

# Generate daily report
./run_suite.sh report --report-type daily_summary --period daily

# Check system health
./run_suite.sh health

# View recent audit entries
./run_suite.sh audit

# Generate compliance report
./run_suite.sh compliance --start-date 2024-01-01 --end-date 2024-01-31

Systemd Service Configuration

# /etc/systemd/system/business-automation.service
[Unit]
Description=Business Automation Suite
After=network.target ollama.service

[Service]
Type=simple
User=automation
Group=automation
WorkingDirectory=/opt/business-automation
Environment=PYTHONPATH=/opt/business-automation
ExecStart=/usr/bin/python3 /opt/business-automation/main.py serve
Restart=on-failure
RestartSec=10

# Security hardening
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/opt/business-automation/output /opt/business-automation/logs

[Install]
WantedBy=multi-user.target

Testing the System

# tests/test_suite.py
import pytest
import json
from pathlib import Path
from business_automation_suite import BusinessAutomationSuite

class TestBusinessAutomationSuite:
    @pytest.fixture
    def suite(self, tmp_path):
        """Create suite instance with test configuration."""
        config = {
            "ai_model": "llama3",
            "document_path": str(tmp_path / "docs"),
            "output_path": str(tmp_path / "output"),
            "log_path": str(tmp_path / "logs"),
            "temp_path": str(tmp_path / "temp"),
            "audit_path": str(tmp_path / "audit.jsonl"),
            "retention_days": 30
        }
        
        config_file = tmp_path / "config.json"
        with open(config_file, "w") as f:
            json.dump(config, f)
        
        return BusinessAutomationSuite(str(config_file))
    
    def test_initialization(self, suite):
        """Test suite initializes all components."""
        assert suite.error_handler is not None
        assert suite.audit_logger is not None
        assert suite.alerting is not None
    
    def test_health_check(self, suite):
        """Test system health endpoint."""
        health = suite.get_system_health()
        assert "timestamp" in health
        assert "components" in health
        assert "overall" in health
    
    def test_document_processing(self, suite, tmp_path):
        """Test document processing pipeline."""
        # Create test document
        doc_path = tmp_path / "test_doc.txt"
        with open(doc_path, "w") as f:
            f.write("Test document with sample content for processing.")
        
        result = suite.process_document(str(doc_path))
        assert "status" in result
    
    def test_audit_logging(self, suite):
        """Test audit log creation."""
        entries = suite.audit_logger.query(limit=10)
        assert isinstance(entries, list)

Deployment Checklist

  1. Infrastructure

    • Server with adequate resources (8GB+ RAM recommended)
    • Ollama installed and models downloaded
    • Python 3.10+ installed
    • Systemd configured for service management
  2. Configuration

    • Suite configuration file in production location
    • Secrets stored securely (not in config files)
    • Webhook URLs configured for alerting
    • Retention policies set appropriately
  3. Security

    • Access control configured
    • Audit log integrity verified
    • Data classification policies defined
    • MFA enabled for admin accounts
  4. Monitoring

    • Health check endpoint operational
    • Alert routing tested
    • Error rates within acceptable thresholds
  5. Documentation

    • Runbook created for common failures
    • Escalation contacts documented
    • Compliance requirements identified
EXERCISE

Extend the Business Automation Suite to support multiple AI models with automatic fallback. Add configuration that routes requests to different models based on content classification, and implement load balancing across model instances.