RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /OpenCLaw: Building a Personal AI Agent
  6. /Ch. 12
OpenCLaw: Building a Personal AI Agent

12. Scheduled Tasks

Chapter 12 of 24 · 20 min
KEY INSIGHT

Scheduled tasks integrate with the agent's task system, enabling background execution with proper state management, error handling, and result reporting. The scheduler manages tasks that run at specified times or intervals. Each scheduled task includes the task logic, schedule definition, and configuration for retries, timeouts, and notifications. Scheduled Task Structure ```python from dataclasses import dataclass, field from enum import Enum from typing import Callable, Awaitable, Any import asyncio import uuid class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" @dataclass class ScheduledTask: """Definition of a scheduled task.""" id: str = field(default_factory=lambda: str(uuid.uuid4())) name: str = "" description: str = "" schedule: CronSchedule handler: Callable[..., Awaitable[Any]] args: tuple = field(default_factory=tuple) kwargs: dict = field(default_factory=dict) # Execution configuration max_retries: int = 3 retry_delay_seconds: int = 60 timeout_seconds: int = 300 # State status: TaskStatus = TaskStatus.PENDING last_run: datetime | None = None next_run: datetime | None = None last_error: str | None = None run_count: int = 0 class TaskScheduler: """Manages scheduled task execution.""" def __init__(self, agent: "OpenCLawAgent"): self.agent = agent self.tasks: dict[str, ScheduledTask] = {} self._running_tasks: set[str] = set() self._should_stop = False def schedule(self, task: ScheduledTask) -> str: """Add a task to the schedule.""" task.next_run = task.schedule.next_run(datetime.now()) self.tasks[task.id] = task return task.id async def run(self) -> None: """Main scheduler loop.""" while not self._should_stop: try: await self._check_and_run_tasks() except Exception as e: self.agent.logger.error(f"Scheduler error: {e}") await asyncio.sleep(10) # Check every 10 seconds async def _check_and_run_tasks(self) -> None: """Check for tasks that need to run and execute them.""" now = datetime.now() for task_id, task in self.tasks.items(): if task.status == TaskStatus.RUNNING: continue if task.next_run and task.next_run <= now: await self._execute_task(task) async def _execute_task(self, task: ScheduledTask) -> None: """Execute a task with error handling and retries.""" task.status = TaskStatus.RUNNING self._running_tasks.add(task.id) for attempt in range(task.max_retries + 1): try: result = await asyncio.wait_for( task.handler(*task.args, **task.kwargs), timeout=task.timeout_seconds ) task.status = TaskStatus.COMPLETED task.last_run = datetime.now() task.next_run = task.schedule.next_run(task.last_run) task.last_error = None task.run_count += 1 break except asyncio.TimeoutError: task.last_error = f"Timeout after {task.timeout_seconds}s" except Exception as e: task.last_error = str(e) if attempt < task.max_retries: await asyncio.sleep(task.retry_delay_seconds) else: task.status = TaskStatus.FAILED self._running_tasks.discard(task.id) ``` Task Persistence Scheduled tasks and their state must persist across agent restarts. The scheduler saves task definitions and state to the database. On restart, it loads tasks and recalculates next run times based on the current time. ```python async def save_state(self) -> None: """Persist scheduler state to database.""" for task in self.tasks.values(): await self.agent.db.execute(""" INSERT OR REPLACE INTO scheduled_tasks (id, name, status, last_run, next_run, last_error, run_count) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( task.id, task.name, task.status.value, task.last_run, task.next_run, task.last_error, task.run_count )) async def load_state(self) -> None: """Restore scheduler state from database.""" rows = await self.agent.db.execute( "SELECT * FROM scheduled_tasks" ) for row in rows: # Reconstruct task and recalculate next_run task = self._reconstruct_task(row) task.next_run = task.schedule.next_run(datetime.now()) self.tasks[task.id] = task ```

EXERCISE

Design a scheduled task that sends a daily summary email to the user. Include the task implementation, cron schedule, error handling for email delivery failures, and retry logic. Explain how you would test this without actually sending emails. ```

← Chapter 11
Cron Scheduling
Chapter 13 →
Learning from Feedback