RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Business Automation with Local AI
  6. /Ch. 11
Business Automation with Local AI

11. Scheduling Reports

Chapter 11 of 18 · 25 min
KEY INSIGHT

Schedule reports for times when recipients are available to act on them. Morning reports work for daily operations; end-of-week reports suit strategic planning.

Automated reporting transforms raw data pipelines into actionable intelligence. A well-designed reporting scheduler handles execution timing, error recovery, and distribution without manual intervention.

Cron-Based Scheduling

The simplest scheduling approach uses system cron with wrapper scripts.

# /etc/cron.d/ai-reports
# Daily sales report at 6 AM
0 6 * * * root /opt/reports/run_report.sh sales_daily >> /var/log/reports/sales.log 2>&1

# Weekly summary every Monday at 7 AM
0 7 * * 1 root /opt/reports/run_report.sh weekly_summary >> /var/log/reports/weekly.log 2>&1

# Hourly data freshness check
0 * * * * root /opt/reports/check_freshness.sh >> /var/log/reports/freshness.log 2>&1
#!/bin/bash
# run_report.sh
REPORT_TYPE="${1}"
REPORT_DIR="/opt/reports"
LOG_FILE="/var/log/reports/${REPORT_TYPE}.log"

echo "$(date): Starting ${REPORT_TYPE} report" >> "$LOG_FILE"

cd "$REPORT_DIR" || exit 1
python3 generate_report.py "$REPORT_TYPE" >> "$LOG_FILE" 2>&1

if [ $? -eq 0 ]; then
    echo "$(date): Completed successfully" >> "$LOG_FILE"
else
    echo "$(date): Failed with exit code $?" >> "$LOG_FILE"
    # Trigger alerting
    /opt/reports/alert_failure.sh "$REPORT_TYPE"
    exit 1
fi

Python Scheduler with Job Management

For more complex scheduling logic, implement a Python-based scheduler.

# scheduler.py
import json
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class ScheduledJob:
    name: str
    command: str
    schedule: str  # cron format: "minute hour day month weekday"
    timeout_seconds: int = 3600
    retry_count: int = 3
    notification_webhook: Optional[str] = None
    last_run: Optional[str] = None
    last_status: Optional[str] = None
    
class ReportScheduler:
    def __init__(self, config_path: str = "jobs.json"):
        self.jobs = self._load_jobs(config_path)
        self.running_jobs = {}
        self.execution_log = Path("execution_log.jsonl")
    
    def _load_jobs(self, path: str) -> list[ScheduledJob]:
        with open(path) as f:
            data = json.load(f)
        return [ScheduledJob(**job) for job in data["jobs"]]
    
    def _matches_schedule(self, job: ScheduledJob) -> bool:
        """Check if job should run now based on cron schedule."""
        now = datetime.now()
        minute, hour, day, month, weekday = job.schedule.split()
        
        if not self._match_cron_field(minute, now.minute):
            return False
        if not self._match_cron_field(hour, now.hour):
            return False
        if not self._match_cron_field(day, now.day):
            return False
        if not self._match_cron_field(month, now.month):
            return False
        if not self._match_cron_field(weekday, now.weekday()):
            return False
        
        return True
    
    def _match_cron_field(self, pattern: str, value: int) -> bool:
        """Match a single cron field against a value."""
        if pattern == "*":
            return True
        if "," in pattern:
            return value in [int(x) for x in pattern.split(",")]
        if "/" in pattern:
            step, divisor = pattern.split("/")
            return value % int(divisor) == 0
        return int(pattern) == value
    
    def run_job(self, job: ScheduledJob) -> dict:
        """Execute a scheduled job with timeout and error handling."""
        start_time = datetime.now()
        status = "unknown"
        error_message = None
        
        self.running_jobs[job.name] = {
            "started_at": start_time.isoformat(),
            "pid": None
        }
        
        try:
            result = subprocess.run(
                job.command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=job.timeout_seconds
            )
            status = "success" if result.returncode == 0 else "failed"
            error_message = result.stderr if result.returncode != 0 else None
        except subprocess.TimeoutExpired:
            status = "timeout"
            error_message = f"Execution exceeded {job.timeout_seconds}s limit"
        except Exception as e:
            status = "error"
            error_message = str(e)
        finally:
            del self.running_jobs[job.name]
        
        end_time = datetime.now()
        execution_record = {
            "job": job.name,
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat(),
            "duration_seconds": (end_time - start_time).total_seconds(),
            "status": status,
            "error": error_message
        }
        
        self._log_execution(execution_record)
        job.last_run = start_time.isoformat()
        job.last_status = status
        
        if status != "success" and job.notification_webhook:
            self._send_alert(job, execution_record)
        
        return execution_record
    
    def _log_execution(self, record: dict):
        """Append execution record to log."""
        with open(self.execution_log, "a") as f:
            f.write(json.dumps(record) + "\n")
    
    def _send_alert(self, job: ScheduledJob, record: dict):
        """Send notification for failed job."""
        import requests
        requests.post(job.notification_webhook, json={
            "job": job.name,
            "status": record["status"],
            "error": record.get("error"),
            "duration": record.get("duration_seconds")
        })
    
    def run_scheduler_loop(self):
        """Main scheduler loop."""
        while True:
            for job in self.jobs:
                if self._matches_schedule(job):
                    # Check if already running
                    if job.name not in self.running_jobs:
                        result = self.run_job(job)
                        print(f"{job.name}: {result['status']}")
            
            time.sleep(60)  # Check every minute
// jobs.json
{
  "jobs": [
    {
      "name": "daily_sales_report",
      "command": "python3 /opt/reports/generate_report.py --type sales --date yesterday",
      "schedule": "0 6 * * *",
      "timeout_seconds": 1800,
      "retry_count": 2,
      "notification_webhook": "https://hooks.example.com/alerts"
    },
    {
      "name": "inventory_alert",
      "command": "python3 /opt/reports/check_inventory.py",
      "schedule": "0 8 * * 1-5",
      "timeout_seconds": 600,
      "retry_count": 1
    },
    {
      "name": "weekly_summary",
      "command": "/opt/reports/weekly_summary.sh",
      "schedule": "0 7 * * 1",
      "timeout_seconds": 3600,
      "retry_count": 3,
      "notification_webhook": "https://hooks.example.com/alerts"
    }
  ]
}

Monitoring and Health Checks

#!/bin/bash
# check_scheduler_health.sh
# Run via separate cron: * * * * * /opt/reports/check_scheduler_health.sh

PIDFILE="/var/run/scheduler.pid"
LOGFILE="/var/log/reports/scheduler_health.log"

if [ -f "$PIDFILE" ]; then
    PID=$(cat "$PIDFILE")
    if ! kill -0 "$PID" 2>/dev/null; then
        echo "$(date): Scheduler not running, restarting" >> "$LOGFILE"
        cd /opt/reports
        nohup python3 scheduler.py >> /var/log/reports/scheduler.log 2>&1 &
        echo $! > "$PIDFILE"
    fi
else
    echo "$(date): PID file missing, starting scheduler" >> "$LOGFILE"
    cd /opt/reports
    nohup python3 scheduler.py >> /var/log/reports/scheduler.log 2>&1 &
    echo $! > "$PIDFILE"
fi
EXERCISE

Create a scheduler that runs a report generation script every 15 minutes during business hours (9 AM to 5 PM, Monday through Friday) and sends alerts if execution time exceeds 5 minutes.

← Chapter 10
Approval Workflows
Chapter 12 →
BI Tool Integration