读书人

Java Concurrent Programming (四)

发布时间: 2012-08-27 21:21:57 作者: rapoo

Java Concurrent Programming (4)
4 锁
锁是递归的,是基于每线程的。锁提供了两种主要特性:互斥(mutual exclusion)和可见性(visibility)。互斥即一次只允许一个线程持有某个特定的锁,因此可使用该特性实现对共享数据的协调访问协议,这样,一次就只有一个线程能够使用该共享数据。可见性要更加复杂一些,它必须确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的。Java语言中,使用synchronized关键字可以实现锁机制,但是它有很多限制:
1)无法中断一个正在等候获得锁的线程。
2)没有办法改变锁的语意,即重入性,获取锁的公平性等。
3)方法和块内的同步,使得只能够够对严格的块结构使用锁。例如:不能在一个方法中获得锁,而在另外一个方法中释放锁。
在这种情况下,引入了java.util.concurrent.locks具有编写的高质量、高效率、语义上准确的线程控制结构工具包。

4.1 Lock & Condition & ReadWriteLock接口
Lock接口定义如下:

public interface Lock {  void lock();  void lockInterruptibly() throws InterruptedException;  boolean tryLock();  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;  void unlock();  Condition newCondition();}

lock方法与tryLock,都是获取锁,但是不同之处在于,tryLock方法如果获取不到锁,并不会等待,如果获取到锁,需要显示的释放。下面是使用的例子:
Lock lock = new ReentrantLock();      if (lock.tryLock()) {           try {              // manipulate protected state          } finally { // if acquire lock ,must release finally              lock.unlock();          }      } else {          // perform alternative actions      }

lockInterruptibly表示如果当前线程未被中断,则获取锁。Condition是一个具有条件功能变量的class,必须要Lock interface并用。因为这个新的interface是与调用的对象以及lock对象分开的。其定义如下:
public interface Condition{ void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }

其await和signal方法与Object类的wait,notify的使用是类似的。awaitUninterruptibly表示等待条件的发生,与await方法不同是,它的调用不可能被中断。
awaitUnti表示等待条件的发生,如果通知没有在指定的绝对时间前出现,返回false值。一般使用如下:
private final Lock lock = new ReentrantLock();private final Condition condition = lock.newCondition();

接口ReadWriteLock表示一个读写锁,
public interface ReadWriteLock {Lock readLock();Lock writeLock();}

readLock用于只读操作,writeLock用于写入操作。只要没有 writer,读取锁可以由多个 reader线程同时保持。写入锁是独占的。

4.2 ReentrantLock类
ReentrantLock实现了Lock接口,具有与内部锁相同的互斥、重入性和内存可见性的保证,它必须被显式地释放。ReentrantLock是可中断的、可定时的,非块结构锁。在Java5中,ReentrantLock的性能要远远高于内部锁。在Java6中,由于管理内部锁的算法采用了类似于ReentrantLock使用的算法,因此内部锁和ReentrantLock之间的性能差别不大。
reentrant 锁意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个 synchronized 块时,才释放锁。
ReentrantLock的构造函数有两个:
public ReentrantLock();public ReentrantLock(boolean fair);

fair表示是否是公平性,true表示是公平性获取锁。默认的构造是非公平性获取锁。在公平锁中,如果锁已被其它线程占有,那么请求线程会加入到等待队列中,并按顺序获得锁;在非公平锁中,当请求锁的时候,如果锁的状态是可用,那么请求线程可以直接获得锁,而不管等待队列中是否有线程已经在等待该锁。公平锁的代价是更多的挂起和重新开始线程的性能开销。在多数情况下,非公平锁的性能高于公平锁。一下是一个例子:
public class ReentrantLockDemo{//private int count;private final int CAPACITY = 1;private final Lock lock = new ReentrantLock();private final Condition isEmpty = lock.newCondition();private final Condition isFull = lock.newCondition();private final Object[] cache = new Object[CAPACITY];public ReentrantLockDemo(int count){this.count = count;}public Object get() throws InterruptedException{lock.lock();try {while(isEmpty()){isEmpty.await();}count--;isFull.signal();return cache[count];} finally{lock.unlock();}}public void put() throws InterruptedException{lock.lock();try {while(isFull()){isFull.await();}count++;cache[0] = Thread.currentThread().getName() + "put a message";System.out.println(cache[0]);isEmpty.signal();} finally{lock.unlock();}}public boolean isFull(){return count == cache.length;}public boolean isEmpty(){return count == 0;}}


4.3 ReentrantReadWriteLock类
ReentrantReadWriteLock实现了ReadWriteLock接口,支持与ReentrantLock类似语义的ReadWriteLock实现,将控制的更细粒化,将锁分为读锁和写锁。其也支持公平性锁竞争的策略。当一个线程申请读锁时,只要写入锁没有被其他线程持有,那么会立刻拥有读锁。当一个线程申请写入锁时,其他线程不能持有读锁和写入锁,否则会一直等待。如果当前线程已经持有写入锁,然后再次申请写入锁,会成功,但是需要释放两次锁。以下是一个小例子:
public class ReentrantReadWriteLockDemo {//private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();private final Lock read = rwl.readLock();private final Lock write = rwl.writeLock();private final Map<String,Object> map = new HashMap<String, Object>();public Object get(String key) {read.lock();try {return map.get(key);} finally{read.unlock();}}public void set(String key, Object value){write.lock();try {map.put(key, value);} finally{write.unlock();}}public void clear(){write.lock();try {map.clear();} finally{write.unlock();}}}


4.4 Semaphore
信号量,其维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数。
信号量通常用于限制可以访问某些资源的线程数目。下面这个小程序会帮助您理解Semaphore:
public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(5);for (int index = 0; index < 20; index++) {final int NO = index;Runnable run = new Runnable() {public void run() {try {semaphore.acquire();System.out.println("acquire thread" + NO + "***********");Thread.sleep(3000);semaphore.release();System.out.println("release thread" + NO + "###########");} catch (Exception e) {}}};service.execute(run);}service.shutdown();}


4.5 Mutex
非重入互斥独占锁(non-reentrant mutual exclusion lock),它不提供公平性锁竞争策略已经顺序化资源的保证。如果锁已经被执行acquire的线程持有,当再次acquire后,会形成阻塞。Mutex可以看做信号量为1的Semaphore。其acquire和release方法,可以不用成对在同一个方法中出现。其定义如下:
public class Mutex implemets Sync {    public void acquire() throws InterruptedException;    public void release();    public boolean attempt(long msec) throws InterruptedException;}

如果当前线程在试图acquire和attempt获取锁的过程中被中断,会抛出InterruptedException异常。

4.6 Latch
闭锁,一个release操作将使得所有之前或者之后的acquire操作都恢复执行。
util.concurrent下的CountDownLatch是其的扩展,其主要方法为countDown方法和await方法,前者表示计数减一,后者表示等待计数为0,如果计数不为0,就形成阻塞,一直阻塞。以下是一个例子,模拟百米才跑比赛,所有人到达终点后,比赛结束:
public class CountDownLatchDemo {//private static final int PLAY_AMOUNT = 10;public static void main(String[] args) {CountDownLatch begin = new CountDownLatch(1);CountDownLatch end = new CountDownLatch(PLAY_AMOUNT);Player[] players = new Player[PLAY_AMOUNT];for(int i = 0; i < players.length; i ++){players[i] = new Player(i+1, begin, end);}ExecutorService executor = Executors.newFixedThreadPool(PLAY_AMOUNT);for(Player player : players){executor.execute(player);}System.out.println("game begin ");begin.countDown();try {end.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("game over ");executor.shutdown();}}class Player implements Runnable{//private final int id;private final CountDownLatch begin;private final CountDownLatch end;public Player(int id, CountDownLatch begin, CountDownLatch end){this.id = id;this.begin = begin;this.end = end;}public void run(){try {begin.await();Thread.sleep((long)(Math.random() * 100));System.out.println("Player " + id + " has arrived");} catch (InterruptedException e) {e.printStackTrace();}finally{end.countDown();}}}

4.7 Barrier
Barrier能够阻塞一组线程,其与闭锁的区别在于:闭锁等待的是事件,barrier等待的是线程。CyclicBarrier表示的cyclic就是表示,线程在释放后可以重用。有两个构造函数:
public CyclicBarrier(int parties, Runnable barrierAction);public CyclicBarrier(int parties);

parties表示可阻挡线程数量,barrierAction表示线程数量到达parties后执行的动作,null表示没有执行动作。第二个版本的构造函数默认为null的执行动作。
public int await() throws InterruptedException, BrokenBarrierExceptionpublic int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException;public void reset();

barrier的核心是await方法,这个方法的行为与Condition的await方法相似。有一个选项可以选择是等到barrier释放掉thread还是有时限条件。不需要signal()方法是因为通知就是由barrier在有正确数目的参与者等待的时候所完成的。barrier可以被reset()方法重置到初始状态以备下一次使用,如果此时有线程在等待,调用reset后会抛出BrokenBarrierException异常。
然而,发生异常的条件会导致barrier失败,当barrier失败的时候,CyclicBarrier会解开barrier并以BrokenBarrierException释放掉正在await方法上等待的thread。barrier会被数种原因解开,等待中的thread被中断,超时或是因为barrier的动作所抛出的异常。但是在每种异常条件下,barrier只是被打开。barrier中的getNumberWaiting()方法返回在barrier上等待thread的数量,getParties()方法返回构造的parties数量,isBroken()方法查询barrier是否已被打开。
以下这个小例子,会帮助您理解CyclicBarrier,模拟5人去登山,当5人都到齐了,便出发:
public class CyclicBarrierDemo {public static void main(String[] args) {final ExecutorService executor = Executors.newCachedThreadPool();final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable(){public void run(){System.out.println("all people arrived , let's go");}});for(int i = 0; i < 5; i ++){executor.execute(new Runnable(){public void run(){try {Thread.sleep((long)(Math.random() * 100));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getId() + " has arrived");try {barrier.await();  // wait other people to arrive} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}});}executor.shutdown();}}

读书人网 >编程

热点推荐