读书人

AbstractExecutorService任务交付lt;二amp;

发布时间: 2012-10-06 17:34:01 作者: rapoo

AbstractExecutorService任务提交<二>

? ? ? ??submit方法分析完毕,接着看两个invokeAll方法,先看第一个:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)        throws InterruptedException {        if (tasks == null)            throw new NullPointerException();        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());        boolean done = false;        try {            for (Callable<T> t : tasks) {                RunnableFuture<T> f = newTaskFor(t);                futures.add(f);                execute(f);            }            for (Future<T> f : futures) {                if (!f.isDone()) {                    try {                        f.get();                    } catch (CancellationException ignore) {                    } catch (ExecutionException ignore) {                    }                }            }            done = true;            return futures;        } finally {            if (!done)                for (Future<T> f : futures)                    f.cancel(true);        }    }

?

?

? ? ? ?第一个for循环,依次把任务包装成RunnableFuture加入到返回结果集List<Future<T>>并提交执行,没有任何阻塞。但是第二个for循环中就会去获取任务执行的结果f.get(),这个方法是会阻塞直到任务执行完。所以invokeAll方法明显不同与submit方法,invokeAll会阻塞主线程。

? ?整段代码都在try中,并在finally中判断是否done,如果没有(这种情况一般只有当任务为null才会发生),则必须把所有任务都cancel掉,底层会中断所有线程,回收资源。

? ? ??关于invokeAll也可以简单测试下,还是使用“AbstractExecutorService任务提交<一>”中的CallableTask:

public static void invokeAllCallableTask()throws Exception{ExecutorService executor = Executors.newCachedThreadPool();List<CallabelTask> tasks = new ArrayList<CallabelTask>(5);for(int i = 0; i < 5; ){tasks.add(new CallabelTask("task_"+(++i)));}List<Future<String>> results = executor.invokeAll(tasks);System.out.println("All the tasks have been submited through invokeAll method!");executor.shutdownNow();for(Future<String> f : results)System.out.println(f.get());}

? ? ? ?这段代码会在executor.invokeAll(tasks)行阻塞当前线程,直到所有任务都执行完(完成 or 取消 or异常),才会继续往下执行println语句进行打印。这里可以调用executor.shutdownNow()立马关闭线程执行器而不会有问题,因为在执行这一句的时候所有任务肯定已经执行完了。而在“AbstractExecutorService任务提交<一>”submit方法测试中,若使用shutdownNow()会有异常,因为它会去cancel正在执行的任务,很容易直到这个异常便是线程的中断异常了,^_^

?

? ? ?关于第二个invokeAll重载方法,逻辑和第一个invokeAll方法是一样的,不同的是这个方法加了时间的限制:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,                                         long timeout, TimeUnit unit)        throws InterruptedException {        if (tasks == null || unit == null)            throw new NullPointerException();        long nanos = unit.toNanos(timeout);        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());        boolean done = false;        try {            for (Callable<T> t : tasks)                futures.add(newTaskFor(t));            long lastTime = System.nanoTime();            // Interleave time checks and calls to execute in case            // executor doesn't have any/much parallelism.            Iterator<Future<T>> it = futures.iterator();            while (it.hasNext()) {                execute((Runnable)(it.next()));                long now = System.nanoTime();                nanos -= now - lastTime;                lastTime = now;                if (nanos <= 0)                    return futures;            }            for (Future<T> f : futures) {                if (!f.isDone()) {                    if (nanos <= 0)                        return futures;                    try {                        f.get(nanos, TimeUnit.NANOSECONDS);                    } catch (CancellationException ignore) {                    } catch (ExecutionException ignore) {                    } catch (TimeoutException toe) {                        return futures;                    }                    long now = System.nanoTime();                    nanos -= now - lastTime;                    lastTime = now;                }            }            done = true;            return futures;        } finally {            if (!done)                for (Future<T> f : futures)                    f.cancel(true);        }    }

? ? ? ?时间限制主要有两部分:

? ? ???第一个循环,是依此提交任务阶段,有时间限制。假如有10个任务要提交,那么循环提交的时每次都会检查是否超时,如果提交到第8个发现超时了,那么第9、10两个任务就不会在调用execute了,而是直接返回这批任务;? ? ??

? ? ? ?在第二个循环中,这时依此取执行结果阶段,每次也会判断是否超时,一旦发现超时就立马返回这批任务。

? ? ? ?当然如果超时返回,将无法执行后续语句done= true; 这样在finally中就会把所有任务都cancel掉。而客户端(主线程)就会收到CancellationException。

?

?

读书人网 >编程

热点推荐