Java中Condition简介

Condition对象是由某个显式锁Lock创建的,一个显式锁Lock可以创建多个Condition对象与之关联,Condition的作用在于控制锁并且判断某个条件(临界值)是否满足,如果不满足,那么使用该锁的线程将会被挂起等待另外的线程将其唤醒,与此同时被挂起的线程将会进入阻塞队列中并且释放对显式锁Lock的持有,这一点与对象监视器的wait()方法非常类似。

介绍

condition接口提供了类似Object的监视器的方法,用来实现等待通知模式。但是与Object监视器又有一定的区别与不同。

Condition是Java中的接口,提供了与Object#waitObject#notify相同的功能。

Doug Lea在Condition接口的描述中提到了这点:

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to “wait”) until notified by another thread that some state condition may now be true.

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 声明锁的对象
ReentrantLock lock = new ReentrantLock();
// 获取监视器
Condition condition = lock.newCondition();

// 获取锁
lock.lock();

// 等待
condition.await();
// 通知
condition.signal();

//释放锁
lock.unlock();

当调用await()方法之后,当前线程会释放锁并在此等待,其他线程调用Condition对象signal()方法,通知当前线程后,当前线程才从await()返回,但是返回对前提也就是需要获得锁。

既然ConditionObject提供的等待与唤醒功能相同,那么它们的用法是不是也很相似呢?

与调用Object#waitObject#notifyAll必须处于synchronized修饰的代码中一样(获取Monitor),调用Condition#awaitCondition#signalAll的前提是要先获取锁。但不同的是,使用Condition前,需要先通过锁去创建Condition

ReentrantLock中提供的Condition为例,首先是创建Condition对象:

1
2
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

然后是获取锁并调用await方法:

1
2
3
4
5
6
7
8
9
new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.unlock();
}

最后,通过调用singalAll唤醒全部阻塞中的线程:

1
2
3
4
5
new Thread(() -> {
lock.lock();
condition.signalAll();
lock.unlock();
}

等待/通知分析

每一个Condition对象包含一个等待队列,该队列是Condition实现的关键,用来实现等待/通知的功能。在Condition中可以拥有多个等待队列。

  • 调用await()方法加入条件等待队列
  • 调用signal()方法从条件等待队列中移出,加入同步队列开始锁的获取

等待

当调用await方法时,当前线程会进入等待队列,并且释放锁,线程状态会变为等待状态(CONDITION :-2);

当调用await()方法时,相当同步队列的首节点移动到了等待队列当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void await() throws InterruptedException {
// 线程被中断,恢复中断 并抛出移除
if (Thread.interrupted())
throw new InterruptedException();
// 添加到等待队列当中
Node node = addConditionWaiter();
// 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;


// 判断是否在等待队列当中 不在推出出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

// 唤醒同步队列中到后继节点;node节点开始自旋获取
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

通知

调用Condition的signal()方法,会唤醒在等待队列中等待时候最长的节点,然后移动到同步队列当中。

1
2
3
4
5
6
7
8
9
10
public final void signal() {
// 判断当前线程释放获取锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();

Node first = firstWaiter;
if (first != null)
// 从等待队列中移出 并且添加到同步队列当中
doSignal(first);
}

当执行了doSignal方法后,当前线程从等待队列中移除,await方法中将继续执行,调用acquireQueued(node),内部进行自旋获取锁,获取锁当条件:

  • 1、前节点是头节点
  • 2、获取到同步状态

Condition接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface Condition {
void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();
}

Condition只提供了两个功能:等待(await)和唤醒(signal),与Object提供的等待与唤醒时相似的:

1
2
3
4
5
6
7
8
9
10
11
public final void wait() throws InterruptedException;

public final void wait(long timeoutMillis, int nanos) throws InterruptedException;

public final native void wait(long timeoutMillis) throws InterruptedException;

@HotSpotIntrinsicCandidate
public final native void notify();

@HotSpotIntrinsicCandidate
public final native void notifyAll();

唤醒功能上,ConditionObject的差异并不大:

  • Condition#signalObject#notify
  • Condition#signalAll=Object#notifyAll

多个线程处于等待状态时,Object#notify()是“随机”唤醒线程,而Condition#signal则由具体实现决定如何唤醒线程,如:ConditionObject唤醒的是最早进入等待的线程但两个方法均只唤醒一个线程。

等待功能上,ConditionObject的共同点是:都会释放持有的资源Condition释放锁Object释放Monitor,即进入等待状态后允许其他线程获取锁/监视器。

主要的差异体现在Condition支持了更加丰富的场景,通过一张表格来对比下:

Condition方法Object方法解释
Condition#await()Object#wait()暂停线程,抛出线程中断异常
Condition#awaitUninterruptibly()/暂停线程,不抛出线程中断异常
Condition#await(time, unit)Object#wait(timeoutMillis, nanos)暂停线程,直到被唤醒或等待指定时间后,超时后自动唤醒返回false,否则返回true
Condition#awaitUntil(deadline)/暂停线程,直到被唤醒或到达指定时间点,超时后自动唤醒返回false,否则返回true
Condition#awaitNanos(nanosTimeout)/暂停线程,直到被唤醒或等待指定时间后,返回值表示被唤醒时的剩余时间(nanosTimeout-耗时),结果为负数表示超时

除了以上差异外,Condition还支持创建多个等待队列,即同一把锁拥有多个等待队列,线程在不同队列中等待,而Object只有一个等待队列。

源码分析

作为接口Condition非常惨,因为在Java中只有AQS中的内部类ConditionObject实现了Condition接口:

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;

private transient Node lastWaiter;
}

static final class Node {
// 省略
}
}

