13. Task Decomposition

Chapter 13 of 16 · 20 min

Task decomposition breaks a complex goal into smaller, independently executable subtasks. Unlike simple planning (which outputs a step list), decomposition often creates a tree or dependency graph of subgoals.

Recursive decomposition

def decompose_task(task: str, model, depth: int = 3, max_children: int = 5) -> dict:
    """Recursively decompose a task into subgoals"""
    if depth == 0:
        return {"type": "action", "description": task}
    
    prompt = (
        f"Break down the following task into {between 3 and max_children} independent subtasks. "
        f"Each subtask should be concrete enough to execute with one or two tool calls. "
        f"If the task is atomic, return it unchanged.\n\nTask: {task}"
    )
    
    response = model.chat([{"role": "user", "content": prompt}])
    
    # Parse subtasks from response
    subtasks = parse_task_list(response.content)
    
    if len(subtasks) == 1 and is_atomic(subtasks[0]):
        return {"type": "action", "description": subtasks[0]}
    
    return {
        "type": "composite",
        "description": task,
        "subtasks": [decompose_task(s, model, depth - 1, max_children) for s in subtasks]
    }

Task graph execution

Once decomposed, the agent executes the graph:

def execute_graph(task_node: dict, tools: list, model) -> str:
    if task_node["type"] == "action":
        return execute_single_action(task_node["description"], tools, model)
    
    if task_node["type"] == "composite":
        results = []
        for subtask in task_node["subtasks"]:
            result = execute_graph(subtask, tools, model)
            results.append(result)
        return aggregate_results(results, model)

Dependency detection

Some subtasks depend on others. Use shared context passing:

class SharedContext(basics):
    def __init__(self):
        self.store = {}
    
    def set(self, key: str, value: Any):
        self.store[key] = value
    
    def get(self, key: str) -> Optional[Any]:
        return self.store.get(key)

When a subtask produces a result that another subtask needs, store it in the shared context:

# Subtask 1: Find current exchange rate
usd_to_eur = execute_graph({"type": "action", "description": "Get USD to EUR exchange rate"}, ...)
ctx.set("exchange_rate", usd_to_eur)

# Subtask 2: Convert amount (reads from context)
convert_task = f"Convert to USD using rate: {ctx.get('exchange_rate')}"
EXERCISE

Decompose the task "Write a summary report comparing renewable energy adoption in three countries." Execute the decomposition and draw the resulting task graph. Then execute each leaf node in the correct dependency order.