Condition对象是由某个显式锁Lock创建的,一个显式锁Lock可以创建多个Condition对象与之关联,Condition的作用在于控制锁并且判断某个条件(临界值)是否满足,如果不满足,那么使用该锁的线程将会被挂起等待另外的线程将其唤醒,与此同时被挂起的线程将会进入阻塞队列中并且释放对显式锁Lock的持有,这一点与对象监视器的wait()方法非常类似。
介绍
condition
接口提供了类似Object的监视器的方法,用来实现等待通知模式。但是与Object监视器又有一定的区别与不同。
Condition
是Java中的接口,提供了与Object#wait
和Object#notify
相同的功能。
Doug Lea在Condition
接口的描述中提到了这点:
Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to “wait”) until notified by another thread that some state condition may now be true.
简单使用
1 | // 声明锁的对象 |
当调用await()
方法之后,当前线程会释放锁并在此等待,其他线程调用Condition对象signal()
方法,通知当前线程后,当前线程才从await()
返回,但是返回对前提也就是需要获得锁。
既然Condition
与Object
提供的等待与唤醒功能相同,那么它们的用法是不是也很相似呢?
与调用Object#wait
和Object#notifyAll
必须处于synchronized
修饰的代码中一样(获取Monitor),调用Condition#await
和Condition#signalAll
的前提是要先获取锁。但不同的是,使用Condition
前,需要先通过锁去创建Condition
。
以ReentrantLock
中提供的Condition
为例,首先是创建Condition
对象:
1 | ReentrantLock lock = new ReentrantLock(); |
然后是获取锁并调用await
方法:
1 | new Thread(() -> { |
最后,通过调用singalAll
唤醒全部阻塞中的线程:
1 | new Thread(() -> { |
等待/通知分析
每一个Condition对象包含一个等待队列,该队列是Condition实现的关键,用来实现等待/通知
的功能。在Condition中可以拥有多个等待队列。
- 调用await()方法加入
条件等待队列
- 调用signal()方法从
条件等待队列
中移出,加入同步队列开始锁的获取
等待
当调用await方法时,当前线程会进入等待队列,并且释放锁,线程状态会变为等待状态(CONDITION :-2);
当调用await()方法时,相当同步队列的首节点移动到了等待队列当中。
1 | public final void await() throws InterruptedException { |
通知
调用Condition的signal()方法,会唤醒在等待队列中等待时候最长的节点,然后移动到同步队列当中。
1 | public final void signal() { |
当执行了doSignal方法后,当前线程从等待队列中移除,await方法中将继续执行,调用acquireQueued(node)
,内部进行自旋获取锁,获取锁当条件:
- 1、前节点是头节点
- 2、获取到同步状态
Condition接口
1 | public interface Condition { |
Condition
只提供了两个功能:等待(await)和唤醒(signal)
,与Object
提供的等待与唤醒时相似的:
1 | public final void wait() throws InterruptedException; |
唤醒功能上,Condition
与Object
的差异并不大:
Condition#signal
≈Object#notify
Condition#signalAll
=Object#notifyAll
多个线程处于等待状态时,Object#notify()
是“随机”唤醒线程,而Condition#signal
则由具体实现决定如何唤醒线程,如:ConditionObject
唤醒的是最早进入等待的线程,但两个方法均只唤醒一个线程。
等待功能上,Condition
与Object
的共同点是:都会释放持有的资源,Condition
释放锁,Object
释放Monitor,即进入等待状态后允许其他线程获取锁/监视器。
主要的差异体现在Condition
支持了更加丰富的场景,通过一张表格来对比下:
Condition 方法 | Object 方法 | 解释 |
---|---|---|
Condition#await() | Object#wait() | 暂停线程,抛出线程中断异常 |
Condition#awaitUninterruptibly() | / | 暂停线程,不抛出线程中断异常 |
Condition#await(time, unit) | Object#wait(timeoutMillis, nanos) | 暂停线程,直到被唤醒或等待指定时间后,超时后自动唤醒返回false,否则返回true |
Condition#awaitUntil(deadline) | / | 暂停线程,直到被唤醒或到达指定时间点,超时后自动唤醒返回false,否则返回true |
Condition#awaitNanos(nanosTimeout) | / | 暂停线程,直到被唤醒或等待指定时间后,返回值表示被唤醒时的剩余时间(nanosTimeout-耗时),结果为负数表示超时 |
除了以上差异外,Condition
还支持创建多个等待队列,即同一把锁拥有多个等待队列,线程在不同队列中等待,而Object
只有一个等待队列。
源码分析
作为接口Condition
非常惨,因为在Java中只有AQS中的内部类ConditionObject
实现了Condition
接口:
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
ConditionObject
只有两个Node
类型的字段,分别是链式结构中的头尾节点,ConditionObject
就是通过它们实现的等待队列。那么ConditionObject
的等待队列起到了怎样的作用呢?是类似于AQS中的排队机制吗?带着这两个问题,我们正是开始源码的分析。
await方法的实现
Condition
接口中定义了4个线程等待的方法:
void await() throws InterruptedException
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
方法虽然很多,但它们之间的差异较小,只体现在时间的处理上,我们看其中最常用的方法:
1 | public final void await() throws InterruptedException { |
注释1的部分,调用addConditionWaiter
方法添加到Condition
队列中:
1 | private Node addConditionWaiter() { |
可以看到,Condition
的队列是一个朴实无华的双向链表,每次调用addConditionWaiter
方法,都会加入到Condition
队列的尾部。
注释2的部分,释放线程持有的锁,同时移出AQS的队列
,内部调用了AQS的release
方法:
1 | =final int fullyRelease(Node node) { |
因为已经分析过AQS的release
方法和ReentrantLock
实现的tryRelease
方法,这里我们就不过多赘述了。
注释3的部分,isOnSyncQueue
判断当前线程是否在AQS的等待队列中,我们来看此时存在的情况:
- 如果
isOnSyncQueue
返回false
,即线程不在AQS的队列中
,进入自旋,调用LockSupport#park
暂停线程; - 如果
isOnSyncQueue
返回true
,即线程在AQS的队列中
,不进入自旋,执行后续逻辑。
结合注释1和注释2的部分,Condition#await
的实现原理了就很清晰了:
Condition
与AQS分别维护了一个等待队列,而且是互斥的,即同一个节点只会出现在一个队列中
;- 当调用
Condition#await
时,将线程添加到Condition
的队列中(注释1),同时从AQS队列中移出(注释2); - 接着判断线程位于的队列:
- 位于
Condition
队列中,该线程需要被暂停,调用LockSupport#park
; - 位于AQS队列中,该线程正在等待获取锁。
- 位于
基于以上的结论,我们已经能够猜到唤醒方法Condition#signalAll
的原理了:
- 将线程从
Condition
队列中移出,并添加到AQS的队列中; - 调用
LockSupport.unpark
唤醒线程。
至于这个猜想是否正确,我们接着来看唤醒方法的实现。
signal和signalAll方法的实现
来看signal
和signalAll
的源码:
1 | // 唤醒一个处于等待中的线程 |
两个方法唯一的差别在于头节点不为空的场景下,是调用doSignal
唤醒一个线程还是调用doSignalAll
唤醒所有线程:
1 | private void doSignal(Node first) { |
可以看到,无论是doSignal
还是doSignalAll
都只是将节点移出Condition
队列,而真正起到唤醒作用的是transferForSignal
方法,从方法名可以看到该方法是通过“转移”进行唤醒的,我们来看源码:
1 | final boolean transferForSignal(Node node) { |
transferForSignal
方法中,调用enq
方法将node
重新添加到AQS的队列中,并返回node
的前驱节点,随后对前驱节点的状态进行判断:
- 当ws>0时,前驱节点处于
Node.CANCELLED
状态,前驱节点退出锁的争抢,node
可以直接被唤醒; - 当ws≤0时,通过CAS修改前驱节点的状态为
Node.SIGNAL
,设置失败时,直接唤醒node
。
《AQS的今生,构建出JUC的基础》中介绍了waitStatus
的5种状态,其中Node.SIGNAL
状态表示需要唤醒后继节点。另外,在分析shouldParkAfterFailedAcquire
方法的源码时,我们知道在进入AQS的等待队列时,需要将前驱节点的状态更新为Node.SIGNAL
。
最后来看enq
的实现:
1 | private Node enq(Node node) { |
enq
的实现就非常简单了,通过CAS更新AQS的队列尾节点,相当于添加到AQS的队列中,并返回尾节点的前驱节点。好了,唤醒方法的源码到这里就结束了,是不是和我们当初的猜想一模一样呢?
原理
功能上,Condition
实现了AQS版Object#wait
和Object#notify
,用法上也与之相似,需要先获取锁,即需要在lock
与unlock
之间调用。原理上,简单来说就是线程在AQS的队列和Condition
的队列之间的转移。
线程t持有锁
假设有线程t已经获取了ReentrantLock
,线程t1,t2和t3正在AQS的队列中等待,我们可以得到这样的结构:
线程t执行Condition#await
如果线程t中调用了Condition#await
方法,线程t进入Condition
的等待队列中,线程t1获取ReentrantLock
,并从AQS的队列中移出,结构如下:
线程t1执行Condition#await
如果线程t1中也执行了Condition#await
方法,同样线程t1进入Condition
队列中,线程t2获取到ReentrantLock
,结构如下:
线程t2执行Condition#signal
如果线程t2执行了Condition#signal
,唤醒Condition
队列中的第一个线程,此时结构如下:
通过上面的流程,我们就可以得到线程是如何在Condition
队列与AQS队列中转移的:
小结
condition
可以用来实现等待/通知模式
。具体实现逻辑是,等待的线程await加入等待队列
,当被通知后,从等待队列中移出,添加到aqs同步队列
中,然后继续开始锁到获取(自旋获取)。