读书人

jetty6中ThreadPool兑现的代码解析

发布时间: 2012-12-19 14:13:14 作者: rapoo

jetty6中ThreadPool实现的代码解析

QueuedThreadPool承载了jetty6提交任务至线程工作的使命,在没有用concurrent包的情况下,作者实现了漂亮的线程池。

?

QueuedThreadPool内部维护了一个FIFO的job队列,该队列基于数组实现。

?

QueuedThreadPool的核心接口是dispatch(Runnable),正如字面所表达的那样,该方法将派发一个job到一个线程来执行。下面是对该方法的注释解析。

?

?

public boolean dispatch(Runnable job)     {      //前置条件的判断        if (!isRunning() || job==null)            return false;                //可以执行该job的线程        PoolThread thread=null;                //是否需要创建新线程的标识        boolean spawn=false;                    synchronized(_lock)        {            // Look for an idle thread            int idle=_idle.size();            //判断是否有空闲线程,如果有,取出空闲线程列表的最后一个线程            if (idle>0)                thread=(PoolThread)_idle.remove(idle-1);            else//已经没有空闲线程了,将该job放入到job队列中            {                // queue the job            //增加队列的大小,得到最新的队列大小            _queued++;                            //如果最新的队列大小大于最大值,则将最大值设置为最新的队列大小                if (_queued>_maxQueued)                    _maxQueued=_queued;                                //设置插入位置的job为最新dispatch的job,并且递增下一个插入位置,_nextJobSlot记录的是下一个job可以放置的索引                _jobs[_nextJobSlot++]=job;                                //如果已经插入位置的游标已经到达了队列的尾部,则从头开始                if (_nextJobSlot==_jobs.length)                    _nextJobSlot=0;                                //如果队列已经满了,则扩容队列。_nextJob是下一个可以获取job的位置,可以认为是队列的尾部,这是一个FIFO的队列                if (_nextJobSlot==_nextJob)                {                    // Grow the job queue                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];                    int split=_jobs.length-_nextJob;                    if (split>0)                        System.arraycopy(_jobs,_nextJob,jobs,0,split);                    if (_nextJob!=0)                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);                                        _jobs=jobs;                    _nextJob=0;                    _nextJobSlot=_queued;                }                                  spawn=_queued>_spawnOrShrinkAt;            }        }                //如果得到了可以工作的空闲线程,则将利用该线程执行job        if (thread!=null)        {            thread.dispatch(job);        }        else if (spawn)//如果需要增加新的线程,则创建一个新的线程        {            newThread();        }                //不管怎么说,这个job要么被一个线程运行,要么成功放到了job的队列,我们的工作算是成功做完了。        return true;    }

?

可以看到实现逻辑大致如下:首先判断是否有空闲线程,如果有,从空闲线程列表中移除最后一个线程,用该线程来执行提交的job。如果没有空闲线程,就把该job放到job的队列中。如果放入队列后,队列的长度大于_spawnOrShrinkAt,就说明可以创建一个新的线程。

?

QueuedThreadPool是用PoolThread作为工作线程的。PoolThread是Thread的子类。看看run方法有什么事情要做。

?

?

public void run()        {            boolean idle=false;            Runnable job=null;            try            {            //只要系统还在运行,就不让该线程停止工作                while (isRunning())                {                       //有job要做了                    if (job!=null)                    {                        final Runnable todo=job;                        job=null;                                                //标识这个线程不在空闲,因为我有工作要做了。                        idle=false;                        todo.run();                        //我觉得这个地方应该加上 idle = true;下面if(!idle)改成if(idle)语义上更清楚一些                    }                                        //需要操作FIFO的job队列,给临界代码加锁                    synchronized(_lock)                    {                        //哈哈,有job可以做了                    if (_queued>0)                        {                    //递减队列的长度                            _queued--;                                                        //从队列的头部取出一个job                            job=_jobs[_nextJob];                                                        //将头部的位置值为null,并且递增_nextJob,更新头部                            _jobs[_nextJob++]=null;                                                        //如果到达了数组的尾部,则从数组的头开始                            if (_nextJob==_jobs.length)                                _nextJob=0;                                                        //非常从容地得到了一个需要执行的job,跳到while的开始,执行该job                            continue;                        }                        //如果job队列为空,那是不是我这个线程是多余的线程,job能很快地被执行掉,需要亲手干掉自己?下面做一些判断                                        //所有线程的数量总和                        final int threads=_threads.size();                                                //如果从数量上判断可以清除掉该线程                        if (threads>_minThreads &&                             (threads>_maxThreads ||                              _idle.size()>_spawnOrShrinkAt))                           {                                                //空闲时间的条件是否能满足。如果有一个兄弟在不久之前已经自杀了,我需要再晚点。                            long now = System.currentTimeMillis();                            if ((now-_lastShrink)>getMaxIdleTimeMs())                            {                            //记录我自杀的时间(即最后一个线程自杀的时间)                                _lastShrink=now;                                                                //从空闲队列中移除自己                                _idle.remove(this);                                                                //使命完成,寿终正寝                                return;                            }                        }                                                //如果我做完了自己的job,我已经空闲了,进入空闲线程池里                        if (!idle)                        {                               // Add ourselves to the idle set.                            _idle.add(this);                            idle=true;                        }                    }                    //既然我已经空闲了,如果这个时候还没有job让我来做,我就再等一会吧。                    //如果等待的时间到了,或者等待的时候有新的job分配给我做,我就不再等待。                    synchronized (this)                    {                        if (_job==null)                            this.wait(getMaxIdleTimeMs());                        job=_job;                        _job=null;                    }                }            }            catch (InterruptedException e)            {                Log.ignore(e);            }            //我死掉了(自杀,或者在执行job时发生异常意外死亡),善后工作            finally            {            //从空闲线程池中把我的尸体抬走                synchronized (_lock)                {                    _idle.remove(this);                }                //从总的线程池中把我的尸体抬走。这个地方可以看到锁的分离。_idle和_threads两个集合用不同的锁控制,提高了性能                synchronized (_threadsLock)                {                    _threads.remove(this);                }                synchronized (this)                {                    job=_job;                }                                //如果我在处理job时意外死亡,就把我未完成的事业交给下一个兄弟来处理                if (job!=null)                {                    QueuedThreadPool.this.dispatch(job);                }            }        }        
?

?

?

?

?

读书人网 >编程

热点推荐