RUNLOCALAIv38
->Will it run?Best GPUCompareTroubleshootStartLearnPulseModelsHardwareToolsBench
Run check
RUNLOCALAI

Independently operated catalog for local-AI hardware and software. Hand-written verdicts. Source-cited claims. Reproducible commands when we have them.

OP·Fredoline Eruo
DIR
  • Models
  • Hardware
  • Tools
  • Benchmarks
TOOLS
  • Will it run?
  • Compare hardware
  • Cost vs cloud
  • Choose my GPU
  • Prompting kits
  • Quick answers
REF
  • All buyer guides
  • Learn local AI
  • Methodology
  • Glossary
  • Errors KB
  • Trust
EDITOR
  • About
  • Author
  • How we make money
  • Editorial policy
  • Contact
LEGAL
  • Privacy
  • Terms
  • Sitemap
MAIL · MONTHLY DIGEST
Get monthly local AI changes
Monthly recap. No spam.
DISCLOSURE

Some links on this site are affiliate links (Amazon Associates and other first-class retailers). When you buy through them, we earn a small commission at no extra cost to you. Affiliate links do not influence our verdicts — there are cards we rate highly that we don't have affiliate relationships with, and cards that sell well that we refuse to recommend. Read more →

© 2026 runlocalai.coIndependently operated
RUNLOCALAI · v38
  1. >
  2. Home
  3. /Learn
  4. /Courses
  5. /Multi-Agent Systems
  6. /Ch. 9
Multi-Agent Systems

09. Task Decomposition

Chapter 9 of 24 · 25 min
KEY INSIGHT

