读书人

AbstractExecutorService任务交付lt;三amp;

发布时间: 2012-10-14 14:55:08 作者: rapoo

AbstractExecutorService任务提交<三>

? ? 最后来看两个invokeAny方法,这个方法和invokeAll的区别在于,invokeAll会阻塞直到所有任务执行完(完成 or 取消 or异常)才会返回(返回的是所有任务的结果),而invokeAny只需要任何一个方法执行完即返回(返回的时候最先执行完的那个任务的结果)。查看代码发现这两个invokeAny方法都是直接调用doInvokeAny方法实现的:?

?

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)        throws InterruptedException, ExecutionException {        try {            return doInvokeAny(tasks, false, 0);        } catch (TimeoutException cannotHappen) {            assert false;            return null;        }    }    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,                           long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException {        return doInvokeAny(tasks, true, unit.toNanos(timeout));    }

?

? ? ? ??一个有超时限制,一个没有。所以主要代码还得看doInvokeAny方法,首先看签名:?

?

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,                            boolean timed, long nanos)

?

? ? ? ?第一个参数tasks就是一个Callable任务集,第二个参数表示是否有超时限制,第三个参数表示若有超时限制,则时间限制在nanos纳秒内。

再看具体实现部分:?

?

if (tasks == null)            throw new NullPointerException();        int ntasks = tasks.size();        if (ntasks == 0)            throw new IllegalArgumentException();        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);        ExecutorCompletionService<T> ecs =            new ExecutorCompletionService<T>(this);        // For efficiency, especially in executors with limited        // parallelism, check to see if previously submitted tasks are        // done before submitting more of them. This interleaving        // plus the exception mechanics account for messiness of main        // loop.        try {            // Record exceptions so that if we fail to obtain any            // result, we can throw the last exception we got.            ExecutionException ee = null;            long lastTime = (timed)? System.nanoTime() : 0;            Iterator<? extends Callable<T>> it = tasks.iterator();            // Start one task for sure; the rest incrementally            futures.add(ecs.submit(it.next()));            --ntasks;            int active = 1;            for (;;) {                Future<T> f = ecs.poll();                if (f == null) {                    if (ntasks > 0) {                        --ntasks;                        futures.add(ecs.submit(it.next()));                        ++active;                    }                    else if (active == 0)                        break;                    else if (timed) {                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);                        if (f == null)                            throw new TimeoutException();                        long now = System.nanoTime();                        nanos -= now - lastTime;                        lastTime = now;                    }                    else                        f = ecs.take();                }                if (f != null) {                    --active;                    try {                        return f.get();                    } catch (InterruptedException ie) {                        throw ie;                    } catch (ExecutionException eex) {                        ee = eex;                    } catch (RuntimeException rex) {                        ee = new ExecutionException(rex);                    }                }            }            if (ee == null)                ee = new ExecutionException();            throw ee;        } finally {            for (Future<T> f : futures)                f.cancel(true);        }

?

? ? 直接看主体( try{…}finally{…} )部分, ntasks表示当前还有几个任务没有提交,active表示当前已经提交了的任务数。在try中先提交一个,紧接着是一个无穷循环for(;;),一切就在这个无穷循环中了,分析这个无穷循环,可分两部分:

?

<一> 提交并执行任务,if (f == null)部分:

?

ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

? ? ? ??通过这个ecs来提交任务和获取结果,所以再查看ExecutorCompletionService,特别是当前方法中用到的其submit(task), poll(),poll(nanos, TimeUnit.NANOSECONDS),take()方法。从构造方法看,它hold了一个线程执行器Executor就是AbstractExecutorService传过来的this对象,有一个任务完成的队列completionQueue是LinkedBlockingQueue对象。仿佛已经很明朗了,不管是take()还是poll(),在获取完成的任务的时候,通过这个阻塞队列获取即可。查看代码果然如此:

?

public Future<V> take() throws InterruptedException {        return completionQueue.take();    }    public Future<V> poll() {        return completionQueue.poll();    }    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {        return completionQueue.poll(timeout, unit);    }

?

? ? ? ??那任务执行完是如何放到这个阻塞队列的呢?在细看,发现提交任务的时候,这个类中会把RunnableFuture对象进一步包装成QueueingFuture,而再看这个QueueingFuture继承自FutureTask,显然QueueingFuture作为一个FutureTask需要做额外的事情了,还记得上一篇“AbstractExecutorService之异步任务RunnableFuture ”末尾提到:

?

“当然你想若是想在cancel时做一些自己的事情,可以通过重写FutureTask的done()方法,因为在cancel返回之前会调用该方法,这个方法默认不做任何事情”

?

? ? ??这里补充一下,其实除了cancel之外,在执行出现异常进行设置innerSetException、或执行正常结束设置结果值innerSet时,也都会调用done()方法。所以想在任务执行结束后做一点自己的事情,都可以通过重写这个done()方法。查看QueueingFuture代码,果然不出所料:

?

private class QueueingFuture extends FutureTask<Void> {        QueueingFuture(RunnableFuture<V> task) {            super(task, null);            this.task = task;        }        protected void done() { completionQueue.add(task); }        private final Future<V> task;    }

?

? ? ??可以看到,当任务执行完(完成 or 取消 or异常)后调用上面的done方法会把当前任务加入到这个阻塞队列中。这下明朗了,呵呵!

再细看那三个获取第一个执行完任务的方法,completionQueue.poll()会去获取队头对象,若队列空就返回null,这个方法不会阻塞;completionQueue.poll(timeout, unit)会阻塞时间timeout,若这段时间还是一个空队列,则返回null;completionQueue.take()会一直阻塞直到队列中有东西可以返回为止。

重新回到AbstractExecutorService. doInvokeAny方法,再次看那个无限循环部分代码,先尝试获取一次结果,这次是非阻塞获取,没有结果就马上提交下一个任务,提交完就本次循环结束。直到所有任务都提交了,循环会进入到阻塞获取结果部分的代码:有时间限制就使用poll(timeout, unit)在允许的时间范围内阻塞,否则就用take()永久阻塞方式获取。?而阻塞的逻辑实现就交给了阻塞队列了。

????? 这部分也记录完,感觉有点乱,不知道写的是否明白。

读书人网 >编程

热点推荐