ConditionObject只有两个Node类型的字段,分别是链式结构中的头尾节点,ConditionObject就是通过它们实现的等待队列。那么ConditionObject的等待队列起到了怎样的作用呢?是类似于AQS中的排队机制吗?带着这两个问题,我们正是开始源码的分析。

await方法的实现

Condition接口中定义了4个线程等待的方法:

  • void await() throws InterruptedException
  • void awaitUninterruptibly();
  • long awaitNanos(long nanosTimeout) throws InterruptedException;
  • boolean await(long time, TimeUnit unit) throws InterruptedException;
  • boolean awaitUntil(Date deadline) throws InterruptedException;

方法虽然很多,但它们之间的差异较小,只体现在时间的处理上,我们看其中最常用的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final void await() throws InterruptedException {
// 线程中断,抛出异常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 注释1:加入到Condition的等待队列中
Node node = addConditionWaiter();
// 注释2:释放持有锁(调用AQS的release)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 注释3:判断是否在AQS的等待队列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 中断时退出方法
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}

// 加入到AQS的等待队列中,调用AQS的acquireQueued方法
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}

// 断开与Condition队列的联系
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}

if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}

注释1的部分,调用addConditionWaiter方法添加到Condition队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Node addConditionWaiter() {
// 判断当前线程是否为持有锁的线程
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}

// 获取Condition队列的尾节点
Node t = lastWaiter;
// 断开不再位于Condition队列的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}

// 创建Node.CONDITION模式的Node节点
Node node = new Node(Node.CONDITION);
if (t == null) {
// 队列为空的场景,将node设置为头节点
firstWaiter = node;
} else {
// 队列不为空的场景,将node添加到尾节点的后继节点上
t.nextWaiter = node;
}
// 更新尾节点
lastWaiter = node;
return node;
}

可以看到,Condition的队列是一个朴实无华的双向链表,每次调用addConditionWaiter方法,都会加入到Condition队列的尾部。

注释2的部分,释放线程持有的锁,同时移出AQS的队列,内部调用了AQS的release方法:

1
2
3
4
5
6
7
8
9
10
11
12
=final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState)) {
return savedState;
}
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}

因为已经分析过AQS的release方法和ReentrantLock实现的tryRelease方法,这里我们就不过多赘述了。

注释3的部分,isOnSyncQueue判断当前线程是否在AQS的等待队列中,我们来看此时存在的情况:

  • 如果isOnSyncQueue返回false,即线程不在AQS的队列中,进入自旋,调用LockSupport#park暂停线程;
  • 如果isOnSyncQueue返回true,即线程在AQS的队列中,不进入自旋,执行后续逻辑。

结合注释1和注释2的部分,Condition#await的实现原理了就很清晰了:

  • Condition与AQS分别维护了一个等待队列,而且是互斥的,即同一个节点只会出现在一个队列中
  • 当调用Condition#await时,将线程添加到Condition的队列中(注释1),同时从AQS队列中移出(注释2);
  • 接着判断线程位于的队列:
    • 位于Condition队列中,该线程需要被暂停,调用LockSupport#park
    • 位于AQS队列中,该线程正在等待获取锁。

基于以上的结论,我们已经能够猜到唤醒方法Condition#signalAll的原理了:

  • 将线程从Condition队列中移出,并添加到AQS的队列中;
  • 调用LockSupport.unpark唤醒线程。

至于这个猜想是否正确,我们接着来看唤醒方法的实现。

signal和signalAll方法的实现

