11. Scheduling Reports
Chapter 11 of 18 · 25 min
Automated reporting transforms raw data pipelines into actionable intelligence. A well-designed reporting scheduler handles execution timing, error recovery, and distribution without manual intervention.
Cron-Based Scheduling
The simplest scheduling approach uses system cron with wrapper scripts.
# /etc/cron.d/ai-reports
# Daily sales report at 6 AM
0 6 * * * root /opt/reports/run_report.sh sales_daily >> /var/log/reports/sales.log 2>&1
# Weekly summary every Monday at 7 AM
0 7 * * 1 root /opt/reports/run_report.sh weekly_summary >> /var/log/reports/weekly.log 2>&1
# Hourly data freshness check
0 * * * * root /opt/reports/check_freshness.sh >> /var/log/reports/freshness.log 2>&1
#!/bin/bash
# run_report.sh
REPORT_TYPE="${1}"
REPORT_DIR="/opt/reports"
LOG_FILE="/var/log/reports/${REPORT_TYPE}.log"
echo "$(date): Starting ${REPORT_TYPE} report" >> "$LOG_FILE"
cd "$REPORT_DIR" || exit 1
python3 generate_report.py "$REPORT_TYPE" >> "$LOG_FILE" 2>&1
if [ $? -eq 0 ]; then
echo "$(date): Completed successfully" >> "$LOG_FILE"
else
echo "$(date): Failed with exit code $?" >> "$LOG_FILE"
# Trigger alerting
/opt/reports/alert_failure.sh "$REPORT_TYPE"
exit 1
fi
Python Scheduler with Job Management
For more complex scheduling logic, implement a Python-based scheduler.
# scheduler.py
import json
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class ScheduledJob:
name: str
command: str
schedule: str # cron format: "minute hour day month weekday"
timeout_seconds: int = 3600
retry_count: int = 3
notification_webhook: Optional[str] = None
last_run: Optional[str] = None
last_status: Optional[str] = None
class ReportScheduler:
def __init__(self, config_path: str = "jobs.json"):
self.jobs = self._load_jobs(config_path)
self.running_jobs = {}
self.execution_log = Path("execution_log.jsonl")
def _load_jobs(self, path: str) -> list[ScheduledJob]:
with open(path) as f:
data = json.load(f)
return [ScheduledJob(**job) for job in data["jobs"]]
def _matches_schedule(self, job: ScheduledJob) -> bool:
"""Check if job should run now based on cron schedule."""
now = datetime.now()
minute, hour, day, month, weekday = job.schedule.split()
if not self._match_cron_field(minute, now.minute):
return False
if not self._match_cron_field(hour, now.hour):
return False
if not self._match_cron_field(day, now.day):
return False
if not self._match_cron_field(month, now.month):
return False
if not self._match_cron_field(weekday, now.weekday()):
return False
return True
def _match_cron_field(self, pattern: str, value: int) -> bool:
"""Match a single cron field against a value."""
if pattern == "*":
return True
if "," in pattern:
return value in [int(x) for x in pattern.split(",")]
if "/" in pattern:
step, divisor = pattern.split("/")
return value % int(divisor) == 0
return int(pattern) == value
def run_job(self, job: ScheduledJob) -> dict:
"""Execute a scheduled job with timeout and error handling."""
start_time = datetime.now()
status = "unknown"
error_message = None
self.running_jobs[job.name] = {
"started_at": start_time.isoformat(),
"pid": None
}
try:
result = subprocess.run(
job.command,
shell=True,
capture_output=True,
text=True,
timeout=job.timeout_seconds
)
status = "success" if result.returncode == 0 else "failed"
error_message = result.stderr if result.returncode != 0 else None
except subprocess.TimeoutExpired:
status = "timeout"
error_message = f"Execution exceeded {job.timeout_seconds}s limit"
except Exception as e:
status = "error"
error_message = str(e)
finally:
del self.running_jobs[job.name]
end_time = datetime.now()
execution_record = {
"job": job.name,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_seconds": (end_time - start_time).total_seconds(),
"status": status,
"error": error_message
}
self._log_execution(execution_record)
job.last_run = start_time.isoformat()
job.last_status = status
if status != "success" and job.notification_webhook:
self._send_alert(job, execution_record)
return execution_record
def _log_execution(self, record: dict):
"""Append execution record to log."""
with open(self.execution_log, "a") as f:
f.write(json.dumps(record) + "\n")
def _send_alert(self, job: ScheduledJob, record: dict):
"""Send notification for failed job."""
import requests
requests.post(job.notification_webhook, json={
"job": job.name,
"status": record["status"],
"error": record.get("error"),
"duration": record.get("duration_seconds")
})
def run_scheduler_loop(self):
"""Main scheduler loop."""
while True:
for job in self.jobs:
if self._matches_schedule(job):
# Check if already running
if job.name not in self.running_jobs:
result = self.run_job(job)
print(f"{job.name}: {result['status']}")
time.sleep(60) # Check every minute
// jobs.json
{
"jobs": [
{
"name": "daily_sales_report",
"command": "python3 /opt/reports/generate_report.py --type sales --date yesterday",
"schedule": "0 6 * * *",
"timeout_seconds": 1800,
"retry_count": 2,
"notification_webhook": "https://hooks.example.com/alerts"
},
{
"name": "inventory_alert",
"command": "python3 /opt/reports/check_inventory.py",
"schedule": "0 8 * * 1-5",
"timeout_seconds": 600,
"retry_count": 1
},
{
"name": "weekly_summary",
"command": "/opt/reports/weekly_summary.sh",
"schedule": "0 7 * * 1",
"timeout_seconds": 3600,
"retry_count": 3,
"notification_webhook": "https://hooks.example.com/alerts"
}
]
}
Monitoring and Health Checks
#!/bin/bash
# check_scheduler_health.sh
# Run via separate cron: * * * * * /opt/reports/check_scheduler_health.sh
PIDFILE="/var/run/scheduler.pid"
LOGFILE="/var/log/reports/scheduler_health.log"
if [ -f "$PIDFILE" ]; then
PID=$(cat "$PIDFILE")
if ! kill -0 "$PID" 2>/dev/null; then
echo "$(date): Scheduler not running, restarting" >> "$LOGFILE"
cd /opt/reports
nohup python3 scheduler.py >> /var/log/reports/scheduler.log 2>&1 &
echo $! > "$PIDFILE"
fi
else
echo "$(date): PID file missing, starting scheduler" >> "$LOGFILE"
cd /opt/reports
nohup python3 scheduler.py >> /var/log/reports/scheduler.log 2>&1 &
echo $! > "$PIDFILE"
fi
EXERCISE
Create a scheduler that runs a report generation script every 15 minutes during business hours (9 AM to 5 PM, Monday through Friday) and sends alerts if execution time exceeds 5 minutes.