读书人

Java多线程(八)之Se地图hore、Count

发布时间: 2013-08-13 16:43:28 作者: rapoo

Java多线程(八)之Semaphore、CountDownLatch、CyclicBarrier、Exchanger (转)
一、引言?Semaphore ? ? ? ? ? ? ? :一个计数信号量CountDownLatch ? ? ? ? ?:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。?CyclicBarrier ? ? ? ? ? :一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点?? ?Exchanger ? ? ? ? ? ? ? :方便了两个共同操作线程之间的双向交换?二、Semaphore?

Semaphore?是一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个?acquire(),然后再获取该许可。每个?release()?添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore?只对可用许可的号码进行计数,并采取相应的行动。

说 白了,Semaphore是一个计数器,在计数器不为0的时候对线程就放行,一旦达到0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线 程,也就是说Semaphore不是可重入的。每一次请求一个许可都会导致计数器减少1,同样每次释放一个许可都会导致计数器增加1,一旦达到了0,新的 许可请求线程将被挂起。

缓存池整好使用此思想来实现的,比如链接池、对象池等。

?

计数信号可以用于限制有权对资源进行并发访问的线程数。该方法对于实现资源池或限制 Web 爬虫(Web crawler)中的输出 socket 连接非常有用。

?

清单1 对象池

?

[java] view plaincopy
  1. package?xylz.study.concurrency.lock;????
  2. import?java.util.concurrent.Semaphore;??import?java.util.concurrent.locks.Lock;??
  3. import?java.util.concurrent.locks.ReentrantLock;????
  4. public?class?ObjectCache<T>?{????
  5. ????public?interface?ObjectFactory<T>?{????
  6. ????????T?makeObject();??????}??
  7. ??????class?Node?{??
  8. ??????????T?obj;??
  9. ??????????Node?next;??
  10. ????}????
  11. ????final?int?capacity;????
  12. ????final?ObjectFactory<T>?factory;????
  13. ????final?Lock?lock?=?new?ReentrantLock();????
  14. ????final?Semaphore?semaphore;????
  15. ????private?Node?head;????
  16. ????private?Node?tail;????
  17. ????public?ObjectCache(int?capacity,?ObjectFactory<T>?factory)?{??????????this.capacity?=?capacity;??
  18. ????????this.factory?=?factory;??????????this.semaphore?=?new?Semaphore(this.capacity);??
  19. ????????this.head?=?null;??????????this.tail?=?null;??
  20. ????}????
  21. ????public?T?getObject()?throws?InterruptedException?{??????????semaphore.acquire();??
  22. ????????return?getNextObject();??????}??
  23. ??????private?T?getNextObject()?{??
  24. ????????lock.lock();??????????try?{??
  25. ????????????if?(head?==?null)?{??????????????????return?factory.makeObject();??
  26. ????????????}?else?{??????????????????Node?ret?=?head;??
  27. ????????????????head?=?head.next;??????????????????if?(head?==?null)?tail?=?null;??
  28. ????????????????ret.next?=?null;//help?GC??????????????????return?ret.obj;??
  29. ????????????}??????????}?finally?{??
  30. ????????????lock.unlock();??????????}??
  31. ????}????
  32. ????private?void?returnObjectToPool(T?t)?{??????????lock.lock();??
  33. ????????try?{??????????????Node?node?=?new?Node();??
  34. ????????????node.obj?=?t;??????????????if?(tail?==?null)?{??
  35. ????????????????head?=?tail?=?node;??????????????}?else?{??
  36. ????????????????tail.next?=?node;??????????????????tail?=?node;??
  37. ????????????}????
  38. ????????}?finally?{??????????????lock.unlock();??
  39. ????????}??????}??
  40. ??????public?void?returnObject(T?t)?{??
  41. ????????returnObjectToPool(t);??????????semaphore.release();??
  42. ????}????
  43. }??

清 单1描述了一个基于信号量Semaphore的对象池实现。此对象池最多支持capacity个对象,这在构造函数中传入。对象池有一个基于FIFO的队 列,每次从对象池的头结点开始取对象,如果头结点为空就直接构造一个新的对象返回。否则将头结点对象取出,并且头结点往后移动。特别要说明的如果对象的个 数用完了,那么新的线程将被阻塞,直到有对象被返回回来。返还对象时将对象加入FIFO的尾节点并且释放一个空闲的信号量,表示对象池中增加一个可用对 象。

