基于Java的concurrent多线程实践摘录
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
线程池维护线程的最少数量 maximumPoolSiz
线程池维护线程的最大数量 keepAliveTime
线程池维护线程所允许的空闲时间 unit
线程池维护线程所允许的空闲时间的单位 workQueue
线程池所使用的缓冲队列 handler
线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
抛出java.util.concurrent.RejectedExecutionException异常 ThreadPoolExecutor.CallerRunsPolicy()
由调用者执行这个任务 ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务 ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
二、一般用法举例
package cn.simplelife.exercise;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class TestThreadPool { private static int produceTaskSleepTime = 2; public static void main(String[] args) { //构造一个线程池 ThreadPoolExecutor producerPool = new ThreadPoolExecutor(2, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(3), new ThreadPoolExecutor.DiscardOldestPolicy()); //每隔produceTaskSleepTime的时间向线程池派送一个任务。 int i=1; while(true){ try { Thread.sleep(produceTaskSleepTime); String task = "task@ " + i; System.out.println("put " + task); producerPool.execute(new ThreadPoolTask(task)); i++; } catch (Exception e) { e.printStackTrace(); } } }}package cn.simplelife.exercise;import java.io.Serializable;/** * 线程池执行的任务 * @author hdpan */public class ThreadPoolTask implements Runnable,Serializable{ //JDK1.5中,每个实现Serializable接口的类都推荐声明这样的一个ID private static final long serialVersionUID = 0; private static int consumeTaskSleepTime = 2000; private Object threadPoolTaskData; ThreadPoolTask(Object tasks){ this.threadPoolTaskData = tasks; } //每个任务的执行过程,现在是什么都没做,除了print和sleep,:) public void run(){ System.out.println("start .."+threadPoolTaskData); try { //便于观察现象,等待一段时间 Thread.sleep(consumeTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } threadPoolTaskData = null; }}?
?**************下面属于Brian Goetz (brian@quiotix.com)Quiotix所写**********
Java? 5.0 第一次让使用 Java 语言开发非阻塞算法成为可能,<!----><!----><!---->java.util.concurrent包充分地利用了这个功能。非阻塞算法属于并发算法,它们可以安全地派生它们的线程,不通过锁定派生,而是通过低级的原子性的硬件原生形式 —— 例如比较和交换。非阻塞算法的设计与实现极为困难,但是它们能够提供更好的吞吐率,对生存问题(例如死锁和优先级反转)也能提供更好的防御。在这期的 Java 理论与实践 中,并发性大师 Brian Goetz 演示了几种比较简单的非阻塞算法的工作方式。在不只一个线程访问一个互斥的变量时,所有线程都必须使用同步,否则就可能会发生一些非常糟糕的事情。Java 语言中主要的同步手段就是
synchronized关键字(也称为内在锁),它强制实行互斥,确保执行synchronized块的线程的动作,能够被后来执行受相同锁保护的synchronized块的其他线程看到。在使用得当的时候,内在锁可以让程序做到线程安全,但是在使用锁定保护短的代码路径,而且线程频繁地争用锁的时候,锁定可能成为相当繁重的操作。在 “流行的原子” 一文中,我们研究了原子变量,原子变量提供了原子性的读-写-修改操作,可以在不使用锁的情况下安全地更新共享变量。原子变量的内存语义与 volatile 变量类似,但是因为它们也可以被原子性地修改,所以可以把它们用作不使用锁的并发算法的基础。
回页首?
回页首?
如 清单 4 所示,插入一个元素涉及两个指针更新,这两个更新都是通过 CAS 进行的:从队列当前的最后节点(C)链接到新节点,并把尾指针移动到新的最后一个节点—)。如果第一步失败,那么队列的状态不变,插入线程会继续重试,直到成功。一旦操作成功,插入被当成生效,其他线程就可以看到修改。还需要把尾指针移动到新节点的位置上,但是这项工作可以看成是 “清理工作”,因为任何处在这种情况下的线程都可以判断出是否需要这种清理,也知道如何进行清理。
队列总是处于两种状态之一:正常状态(或称静止状态,图 1 和 图 3)或中间状态(图 2)。在插入操作之前和第二个 CAS—)成功之后,队列处在静止状态;在第一个 CAS(C)成功之后,队列处在中间状态。在静止状态时,尾指针指向的链接节点的 next 字段总为 null,而在中间状态时,这个字段为非 null。任何线程通过比较
tail.next是否为 null,就可以判断出队列的状态,这是让线程可以帮助其他线程 “完成” 操作的关键。
图 2. 处在插入中间状态的队列,在新元素插入之后,尾指针更新之前插入操作在插入新元素(A)之前,先检查队列是否处在中间状态,如 清单 4 所示。如果是在中间状态,那么肯定有其他线程已经处在元素插入的中途,在步骤(C)和—)之间。不必等候其他线程完成,当前线程就可以 “帮助” 它完成操作,把尾指针向前移动(B)。如果有必要,它还会继续检查尾指针并向前移动指针,直到队列处于静止状态,这时它就可以开始自己的插入了。
第一个 CAS(C)可能因为两个线程竞争访问队列当前的最后一个元素而失败;在这种情况下,没有发生修改,失去 CAS 的线程会重新装入尾指针并再次尝试。如果第二个 CAS—)失败,插入线程不需要重试 —— 因为其他线程已经在步骤(B)中替它完成了这个操作!
图 3. 在尾指针更新后,队列重新处在静止状态
回页首?
结束语
非阻塞算法要比基于锁的算法复杂得多。开发非阻塞算法是相当专业的训练,而且要证明算法的正确也极为困难。但是在 Java 版本之间并发性能上的众多改进来自对非阻塞算法的采用,而且随着并发性能变得越来越重要,可以预见在 Java 平台的未来发行版中,会使用更多的非阻塞算法。

