KEY INSIGHT
Scheduled tasks integrate with the agent's task system, enabling background execution with proper state management, error handling, and result reporting.
The scheduler manages tasks that run at specified times or intervals. Each scheduled task includes the task logic, schedule definition, and configuration for retries, timeouts, and notifications.
Scheduled Task Structure
```python
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Awaitable, Any
import asyncio
import uuid
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ScheduledTask:
"""Definition of a scheduled task."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
description: str = ""
schedule: CronSchedule
handler: Callable[..., Awaitable[Any]]
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
# Execution configuration
max_retries: int = 3
retry_delay_seconds: int = 60
timeout_seconds: int = 300
# State
status: TaskStatus = TaskStatus.PENDING
last_run: datetime | None = None
next_run: datetime | None = None
last_error: str | None = None
run_count: int = 0
class TaskScheduler:
"""Manages scheduled task execution."""
def __init__(self, agent: "OpenCLawAgent"):
self.agent = agent
self.tasks: dict[str, ScheduledTask] = {}
self._running_tasks: set[str] = set()
self._should_stop = False
def schedule(self, task: ScheduledTask) -> str:
"""Add a task to the schedule."""
task.next_run = task.schedule.next_run(datetime.now())
self.tasks[task.id] = task
return task.id
async def run(self) -> None:
"""Main scheduler loop."""
while not self._should_stop:
try:
await self._check_and_run_tasks()
except Exception as e:
self.agent.logger.error(f"Scheduler error: {e}")
await asyncio.sleep(10) # Check every 10 seconds
async def _check_and_run_tasks(self) -> None:
"""Check for tasks that need to run and execute them."""
now = datetime.now()
for task_id, task in self.tasks.items():
if task.status == TaskStatus.RUNNING:
continue
if task.next_run and task.next_run <= now:
await self._execute_task(task)
async def _execute_task(self, task: ScheduledTask) -> None:
"""Execute a task with error handling and retries."""
task.status = TaskStatus.RUNNING
self._running_tasks.add(task.id)
for attempt in range(task.max_retries + 1):
try:
result = await asyncio.wait_for(
task.handler(*task.args, **task.kwargs),
timeout=task.timeout_seconds
)
task.status = TaskStatus.COMPLETED
task.last_run = datetime.now()
task.next_run = task.schedule.next_run(task.last_run)
task.last_error = None
task.run_count += 1
break
except asyncio.TimeoutError:
task.last_error = f"Timeout after {task.timeout_seconds}s"
except Exception as e:
task.last_error = str(e)
if attempt < task.max_retries:
await asyncio.sleep(task.retry_delay_seconds)
else:
task.status = TaskStatus.FAILED
self._running_tasks.discard(task.id)
```
Task Persistence
Scheduled tasks and their state must persist across agent restarts. The scheduler saves task definitions and state to the database. On restart, it loads tasks and recalculates next run times based on the current time.
```python
async def save_state(self) -> None:
"""Persist scheduler state to database."""
for task in self.tasks.values():
await self.agent.db.execute("""
INSERT OR REPLACE INTO scheduled_tasks
(id, name, status, last_run, next_run, last_error, run_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
task.id, task.name, task.status.value,
task.last_run, task.next_run, task.last_error, task.run_count
))
async def load_state(self) -> None:
"""Restore scheduler state from database."""
rows = await self.agent.db.execute(
"SELECT * FROM scheduled_tasks"
)
for row in rows:
# Reconstruct task and recalculate next_run
task = self._reconstruct_task(row)
task.next_run = task.schedule.next_run(datetime.now())
self.tasks[task.id] = task
```