前两篇文章介绍了AQS的核心同步机制,使用CHL同步队列实现线程等待和唤醒,一个int值记录资源量。为上层各式各样的同步器实现画好了模版,像已经介绍到的ReentrantLock,Semaphroe,CountDownLatch都是在模版基础上实现的。花里胡哨,万变不离其宗。
以下是第三部分的内容,尝试写完Condition部分,基本结束AQS源码的学习,不过还是围绕着一个队列(条件队列)来进行的。
Tips
在第一篇文章中介绍Node类的nextWaiter字段的时候已经说明过它的一个字段两用,对于条件队列只有在独占模式下才会有。所以关于Condition的所有实现的一个前提是独占模式,需要谨记在心,对于理解源码非常重要。
队列结构条件队列的数据结构:定义了firstWaiter
指向头节点,lastWaiter
指向尾节点,node中nextWaiter
指向后继节点,没有使用Node结构中的pred和next,Node中维护着waitStatus
和thread
字段。所以条件队列是一个头尾节点有指向的单向链表,如下图所示:
和同步队列不同的是它不需要单独维护的head虚节点,节点的waitStatus只有两种:CONDITION,CANCELLED。
ConditionObject内部方法对于条件队列的操作全部是在ConditionObject内部完成的,先详细阅读好这些方法
addConditionWaiter向条件队列插入新节点,本方法由await调用,await是是获取锁的情况下执行的,所以代码中不需要考虑并发情况,相对来说就简单很多,没有cas,没有自旋,只是一个正常入队操作。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
// 【1】清理节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 【2】创建节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
-
【1】插入的时候先判断了下原尾节点状态,如果不是CONDITION状态,就认为节点已经是取消状态,触发
unlinkCancelledWaiters
清理节点。 -
【2】创建的新节点初始化状态是CONDITION,对于新插入的节点,lastWaiter一定是指向的,因为是队列后面加入的,然后分两种情况:
-
1,如果队列没有节点了,那firstWaiter也得指向这个新插入的节点
-
2,如果只要列表还有节点,那最后节点的next指向新节点
-
本方法只在两种情况下会调用:
- 条件等待(await)的时候发生中断(cancellation occurred),线程被取消了,节点作为本线程的资源自然需要清除
- 条件队列插入新节点的时候检查原尾节点已经CANCELLED状态,插入的时候顺便检查了下发现原尾节点状态不行,触发清除操作
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 遍历到的最后面的有效节点
Node trail = null;
// 完整遍历
while (t != null) {
Node next = t.nextWaiter;
// 节点状态非CONDITION
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 遍历开始一直是CANCELLED状态节点
if (trail == null)
firstWaiter = next;
else // 中间段有CANCELLED状态节点
trail.nextWaiter = next;
// 遍历到队列尾部
if (next == null)
lastWaiter = trail;
}
else
// 记录最后面的有效节点
trail = t;
t = next;
}
}
关联知识
这种清除CANCELLED状态节点的操作在等待队列上也是有的,可以回忆一下cancelAcquire
方法,不过它是传入指定节点,会从这个节点往前连续的CANCELLED状态节点进行清除。再比如ThreadLocalMap上也有类似的操作,当发现过期slot就会触发清理操作,清理操作也是执行到出现可用slot位置就会停止,都不会触发完整遍历清除的操作。
这里就不同了,它是完整遍历进行清除,没有什么特定的位置停止的逻辑。想象一下这种场景没有触发signal之前,所有条件队列里的节点是不会被回收的,因为他们需要静静地等待被唤醒,唤醒的时候自然会检查状态,但是加入很长时间没来唤醒,条件队列里又有很多需要清理的节点就会浪费内存,所以好不容易触发一次清理那么就进行完整遍历清理。如此不会在取消节点特别多的时候出现不停触发清理操作。
代码分析-
遍历清理的操作会有以下几种场景:
- 遍历开始一直是CANCELLED状态节点,此时需要注意firstWaiter的指向需要更新
- 中间段有CANCELLED状态节点,把清除的节点前后节点连接起来
遍历是从头至尾开始的,注意
trail
字段表示遍历到的最后面的有效节点1,检测当前节点是取消节点,先把这个节点nextWaiter设置成null,断开和后继节点的关联,然后判断
trail
是否为null,trail
为null表示从头遍历到此时还没有出现一个有效节点,也就是第一种情况,所以firstWaiter
就指向next,表示队列的开始就从next开始,因为前面的节点都是取消状态了,如果trail
不为null,那么就是第二种情况,将取消节点前后节点关联起来即可,而trail
作为此时遍历到最后一个有效节点,自然就是trail
的nextWaiter指向到取消节点的后继节点。如果后继节点是null,表示队列已经遍历结束,那么需要更新lastWaiter
指向到trail
。2,如果不是取消节点,
trail
更新,遍历继续即可
以下图示有三个取消状态节点的队列清除操作的过程:
Conditions 支持方法AQS中有一个专门的Internal support methods for Conditions
被唤醒时,把条件队列的节点换到同步队列的操作
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 【1】节点状态更新为初始化状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//【2】入队操作
Node p = enq(node);
int ws = p.waitStatus;
//【3】
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 【1】先用cas修改节点状态为初始状态0,但是如果原状态不是CONDITION那就会返回失败,什么场景会有waitStatus为CANCELLED的节点呢?注意这里很关键:在await操作的时候执行release操作失败,就会把新加入条件队列的节点状态改为CANCELLED,并没有直接从调节队列中清除,这种节点是无效节点,这里遇到无效节点就会返回false,调用本方法的代码可以根据返回结果判断是否继续向前遍历条件队列,找到有效节点。
- 【2】,【3】,我们知道single操作是在持有锁的情况下进行的,操作完成后正常都会进行unLock操作,那么也就是说single操作只需要做一件事那就是把条件队列中可用的节点转移到同步队列中即可,所以当我们执行完enq方法已经完成任务。而实际代码做了更多:enq方法结束意味着入队成功,方法返回的是新入队的node的前继节点,然后根据前继节点的状态判断出是否需要进行唤醒当前节点线程。如果前继节点是CANCELLED状态或者状态不能顺利修改成SIGNAL,就会唤醒这个节点的线程。假如触发了唤醒,此时的唤醒的线程是等待在await方法中的unpak操作上的,当唤醒的时候也应该执行那边的代码,并不是像一个普通的同步队列中节点线程唤醒时执行的代码一样,这部分代码在后续分析到唤醒部分的时候描述。
判断节点是否在同步队列中,这个判断后面方法经常需要用到,因为Condition的场景中需要两个队列节点的转换操作,其中有并发的操作的情况需要考虑。
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
// 【1】条件队列判断
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 【2】同步队列判断
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 【3】遍历
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
- 【1】,在条件队列的判断是如果waitStatus只要还是CONDITION或者node.prev为空那么就一定还在条件队列,在进入同步队列时node.prev会被先设置,所以通过node.prev判断是不为空不能保证一定在同步队列,但是为空就一定不在同步队列。
- 【2】,在同步队列的判断是node.next不为空就确定一定在同步队列,我们在已经知道
enq
中进行节点入队时,入队完成的最后一步就是设置next。 - 【3】,遍历保底,经过前面两个判断的过滤,执行到这里的情况就是node.prev != null并且node.next == null,那就是入队入到一半的情况,所以进行一次从尾节点向前遍历找这个节点,确保节点是否真的已经入队到同步队列中。我们再思考下入是尾节点cas指向还没执行或执行失败的情况,那么遍历也是找不到这个节点在同步队列的,还不是认为没有在同步队列吗?的确,所以在此划出关键点:看一个队列是否进入到同步队列就看尾节点cas指向是否成功。 另外我们可以发现所有使用到
isOnSyncQueue
的地方都是while循环
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
这个方法的逻辑:
- 1,先判断是否能通过CAS把状态从CONDITION改为初始状态,如果可以,说明这个节点还在条件队列上,就直接执行enq,把节点迁移到同步队列上去然后返回true。
- 2,如果不能通过CAS把状态从CONDITION改为初始状态,说明节点状态已经不是CONDITION,然后自旋判断节点是否还没有进入同步队列,如果是就让出CPU等待一下,直到节点进入到同步队列,然后返回false
单单从这个代码逻辑上看是有点奇怪的,但是从调用的上层看是合理的,在await方法的分析中会涉及到。
fullyRelease完全释放当前state的值,也就是此时state的值是多少就调用release方法的时候传多少。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
- 这个方法也是在await系列方法里调用,在await的时候会释放锁,前几篇文章已经提及过
ReentrantLock
的可重入特性,就是每次获取锁时将state累加,当await的时候一次性全部释放才行。 - 调用release有可能发生失败返回false,会进入else后抛出
IllegalMonitorStateException
,另外release方法也可能直接抛出异常,比如ReentrantLock
的实现里判断不是持有锁的线程就会抛出一样的IllegalMonitorStateException
。这都会使failed不能更新为false,所以在finally代码块中会把节点的状态改为CANCELLED。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//【1】新增一个节点入队条件队列
Node node = addConditionWaiter();
// 释放锁,如果释放失败,节点状态会变更为取消
int savedState = fullyRelease(node);
int interruptMode = 0;
//【2】
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//【3】唤醒线程后
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
- 【1】首先向条件队列加入新节点,成功后释放锁。
- 【2】自选判断节点不在同步队列中,然后线程就进入等待状态了,直到有线程中断或者signal唤醒。后面唤醒后执行的代码就在signal方法中分析。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
-
唤醒操作的代码逻辑是这样的:
- 1,先把first.nextWaiter置为null就是把firstWaiter从条件队列中脱离,
- 2,在transferForSignal方法中判断这个节点为有效节点,如果是就更新节点状态为初始状态,然后调用enq方法把节点放入同步队列的尾部,然后再把前节点状态改为SIGNAL。
- 3,在transferForSignal方法中判断这个节点为无效节点,就继续从条件队列中脱离出一个firstWaiter节点再执行transferForSignal进行判断直到一个有效节点出现或队列遍历结束。
现在我们回头去看await方法中park位置的代码,继续分析唤醒后执行的代码逻辑:
首先我们需要清楚,一个park的线程唤醒起来有两种情况:
- 1,正常的signal操作触发
- 2,线程interrupt触发
而我们是不清楚这个唤醒是哪个原因触发的,举个例子,当signal操作触发后,线程也出现了interrupt,我们通过中断标记难道就说是interrupt原因导致的unPark的吗?肯定是不准确的。随意就有了一下几种情况:
-
1,没有中断标记,那就不用想了,肯定是signal操作触发
-
2,有中断标记,判断这个中断标记是signal操作前,还是signal操作后
- 如果是signal操作前,算interrupt触发
- 如果是signal操作后,算signal触发
对应着一个字段interruptMode标记来区分这三种情况,在checkInterruptWhileWaiting方法中的返回逻辑:
-
0
-
有中断标记
- REINTERRUPT(1)
- THROW_IE(-1)
那么怎么判断中断是signal操作前还是后呢?关键方法是transferAfterCancelledWait
,根据这个方法的逻辑,被唤醒的线程的节点状态能够从CONDITION改成0,那意味着还没有出发到signal,那就是interrupt触发,就会把节点转移到同步队列上去,标记THROW_IE。如果状态已经不是CONDITION,那么自旋保证节点转移到同步队列成功,标记REINTERRUPT。
线程唤醒后退出自旋后执行以下代码:
// 【1】
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 【2】
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//【3】
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
- 【1】,执行acquireQueued方法,会尝试获取一次锁,没获取到就进同步队列,线程再进入等待,获取到锁的情况返回true,表示在队列中等待后再获取锁并且发现自己已经被标记中断,并且不是THROW_IE标记,就会变更为REINTERRUPT。因为后续是REINTERRUPT会进行一次线程异常标记。这里这样操作的原因是,在进入acquireQueued方法返回true的场景是等待同步锁的时候线程被标记中断,这个时候前面执行的时候interruptMode可能是0,那么就需要补一次执行
selfInterrupt()
。 - 【2】,对条件队列执行清理无效节点
- 【3】,对中断标记类型做处理,THROW_IE:抛出异常,REINTERRUPT:线程标记异常(
selfInterrupt()
)
1,在Lock替换synchronized方法和语句的地方,Condition替换Object监视方法的使用。对于condition的使用,有很多场景,像LinkedBlockingQueue
中就有很典型的使用,所以理解清楚这部分的实现,有助于后续阅读其他源码。
2,对于关键的两个操作,await就是从同步队列节点释放,然后条件队列节点加入;signal就是条件队列节点移除,同步队列节点加入。