AbstractQueuedSynchronizer(4)
Condition是一个条件功能的class,必须放在Lock代码块内,如同wait,notify方法放在synchronized块一样。
Condition的(await,signal)与object的(wait,notify)相比,提供了更为通用和灵活的解决方案,可以让多种条件的线程之间相互通信。
Condition的定义:
public interface Condition{ void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
Lock接口的newCondition方法是实现Condition的一种方式,这样Condition就可以和Lock合作得亲密无间。
在AbstractQueuedSynchronizer中也有Condition的实现ConditionObject:
1. ConditionObject.addConditionWaiter 方法
添加一个waiter到等待队列中
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
如果上一个waiter被取消了,则调用unlinkCancelledWaiters方法去掉队列里面的无效waiters。新建一个waiter并把它放到队列的尾部。
2. ConditionObject.unlinkCancelledWaiters方法
从condition队列中把取消的等待节点删除,目的是为了减少内存占用,提高效率。
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}
从起始节点firstWaiter出发,遍历到结束节点lastWaiter。
值得一提的是trail变量,保存的是上一个有效节点:
a. 如果trail == null,说明还没有有效的节点,所以把next节点暂且作为firstWaiter
b. 如果当前节点t无效,则把t剔除出列(t.nextWaiter = null),并把trail和next暂且关联起来(trail.nextWaiter = next)
c. 当next == null时,说明已经遍历完整个队列了,把trail作为结束节点
3. ConditionObject.doSignal方法
从开始节点开始,向后搜索直到找到一个不为null的等待节点并把它转移到同步队列上。
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
4. ConditionObject.signal方法
把等待最久的condition节点移到锁的等待队列中。
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
拿到第一个waiter,如果不为空,则调用doSignal方法。
5. ConditionObject.awaitUninterruptibly方法
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); }
首先把当前线程加入到condition的等待队列中,然后试着去释放当前等待节点。循环监测节点是否在锁的等待队列中,如果没有,就park当前线程。
6. ConditionObject.await方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
和awaitUninterruptibly类似,这里允许出现Interrupt。
7. ConditionObject.awaitNanos方法
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); long lastTime = System.nanoTime(); int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); }
实现指定时间的等待。
和awaitUninterruptibly类似。每次循环都会检查