实际上对象池、线程池的原理大致上就是这样的,只不过真正的对象池、线程池要处理比较复杂的逻辑,所以实现起来还需要做很多的工作,例如超时机制,自动回收机制,对象的有效期等等问题。

这里特别说明的是信号量只是在信号不够的时候挂起线程,但是并不能保证信号量足够的时候获取对象和返还对象是线程安全的,所以在清单1中仍然需要锁Lock来保证并发的正确性。

将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多?Lock?实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。

上面这段话的意思是说当某个线程A持有信号量数为1的信号量时,其它线程只能等待此线程释放资源才能继续,这时候持有信号量的线程A就相当于持有了“锁”,其它线程的继续就需要这把锁,于是线程A的释放才能决定其它线程的运行,相当于扮演了“锁”的角色。

?

2.1 信号量的公平性?

另外同公平锁非公平锁一样,信号量也有公平性。 如果一个信号量是公平的表示线程在获取信号量时按FIFO的顺序得到许可,也就是按照请求的顺序得到释放。这里特别说明的是:所谓请求的顺序是指在请求信 号量而进入FIFO队列的顺序,有可能某个线程先请求信号而后进去请求队列,那么次线程获取信号量的顺序就会晚于其后请求但是先进入请求队列的线程。这个 在公平锁和非公平锁中谈过很多。

?

除了acquire以外,Semaphore还有几种类似的acquire方法,这些方法可以更好的处理中断和超时或者异步等特性,可以参考JDK API。

按照同样的学习原则,下面对主要的实现进行分析。Semaphore的acquire方法实际上访问的是AQS的acquireSharedInterruptibly(arg)方法。这个可以参考http://blog.csdn.net/a511596982/article/details/8275624??一节。

所以Semaphore的await实现也是比较简单的。与CountDownLatch不同的是,Semaphore区分公平信号和非公平信号。

?

清单2 公平信号获取方法

?

[java] view plaincopy
  1. protected?int?tryAcquireShared(int?acquires)?{??????Thread?current?=?Thread.currentThread();??
  2. ????for?(;;)?{??????????Thread?first?=?getFirstQueuedThread();??
  3. ????????if?(first?!=?null?&&?first?!=?current)??????????????return?-1;??
  4. ????????int?available?=?getState();??????????int?remaining?=?available?-?acquires;??
  5. ????????if?(remaining?<?0?||??????????????compareAndSetState(available,?remaining))??
  6. ????????????return?remaining;??????}??
  7. }??

清单3 非公平信号获取方法

?

[java] view plaincopy
  1. protected?int?tryAcquireShared(int?acquires)?{??????return?nonfairTryAcquireShared(acquires);??
  2. }????
  3. final?int?nonfairTryAcquireShared(int?acquires)?{??????for?(;;)?{??
  4. ????????int?available?=?getState();??????????int?remaining?=?available?-?acquires;??
  5. ????????if?(remaining?<?0?||??????????????compareAndSetState(available,?remaining))??
  6. ????????????return?remaining;??????}??
  7. }??

对 比清单2和清单3可以看到,公平信号和非公平信号在于第一次尝试能否获取信号时,公平信号量总是将当前线程进入AQS的CLH队列进行排队(因为第一次尝 试时队列的头结点线程很有可能不是当前线程,当然不排除同一个线程第二次进入信号量),从而根据AQS的CLH队列的顺序FIFO依次获取信号量;而对于 非公平信号量,第一次立即尝试能否拿到信号量,一旦信号量的剩余数available大于请求数(acquires通常为1),那么线程就立即得到了释 放,而不需要进行AQS队列进行排队。只有remaining<0的时候(也就是信号量不够的时候)才会进入AQS队列。

所 以非公平信号量的吞吐量总是要比公平信号量的吞吐量要大,但是需要强调的是非公平信号量和非公平锁一样存在“饥渴死”的现象,也就是说活跃线程可能总是拿 到信号量,而非活跃线程可能难以拿到信号量。而对于公平信号量由于总是靠请求的线程的顺序来获取信号量,所以不存在此问题。

?

三、CountDownLatch?3.1 闭锁(Latch)?

闭 锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻 断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说 闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。

?

CountDownLatch是JDK 5+里面闭锁的一个实现,允许一个或者多个线程等待某个事件的发生。CountDownLatch有一个正数计数器,countDown方法对计数器做减操作,await方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。

?

CountDownLatch的API如下。

读书人网 >行业软件

热点推荐