ArrayBlockingQueue中对条件变量的使用

ArrayBlockingQueue中对条件变量的使用

Content #

条件变量(java.util.concurrent.Condition),如果说 ReentrantLock 是 synchronized 的替代选择,Condition 则是将 wait、notify、notifyAll 等操作转化为相应的对象,将复杂而晦涩的同步操作转变为直观可控的对象行为。

条件变量最为典型的应用场景就是标准类库中的 ArrayBlockingQueue 等。

我们参考下面的源码,首先,通过再入锁获取条件变量:

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
      throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

两个条件变量是从同一再入锁创建出来,然后使用在特定操作中,如下面的 take 方法,判断和等待条件满足:

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
      while (count == 0)
          notEmpty.await();
      return dequeue();
  } finally {
      lock.unlock();
  }
}

当队列为空时,试图 take 的线程的正确行为应该是等待入队发生,而不是直接返回,这是 BlockingQueue 的语义,使用条件 notEmpty 就可以优雅地实现这一逻辑。

那么,怎么保证入队触发后续 take 操作呢?请看 enqueue 实现:

private void enqueue(E e) {
  // assert lock.isHeldByCurrentThread();
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = e;
  if (++putIndex == items.length) putIndex = 0;
  count++;
  notEmpty.signal(); // 通知等待的线程,非空条件已经满足
}

通过 signal/await 的组合,完成了条件判断和通知等待线程,非常顺畅就完成了状态流转。注意,signal 和 await 成对调用非常重要,不然假设只有 await 动作,线程会一直等待直到被打断(interrupt)。

Viewpoints #

From #

第15讲 | synchronized和ReentrantLock有什么区别呢?