11. Task Planner
The task planner takes a high-level goal and decomposes it into executable steps with clear tool calls and expected outcomes. It's where "help me migrate our database to a new region" becomes "run pg_dump, upload to S3, provision new instance, restore dump."
Decomposition strategy:
class TaskPlanner:
def __init__(self, llm: LLMInterface, tools: ToolRegistry):
self.llm = llm
self.tools = tools
async def decompose(
self,
task: str,
context: dict[str, Any] = None
) -> list[TaskStep]:
tools_schemas = self.tools.schemas()
prompt = f"""Decompose this task into steps. Each step must use exactly one tool from the available tools.
Task: {task}
Available tools:
{json.dumps(tools_schemas, indent=2)}
{"Additional context: " + json.dumps(context) if context else ""}
Output format:
{{
"steps": [
{{
"tool_name": "tool_name",
"tool_args": {{"param": "value"}},
"expected_outcome": "What this step accomplishes",
"rollback_on_failure": "How to undo if this step fails"
}}
],
"requires_confirmation": true/false,
"risky_steps": ["step descriptions that need human approval"]
}}"""
response = await self.llm.generate(prompt)
return self._parse_steps(response)
def _parse_steps(self, response: str) -> list[TaskStep]:
data = json.loads(response)
return [
TaskStep(
tool_name=s["tool_name"],
tool_args=s["tool_args"],
expected_outcome=s.get("expected_outcome", ""),
rollback=s.get("rollback_on_failure")
)
for s in data["steps"]
]
TaskStep with confirmation:
@dataclass
class TaskStep:
tool_name: str
tool_args: dict[str, Any]
expected_outcome: str
rollback: Optional[str] = None
status: str = "pending"
requires_confirmation: bool = False
@dataclass
class TaskPlan:
steps: list[TaskStep]
requires_confirmation: bool = False
risky_steps: list[str] = field(default_factory=list)
Executor with confirmation handling:
class TaskExecutor:
def __init__(self, registry: ToolRegistry):
self.registry = registry
async def execute_with_confirmation(
self,
plan: TaskPlan,
confirm_fn: Callable[[list[str]], Awaitable[list[bool]]]
) -> ExecutionResult:
risky_indices = [
i for i, s in enumerate(plan.steps) if s in plan.risky_steps
]
if risky_indices:
confirmations = await confirm_fn(
[plan.steps[i].expected_outcome for i in risky_indices]
)
for idx, approved in zip(risky_indices, confirmations):
if not approved:
return ExecutionResult(
success=False,
failed_step=idx,
error="User rejected risky step"
)
results = []
for i, step in enumerate(plan.steps):
step.status = "executing"
try:
tool = self.registry.get(step.tool_name)
result = await tool.handler(**step.tool_args)
step.status = "completed"
results.append(StepResult(step=i, success=True, output=result))
except Exception as e:
step.status = "failed"
# Attempt rollback
if step.rollback:
await self._execute_rollback(step.rollback, results)
return ExecutionResult(
success=False,
failed_step=i,
error=str(e),
partial_results=results
)
return ExecutionResult(success=True, results=results)
Failure mode: missing context. The planner doesn't know the current state of systems. "Provision new instance" might fail if the IP range is exhausted. Build context gathering into the planning phase—planner asks tools for state before committing to a plan.
Design a multi-step task your agent should handle (at least 5 steps). Write the decomposition prompt and test whether the planner produces valid, executable steps for each.