18. Business Automation Suite Project
Chapter 18 of 18 · 35 min
This capstone project integrates all previous chapters into a cohesive business automation system. Build a complete solution that handles document processing, approval workflows, alerting, and audit logging.
System Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Business Automation Suite │
├─────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Document │ │ Approval │ │ Reporting │ │
│ │ Ingestion │──│ Workflow │──│ Engine │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AI Processing Layer │ │
│ │ (Ollama + Local Models) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Alerting │ │ Audit │ │ Dashboard │ │
│ │ System │ │ Logger │ │ Updater │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Core Implementation
# business_automation_suite.py
"""
Business Automation Suite
A complete automation system integrating document processing,
approvals, AI analysis, alerting, and audit logging.
"""
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import ollama
# Import components from previous chapters
from error_handling import ErrorHandler, AutomationError, ModelError
from alerting_system import AlertingSystem, Severity
from audit_logger import AuditLogger, AuditEventType, audited
from approval_workflow import ApprovalWorkflow
from data_classification import DataClassifier, DataClassification
from circuit_breaker import CircuitBreaker
class BusinessAutomationSuite:
"""
Main orchestrator for business automation.
Integrates all components into cohesive system.
"""
def __init__(self, config_path: str = "suite_config.json"):
self.config = self._load_config(config_path)
self._initialize_components()
self._setup_directories()
def _load_config(self, path: str) -> dict:
with open(path) as f:
return json.load(f)
def _initialize_components(self):
"""Initialize all subsystems."""
# Error handling
self.error_handler = ErrorHandler()
# Audit logging
self.audit_logger = AuditLogger(
audit_path=self.config.get("audit_path", "audit_log.jsonl"),
retention_days=self.config.get("retention_days", 365)
)
# Alerting
self.alerting = AlertingSystem()
# Approval workflow
self.approval_workflow = ApprovalWorkflow()
# Data classification
self.data_classifier = DataClassifier()
# Circuit breakers for external services
self.model_circuit = CircuitBreaker(failure_threshold=5)
def _setup_directories(self):
"""Create required directories."""
dirs = [
self.config.get("document_path", "documents"),
self.config.get("output_path", "output"),
self.config.get("log_path", "logs"),
self.config.get("temp_path", "temp")
]
for dir_path in dirs:
Path(dir_path).mkdir(parents=True, exist_ok=True)
def process_document(self, document_path: str,
classification: str = "internal") -> dict:
"""
Process incoming document through AI pipeline.
Handles ingestion, analysis, classification, and action routing.
"""
audit_context = {
"actor": "system",
"actor_type": "service",
"session_id": None
}
# Log ingestion
self.audit_logger.log(
event_type=AuditEventType.DATA_ACCESS,
actor="system",
actor_type="service",
resource=document_path,
action="document_ingestion",
outcome="success",
details={"classification": classification},
session_id=None
)
try:
# Classify data
policy = self.data_classifier.classify(classification)
# Load document
with open(document_path, "r") as f:
content = f.read()
# Check for sensitive data
sensitive = self._detect_sensitive_content(content)
if sensitive:
self.alerting.send_alert(Alert(
alert_id=f"sensitive_{datetime.now().timestamp()}",
source="document_processor",
metric="pii_detected",
value=len(sensitive),
threshold=0,
severity=Severity.WARNING,
message=f"Sensitive data found in document",
created_at=datetime.now()
))
# Process through AI with circuit breaker
result = self.model_circuit.call(
self._analyze_document,
content,
policy
)
# Route to appropriate workflow
workflow_action = self._determine_action(result, policy)
if workflow_action["requires_approval"]:
approval_id = self.approval_workflow.trigger_approval(
action_type=workflow_action["type"],
action_data={
"document": document_path,
"result": result,
"action": workflow_action
}
)
return {
"status": "pending_approval",
"approval_id": approval_id,
"result": result
}
else:
# Execute action directly
self._execute_action(workflow_action, result)
return {
"status": "completed",
"result": result
}
except Exception as e:
error_result = self.error_handler.handle(
e, "document_processing", {"document": document_path}
)
self.alerting.send_alert(Alert(
alert_id=f"error_{datetime.now().timestamp()}",
source="document_processor",
metric="processing_error",
value=1,
threshold=0,
severity=Severity.CRITICAL,
message=f"Document processing failed: {str(e)}",
created_at=datetime.now()
))
return {
"status": "failed",
"error": str(e),
"recovery": error_result.get("recovery", {})
}
def _analyze_document(self, content: str, policy) -> dict:
"""Analyze document using local AI model."""
model = self.config.get("ai_model", "llama3")
prompt = f"""Analyze this business document and extract:
1. Key entities (people, organizations, amounts)
2. Document type and purpose
3. Recommended actions
4. Risk assessment (low/medium/high)
5. Summary (3 sentences max)
Document:
{content[:5000]}"""
response = ollama.chat(
model=model,
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0.3}
)
return {
"analysis": response["message"]["content"],
"model_used": model,
"timestamp": datetime.now().isoformat()
}
def _detect_sensitive_content(self, content: str) -> list[str]:
"""Detect sensitive data in content."""
import re
patterns = {
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
}
detected = []
for pattern_name, pattern in patterns.items():
matches = re.findall(pattern, content)
if matches:
detected.append(f"{pattern_name}: {len(matches)} occurrences")
return detected
def _determine_action(self, result: dict, policy) -> dict:
"""Determine workflow action based on analysis."""
# Simple heuristic - extend with AI in production
requires_approval = policy.classification == DataClassification.CONFIDENTIAL
return {
"type": "document_classification",
"requires_approval": requires_approval,
"auto_route": not requires_approval,
"retention_days": policy.retention_days
}
def _execute_action(self, action: dict, result: dict):
"""Execute automated action."""
output_path = Path(self.config.get("output_path", "output"))
output_file = output_path / f"processed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, "w") as f:
json.dump({
"action": action,
"result": result,
"executed_at": datetime.now().isoformat()
}, f, indent=2)
self.audit_logger.log(
event_type=AuditEventType.DATA_MODIFICATION,
actor="system",
actor_type="service",
resource=str(output_file),
action="automated_action",
outcome="success",
details=action
)
def run_report(self, report_type: str, period: str = "daily") -> dict:
"""Generate scheduled reports."""
report_config = self.config.get("reports", {}).get(report_type, {})
if not report_config:
return {"error": f"Unknown report type: {report_type}"}
# Execute report generation
command = report_config.get("command")
if not command:
return {"error": "No command configured for report"}
import subprocess
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=report_config.get("timeout", 300)
)
output = {
"report_type": report_type,
"period": period,
"status": "success" if result.returncode == 0 else "failed",
"output": result.stdout,
"generated_at": datetime.now().isoformat()
}
if result.returncode != 0:
output["error"] = result.stderr
# Log report generation
self.audit_logger.log(
event_type=AuditEventType.SYSTEM_ERROR if result.returncode != 0
else AuditEventType.DATA_ACCESS,
actor="scheduler",
actor_type="system",
resource=f"report:{report_type}",
action="report_generation",
outcome="success" if result.returncode == 0 else "failure",
details=output
)
return output
except subprocess.TimeoutExpired:
return {
"report_type": report_type,
"status": "timeout",
"error": f"Report exceeded {report_config.get('timeout', 300)}s"
}
def get_system_health(self) -> dict:
"""Return system health status."""
health = {
"timestamp": datetime.now().isoformat(),
"components": {}
}
# Check Ollama
try:
ollama.list()
health["components"]["ollama"] = {"status": "healthy"}
except Exception as e:
health["components"]["ollama"] = {
"status": "unhealthy",
"error": str(e)
}
# Check audit log integrity
integrity = self.audit_logger.verify_integrity()
health["components"]["audit"] = {
"status": "healthy" if integrity["verified"] else "degraded",
"total_entries": integrity["total_entries"],
"issues": len(integrity["issues"])
}
# Check error rates
health["components"]["error_handler"] = {
"status": "healthy",
"error_counts": self.error_handler.error_counts
}
# Overall status
unhealthy = [k for k, v in health["components"].items()
if v.get("status") != "healthy"]
health["overall"] = "healthy" if not unhealthy else "degraded"
health["issues"] = unhealthy
return health
def export_compliance_report(self, start_date: str,
end_date: str) -> dict:
"""Export compliance report for audit period."""
from compliance_validator import ComplianceValidator
validator = ComplianceValidator(self.audit_logger, self.data_classifier)
return validator.generate_compliance_report(start_date, end_date)
Configuration File
// suite_config.json
{
"ai_model": "llama3",
"document_path": "documents/incoming",
"output_path": "output/processed",
"log_path": "logs",
"temp_path": "temp",
"audit_path": "audit_log.jsonl",
"retention_days": 365,
"reports": {
"daily_summary": {
"command": "python3 /opt/reports/daily_summary.py",
"timeout": 300,
"schedule": "0 6 * * *"
},
"weekly_metrics": {
"command": "python3 /opt/reports/weekly_metrics.py",
"timeout": 600,
"schedule": "0 7 * * 1"
}
},
"alerting": {
"webhook_url": "https://hooks.example.com/alerts",
"severity_thresholds": {
"critical_response_minutes": 5,
"warning_response_minutes": 30
}
},
"approval": {
"threshold_amount": 10000,
"timeout_hours": 24
}
}
Main Entry Point
# main.py
#!/usr/bin/env python3
"""
Business Automation Suite - Main Entry Point
"""
import sys
import argparse
from pathlib import Path
def main():
parser = argparse.ArgumentParser(description="Business Automation Suite")
parser.add_argument("command", choices=[
"process", "report", "health", "audit", "compliance"
], help="Command to execute")
parser.add_argument("--file", help="Document file to process")
parser.add_argument("--report-type", help="Type of report to generate")
parser.add_argument("--period", help="Report period (daily, weekly, monthly)")
parser.add_argument("--start-date", help="Start date for reports (ISO format)")
parser.add_argument("--end-date", help="End date for reports (ISO format)")
args = parser.parse_args()
# Initialize suite
from business_automation_suite import BusinessAutomationSuite
suite = BusinessAutomationSuite()
if args.command == "process":
if not args.file:
print("Error: --file required for process command")
sys.exit(1)
result = suite.process_document(args.file)
print(json.dumps(result, indent=2))
elif args.command == "report":
if not args.report_type:
print("Error: --report-type required for report command")
sys.exit(1)
result = suite.run_report(args.report_type, args.period or "daily")
print(json.dumps(result, indent=2))
elif args.command == "health":
health = suite.get_system_health()
print(json.dumps(health, indent=2))
elif args.command == "audit":
entries = suite.audit_logger.query(limit=100)
print(json.dumps(entries, indent=2))
elif args.command == "compliance":
if not args.start_date or not args.end_date:
print("Error: --start-date and --end-date required for compliance command")
sys.exit(1)
report = suite.export_compliance_report(args.start_date, args.end_date)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
Running the Suite
#!/bin/bash
# run_suite.sh
# Set up environment
export PYTHONPATH=/opt/business-automation:$PYTHONPATH
# Ensure Ollama is running
if ! curl -s http://localhost:11434/api/tags > /dev/null 2>&1; then
echo "Warning: Ollama not responding at http://localhost:11434"
fi
# Run the suite
python3 main.py "$@"
# Example usage:
# Process a document
./run_suite.sh process --file documents/incoming/invoice_2024_001.txt
# Generate daily report
./run_suite.sh report --report-type daily_summary --period daily
# Check system health
./run_suite.sh health
# View recent audit entries
./run_suite.sh audit
# Generate compliance report
./run_suite.sh compliance --start-date 2024-01-01 --end-date 2024-01-31
Systemd Service Configuration
# /etc/systemd/system/business-automation.service
[Unit]
Description=Business Automation Suite
After=network.target ollama.service
[Service]
Type=simple
User=automation
Group=automation
WorkingDirectory=/opt/business-automation
Environment=PYTHONPATH=/opt/business-automation
ExecStart=/usr/bin/python3 /opt/business-automation/main.py serve
Restart=on-failure
RestartSec=10
# Security hardening
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/opt/business-automation/output /opt/business-automation/logs
[Install]
WantedBy=multi-user.target
Testing the System
# tests/test_suite.py
import pytest
import json
from pathlib import Path
from business_automation_suite import BusinessAutomationSuite
class TestBusinessAutomationSuite:
@pytest.fixture
def suite(self, tmp_path):
"""Create suite instance with test configuration."""
config = {
"ai_model": "llama3",
"document_path": str(tmp_path / "docs"),
"output_path": str(tmp_path / "output"),
"log_path": str(tmp_path / "logs"),
"temp_path": str(tmp_path / "temp"),
"audit_path": str(tmp_path / "audit.jsonl"),
"retention_days": 30
}
config_file = tmp_path / "config.json"
with open(config_file, "w") as f:
json.dump(config, f)
return BusinessAutomationSuite(str(config_file))
def test_initialization(self, suite):
"""Test suite initializes all components."""
assert suite.error_handler is not None
assert suite.audit_logger is not None
assert suite.alerting is not None
def test_health_check(self, suite):
"""Test system health endpoint."""
health = suite.get_system_health()
assert "timestamp" in health
assert "components" in health
assert "overall" in health
def test_document_processing(self, suite, tmp_path):
"""Test document processing pipeline."""
# Create test document
doc_path = tmp_path / "test_doc.txt"
with open(doc_path, "w") as f:
f.write("Test document with sample content for processing.")
result = suite.process_document(str(doc_path))
assert "status" in result
def test_audit_logging(self, suite):
"""Test audit log creation."""
entries = suite.audit_logger.query(limit=10)
assert isinstance(entries, list)
Deployment Checklist
Infrastructure
- Server with adequate resources (8GB+ RAM recommended)
- Ollama installed and models downloaded
- Python 3.10+ installed
- Systemd configured for service management
Configuration
- Suite configuration file in production location
- Secrets stored securely (not in config files)
- Webhook URLs configured for alerting
- Retention policies set appropriately
Security
- Access control configured
- Audit log integrity verified
- Data classification policies defined
- MFA enabled for admin accounts
Monitoring
- Health check endpoint operational
- Alert routing tested
- Error rates within acceptable thresholds
Documentation
- Runbook created for common failures
- Escalation contacts documented
- Compliance requirements identified
EXERCISE
Extend the Business Automation Suite to support multiple AI models with automatic fallback. Add configuration that routes requests to different models based on content classification, and implement load balancing across model instances.