KEY INSIGHT
Decomposition quality determines system performance more than agent sophistication; well-structured tasks enable parallelism while poorly structured tasks create bottlenecks.
Task decomposition breaks complex requests into manageable subtasks. A supervisor agent receives a complex goal and decides how to divide work among specialized agents. Effective decomposition enables parallel execution, isolates failures, and simplifies each agent's job.
Decomposition strategies depend on task structure. Sequential tasks with dependencies require careful ordering; independent subtasks can parallelize. Tasks requiring different capabilities naturally decompose along capability boundaries. Tasks with nested scope require hierarchical decomposition.
Goal-oriented decomposition starts from the final deliverable and works backward. What final output does the user need? What inputs are required to produce it? What intermediate results feed those inputs? This recursive decomposition continues until reaching tasks that existing agents can execute directly.
Capability-based decomposition maps tasks to agent capabilities. Each agent declares what it can do; decomposition selects agents whose capabilities match subtasks. This approach scales as new agents join—decomposition logic automatically leverages new capabilities.
Decomposition must handle uncertainty about task difficulty. Some requests decompose into obvious subtasks; others reveal complexity only during execution. Adaptive decomposition adjusts granularity based on early results—simple subtasks may not need further decomposition while complex ones break down further.
Subtask specification requires clear interfaces. Each subtask should define inputs, expected outputs, quality criteria, and failure modes. Vague subtask definitions leads to mismatched expectations and integration failures.
```python
from dataclasses import dataclass, field
from typing import Optional, Any, Callable
from enum import Enum
import asyncio
class TaskType(Enum):
PRIMITIVE = "primitive" # Can be executed by existing agent
COMPOSITE = "composite" # Requires further decomposition
CONDITIONAL = "conditional" # Branch based on runtime conditions
@dataclass
class Subtask:
id: str
name: str
type: TaskType
input_spec: dict
output_spec: dict
dependencies: list[str] = field(default_factory=list)
required_capabilities: list[str] = field(default_factory=list)
quality_threshold: float = 0.8
children: list["Subtask"] = field(default_factory=list)
@dataclass
class DecompositionResult:
root_task: Subtask
execution_plan: list[list[Subtask]] # Batches of parallelizable tasks
estimated_duration_seconds: float
class TaskDecomposer:
def __init__(self, capability_registry: dict[str, list[str]]):
self.capabilities = capability_registry # agent_name -> supported_capabilities
async def decompose(
self,
goal: str,
constraints: dict
) -> DecompositionResult:
"""Main decomposition entry point"""
# Phase 1: Initial decomposition based on goal analysis
initial_tasks = await self._analyze_goal(goal, constraints)
# Phase 2: Expand composite tasks
expanded = await self._expand_composites(initial_tasks)
# Phase 3: Analyze dependencies
dependency_graph = self._build_dependency_graph(expanded)
# Phase 4: Generate execution plan (batched for parallelization)
execution_plan = self._generate_execution_plan(dependency_graph)
# Phase 5: Estimate duration
duration = self._estimate_duration(execution_plan)
root = Subtask(
id="root",
name=goal,
type=TaskType.COMPOSITE,
input_spec={},
output_spec={},
children=expanded
)
return DecompositionResult(
root_task=root,
execution_plan=execution_plan,
estimated_duration_seconds=duration
)
async def _analyze_goal(self, goal: str, constraints: dict) -> list[Subtask]:
"""Initial goal analysis and task identification"""
# Prompt-based analysis (would use actual LLM in production)
analysis_prompt = f"""Analyze this goal and identify subtasks:
Goal: {goal}
Constraints: {constraints}
Identify:
1. Main phases or steps
2. Required capabilities for each step
3. Clear interfaces between steps
4. Potential parallelization opportunities"""
# In production, this calls an analysis model
tasks = await self._call_analysis_model(analysis_prompt)
return tasks
async def _expand_composites(self, tasks: list[Subtask]) -> list[Subtask]:
"""Recursively expand composite tasks"""
expanded = []
for task in tasks:
if task.type == TaskType.COMPOSITE:
# Check if we have matching agents
has_agent = any(
cap in self.capabilities.get(agent, [])
for agent in self.capabilities
for cap in task.required_capabilities
)
if not has_agent:
# Need further decomposition
subtasks = await self._decompose_further(task)
expanded.extend(await self._expand_composites(subtasks))
else:
expanded.append(task)
else:
expanded.append(task)
return expanded
def _build_dependency_graph(self, tasks: list[Subtask]) -> dict[str, list[str]]:
"""Analyze which tasks depend on outputs of other tasks"""
# Simple implementation - real version would analyze data flow
dependencies = {}
for task in tasks:
# Tasks without declared dependencies can run immediately
if task.dependencies:
dependencies[task.id] = task.dependencies
else:
dependencies[task.id] = []
return dependencies
def _generate_execution_plan(self, dependency_graph: dict) -> list[list[Subtask]]:
"""Topologically sort and batch tasks by parallelism potential"""
# Build adjacency lists
in_degree = {task_id: 0 for task_id in dependency_graph}
for deps in dependency_graph.values():
for dep in deps:
in_degree[dep] = in_degree.get(dep, 0) + 1
plan = []
remaining = set(dependency_graph.keys())
while remaining:
# Find all tasks with no dependencies
ready = [
task_id for task_id in remaining
if in_degree.get(task_id, 0) == 0
]
if not ready:
# Circular dependency - break cycle
break
plan.append(ready)
# Remove ready tasks and update degrees
for task_id in ready:
remaining.remove(task_id)
for other_task, deps in dependency_graph.items():
if task_id in deps:
in_degree[other_task] -= 1
return plan
def _estimate_duration(self, plan: list[list[str]]) -> float:
"""Estimate total execution time based on task batches"""
# Simplified - real implementation would use historical timing
return len(plan) * 30.0 # Assume 30 seconds per batch
async def _call_analysis_model(self, prompt: str) -> list[Subtask]:
"""Call analysis model to identify task structure"""
# Placeholder - in production, calls actual model
return []
async def _decompose_further(self, task: Subtask) -> list[Subtask]:
"""Further decompose a composite task"""
# Placeholder - recursive decomposition logic
return []
# Execution engine for decomposed tasks
class TaskExecutor:
def __init__(self, agent_registry: dict[str, Any]):
self.agents = agent_registry
async def execute_plan(self, plan: list[list[Subtask]]) -> dict[str, Any]:
results = {}
for batch in plan:
# Execute all tasks in batch concurrently
batch_results = await asyncio.gather(
*[self._execute_task(task, results) for task in batch],
return_exceptions=True
)
for task, result in zip(batch, batch_results):
if isinstance(result, Exception):
results[task.id] = {"status": "failed", "error": str(result)}
else:
results[task.id] = {"status": "success", "result": result}
return results
async def _execute_task(
self,
task: Subtask,
prior_results: dict[str, Any]
) -> Any:
"""Execute a single task using appropriate agent"""
# Gather inputs from dependencies
inputs = {}
for dep_id in task.dependencies:
inputs.update(prior_results.get(dep_id, {}).get("result", {}))
# Find capable agent
agent = self._find_agent(task.required_capabilities)
if not agent:
raise ValueError(f"No agent found for capabilities: {task.required_capabilities}")
# Execute with combined inputs
return await agent.execute(
task.input_spec,
context=inputs
)
def _find_agent(self, required_capabilities: list[str]) -> Optional[Any]:
"""Find first agent supporting required capabilities"""
for agent_name, capabilities in self.agents.items():
if all(cap in capabilities for cap in required_capabilities):
return self.agents[agent_name]
return None
```