来看signalsignalAll的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 唤醒一个处于等待中的线程
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 获取Condition队列中的第一个节点
Node first = firstWaiter;
if (first != null) {
// 唤醒第一个节点
doSignal(first);
}
}

// 唤醒全部处于等待中的线程
public final void signalAll() {
if (!isHeldExclusively()){
throw new IllegalMonitorStateException();
}

Node first = firstWaiter;
if (first != null) {
// 唤醒所有节点
doSignalAll(first);
}
}

两个方法唯一的差别在于头节点不为空的场景下,是调用doSignal唤醒一个线程还是调用doSignalAll唤醒所有线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doSignal(Node first) {
do {
// 更新头节点
if ( (firstWaiter = first.nextWaiter) == null) {
// 无后继节点的场景
lastWaiter = null;
}
// 断开节点的连接
first.nextWaiter = null;
// 唤醒头节点
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}

private void doSignalAll(Node first) {
// 将Condition的队列置为空
lastWaiter = firstWaiter = null;
do {
// 断开链接
Node next = first.nextWaiter;
first.nextWaiter = null;
// 唤醒当前头节点
transferForSignal(first);
// 更新头节点
first = next;
} while (first != null);
}

可以看到,无论是doSignal还是doSignalAll都只是将节点移出Condition队列,而真正起到唤醒作用的是transferForSignal方法,从方法名可以看到该方法是通过“转移”进行唤醒的,我们来看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean transferForSignal(Node node) {
// 通过CAS替换node的状态
// 如果替换失败,说明node不处于Node.CONDITION状态,不需要唤醒
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
return false;
}
// 将节点添加到AQS的队列的队尾
// 并返回老队尾节点,即node的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 对前驱节点状态的判断
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
LockSupport.unpark(node.thread);
}
return true;
}

transferForSignal方法中,调用enq方法将node重新添加到AQS的队列中,并返回node的前驱节点,随后对前驱节点的状态进行判断:

  • 当ws>0时,前驱节点处于Node.CANCELLED状态,前驱节点退出锁的争抢,node可以直接被唤醒;
  • 当ws≤0时,通过CAS修改前驱节点的状态为Node.SIGNAL,设置失败时,直接唤醒node

AQS的今生,构建出JUC的基础》中介绍了waitStatus的5种状态,其中Node.SIGNAL状态表示需要唤醒后继节点。另外,在分析shouldParkAfterFailedAcquire方法的源码时,我们知道在进入AQS的等待队列时,需要将前驱节点的状态更新为Node.SIGNAL

最后来看enq的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(Node node) {
for (;;) {
// 获取尾节点
Node oldTail = tail;
if (oldTail != null) {
// 更新当前节点的前驱节点
node.setPrevRelaxed(oldTail);
// 更新尾节点
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
// 返回当前节点的前驱节点(即老尾节点)
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}

enq的实现就非常简单了,通过CAS更新AQS的队列尾节点,相当于添加到AQS的队列中,并返回尾节点的前驱节点。好了,唤醒方法的源码到这里就结束了,是不是和我们当初的猜想一模一样呢?

原理

功能上,Condition实现了AQS版Object#waitObject#notify,用法上也与之相似,需要先获取锁,即需要在lockunlock之间调用。原理上,简单来说就是线程在AQS的队列和Condition的队列之间的转移。

线程t持有锁

假设有线程t已经获取了ReentrantLock,线程t1,t2和t3正在AQS的队列中等待,我们可以得到这样的结构:

线程t执行Condition#await

如果线程t中调用了Condition#await方法,线程t进入Condition的等待队列中,线程t1获取ReentrantLock,并从AQS的队列中移出,结构如下:

线程t1执行Condition#await

如果线程t1中也执行了Condition#await方法,同样线程t1进入Condition队列中,线程t2获取到ReentrantLock,结构如下:

线程t2执行Condition#signal

如果线程t2执行了Condition#signal,唤醒Condition队列中的第一个线程,此时结构如下:

通过上面的流程,我们就可以得到线程是如何在Condition队列与AQS队列中转移的:

小结

condition可以用来实现等待/通知模式。具体实现逻辑是,等待的线程await加入等待队列,当被通知后,从等待队列中移出,添加到aqs同步队列中,然后继续开始锁到获取(自旋获取)。

本文标题:Java中Condition简介

文章作者:LiJing

发布时间:2023年05月17日 - 18:21:18

最后更新:2023年06月03日 - 10:01:56

原始链接:https://blog-next.xiaojingge.com/posts/1231959411.html

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------------本文结束 感谢您的阅读-------------------