Decomposition quality determines system performance more than agent sophistication; well-structured tasks enable parallelism while poorly structured tasks create bottlenecks. Task decomposition breaks complex requests into manageable subtasks. A supervisor agent receives a complex goal and decides how to divide work among specialized agents. Effective decomposition enables parallel execution, isolates failures, and simplifies each agent's job. Decomposition strategies depend on task structure. Sequential tasks with dependencies require careful ordering; independent subtasks can parallelize. Tasks requiring different capabilities naturally decompose along capability boundaries. Tasks with nested scope require hierarchical decomposition. Goal-oriented decomposition starts from the final deliverable and works backward. What final output does the user need? What inputs are required to produce it? What intermediate results feed those inputs? This recursive decomposition continues until reaching tasks that existing agents can execute directly. Capability-based decomposition maps tasks to agent capabilities. Each agent declares what it can do; decomposition selects agents whose capabilities match subtasks. This approach scales as new agents join—decomposition logic automatically leverages new capabilities. Decomposition must handle uncertainty about task difficulty. Some requests decompose into obvious subtasks; others reveal complexity only during execution. Adaptive decomposition adjusts granularity based on early results—simple subtasks may not need further decomposition while complex ones break down further. Subtask specification requires clear interfaces. Each subtask should define inputs, expected outputs, quality criteria, and failure modes. Vague subtask definitions leads to mismatched expectations and integration failures. ```python from dataclasses import dataclass, field from typing import Optional, Any, Callable from enum import Enum import asyncio class TaskType(Enum): PRIMITIVE = "primitive" # Can be executed by existing agent COMPOSITE = "composite" # Requires further decomposition CONDITIONAL = "conditional" # Branch based on runtime conditions @dataclass class Subtask: id: str name: str type: TaskType input_spec: dict output_spec: dict dependencies: list[str] = field(default_factory=list) required_capabilities: list[str] = field(default_factory=list) quality_threshold: float = 0.8 children: list["Subtask"] = field(default_factory=list) @dataclass class DecompositionResult: root_task: Subtask execution_plan: list[list[Subtask]] # Batches of parallelizable tasks estimated_duration_seconds: float class TaskDecomposer: def __init__(self, capability_registry: dict[str, list[str]]): self.capabilities = capability_registry # agent_name -> supported_capabilities async def decompose( self, goal: str, constraints: dict ) -> DecompositionResult: """Main decomposition entry point""" # Phase 1: Initial decomposition based on goal analysis initial_tasks = await self._analyze_goal(goal, constraints) # Phase 2: Expand composite tasks expanded = await self._expand_composites(initial_tasks) # Phase 3: Analyze dependencies dependency_graph = self._build_dependency_graph(expanded) # Phase 4: Generate execution plan (batched for parallelization) execution_plan = self._generate_execution_plan(dependency_graph) # Phase 5: Estimate duration duration = self._estimate_duration(execution_plan) root = Subtask( id="root", name=goal, type=TaskType.COMPOSITE, input_spec={}, output_spec={}, children=expanded ) return DecompositionResult( root_task=root, execution_plan=execution_plan, estimated_duration_seconds=duration ) async def _analyze_goal(self, goal: str, constraints: dict) -> list[Subtask]: """Initial goal analysis and task identification""" # Prompt-based analysis (would use actual LLM in production) analysis_prompt = f"""Analyze this goal and identify subtasks: Goal: {goal} Constraints: {constraints} Identify: 1. Main phases or steps 2. Required capabilities for each step 3. Clear interfaces between steps 4. Potential parallelization opportunities""" # In production, this calls an analysis model tasks = await self._call_analysis_model(analysis_prompt) return tasks async def _expand_composites(self, tasks: list[Subtask]) -> list[Subtask]: """Recursively expand composite tasks""" expanded = [] for task in tasks: if task.type == TaskType.COMPOSITE: # Check if we have matching agents has_agent = any( cap in self.capabilities.get(agent, []) for agent in self.capabilities for cap in task.required_capabilities ) if not has_agent: # Need further decomposition subtasks = await self._decompose_further(task) expanded.extend(await self._expand_composites(subtasks)) else: expanded.append(task) else: expanded.append(task) return expanded def _build_dependency_graph(self, tasks: list[Subtask]) -> dict[str, list[str]]: """Analyze which tasks depend on outputs of other tasks""" # Simple implementation - real version would analyze data flow dependencies = {} for task in tasks: # Tasks without declared dependencies can run immediately if task.dependencies: dependencies[task.id] = task.dependencies else: dependencies[task.id] = [] return dependencies def _generate_execution_plan(self, dependency_graph: dict) -> list[list[Subtask]]: """Topologically sort and batch tasks by parallelism potential""" # Build adjacency lists in_degree = {task_id: 0 for task_id in dependency_graph} for deps in dependency_graph.values(): for dep in deps: in_degree[dep] = in_degree.get(dep, 0) + 1 plan = [] remaining = set(dependency_graph.keys()) while remaining: # Find all tasks with no dependencies ready = [ task_id for task_id in remaining if in_degree.get(task_id, 0) == 0 ] if not ready: # Circular dependency - break cycle break plan.append(ready) # Remove ready tasks and update degrees for task_id in ready: remaining.remove(task_id) for other_task, deps in dependency_graph.items(): if task_id in deps: in_degree[other_task] -= 1 return plan def _estimate_duration(self, plan: list[list[str]]) -> float: """Estimate total execution time based on task batches""" # Simplified - real implementation would use historical timing return len(plan) * 30.0 # Assume 30 seconds per batch async def _call_analysis_model(self, prompt: str) -> list[Subtask]: """Call analysis model to identify task structure""" # Placeholder - in production, calls actual model return [] async def _decompose_further(self, task: Subtask) -> list[Subtask]: """Further decompose a composite task""" # Placeholder - recursive decomposition logic return [] # Execution engine for decomposed tasks class TaskExecutor: def __init__(self, agent_registry: dict[str, Any]): self.agents = agent_registry async def execute_plan(self, plan: list[list[Subtask]]) -> dict[str, Any]: results = {} for batch in plan: # Execute all tasks in batch concurrently batch_results = await asyncio.gather( *[self._execute_task(task, results) for task in batch], return_exceptions=True ) for task, result in zip(batch, batch_results): if isinstance(result, Exception): results[task.id] = {"status": "failed", "error": str(result)} else: results[task.id] = {"status": "success", "result": result} return results async def _execute_task( self, task: Subtask, prior_results: dict[str, Any] ) -> Any: """Execute a single task using appropriate agent""" # Gather inputs from dependencies inputs = {} for dep_id in task.dependencies: inputs.update(prior_results.get(dep_id, {}).get("result", {})) # Find capable agent agent = self._find_agent(task.required_capabilities) if not agent: raise ValueError(f"No agent found for capabilities: {task.required_capabilities}") # Execute with combined inputs return await agent.execute( task.input_spec, context=inputs ) def _find_agent(self, required_capabilities: list[str]) -> Optional[Any]: """Find first agent supporting required capabilities""" for agent_name, capabilities in self.agents.items(): if all(cap in capabilities for cap in required_capabilities): return self.agents[agent_name] return None ```

EXERCISE

Design a task decomposition strategy for an automated research report system. The system receives a research topic and must produce a thorough report including literature review, data analysis, and conclusions. Specify how you would decompose this into parallelizable subtasks, identify dependencies between tasks, and define quality thresholds at each stage. (12 minutes)

← Chapter 8
Specialized Agent Roles
Chapter 10 →
Tool Sharing