What's New on Java 7 Phaser
1 Overview
?? Java 7的并发包中推出了Phaser,其功能跟CyclicBarrier和CountDownLatch有些重叠,但是提供了更灵活的用法,例如支持动态调整注册任务的数量等。本文在Phaser自带的示例代码基础上进行一下简单的分析。
?
2 Glossary
2.1 Registration
??? Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量,此外也支持通过其构造函数进行指定初始数量。在适当的时机,Phaser支持减少注册任务的数量,例如arriveAndDeregister()。单个Phaser实例允许的注册任务数的上限是65535。
2.2 Arrival
??? 正如Phaser类的名字所暗示,每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到达,arriveAndAwaitAdvance()方法用于记录到达,并且等待其它未到达的任务。
2.3 Termination
??? Phaser支持终止。Phaser终止之后,调用register()和bulkRegister(int parties)方法没有任何效果,arriveAndAwaitAdvance()方法也会立即返回。触发终止的时机是在protected boolean onAdvance(int phase, int registeredParties)方法返回时,如果该方法返回true,那么Phaser会被终止。默认实现是在注册任务数为0时返回true(即return registeredParties == 0;)。此外,forceTermination()方法用于强制终止,isTerminated()方法用于判断是否已经终止。
2.4 Tiering
??? Phaser支持层次结构,即通过构造函数Phaser(Phaser parent)和Phaser(Phaser parent, int parties)构造一个树形结构。这有助于减轻因在单个的Phaser上注册过多的任务而导致的竞争,从而提升吞吐量,代价是增加单个操作的开销。
?
3 Sample Usage
3.1 Sample 1
??? 在有些场景下,我们希望控制多个线程的启动时机:例如在并发相关的单元测试中,有时需要控制线程的启动时机,以期获得最大程度的并发,通常我们会使用CountDownLatch,以下是使用Phaser的版本。
?? 以上例子中,由于线程是在一个循环中start,因此start的时机有一定的间隔。本例中这些线程实际开始工作的时机是在所有的线程都调用了phaser.arriveAndAwaitAdvance()之后。
??? 此外,如果留心arriveAndAwaitAdvance()方法的签名,会发现它并没有抛出InterruptedException,实际上,即使当前线程被中断,arriveAndAwaitAdvance()方法也不会返回,而是继续等待。如果在等待时希望可中断,或者可超时,那么需要使用以下方法:
??? 以上例子中,只有当用户按下回车之后,任务才真正开始执行。需要注意的是,arriveAndDeregister()方法不会被阻塞,并且返回到达时的phase number(arrive方法也是如此)。
?
3.3 Sample 3
??? CyclicBarrier支持barrier action, Phaser同样也支持。不同之处是Phaser的barrier action需要改写onAdvance方法来进行定制。
?? 本例中的barrier action只是简单地打印了一条信息,此外在超过指定的迭代次数后终止了Phaser。
3.4 Sample 4
??? 在Smaple 3的例子中,主线程在其它工作线程结束之前已经终止。如果希望主线程等待这些工作线程结束,除了使用Thread.join()之外,也可以尝试以下的方式:
?? 如果希望主线程在特定的phase结束之后终止,那么可以在主线程中调用下述方法:
??? 需要注意的是,TASKS_PER_PHASER的值取决于具体的Task实现。对于Task执行时间很短的场景(也就是竞争相对激烈),可以考虑使用较小的TASKS_PER_PHASER值,例如4。反之可以适当增大TASKS_PER_PHASER。
1 楼 childz 2012-08-07 非常不错,感谢