读书人

相关Java 5.0+ 并发包的探讨-2 sectio

发布时间: 2012-12-21 12:03:50 作者: rapoo

有关Java 5.0+ 并发包的探讨-2 section -补充-2

Exchanger

Exchanger 类方便了两个共同操作线程之间的双向交换;
Exchanger 通常用于一个线程填充缓冲,而另一个线程清空缓冲的情况。当两个线程在屏障处集合时(交换点),它们交换缓冲,为了解释Exchanger的使用方法,先看看其实现原理,在Exchanger中最核心的方法是exchange, 其JDK的实现是:

 public V exchange(V x) throws InterruptedException {        if (!Thread.interrupted()) {            Object v = doExchange(x == null? NULL_ITEM : x, false, 0);            if (v == NULL_ITEM)                return null;            if (v != CANCEL)                return (V)v;            Thread.interrupted(); // Clear interrupt status on IE throw        }        throw new InterruptedException();    }

?

调用了doExchange子方法,我们来看看其具体的实现:

?

 private Object doExchange(Object item, boolean timed, long nanos) {        Node me = new Node(item);                 // Create in case occupying        int index = hashIndex();                  // Index of current slot        int fails = 0;                            // Number of CAS failures        for (;;) {            Object y;                             // Contents of current slot            Slot slot = arena[index];            if (slot == null)                     // Lazily initialize slots                createSlot(index);                // Continue loop to reread            else if ((y = slot.get()) != null &&  // Try to fulfill                     slot.compareAndSet(y, null)) {                Node you = (Node)y;               // Transfer item                if (you.compareAndSet(null, item)) {                    LockSupport.unpark(you.waiter);                    return you.item;                }                                 // Else cancelled; continue            }            else if (y == null &&                 // Try to occupy                     slot.compareAndSet(null, me)) {                if (index == 0)                   // Blocking wait for slot 0                    return timed? awaitNanos(me, slot, nanos): await(me, slot);                Object v = spinWait(me, slot);    // Spin wait for non-0                if (v != CANCEL)                    return v;                me = new Node(item);              // Throw away cancelled node                int m = max.get();                if (m > (index >>>= 1))           // Decrease index                    max.compareAndSet(m, m - 1);  // Maybe shrink table            }            else if (++fails > 1) {               // Allow 2 fails on 1st slot                int m = max.get();                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))                    index = m + 1;                // Grow on 3rd failed slot                else if (--index < 0)                    index = m;                    // Circularly traverse            }        }    }

?

? Slot类是从AtomicRefrence继承而来的,也就是说交换是基于原子操作的,另外从上可以看到是如何交换数据的!并且我试图补齐在其实现Class上的注释中的例子:

class FillAndEmpty {Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();DataBuffer initialEmptyBuffer = new DataBuffer();DataBuffer initialFullBuffer = new DataBuffer();class FillingLoop implements Runnable {public void run() {       DataBuffer currentBuffer = initialEmptyBuffer;      try {         while (currentBuffer != null) {          addToBuffer(currentBuffer);          if (currentBuffer.full())            currentBuffer = exchanger.exchange(currentBuffer);        }      } catch (InterruptedException ex) { }}}class EmptyingLoop implements Runnable {public void run() {      DataBuffer currentBuffer = initialFullBuffer;       try {         while (currentBuffer != null) {          takeFromBuffer(currentBuffer);          if (currentBuffer.empty())            currentBuffer = exchanger.exchange(currentBuffer);        }      } catch (InterruptedException ex) { }}void start() {new Thread(new FillingLoop()).start();new Thread(new EmptyingLoop()).start();}}

??

?

读书人网 >编程

热点推荐