AbstractQueuedSynchronizer中的ConditionObject剖析

在多线程环境中, 有时候, 一个线程的执行是需要等待一个条件发生后才能执行的. 在经典的生产者和消费者模式中, 如果缓冲区满后, 生产者是不能向缓冲区投放item的, 它需要等待一个条件: 缓冲区不为满的状态. 同理, 如果缓冲区为空时, 消费者是不能消费item的, 它需要等待一个条件: 缓冲区不为空. 一个线程需要等待一定的条件发生, 这个条件往往是别的线程触发的, 这就是经典的等待/唤醒模式.
在JDK1.5之前要实现这种模式的话, 只能够借助synchronized关键字和Object的对象锁来实现. 在1.5之后, 可以利用基于AQS实现的锁和AQS内部的ConditionObject来实现. 下面以ReentrantLock为例实现一个等待/唤醒模式
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
 | public class ConditionObjectTest {     private ReentrantLock lock = new ReentrantLock();     private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();     private Condition isEmpty = lock.newCondition();     private Condition isFull = lock.newCondition();     private int count;     private static final int MAX_LENGTH = 10;     public void product(int i) {         ReentrantLock reentrantLock = lock;         lock.lock();         if (count >= MAX_LENGTH) {             try {                 isEmpty.await();              } catch (InterruptedException e) {                 e.printStackTrace();             }         }         queue.add(i);         count++;         isFull.signal();          reentrantLock.unlock();     }     public void consume() {         ReentrantLock reentrantLock = lock;         lock.lock();         if (queue.isEmpty()) {             try {                 isFull.await();              } catch (InterruptedException e) {                 e.printStackTrace();             }         }         int i = queue.peek();          count--;         isEmpty.signal();          reentrantLock.unlock();     } }
 | 
在这个例子中定义了两个条件, 一个是缓冲区为空的条件, 另外一个是缓冲区为满的条件. 当队列达到最大长度时, 也就是缓冲区满了, 此时生产者调用了isEmpty.await()来等待一个缓冲区不为满的条件, 因此线程会暂时被挂起. 这个条件是由消费者消费了一个item后调用isEmpty.signal()是触发的. 触发了这个条件后, 会唤醒处于等待的生产者线程, 使它从isEmpty.await()中返回. 至于当缓冲区为满时的情况原理是一样的, 这里不多分析. 下面主要分析AQS内部怎么实现等待通知模式的.
ConditionObject
一般而言, 线程是否需要等待一个条件的判断, 这个判断往往是访问一个共享变量, 在前面的例子中, 这个共享变量是缓冲区. 因此, 每次等待一个条件或者触发一个条件时, 都必须先获得锁. 这也解释为什么ConditionObject会作为AQS的内部类.
ConditionObject的等待/通知方法
ConditionObject中的等待方法支持的类型跟AQS中一样, 都支持不可中断, 可中断, 超时三种类型.
等待
- awaitUninterruptibly(): 不可中断的等待一个条件
 
- await(): 响应中断的等待一个条件
 
- awaitNanos(long nanosTimeout): 超时等待一个条件, 如果超过指定的等待时间的话, 会直接返回. 超时等待还有两个重载方法, 这里只列出一个.
 
通知
awaitUninterruptibly()解读
三种类型的等待方法的实现逻辑跟AQS中的获取同步状态的三种类型差不多, 这里只分析awaitUninterruptibly().
| 1 2 3 4 5 6 7 8 9 10 11 12
 | public final void awaitUninterruptibly() {     Node node = addConditionWaiter();      int savedState = fullyRelease(node);      boolean interrupted = false;     while (!isOnSyncQueue(node)) {          LockSupport.park(this);          if (Thread.interrupted())             interrupted = true;     }     if (acquireQueued(node, savedState) || interrupted)          selfInterrupt(); }
 | 
由于每次调用这个方法时, 必定时已经获取了锁的, 所以不用控制同步, 实现起来比较简单. 首先将当前节点添加进等待队列, 接着释放同步状态, 也就是释放锁, 它准备要被挂起了, 挂起前必须释放同步状态, 不然有可能引起死锁. 然后, 判断节点是否存在同步队列中, 如果不存在的话,证明已经被添加进等待队列中, 此时进入While循环挂起线程. 接下来执行到这里就停了. 需要等待其他线程触发它等待的条件.
signal()解读
| 1 2 3 4 5 6 7
 | public final void signal() {     if (!isHeldExclusively())         throw new IllegalMonitorStateException();     Node first = firstWaiter;     if (first != null)         doSignal(first); }
 | 
每次都是释放等待队列中的第一个节点, 说明等待队列是一个FIFO队列. 释放的主要逻辑都在doSignal(first)中.
| 1 2 3 4 5 6 7 8
 | private void doSignal(Node first) {     do {         if ( (firstWaiter = first.nextWaiter) == null)             lastWaiter = null;         first.nextWaiter = null;     } while (!transferForSignal(first) &&              (first = firstWaiter) != null); }
 | 
在doSignal()中会调用transferForSignal(first)将等待队列中的节点移动到同步队列中
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
 | final boolean transferForSignal(Node node) {           * If cannot change waitStatus, the node has been cancelled.      */     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))         return false;           * Splice onto queue and try to set waitStatus of predecessor to      * indicate that thread is (probably) waiting. If cancelled or      * attempt to set waitStatus fails, wake up to resync (in which      * case the waitStatus can be transiently and harmlessly wrong).      */     Node p = enq(node);     int ws = p.waitStatus;     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))         LockSupport.unpark(node.thread);     return true; }
 | 
首先通过CAS设置节点的状态, 如果设置失败的话, 说明节点已经被取消. 接着调用enq(node)方法, 将节点移动到同步队列中, 然后设置节点的状态为SIGNAL.最后唤醒线程. 唤醒后, 在之前的等待方法中, 会被执行.
| 1 2 3 4 5 6 7 8 9 10 11 12
 | public final void awaitUninterruptibly() {     Node node = addConditionWaiter();      int savedState = fullyRelease(node);      boolean interrupted = false;     while (!isOnSyncQueue(node)) {          LockSupport.park(this);          if (Thread.interrupted())             interrupted = true;     }     if (acquireQueued(node, savedState) || interrupted)          selfInterrupt(); }
 | 
由于节点已经被移动到同步队列中, 所以isOnSyncQueue(node)会返回true跳出循环, 接着调用acquireQueued(node, savedState)来竞争同步状态, 也就是重新获得锁. 如果成功的话, 将从awaitUninterruptibly()中返回.
对于signalAll(), 它通过一个循环, 调用signal()来实现唤醒等待队列中的全部线程.
总结
当一个线程调用等待方法时, 它首先会把自己添加进等待队列中, 接着释放同步状态, 然后被挂起. 直到其他线程调用唤醒的方法, 节点会被移动到同步队列中并且唤醒对应的线程去竞争同步状态, 如果成功的话, 将从等待的方法中返回, 下面是逻辑图:
