读书人

经过Executors创建创建的线程池分析I

发布时间: 2012-12-25 16:18:29 作者: rapoo

通过Executors创建创建的线程池分析I

对于线程池主要关心的内容有:线程池的大小控制、池中的队列控制、池饱和时的策略,下面我们从这三个角度来分析一下Executors生成的不同池。

1、ThreadPoolExecutor构造函数介绍:

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }

?第一个corePoolSize:JDK文档里定义为核心线程数,很耐人寻味的一个词,但是很重要,我们在下面的分析中经常会使用它,对于它的理解,我们最后在取总结;

第二个maximumPoolSize:线程池中可容纳的最大线程个数;

第三个keepAliveTime:当当前的线程数大于核心线程数时,多余的空闲线程等待新任务的最长时间,如果没有新任务进入则多余的空闲线程将被销毁;

第四个unit:第三个参数的单位;

第五个workQueue:保持任务的队列(任务队列基本有三种形式:无限队列、有限队列和同步移交);

public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              RejectedExecutionHandler handler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), handler);    }

相对第一个构造函数多了一个handler,这个参数用来指定线程池饱和时的策略(饱和策略主要有:中止、遗弃、遗弃最旧的、调用者运行);还有两个构造函数这里就不再列出了,不同点就是可以指定threadFactory。

?

2、newCachedThreadPool:

有了上面的知识,我们来分析一下newCachedThreadPool构造的线程池,先看一下方法内容:

public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());}

这里指定了,核心线程数为0,线程池中最大的线程数为0x7fffffff,当前线程个数大于核心线程数时,多余线程的存活时间为60秒,任务队列指定的使用的是SynchronousQueue,饱和策略使用的是AbortPolicy(中止策略)

下面我们来看一下newCachedThreadPool创建的线程池:线程池大小、池中的队列、池饱和时的策略

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {            if (runState == RUNNING && workQueue.offer(command)) {                if (runState != RUNNING || poolSize == 0)                    ensureQueuedTaskHandled(command);            }            else if (!addIfUnderMaximumPoolSize(command))                reject(command); // is shutdown or saturated        }    }

?上面的代码是线程池执行任务的核心代码,我们分析一下,

poolSize记录的是当前线程池中的线程个数,corePoolSize是我们指定的线程池的核心线程个数,在newCachedThreadPool创建的线程池中poolSize >= corePoolSize永远都会成立,因为corePoolSize被我们设置成0,当前线程池的大小永远是大于等于0的,接下来看一下内层代码,先判断一下当前线程池的状态是否在运行状态,然后调用workQueue.offer(command)方法,在newCachedThreadPool创建的线程池中workQueue是SynchronousQueue,因此调用的是SynchronousQueue的如下代码:

public boolean offer(E e) {        if (e == null) throw new NullPointerException();        return transferer.transfer(e, true, 0) != null;}

这里的transferer是TransferStack,由于我们timed和nanos(transfer方法的第二和第三个参数)设置为true和0,因此会永远执行transfer方法的如下代码:

if (timed && nanos <= 0) {      // can't wait      if (h != null && h.isCancelled())             casHead(h, h.next);     // pop cancelled node       else            return null;} 

而这里transferer.transfer(e, true, 0)方法的返回是null,因此workQueue.offer(command)方法永远是false。

这样机只会执行:

else if (!addIfUnderMaximumPoolSize(command))           reject(command);

?这个分支,addIfUnderMaximumPoolSize方法如下:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {        Thread t = null;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            if (poolSize < maximumPoolSize && runState == RUNNING)                t = addThread(firstTask);        } finally {            mainLock.unlock();        }        if (t == null)            return false;        t.start();        return true;}

?这里又会判断poolSize当前线程池中线程个数是否小于maximumPoolSize线程池中可容纳的最大个数,如果小于则创建一个线程来执行对应的任务返回true,如果不小于则会返回false,当返回false时,回去执行reject(command)方法,会根据中止的策略来处理新提交的任务。

这里要说明一下SynchronousQueue:

SynchronousQueue并不是一个队列,只是线程之间移交信息的机制,当我们把一个元素放入到SynchronousQueue中时必须有另一个线程正在等待接受移交的任务,如果没有这样的线程存在,只要当前池的大小还小于最大值,线程池就会创建一个新的线程来执行这个任务,因此使用SynchronousQueue作为线程池队列的前提是,要么池的大小是无限的或者可以接受被拒绝策略。

例子:

public class CachedThreadPoolTest {/** * @param args */public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();((ThreadPoolExecutor)service).setMaximumPoolSize(3);((ThreadPoolExecutor)service).setCorePoolSize(2);((ThreadPoolExecutor)service).setKeepAliveTime(20, TimeUnit.SECONDS);service.submit(new SleepTask());service.submit(new SleepTask());service.submit(new SleepTask());try{service.submit(new SleepTask());--将被拒绝}catch(RejectedExecutionException e){System.out.println("Rejected1");}try{service.submit(new SleepTask());--将被拒绝}catch(RejectedExecutionException e){System.out.println("Rejected2");}}}

?结果:

Rejected1Rejected2pool-1-thread-1pool-1-thread-2pool-1-thread-3

SleepTask代码

public class SleepTask implements Callable<String> {@Overridepublic String call() throws Exception {TimeUnit.SECONDS.sleep(10);System.out.println(Thread.currentThread().getName());return Thread.currentThread().getName();}}
?

?

读书人网 >编程

热点推荐