Java多线程(十二)之线程池深入分析(下)(转)
一、数据结构与线程构造方法
?
??
由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构。图1描述了这种数据结构。
?

?
图1 ThreadPoolExecutor 数据结构
?
其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来。
?
- 线程池需要支持多个线程并发执行,因此有一个线程集合Collection<Thread>来执行线程任务;涉及任务的异步执行,因此需要有一个集合来缓存任务队列Collection<Runnable>;很显然在多个线程之间协调多个任务,那么就需要一个线程安全的任务集合,同时还需要支持阻塞、超时操作,那么BlockingQueue是必不可少的;既然是线程池,出发点就是提高系统性能同时降低资源消耗,那么线程池的大小就有限制,因此需要有一个核心线程池大小(线程个数)和一个最大线程池大小(线程个数),有一个计数用来描述当前线程池大小;如果是有限的线程池大小,那么长时间不使用的线程资源就应该销毁掉,这样就需要一个线程空闲时间的计数来描述线程何时被销毁;前面描述过线程池也是有生命周期的,因此需要有一个状态来描述线程池当前的运行状态;线程池的任务队列如果有边界,那么就需要有一个任务拒绝策略来处理过多的任务,同时在线程池的销毁阶段也需要有一个任务拒绝策略来处理新加入的任务;上面种的线程池大小、线程空闲实际那、线程池运行状态等等状态改变都不是线程安全的,因此需要有一个全局的锁(mainLock)来协调这些竞争资源;除了以上数据结构以外,ThreadPoolExecutor还有一些状态用来描述线程池的运行计数,例如线程池运行的任务数、曾经达到的最大线程数,主要用于调试和性能分析。
- public?ScheduledFuture<?>?scheduleAtFixedRate(Runnable?command,????????????????????????????????????????????????????long?initialDelay,??
- ??????????????????????????????????????????????????long?period,????????????????????????????????????????????????????TimeUnit?unit)?{??
- ????????if?(command?==?null?||?unit?==?null)??????????????throw?new?NullPointerException();??
- ????????if?(period?<=?0)??????????????throw?new?IllegalArgumentException();??
- ????????if?(initialDelay?<?0)?initialDelay?=?0;??????????long?triggerTime?=?now()?+?unit.toNanos(initialDelay);??
- ????????RunnableScheduledFuture<?>?t?=?decorateTask(command,??????????????new?ScheduledFutureTask<Object>(command,??
- ????????????????????????????????????????????null,??????????????????????????????????????????????triggerTime,??
- ????????????????????????????????????????????unit.toNanos(period)));??????????delayedExecute(t);??
- ????????return?t;??????}??
- boolean?innerRunAndReset()?{??????????????if?(!compareAndSetState(0,?RUNNING))??
- ????????????????return?false;??????????????try?{??
- ????????????????runner?=?Thread.currentThread();??????????????????if?(getState()?==?RUNNING)??
- ????????????????????callable.call();?//?don't?set?result??????????????????runner?=?null;??
- ????????????????return?compareAndSetState(RUNNING,?0);??????????????}?catch?(Throwable?ex)?{??
- ????????????????innerSetException(ex);??????????????????return?false;??
- ????????????}??????????}??
?
?
?
对于ThreadPoolExecutor而言,一个线程就是一个Worker对象,它与一个线程绑定,当Worker执行完毕就是线程执行完毕,这个在后面详细讨论线程池中线程的运行方式。
?
既然是线程池,那么就首先研究下线程的构造方法。
?
?
?
?
当提交一个任务时,如果需要创建一个线程(何时需 要在下一节中探讨)时,就调用线程工厂创建一个线程,同时将线程绑定到Worker工作队列中。需要说明的是,Worker队列构造的时候带着一个任务 Runnable,因此Worker创建时总是绑定着一个待执行任务。换句话说,创建线程的前提是有必要创建线程(任务数已经超出了线程或者强制创建新的 线程,至于为何强制创建新的线程后面章节会具体分析),不会无缘无故创建一堆空闲线程等着任务。这是节省资源的一种方式。
?
一旦线程池启动线程后(调用线程run())方 法,那么线程工作队列Worker就从第1个任务开始执行(这时候发现构造Worker时传递一个任务的好处了),一旦第1个任务执行完毕,就从线程池的 任务队列中取出下一个任务进行执行。循环如此,直到线程池被关闭或者任务抛出了一个RuntimeException。
?
由此可见,线程池的基本原理其实也很简单,无非预先启动一些线程,线程进入死循环状态,每次从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已。如此反复。
?
其实,线程池原理看起来简单,但是复杂的是各种策略,例如何时该启动一个线程,何时该终止、挂起、唤醒一个线程,任务队列的阻塞与超时,线程池的生命周期以及任务拒绝策略等等。
?
?
?
三、线程池任务执行流程?
??
我们从一个API开始接触Executor是如何处理任务队列的。
?
java.util.concurrent.Executor.execute(Runnable)
?
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current?RejectedExecutionHandler.
?
线程池中所有任务执行都依赖于此接口。这段话有以下几个意思:
?
- 任务可能在将来某个时刻被执行,有可能不是立即执行。为什么这里有两个“可能”?继续往下面看。任务可能在一个新的线程中执行或者线程池中存在的一个线程中执行。任务无法被提交执行有以下两个原因:线程池已经关闭或者线程池已经达到了容量限制。所有失败的任务都将被“当前”的任务拒绝策略RejectedExecutionHandler 处理。
?
回答上面两个“可能“。任务可能被执行,那不可能的情况就是上面说的情况3;可能不是立即执行,是因为任务可能还在队列中排队,因此还在等待分配线程执行。了解完了字面上的问题,我们再来看具体的实现。
?
?
?
?
老实说这个图比上面步骤更难以理解,那么从何入手呢。
?
流程的入口很简单,我们就是要执行一个任务(Runnable command),那么它的结束点在哪或者有哪几个?
?
根据左边这个图我们知道可能有以下几种出口:
?
(1)图中的P1、P7,我们根据这条路径可以看到,仅仅是将任务加入任务队列(offer(command))了;
?
(2)图中的P3,这条路径不将任务加入任务队列,但是启动了一个新工作线程(Worker)进行扫尾操作,用户处理为空的任务队列;
?
(3)图中的P4,这条路径没有将任务加入任务队列,但是启动了一个新工作线程(Worker),并且工作现场的第一个任务就是当前任务;
?
(4)图中的P5、P6,这条路径没有将任务加入任务队列,也没有启动工作线程,仅仅是抛给了任务拒绝策略。P2是任务加入了任务队列却因为线程池已经关闭于是又从任务队列中删除,并且抛给了拒绝策略。
?
如果上面的解释还不清楚,可以去研究下面两段代码:
?
?
?
?
在Future接口中提供了5个方法。
?
- V get() throws InterruptedException, ExecutionException: 等待计算完成,然后获取其结果。V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。boolean isCancelled():如果在任务正常完成前将其取消,则返回?true。boolean isDone():如果任务已完成,则返回?true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回true。
?
API看起来容易,来研究下异常吧。get()请求获取一个结果会阻塞当前进程,并且可能抛出以下三种异常:
?
- InterruptedException:执行任务的线程被中断则会抛出此异常,此时不能知道任务是否执行完毕,因此其结果是无用的,必须处理此异常。ExecutionException: 任务执行过程中(Runnable#run())方法可能抛出RuntimeException,如果提交的是一个 java.util.concurrent.Callable<V>接口任务,那么 java.util.concurrent.Callable.call()方法有可能抛出任意异常。CancellationException:实际上get()方法还可能抛出一个CancellationException的RuntimeException,也就是任务被取消了但是依然去获取结果。
?
对于get(long timeout, TimeUnit unit)而言,除了get()方法的异常外,由于有超时机制,因此还可能得到一个TimeoutException。
?
boolean cancel(boolean mayInterruptIfRunning)方法比较复杂,各种情况比较多:
?
- 如果任务已经执行完毕,那么返回false。如果任务已经取消,那么返回false。循环直到设置任务为取消状态,对于未启动的任务将永远不再执行,对于正在运行的任务,将根据mayInterruptIfRunning是否中断其运行,如果不中断那么任务将继续运行直到结束。此方法返回后任务要么处于运行结束状态,要么处于取消状态。isDone()将永远返回true,如果cancel()方法返回true,isCancelled()始终返回true。
?
来看看Future接口的实现类java.util.concurrent.FutureTask<V>具体是如何操作的。
?
在FutureTask中使用了一个AQS数据结构来完成各种状态以及加锁、阻塞的实现。
?
在此AQS类java.util.concurrent.FutureTask.Sync中一个任务用4中状态:
?

?
初始情况下任务状态state=0,任务执行(innerRun)后状态变为运行状态RUNNING(state=1),执行完毕后变成运行结束状态RAN(state=2)。任务在初始状态或者执行状态被取消后就变为状态CANCELLED(state=4)。AQS最擅长无锁情况下处理几种简单的状态变更的。
?
?
?
?
先 从功能/结构上分析下。第一种情况假设提交的任务每次执行花费10s,间隔(delay/period)为20s,对于 scheduleAtFixedRate而言,每次执行开始时间20s,对于scheduleWithFixedDelay来说每次执行开始时间30s。 第二种情况假设提交的任务每次执行时间花费20s,间隔(delay/period)为10s,对于scheduleAtFixedRate而言,每次执 行开始时间10s,对于scheduleWithFixedDelay来说每次执行开始时间30s。(具体分析可以参考这里)
?
也就是说scheduleWithFixedDelay的执行开始时间为(delay+cost),而对于scheduleAtFixedRate来说执行开始时间为max(period,cost)。
?
回头再来看上面源码runPeriodic()就很容易了。但特别要提醒的,如果任务的任何一个执行遇到异常,则后续执行都会被取消,这从runPeriodic()就能看出。要强调的第二点就是同一个周期性任务不会被同时执行。就比如说尽管上面第二种情况的scheduleAtFixedRate任务每隔10s执行到达一个时间点,但是由于每次执行时间花费为20s,因此每次执行间隔为20s,只不过执行的任务次数会多一点。但从本质上讲就是每隔20s执行一次,如果任务队列不取消的话。
?
为什么不会同时执行?
?
这是因为ScheduledFutureTask执行的时候会将任务从队列中移除来,执行完毕以后才会添加下一个同序列的任务,因此任务队列中其实最多只有同序列的任务的一份副本,所以永远不会同时执行(尽管要执行的时间在过去)。
?
?
?
ScheduledThreadPoolExecutor使用一个无界(容量无限,整数的最大值)的容器—elayedWorkQueue队列),根据ThreadPoolExecutor的 原理,只要当容器满的时候才会启动一个大于corePoolSize的线程数。因此实际上ScheduledThreadPoolExecutor是一个 固定线程大小的线程池,固定大小为corePoolSize,构造函数里面的Integer.MAX_VALUE其实是不生效的(尽管PriorityQueue使用数组实现有PriorityQueue大小限制,如果你的任务数超过了2147483647就会导致OutOfMemoryError,这个参考PriorityQueue的grow方法)。
?
?
?
再 回头看scheduleAtFixedRate等方法就容易多了。无非就是往任务队列中添加一个未来某一时刻的ScheduledFutureTask任 务,如果是scheduleAtFixedRate那么period/delay就是正数,如果是scheduleWithFixedDelay那么 period/delay就是一个负数,如果是0那么就是一次性任务。直接调用父类ThreadPoolExecutor的execute/submit等方法就相当于period/delay是0,并且initialDelay也是0。
?
?
?
[java] view plaincopy?
另 外需要补充说明的一点,前面说过java.util.concurrent.FutureTask.Sync任务只能执行一次,那么在 runPeriodic()里面怎么又将执行过的任务加入队列中呢?这是因为java.util.concurrent.FutureTask.Sync 提供了一个innerRunAndReset()方法,此方法不仅执行任务还将任务的状态还原成0(初始状态)了,所以此任务就可以重复执行。这就是为什 么runPeriodic()里面调用runAndRest()的缘故。
?
?
?
[java] view plaincopy?
谢谢xylz的文章。?
关于线程池由于时间原因,没有好好整理。?
??
内容来源:?
深入浅出 Java Concurrency (33): 线程池 part 6 线程池的实现及原理 (1)?
http://www.blogjava.net/xylz/archive/2011/01/18/343183.html?
深入浅出 Java Concurrency (33): 线程池 part 6 线程池的实现及原理 (2)?
http://www.blogjava.net/xylz/archive/2011/02/11/344091.html?
深入浅出 Java Concurrency (33): 线程池 part 6 线程池的实现及原理 (3)?
http://www.blogjava.net/xylz/archive/2011/02/13/344207.html?